1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

fix: lock aggregation jobs to one at a time on a per-user basis (resolve #180)

This commit is contained in:
Ferdinand Mütsch 2021-04-19 20:36:37 +02:00
parent 50a54bde22
commit 0e3ce1e9e4
2 changed files with 41 additions and 1 deletions

View File

@ -16,7 +16,14 @@ func NewSummaryRepository(db *gorm.DB) *SummaryRepository {
func (r *SummaryRepository) GetAll() ([]*models.Summary, error) {
var summaries []*models.Summary
if err := r.db.Find(&summaries).Error; err != nil {
if err := r.db.
Order("from_time asc").
Preload("Projects", "type = ?", models.SummaryProject).
Preload("Languages", "type = ?", models.SummaryLanguage).
Preload("Editors", "type = ?", models.SummaryEditor).
Preload("OperatingSystems", "type = ?", models.SummaryOS).
Preload("Machines", "type = ?", models.SummaryMachine).
Find(&summaries).Error; err != nil {
return nil, err
}
return summaries, nil

View File

@ -1,9 +1,11 @@
package services
import (
"errors"
"github.com/emvi/logbuch"
"github.com/muety/wakapi/config"
"runtime"
"sync"
"time"
"github.com/go-co-op/gocron"
@ -14,11 +16,14 @@ const (
aggregateIntervalDays int = 1
)
var lock = sync.Mutex{}
type AggregationService struct {
config *config.Config
userService IUserService
summaryService ISummaryService
heartbeatService IHeartbeatService
inProgress map[string]bool
}
func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService {
@ -27,6 +32,7 @@ func NewAggregationService(userService IUserService, summaryService ISummaryServ
userService: userService,
summaryService: summaryService,
heartbeatService: heartbeatService,
inProgress: map[string]bool{},
}
}
@ -49,6 +55,11 @@ func (srv *AggregationService) Schedule() {
}
func (srv *AggregationService) Run(userIds map[string]bool) error {
if err := srv.lockUsers(userIds); err != nil {
return err
}
defer srv.unlockUsers(userIds)
jobs := make(chan *AggregationJob)
summaries := make(chan *models.Summary)
@ -145,6 +156,28 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[
return nil
}
func (srv *AggregationService) lockUsers(userIds map[string]bool) error {
lock.Lock()
defer lock.Unlock()
for uid := range userIds {
if _, ok := srv.inProgress[uid]; ok {
return errors.New("aggregation already in progress for at least of the request users")
}
}
for uid := range userIds {
srv.inProgress[uid] = true
}
return nil
}
func (srv *AggregationService) unlockUsers(userIds map[string]bool) {
lock.Lock()
defer lock.Unlock()
for uid := range userIds {
delete(srv.inProgress, uid)
}
}
func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob) {
var to time.Time