mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
fix: concurrency bugs with summary aggregation and user counting
This commit is contained in:
parent
aab9e98ebd
commit
a4b89d3a69
@ -91,7 +91,8 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri
|
|||||||
jobs := make(chan *AggregationJob)
|
jobs := make(chan *AggregationJob)
|
||||||
defer close(jobs)
|
defer close(jobs)
|
||||||
go func() {
|
go func() {
|
||||||
for job := range jobs {
|
for jobRef := range jobs {
|
||||||
|
job := *jobRef
|
||||||
if err := srv.queueWorkers.Dispatch(func() {
|
if err := srv.queueWorkers.Dispatch(func() {
|
||||||
srv.process(job)
|
srv.process(job)
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
@ -122,7 +123,7 @@ func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[stri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (srv *AggregationService) process(job *AggregationJob) {
|
func (srv *AggregationService) process(job AggregationJob) {
|
||||||
if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil {
|
if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil {
|
||||||
config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err)
|
config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err)
|
||||||
} else {
|
} else {
|
||||||
|
@ -63,11 +63,12 @@ func (srv *MiscService) CountTotalTime() {
|
|||||||
pendingJobs.Add(len(users))
|
pendingJobs.Add(len(users))
|
||||||
|
|
||||||
for _, u := range users {
|
for _, u := range users {
|
||||||
|
user := *u
|
||||||
if err := srv.queueWorkers.Dispatch(func() {
|
if err := srv.queueWorkers.Dispatch(func() {
|
||||||
defer pendingJobs.Done()
|
defer pendingJobs.Done()
|
||||||
totalTime.Add(srv.countUserTotalTime(u.ID))
|
totalTime.Add(srv.countUserTotalTime(user.ID))
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
config.Log().Error("failed to enqueue counting job for user '%s'", u.ID)
|
config.Log().Error("failed to enqueue counting job for user '%s'", user.ID)
|
||||||
pendingJobs.Done()
|
pendingJobs.Done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user