diff --git a/config/config.go b/config/config.go index 9754e1d..1302010 100644 --- a/config/config.go +++ b/config/config.go @@ -4,10 +4,12 @@ import ( "encoding/json" "flag" "fmt" + "github.com/robfig/cron/v3" "io/ioutil" "net/http" "os" "regexp" + "strconv" "strings" "time" @@ -242,13 +244,81 @@ func (c *appConfig) GetOSColors() map[string]string { return cloneStringMap(c.Colors["operating_systems"], true) } -func (c *appConfig) GetWeeklyReportDay() time.Weekday { - s := strings.Split(c.ReportTimeWeekly, ",")[0] - return parseWeekday(s) +func (c *appConfig) GetAggregationTimeCron() string { + if strings.Contains(c.AggregationTime, ":") { + // old format, e.g. "15:04" + timeParts := strings.Split(c.AggregationTime, ":") + h, err := strconv.Atoi(timeParts[0]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + m, err := strconv.Atoi(timeParts[1]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + return fmt.Sprintf("0 %d %d * * *", m, h) + } + + return cronPadToSecondly(c.AggregationTime) } -func (c *appConfig) GetWeeklyReportTime() string { - return strings.Split(c.ReportTimeWeekly, ",")[1] +func (c *appConfig) GetWeeklyReportCron() string { + if strings.Contains(c.ReportTimeWeekly, ",") { + // old format, e.g. "fri,18:00" + split := strings.Split(c.ReportTimeWeekly, ",") + weekday := parseWeekday(split[0]) + timeParts := strings.Split(split[1], ":") + + h, err := strconv.Atoi(timeParts[0]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + m, err := strconv.Atoi(timeParts[1]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + return fmt.Sprintf("0 %d %d * * %d", m, h, weekday) + } + + return cronPadToSecondly(c.ReportTimeWeekly) +} + +func (c *appConfig) GetLeaderboardGenerationTimeCron() []string { + crons := []string{} + + var parse func(string) string + + if strings.Contains(c.LeaderboardGenerationTime, ":") { + // old format, e.g. "15:04" + parse = func(s string) string { + timeParts := strings.Split(s, ":") + h, err := strconv.Atoi(timeParts[0]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + m, err := strconv.Atoi(timeParts[1]) + if err != nil { + logbuch.Fatal(err.Error()) + } + + return fmt.Sprintf("0 %d %d * * *", m, h) + } + } else { + parse = func(s string) string { + return cronPadToSecondly(s) + } + } + + for _, s := range strings.Split(c.LeaderboardGenerationTime, ";") { + crons = append(crons, parse(strings.TrimSpace(s))) + } + + return crons } func (c *appConfig) HeartbeatsMaxAge() time.Duration { @@ -352,6 +422,14 @@ func parseWeekday(s string) time.Weekday { return time.Monday } +func cronPadToSecondly(expr string) string { + parts := strings.Split(expr, " ") + if len(parts) == 6 { + return expr + } + return "0 " + expr +} + func Set(config *Config) { cfg = config } @@ -414,16 +492,35 @@ func Load(version string) *Config { if config.Mail.Provider != "" && findString(config.Mail.Provider, emailProviders, "") == "" { logbuch.Fatal("unknown mail provider '%s'", config.Mail.Provider) } - if _, err := time.Parse("15:04", config.App.GetWeeklyReportTime()); err != nil { - logbuch.Fatal("invalid interval set for report_time_weekly") - } - if _, err := time.Parse("15:04", config.App.AggregationTime); err != nil { - logbuch.Fatal("invalid interval set for aggregation_time") - } if _, err := time.ParseDuration(config.App.HeartbeatMaxAge); err != nil { logbuch.Fatal("invalid duration set for heartbeat_max_age") } + cronParser := cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) + + if _, err := cronParser.Parse(config.App.GetWeeklyReportCron()); err != nil { + logbuch.Fatal("invalid cron expression for report_time_weekly") + } + if _, err := cronParser.Parse(config.App.GetAggregationTimeCron()); err != nil { + logbuch.Fatal("invalid cron expression for aggregation_time") + } + for _, c := range config.App.GetLeaderboardGenerationTimeCron() { + if _, err := cronParser.Parse(c); err != nil { + logbuch.Fatal("invalid cron expression for leaderboard_generation_time") + } + } + + // deprecation notices + if strings.Contains(config.App.AggregationTime, ":") { + logbuch.Warn("you're using deprecated syntax for 'aggregation_time', please change it to a valid cron expression") + } + if strings.Contains(config.App.ReportTimeWeekly, ":") { + logbuch.Warn("you're using deprecated syntax for 'report_time_weekly', please change it to a valid cron expression") + } + if strings.Contains(config.App.LeaderboardGenerationTime, ":") { + logbuch.Warn("you're using deprecated syntax for 'leaderboard_generation_time', please change it to a semicolon-separated list if valid cron expressions") + } + Set(config) return Get() } diff --git a/config/jobqueue.go b/config/jobqueue.go new file mode 100644 index 0000000..baf3c0d --- /dev/null +++ b/config/jobqueue.go @@ -0,0 +1,44 @@ +package config + +import ( + "fmt" + "github.com/muety/artifex" +) + +var jobqueues map[string]*artifex.Dispatcher + +const ( + QueueDefault = "" + QueueProcessing = "processing" + QueueMails = "mails" +) + +func init() { + jobqueues = make(map[string]*artifex.Dispatcher) +} + +func InitQueue(name string, workers int) error { + if _, ok := jobqueues[name]; ok { + return fmt.Errorf("queue '%s' already existing", name) + } + jobqueues[name] = artifex.NewDispatcher(workers, 4096) + jobqueues[name].Start() + return nil +} + +func GetDefaultQueue() *artifex.Dispatcher { + return GetQueue("") +} + +func GetQueue(name string) *artifex.Dispatcher { + if _, ok := jobqueues[name]; !ok { + InitQueue(name, 1) + } + return jobqueues[name] +} + +func CloseQueues() { + for _, q := range jobqueues { + q.Stop() + } +} diff --git a/go.mod b/go.mod index 0a118cf..311afb6 100644 --- a/go.mod +++ b/go.mod @@ -63,6 +63,7 @@ require ( github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect + github.com/muety/artifex v0.0.0-20221119195407-0ccdcf919cae // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20220927061507-ef77025ab5aa // indirect github.com/robfig/cron/v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 937577b..a552912 100644 --- a/go.sum +++ b/go.sum @@ -179,6 +179,8 @@ github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJK github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mitchellh/hashstructure/v2 v2.0.2 h1:vGKWl0YJqUNxE8d+h8f6NJLcCJrgbhC4NcD46KavDd4= github.com/mitchellh/hashstructure/v2 v2.0.2/go.mod h1:MG3aRVU/N29oo/V/IhBX8GR/zz4kQkprJgF2EVszyDE= +github.com/muety/artifex v0.0.0-20221119195407-0ccdcf919cae h1:f9HA6uXm/MuI1zqsLTUioA2awahxdHv6LtKH8P9njKM= +github.com/muety/artifex v0.0.0-20221119195407-0ccdcf919cae/go.mod h1:ohgA8vHxRH0ErHcJejxICnAp49PK/6ZezEJ69zrx0/A= github.com/narqo/go-badge v0.0.0-20220127184443-140af28a266e h1:bR8DQ4ZfItytLJwRlrLOPUHd5z18V6tECwYQFy8W+8g= github.com/narqo/go-badge v0.0.0-20220127184443-140af28a266e/go.mod h1:m9BzkaxwU4IfPQi9ko23cmuFltayFe8iS0dlRlnEWiM= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= diff --git a/services/report.go b/services/report.go index c332c29..b2d473c 100644 --- a/services/report.go +++ b/services/report.go @@ -1,21 +1,20 @@ package services import ( + "github.com/duke-git/lancet/v2/slice" "github.com/emvi/logbuch" - "github.com/go-co-op/gocron" "github.com/leandro-lugaresi/hub" "github.com/muety/wakapi/config" "github.com/muety/wakapi/models" "math/rand" - "sync" "time" ) -var reportLock = sync.Mutex{} +// delay between evey report generation task (to throttle email sending frequency) +const reportDelay = 15 * time.Second -// range for random offset to add / subtract when scheduling a new job -// to avoid all mails being sent at once, but distributed over 2*offsetIntervalMin minutes -const offsetIntervalMin = 15 +// past time range to cover in the report +const reportRange = 7 * 24 * time.Hour type ReportService struct { config *config.Config @@ -23,7 +22,6 @@ type ReportService struct { summaryService ISummaryService userService IUserService mailService IMailService - scheduler *gocron.Scheduler rand *rand.Rand } @@ -34,80 +32,55 @@ func NewReportService(summaryService ISummaryService, userService IUserService, summaryService: summaryService, userService: userService, mailService: mailService, - scheduler: gocron.NewScheduler(time.Local), rand: rand.New(rand.NewSource(time.Now().Unix())), } - srv.scheduler.StartAsync() - - sub := srv.eventBus.Subscribe(0, config.EventUserUpdate) - go func(sub *hub.Subscription) { - for m := range sub.Receiver { - srv.SyncSchedule(m.Fields[config.FieldPayload].(*models.User)) - } - }(&sub) - return srv } func (srv *ReportService) Schedule() { logbuch.Info("initializing report service") - users, err := srv.userService.GetAllByReports(true) - if err != nil { - config.Log().Fatal("%v", err) - } - - logbuch.Info("scheduling reports for %d users", len(users)) - for _, u := range users { - srv.SyncSchedule(u) - } -} - -// SyncSchedule syncs the currently active schedulers with the user's wish about whether or not to receive reports. -// Returns whether a scheduler is active after this operation has run. -func (srv *ReportService) SyncSchedule(u *models.User) bool { - reportLock.Lock() - defer reportLock.Unlock() - - // unschedule - if !u.ReportsWeekly { - _ = srv.scheduler.RemoveByTag(u.ID) - logbuch.Info("disabled scheduled reports for user %s", u.ID) - return false - } - - // schedule - if job := srv.getJobByTag(u.ID); job == nil && u.ReportsWeekly { - t, _ := time.ParseInLocation("15:04", srv.config.App.GetWeeklyReportTime(), u.TZ()) - t = t.Add(time.Duration(srv.rand.Intn(offsetIntervalMin*60)) * time.Second) - if job, err := srv.scheduler. - SingletonMode(). - Every(1). - Week(). - Weekday(srv.config.App.GetWeeklyReportDay()). - At(t). - Tag(u.ID). - Do(srv.Run, u, 7*24*time.Hour); err != nil { - config.Log().Error("failed to schedule report job for user '%s' - %v", u.ID, err) - } else { - logbuch.Info("next report for user %s is scheduled for %v", u.ID, job.NextRun()) + _, err := config.GetDefaultQueue().DispatchCron(func() { + // fetch all users with reports enabled + users, err := srv.userService.GetAllByReports(true) + if err != nil { + config.Log().Error("failed to get users for report generation, %v", err) + return } - } - return u.ReportsWeekly + // filter users who have their email set + users = slice.Filter[*models.User](users, func(i int, u *models.User) bool { + return u.Email != "" + }) + + // schedule jobs, throttled by one job per 15 seconds + logbuch.Info("scheduling report generation for %d users", len(users)) + for i, u := range users { + err := config.GetQueue(config.QueueMails).DispatchIn(func() { + if err := srv.SendReport(u, reportRange); err != nil { + config.Log().Error("failed to generate report for '%s', %v", u.ID, err) + } + }, time.Duration(i)*reportDelay) + + if err != nil { + config.Log().Error("failed to dispatch report generation job for user '%s', %v", u.ID, err) + } + } + }, srv.config.App.GetWeeklyReportCron()) + + if err != nil { + config.Log().Error("failed to dispatch report generation jobs, %v", err) + } } -func (srv *ReportService) Run(user *models.User, duration time.Duration) error { +func (srv *ReportService) SendReport(user *models.User, duration time.Duration) error { if user.Email == "" { logbuch.Warn("not generating report for '%s' as no e-mail address is set") return nil } - if !srv.SyncSchedule(user) { - logbuch.Info("reports for user '%s' were turned off in the meanwhile since last report job ran") - return nil - } + logbuch.Info("generating report for '%s'", user.ID) end := time.Now().In(user.TZ()) start := time.Now().Add(-1 * duration) @@ -126,21 +99,10 @@ func (srv *ReportService) Run(user *models.User, duration time.Duration) error { } if err := srv.mailService.SendReport(user, report); err != nil { - config.Log().Error("failed to send report for '%s' - %v", user.ID, err) + config.Log().Error("failed to send report for '%s', %v", user.ID, err) return err } logbuch.Info("sent report to user '%s'", user.ID) return nil } - -func (srv *ReportService) getJobByTag(tag string) *gocron.Job { - for _, j := range srv.scheduler.Jobs() { - for _, t := range j.Tags() { - if t == tag { - return j - } - } - } - return nil -} diff --git a/services/services.go b/services/services.go index 867e7bf..4920e30 100644 --- a/services/services.go +++ b/services/services.go @@ -93,8 +93,7 @@ type ISummaryService interface { type IReportService interface { Schedule() - SyncSchedule(user *models.User) bool - Run(*models.User, time.Duration) error + SendReport(*models.User, time.Duration) error } type ILeaderboardService interface { diff --git a/utils/cron.go b/utils/cron.go new file mode 100644 index 0000000..d2444f3 --- /dev/null +++ b/utils/cron.go @@ -0,0 +1,11 @@ +package utils + +import "strings" + +func CronPadToSecondly(expr string) string { + parts := strings.Split(expr, " ") + if len(parts) == 6 { + return expr + } + return "0 " + expr +}