diff --git a/config/jobqueue.go b/config/jobqueue.go index baf3c0d..6db17c7 100644 --- a/config/jobqueue.go +++ b/config/jobqueue.go @@ -2,25 +2,33 @@ package config import ( "fmt" + "github.com/emvi/logbuch" "github.com/muety/artifex" + "math" + "runtime" ) var jobqueues map[string]*artifex.Dispatcher const ( - QueueDefault = "" - QueueProcessing = "processing" - QueueMails = "mails" + QueueDefault = "wakapi.default" + QueueProcessing = "wakapi.processing" + QueueReports = "wakapi.reports" ) func init() { jobqueues = make(map[string]*artifex.Dispatcher) + + InitQueue(QueueDefault, 1) + InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0))) + InitQueue(QueueReports, 1) } func InitQueue(name string, workers int) error { if _, ok := jobqueues[name]; ok { return fmt.Errorf("queue '%s' already existing", name) } + logbuch.Info("creating job queue '%s' (%d workers)", name, workers) jobqueues[name] = artifex.NewDispatcher(workers, 4096) jobqueues[name].Start() return nil diff --git a/main.go b/main.go index 51fc4bd..237a135 100644 --- a/main.go +++ b/main.go @@ -186,9 +186,9 @@ func main() { // Schedule background tasks go aggregationService.Schedule() - go leaderboardService.ScheduleDefault() - go miscService.ScheduleCountTotalTime() + go leaderboardService.Schedule() go reportService.Schedule() + go miscService.ScheduleCountTotalTime() routes.Init() diff --git a/routes/settings.go b/routes/settings.go index bf2066f..5e8ad9c 100644 --- a/routes/settings.go +++ b/routes/settings.go @@ -661,7 +661,7 @@ func (h *SettingsHandler) regenerateSummaries(user *models.User) error { return err } - if err := h.aggregationSrvc.Run(datastructure.NewSet(user.ID)); err != nil { + if err := h.aggregationSrvc.AggregateSummaries(datastructure.NewSet(user.ID)); err != nil { logbuch.Error("failed to regenerate summaries: %v", err) return err } diff --git a/services/aggregation.go b/services/aggregation.go index f11653a..d2276a4 100644 --- a/services/aggregation.go +++ b/services/aggregation.go @@ -4,12 +4,11 @@ import ( "errors" datastructure "github.com/duke-git/lancet/v2/datastructure/set" "github.com/emvi/logbuch" + "github.com/muety/artifex" "github.com/muety/wakapi/config" - "runtime" "sync" "time" - "github.com/go-co-op/gocron" "github.com/muety/wakapi/models" ) @@ -25,6 +24,8 @@ type AggregationService struct { summaryService ISummaryService heartbeatService IHeartbeatService inProgress datastructure.Set[string] + queueDefault *artifex.Dispatcher + queueWorkers *artifex.Dispatcher } func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService { @@ -34,6 +35,8 @@ func NewAggregationService(userService IUserService, summaryService ISummaryServ summaryService: summaryService, heartbeatService: heartbeatService, inProgress: datastructure.NewSet[string](), + queueDefault: config.GetDefaultQueue(), + queueWorkers: config.GetQueue(config.QueueProcessing), } } @@ -45,58 +48,23 @@ type AggregationJob struct { // Schedule a job to (re-)generate summaries every day shortly after midnight func (srv *AggregationService) Schedule() { - s := gocron.NewScheduler(time.Local) - s.Every(1).Day().At(srv.config.App.AggregationTime).WaitForSchedule().Do(srv.Run, datastructure.NewSet[string]()) - s.StartBlocking() + logbuch.Info("scheduling summary aggregation") + + if _, err := srv.queueDefault.DispatchCron(func() { + if err := srv.AggregateSummaries(datastructure.NewSet[string]()); err != nil { + config.Log().Error("failed to generate summaries, %v", err) + } + }, srv.config.App.GetAggregationTimeCron()); err != nil { + config.Log().Error("failed to schedule summary generation, %v", err) + } } -func (srv *AggregationService) Run(userIds datastructure.Set[string]) error { +func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[string]) error { if err := srv.lockUsers(userIds); err != nil { return err } defer srv.unlockUsers(userIds) - jobs := make(chan *AggregationJob) - summaries := make(chan *models.Summary) - - for i := 0; i < runtime.NumCPU(); i++ { - go srv.summaryWorker(jobs, summaries) - } - - for i := 0; i < int(srv.config.Db.MaxConn); i++ { - go srv.persistWorker(summaries) - } - - // don't leak open channels - go func(c1 chan *AggregationJob, c2 chan *models.Summary) { - defer close(c1) - defer close(c2) - time.Sleep(1 * time.Hour) - }(jobs, summaries) - - return srv.trigger(jobs, userIds) -} - -func (srv *AggregationService) summaryWorker(jobs <-chan *AggregationJob, summaries chan<- *models.Summary) { - for job := range jobs { - if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil { - config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err) - } else { - logbuch.Info("successfully generated summary (%v, %v, %s)", job.From, job.To, job.UserID) - summaries <- summary - } - } -} - -func (srv *AggregationService) persistWorker(summaries <-chan *models.Summary) { - for summary := range summaries { - if err := srv.summaryService.Insert(summary); err != nil { - config.Log().Error("failed to save summary (%v, %v, %s) - %v", summary.UserID, summary.FromTime, summary.ToTime, err) - } - } -} - -func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds datastructure.Set[string]) error { logbuch.Info("generating summaries") // Get a map from user ids to the time of their latest summary or nil if none exists yet @@ -119,6 +87,19 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds data firstUserHeartbeatLookup[e.User] = e.Time } + // Dispatch summary generation jobs + jobs := make(chan *AggregationJob) + defer close(jobs) + go func() { + for job := range jobs { + if err := srv.queueWorkers.Dispatch(func() { + srv.process(job) + }); err != nil { + config.Log().Error("failed to dispatch summary generation job for user '%s'", job.UserID) + } + } + }() + // Generate summary aggregation jobs for _, e := range lastUserSummaryTimes { if userIds != nil && !userIds.IsEmpty() && !userIds.Contain(e.User) { @@ -141,24 +122,15 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds data return nil } -func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error { - aggregationLock.Lock() - defer aggregationLock.Unlock() - for uid := range userIds { - if srv.inProgress.Contain(uid) { - return errors.New("aggregation already in progress for at least of the request users") +func (srv *AggregationService) process(job *AggregationJob) { + if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil { + config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err) + } else { + logbuch.Info("successfully generated summary (%v, %v, %s)", job.From, job.To, job.UserID) + if err := srv.summaryService.Insert(summary); err != nil { + config.Log().Error("failed to save summary (%v, %v, %s) - %v", summary.UserID, summary.FromTime, summary.ToTime, err) } } - srv.inProgress = srv.inProgress.Union(userIds) - return nil -} - -func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) { - aggregationLock.Lock() - defer aggregationLock.Unlock() - for uid := range userIds { - srv.inProgress.Delete(uid) - } } func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob) { @@ -189,6 +161,26 @@ func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob } } +func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error { + aggregationLock.Lock() + defer aggregationLock.Unlock() + for uid := range userIds { + if srv.inProgress.Contain(uid) { + return errors.New("aggregation already in progress for at least of the request users") + } + } + srv.inProgress = srv.inProgress.Union(userIds) + return nil +} + +func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) { + aggregationLock.Lock() + defer aggregationLock.Unlock() + for uid := range userIds { + srv.inProgress.Delete(uid) + } +} + func getStartOfToday() time.Time { now := time.Now() return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 1, now.Location()) diff --git a/services/leaderboard.go b/services/leaderboard.go index 0dd2cb1..8bea809 100644 --- a/services/leaderboard.go +++ b/services/leaderboard.go @@ -2,8 +2,8 @@ package services import ( "github.com/emvi/logbuch" - "github.com/go-co-op/gocron" "github.com/leandro-lugaresi/hub" + "github.com/muety/artifex" "github.com/muety/wakapi/config" "github.com/muety/wakapi/models" "github.com/muety/wakapi/repositories" @@ -22,6 +22,8 @@ type LeaderboardService struct { repository repositories.ILeaderboardRepository summaryService ISummaryService userService IUserService + queueDefault *artifex.Dispatcher + queueWorkers *artifex.Dispatcher } func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, summaryService ISummaryService, userService IUserService) *LeaderboardService { @@ -32,6 +34,8 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, repository: leaderboardRepo, summaryService: summaryService, userService: userService, + queueDefault: config.GetDefaultQueue(), + queueWorkers: config.GetQueue(config.QueueProcessing), } onUserUpdate := srv.eventBus.Subscribe(0, config.EventUserUpdate) @@ -48,7 +52,7 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, if user.PublicLeaderboard && !exists { logbuch.Info("generating leaderboard for '%s' after settings update", user.ID) - srv.Run([]*models.User{user}, models.IntervalPast7Days, []uint8{models.SummaryLanguage}) + srv.ComputeLeaderboard([]*models.User{user}, models.IntervalPast7Days, []uint8{models.SummaryLanguage}) } else if !user.PublicLeaderboard && exists { logbuch.Info("clearing leaderboard for '%s' after settings update", user.ID) if err := srv.repository.DeleteByUser(user.ID); err != nil { @@ -62,23 +66,26 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, return srv } -func (srv *LeaderboardService) ScheduleDefault() { - runAllUsers := func(interval *models.IntervalKey, by []uint8) { +func (srv *LeaderboardService) Schedule() { + logbuch.Info("scheduling leaderboard generation") + + generate := func() { users, err := srv.userService.GetAllByLeaderboard(true) if err != nil { config.Log().Error("failed to get users for leaderboard generation - %v", err) return } - - srv.Run(users, interval, by) + srv.ComputeLeaderboard(users, models.IntervalPast7Days, []uint8{models.SummaryLanguage}) } - s := gocron.NewScheduler(time.Local) - s.Every(1).Day().At(srv.config.App.LeaderboardGenerationTime).Do(runAllUsers, models.IntervalPast7Days, []uint8{models.SummaryLanguage}) - s.StartBlocking() + for _, cronExp := range srv.config.App.GetLeaderboardGenerationTimeCron() { + if _, err := srv.queueDefault.DispatchCron(generate, cronExp); err != nil { + config.Log().Error("failed to schedule leaderboard generation (%s), %v", cronExp, err) + } + } } -func (srv *LeaderboardService) Run(users []*models.User, interval *models.IntervalKey, by []uint8) error { +func (srv *LeaderboardService) ComputeLeaderboard(users []*models.User, interval *models.IntervalKey, by []uint8) error { logbuch.Info("generating leaderboard (%s) for %d users (%d aggregations)", (*interval)[0], len(users), len(by)) for _, user := range users { diff --git a/services/misc.go b/services/misc.go index f493209..5d2f72f 100644 --- a/services/misc.go +++ b/services/misc.go @@ -2,12 +2,13 @@ package services import ( "github.com/emvi/logbuch" + "github.com/muety/artifex" "github.com/muety/wakapi/config" - "runtime" + "github.com/muety/wakapi/utils" "strconv" + "sync" "time" - "github.com/go-co-op/gocron" "github.com/muety/wakapi/models" ) @@ -16,6 +17,8 @@ type MiscService struct { userService IUserService summaryService ISummaryService keyValueService IKeyValueService + queueDefault *artifex.Dispatcher + queueWorkers *artifex.Dispatcher } func NewMiscService(userService IUserService, summaryService ISummaryService, keyValueService IKeyValueService) *MiscService { @@ -24,81 +27,64 @@ func NewMiscService(userService IUserService, summaryService ISummaryService, ke userService: userService, summaryService: summaryService, keyValueService: keyValueService, + queueDefault: config.GetDefaultQueue(), + queueWorkers: config.GetQueue(config.QueueProcessing), } } -type CountTotalTimeJob struct { - UserID string - NumJobs int -} - -type CountTotalTimeResult struct { - UserId string - Total time.Duration -} - func (srv *MiscService) ScheduleCountTotalTime() { - s := gocron.NewScheduler(time.Local) - s.Every(1).Hour().WaitForSchedule().Do(srv.runCountTotalTime) - s.StartBlocking() + if _, err := srv.queueDefault.DispatchEvery(srv.CountTotalTime, 1*time.Hour); err != nil { + config.Log().Error("failed to schedule user counting jobs, %v", err) + } } -func (srv *MiscService) runCountTotalTime() error { +func (srv *MiscService) CountTotalTime() { users, err := srv.userService.GetAll() if err != nil { - return err + logbuch.Error("failed to fetch users for time counting, %v", err) } - jobs := make(chan *CountTotalTimeJob, len(users)) - results := make(chan *CountTotalTimeResult, len(users)) + var totalTime time.Duration = 0 + var pendingJobs sync.WaitGroup + pendingJobs.Add(len(users)) for _, u := range users { - jobs <- &CountTotalTimeJob{ - UserID: u.ID, - NumJobs: len(users), + if err := srv.queueWorkers.Dispatch(func() { + defer pendingJobs.Done() + totalTime += srv.countUserTotalTime(u.ID) + }); err != nil { + config.Log().Error("failed to enqueue counting job for user '%s'", u.ID) + pendingJobs.Done() } } - close(jobs) - - for i := 0; i < runtime.NumCPU(); i++ { - go srv.countTotalTimeWorker(jobs, results) - } // persist - var i int - var total time.Duration - for i = 0; i < len(users); i++ { - result := <-results - total += result.Total - } - close(results) - - if err := srv.keyValueService.PutString(&models.KeyStringValue{ - Key: config.KeyLatestTotalTime, - Value: total.String(), - }); err != nil { - logbuch.Error("failed to save total time count: %v", err) - } - - if err := srv.keyValueService.PutString(&models.KeyStringValue{ - Key: config.KeyLatestTotalUsers, - Value: strconv.Itoa(i), - }); err != nil { - logbuch.Error("failed to save total users count: %v", err) - } - - return nil -} - -func (srv *MiscService) countTotalTimeWorker(jobs <-chan *CountTotalTimeJob, results chan<- *CountTotalTimeResult) { - for job := range jobs { - if result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: job.UserID}, srv.summaryService.Retrieve, nil, false); err != nil { - config.Log().Error("failed to count total for user %s: %v", job.UserID, err) - } else { - results <- &CountTotalTimeResult{ - UserId: job.UserID, - Total: result.TotalTime(), + go func(wg *sync.WaitGroup) { + if utils.WaitTimeout(&pendingJobs, 10*time.Minute) { + if err := srv.keyValueService.PutString(&models.KeyStringValue{ + Key: config.KeyLatestTotalTime, + Value: totalTime.String(), + }); err != nil { + logbuch.Error("failed to save total time count: %v", err) } + + if err := srv.keyValueService.PutString(&models.KeyStringValue{ + Key: config.KeyLatestTotalUsers, + Value: strconv.Itoa(len(users)), + }); err != nil { + logbuch.Error("failed to save total users count: %v", err) + } + } else { + config.Log().Error("waiting for user counting jobs timed out") } - } + }(&pendingJobs) +} + +func (srv *MiscService) countUserTotalTime(userId string) time.Duration { + result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: userId}, srv.summaryService.Retrieve, nil, false) + if err != nil { + config.Log().Error("failed to count total for user %s: %v", userId, err) + return 0 + } + return result.TotalTime() } diff --git a/services/report.go b/services/report.go index b2d473c..ce4ca40 100644 --- a/services/report.go +++ b/services/report.go @@ -4,6 +4,7 @@ import ( "github.com/duke-git/lancet/v2/slice" "github.com/emvi/logbuch" "github.com/leandro-lugaresi/hub" + "github.com/muety/artifex" "github.com/muety/wakapi/config" "github.com/muety/wakapi/models" "math/rand" @@ -11,7 +12,7 @@ import ( ) // delay between evey report generation task (to throttle email sending frequency) -const reportDelay = 15 * time.Second +const reportDelay = 5 * time.Second // past time range to cover in the report const reportRange = 7 * 24 * time.Hour @@ -23,6 +24,8 @@ type ReportService struct { userService IUserService mailService IMailService rand *rand.Rand + queueDefault *artifex.Dispatcher + queueWorkers *artifex.Dispatcher } func NewReportService(summaryService ISummaryService, userService IUserService, mailService IMailService) *ReportService { @@ -33,15 +36,27 @@ func NewReportService(summaryService ISummaryService, userService IUserService, userService: userService, mailService: mailService, rand: rand.New(rand.NewSource(time.Now().Unix())), + queueDefault: config.GetDefaultQueue(), + queueWorkers: config.GetQueue(config.QueueReports), } return srv } func (srv *ReportService) Schedule() { - logbuch.Info("initializing report service") + logbuch.Info("scheduling report generation") - _, err := config.GetDefaultQueue().DispatchCron(func() { + scheduleUserReport := func(u *models.User, index int) { + if err := srv.queueWorkers.DispatchIn(func() { + if err := srv.SendReport(u, reportRange); err != nil { + config.Log().Error("failed to generate report for '%s', %v", u.ID, err) + } + }, time.Duration(index)*reportDelay); err != nil { + config.Log().Error("failed to dispatch report generation job for user '%s', %v", u.ID, err) + } + } + + _, err := srv.queueDefault.DispatchCron(func() { // fetch all users with reports enabled users, err := srv.userService.GetAllByReports(true) if err != nil { @@ -54,18 +69,10 @@ func (srv *ReportService) Schedule() { return u.Email != "" }) - // schedule jobs, throttled by one job per 15 seconds + // schedule jobs, throttled by one job per x seconds logbuch.Info("scheduling report generation for %d users", len(users)) for i, u := range users { - err := config.GetQueue(config.QueueMails).DispatchIn(func() { - if err := srv.SendReport(u, reportRange); err != nil { - config.Log().Error("failed to generate report for '%s', %v", u.ID, err) - } - }, time.Duration(i)*reportDelay) - - if err != nil { - config.Log().Error("failed to dispatch report generation job for user '%s', %v", u.ID, err) - } + scheduleUserReport(u, i) } }, srv.config.App.GetWeeklyReportCron()) diff --git a/services/services.go b/services/services.go index 4920e30..cbee61a 100644 --- a/services/services.go +++ b/services/services.go @@ -8,7 +8,7 @@ import ( type IAggregationService interface { Schedule() - Run(set datastructure.Set[string]) error + AggregateSummaries(set datastructure.Set[string]) error } type IMiscService interface { @@ -97,8 +97,8 @@ type IReportService interface { } type ILeaderboardService interface { - ScheduleDefault() - Run([]*models.User, *models.IntervalKey, []uint8) error + Schedule() + ComputeLeaderboard([]*models.User, *models.IntervalKey, []uint8) error ExistsAnyByUser(string) (bool, error) CountUsers() (int64, error) GetByInterval(*models.IntervalKey, *models.PageParams, bool) (models.Leaderboard, error) diff --git a/utils/sync.go b/utils/sync.go new file mode 100644 index 0000000..0536970 --- /dev/null +++ b/utils/sync.go @@ -0,0 +1,23 @@ +package utils + +import ( + "sync" + "time" +) + +// WaitTimeout waits for the waitgroup for the specified max timeout. +// Returns true if waiting timed out. +// See // https://stackoverflow.com/a/32843750/3112139. +func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +}