From 0e3ce1e9e4ae9a4f2d602b54d8ef8e1c5cff8671 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Mon, 19 Apr 2021 20:36:37 +0200 Subject: [PATCH] fix: lock aggregation jobs to one at a time on a per-user basis (resolve #180) --- repositories/summary.go | 9 ++++++++- services/aggregation.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/repositories/summary.go b/repositories/summary.go index f151780..603d24b 100644 --- a/repositories/summary.go +++ b/repositories/summary.go @@ -16,7 +16,14 @@ func NewSummaryRepository(db *gorm.DB) *SummaryRepository { func (r *SummaryRepository) GetAll() ([]*models.Summary, error) { var summaries []*models.Summary - if err := r.db.Find(&summaries).Error; err != nil { + if err := r.db. + Order("from_time asc"). + Preload("Projects", "type = ?", models.SummaryProject). + Preload("Languages", "type = ?", models.SummaryLanguage). + Preload("Editors", "type = ?", models.SummaryEditor). + Preload("OperatingSystems", "type = ?", models.SummaryOS). + Preload("Machines", "type = ?", models.SummaryMachine). + Find(&summaries).Error; err != nil { return nil, err } return summaries, nil diff --git a/services/aggregation.go b/services/aggregation.go index 32f0122..415b6eb 100644 --- a/services/aggregation.go +++ b/services/aggregation.go @@ -1,9 +1,11 @@ package services import ( + "errors" "github.com/emvi/logbuch" "github.com/muety/wakapi/config" "runtime" + "sync" "time" "github.com/go-co-op/gocron" @@ -14,11 +16,14 @@ const ( aggregateIntervalDays int = 1 ) +var lock = sync.Mutex{} + type AggregationService struct { config *config.Config userService IUserService summaryService ISummaryService heartbeatService IHeartbeatService + inProgress map[string]bool } func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService { @@ -27,6 +32,7 @@ func NewAggregationService(userService IUserService, summaryService ISummaryServ userService: userService, summaryService: summaryService, heartbeatService: heartbeatService, + inProgress: map[string]bool{}, } } @@ -49,6 +55,11 @@ func (srv *AggregationService) Schedule() { } func (srv *AggregationService) Run(userIds map[string]bool) error { + if err := srv.lockUsers(userIds); err != nil { + return err + } + defer srv.unlockUsers(userIds) + jobs := make(chan *AggregationJob) summaries := make(chan *models.Summary) @@ -145,6 +156,28 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[ return nil } +func (srv *AggregationService) lockUsers(userIds map[string]bool) error { + lock.Lock() + defer lock.Unlock() + for uid := range userIds { + if _, ok := srv.inProgress[uid]; ok { + return errors.New("aggregation already in progress for at least of the request users") + } + } + for uid := range userIds { + srv.inProgress[uid] = true + } + return nil +} + +func (srv *AggregationService) unlockUsers(userIds map[string]bool) { + lock.Lock() + defer lock.Unlock() + for uid := range userIds { + delete(srv.inProgress, uid) + } +} + func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob) { var to time.Time