From a4b89d3a6961fefe75e777120d8f6b244682452e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Thu, 1 Dec 2022 14:13:52 +0100 Subject: [PATCH] fix: concurrency bugs with summary aggregation and user counting --- services/aggregation.go | 5 +++-- services/misc.go | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/services/aggregation.go b/services/aggregation.go index d2276a4..d39454a 100644 --- a/services/aggregation.go +++ b/services/aggregation.go @@ -91,7 +91,8 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri jobs := make(chan *AggregationJob) defer close(jobs) go func() { - for job := range jobs { + for jobRef := range jobs { + job := *jobRef if err := srv.queueWorkers.Dispatch(func() { srv.process(job) }); err != nil { @@ -122,7 +123,7 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri return nil } -func (srv *AggregationService) process(job *AggregationJob) { +func (srv *AggregationService) process(job AggregationJob) { if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil { config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err) } else { diff --git a/services/misc.go b/services/misc.go index bb7fb24..9ad8e54 100644 --- a/services/misc.go +++ b/services/misc.go @@ -63,11 +63,12 @@ func (srv *MiscService) CountTotalTime() { pendingJobs.Add(len(users)) for _, u := range users { + user := *u if err := srv.queueWorkers.Dispatch(func() { defer pendingJobs.Done() - totalTime.Add(srv.countUserTotalTime(u.ID)) + totalTime.Add(srv.countUserTotalTime(user.ID)) }); err != nil { - config.Log().Error("failed to enqueue counting job for user '%s'", u.ID) + config.Log().Error("failed to enqueue counting job for user '%s'", user.ID) pendingJobs.Done() } }