refactor: move more background jobs to using job queue

This commit is contained in:
Ferdinand Mütsch 2022-11-20 10:10:24 +01:00
parent e2ef54152d
commit fcca881cfc
9 changed files with 179 additions and 156 deletions

View File

@ -2,25 +2,33 @@ package config
import (
"fmt"
"github.com/emvi/logbuch"
"github.com/muety/artifex"
"math"
"runtime"
)
var jobqueues map[string]*artifex.Dispatcher
const (
QueueDefault = ""
QueueProcessing = "processing"
QueueMails = "mails"
QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports"
)
func init() {
jobqueues = make(map[string]*artifex.Dispatcher)
InitQueue(QueueDefault, 1)
InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0)))
InitQueue(QueueReports, 1)
}
func InitQueue(name string, workers int) error {
if _, ok := jobqueues[name]; ok {
return fmt.Errorf("queue '%s' already existing", name)
}
logbuch.Info("creating job queue '%s' (%d workers)", name, workers)
jobqueues[name] = artifex.NewDispatcher(workers, 4096)
jobqueues[name].Start()
return nil

View File

@ -186,9 +186,9 @@ func main() {
// Schedule background tasks
go aggregationService.Schedule()
go leaderboardService.ScheduleDefault()
go miscService.ScheduleCountTotalTime()
go leaderboardService.Schedule()
go reportService.Schedule()
go miscService.ScheduleCountTotalTime()
routes.Init()

View File

@ -661,7 +661,7 @@ func (h *SettingsHandler) regenerateSummaries(user *models.User) error {
return err
}
if err := h.aggregationSrvc.Run(datastructure.NewSet(user.ID)); err != nil {
if err := h.aggregationSrvc.AggregateSummaries(datastructure.NewSet(user.ID)); err != nil {
logbuch.Error("failed to regenerate summaries: %v", err)
return err
}

View File

@ -4,12 +4,11 @@ import (
"errors"
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
"github.com/emvi/logbuch"
"github.com/muety/artifex"
"github.com/muety/wakapi/config"
"runtime"
"sync"
"time"
"github.com/go-co-op/gocron"
"github.com/muety/wakapi/models"
)
@ -25,6 +24,8 @@ type AggregationService struct {
summaryService ISummaryService
heartbeatService IHeartbeatService
inProgress datastructure.Set[string]
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService {
@ -34,6 +35,8 @@ func NewAggregationService(userService IUserService, summaryService ISummaryServ
summaryService: summaryService,
heartbeatService: heartbeatService,
inProgress: datastructure.NewSet[string](),
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueProcessing),
}
}
@ -45,58 +48,23 @@ type AggregationJob struct {
// Schedule a job to (re-)generate summaries every day shortly after midnight
func (srv *AggregationService) Schedule() {
s := gocron.NewScheduler(time.Local)
s.Every(1).Day().At(srv.config.App.AggregationTime).WaitForSchedule().Do(srv.Run, datastructure.NewSet[string]())
s.StartBlocking()
logbuch.Info("scheduling summary aggregation")
if _, err := srv.queueDefault.DispatchCron(func() {
if err := srv.AggregateSummaries(datastructure.NewSet[string]()); err != nil {
config.Log().Error("failed to generate summaries, %v", err)
}
}, srv.config.App.GetAggregationTimeCron()); err != nil {
config.Log().Error("failed to schedule summary generation, %v", err)
}
}
func (srv *AggregationService) Run(userIds datastructure.Set[string]) error {
func (srv *AggregationService) AggregateSummaries(userIds datastructure.Set[string]) error {
if err := srv.lockUsers(userIds); err != nil {
return err
}
defer srv.unlockUsers(userIds)
jobs := make(chan *AggregationJob)
summaries := make(chan *models.Summary)
for i := 0; i < runtime.NumCPU(); i++ {
go srv.summaryWorker(jobs, summaries)
}
for i := 0; i < int(srv.config.Db.MaxConn); i++ {
go srv.persistWorker(summaries)
}
// don't leak open channels
go func(c1 chan *AggregationJob, c2 chan *models.Summary) {
defer close(c1)
defer close(c2)
time.Sleep(1 * time.Hour)
}(jobs, summaries)
return srv.trigger(jobs, userIds)
}
func (srv *AggregationService) summaryWorker(jobs <-chan *AggregationJob, summaries chan<- *models.Summary) {
for job := range jobs {
if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil {
config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err)
} else {
logbuch.Info("successfully generated summary (%v, %v, %s)", job.From, job.To, job.UserID)
summaries <- summary
}
}
}
func (srv *AggregationService) persistWorker(summaries <-chan *models.Summary) {
for summary := range summaries {
if err := srv.summaryService.Insert(summary); err != nil {
config.Log().Error("failed to save summary (%v, %v, %s) - %v", summary.UserID, summary.FromTime, summary.ToTime, err)
}
}
}
func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds datastructure.Set[string]) error {
logbuch.Info("generating summaries")
// Get a map from user ids to the time of their latest summary or nil if none exists yet
@ -119,6 +87,19 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds data
firstUserHeartbeatLookup[e.User] = e.Time
}
// Dispatch summary generation jobs
jobs := make(chan *AggregationJob)
defer close(jobs)
go func() {
for job := range jobs {
if err := srv.queueWorkers.Dispatch(func() {
srv.process(job)
}); err != nil {
config.Log().Error("failed to dispatch summary generation job for user '%s'", job.UserID)
}
}
}()
// Generate summary aggregation jobs
for _, e := range lastUserSummaryTimes {
if userIds != nil && !userIds.IsEmpty() && !userIds.Contain(e.User) {
@ -141,24 +122,15 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds data
return nil
}
func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
if srv.inProgress.Contain(uid) {
return errors.New("aggregation already in progress for at least of the request users")
func (srv *AggregationService) process(job *AggregationJob) {
if summary, err := srv.summaryService.Summarize(job.From, job.To, &models.User{ID: job.UserID}, nil); err != nil {
config.Log().Error("failed to generate summary (%v, %v, %s) - %v", job.From, job.To, job.UserID, err)
} else {
logbuch.Info("successfully generated summary (%v, %v, %s)", job.From, job.To, job.UserID)
if err := srv.summaryService.Insert(summary); err != nil {
config.Log().Error("failed to save summary (%v, %v, %s) - %v", summary.UserID, summary.FromTime, summary.ToTime, err)
}
}
srv.inProgress = srv.inProgress.Union(userIds)
return nil
}
func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
srv.inProgress.Delete(uid)
}
}
func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob) {
@ -189,6 +161,26 @@ func generateUserJobs(userId string, from time.Time, jobs chan<- *AggregationJob
}
}
func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
if srv.inProgress.Contain(uid) {
return errors.New("aggregation already in progress for at least of the request users")
}
}
srv.inProgress = srv.inProgress.Union(userIds)
return nil
}
func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
srv.inProgress.Delete(uid)
}
}
func getStartOfToday() time.Time {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 1, now.Location())

View File

@ -2,8 +2,8 @@ package services
import (
"github.com/emvi/logbuch"
"github.com/go-co-op/gocron"
"github.com/leandro-lugaresi/hub"
"github.com/muety/artifex"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"github.com/muety/wakapi/repositories"
@ -22,6 +22,8 @@ type LeaderboardService struct {
repository repositories.ILeaderboardRepository
summaryService ISummaryService
userService IUserService
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, summaryService ISummaryService, userService IUserService) *LeaderboardService {
@ -32,6 +34,8 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository,
repository: leaderboardRepo,
summaryService: summaryService,
userService: userService,
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueProcessing),
}
onUserUpdate := srv.eventBus.Subscribe(0, config.EventUserUpdate)
@ -48,7 +52,7 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository,
if user.PublicLeaderboard && !exists {
logbuch.Info("generating leaderboard for '%s' after settings update", user.ID)
srv.Run([]*models.User{user}, models.IntervalPast7Days, []uint8{models.SummaryLanguage})
srv.ComputeLeaderboard([]*models.User{user}, models.IntervalPast7Days, []uint8{models.SummaryLanguage})
} else if !user.PublicLeaderboard && exists {
logbuch.Info("clearing leaderboard for '%s' after settings update", user.ID)
if err := srv.repository.DeleteByUser(user.ID); err != nil {
@ -62,23 +66,26 @@ func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository,
return srv
}
func (srv *LeaderboardService) ScheduleDefault() {
runAllUsers := func(interval *models.IntervalKey, by []uint8) {
func (srv *LeaderboardService) Schedule() {
logbuch.Info("scheduling leaderboard generation")
generate := func() {
users, err := srv.userService.GetAllByLeaderboard(true)
if err != nil {
config.Log().Error("failed to get users for leaderboard generation - %v", err)
return
}
srv.Run(users, interval, by)
srv.ComputeLeaderboard(users, models.IntervalPast7Days, []uint8{models.SummaryLanguage})
}
s := gocron.NewScheduler(time.Local)
s.Every(1).Day().At(srv.config.App.LeaderboardGenerationTime).Do(runAllUsers, models.IntervalPast7Days, []uint8{models.SummaryLanguage})
s.StartBlocking()
for _, cronExp := range srv.config.App.GetLeaderboardGenerationTimeCron() {
if _, err := srv.queueDefault.DispatchCron(generate, cronExp); err != nil {
config.Log().Error("failed to schedule leaderboard generation (%s), %v", cronExp, err)
}
}
}
func (srv *LeaderboardService) Run(users []*models.User, interval *models.IntervalKey, by []uint8) error {
func (srv *LeaderboardService) ComputeLeaderboard(users []*models.User, interval *models.IntervalKey, by []uint8) error {
logbuch.Info("generating leaderboard (%s) for %d users (%d aggregations)", (*interval)[0], len(users), len(by))
for _, user := range users {

View File

@ -2,12 +2,13 @@ package services
import (
"github.com/emvi/logbuch"
"github.com/muety/artifex"
"github.com/muety/wakapi/config"
"runtime"
"github.com/muety/wakapi/utils"
"strconv"
"sync"
"time"
"github.com/go-co-op/gocron"
"github.com/muety/wakapi/models"
)
@ -16,6 +17,8 @@ type MiscService struct {
userService IUserService
summaryService ISummaryService
keyValueService IKeyValueService
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewMiscService(userService IUserService, summaryService ISummaryService, keyValueService IKeyValueService) *MiscService {
@ -24,81 +27,64 @@ func NewMiscService(userService IUserService, summaryService ISummaryService, ke
userService: userService,
summaryService: summaryService,
keyValueService: keyValueService,
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueProcessing),
}
}
type CountTotalTimeJob struct {
UserID string
NumJobs int
}
type CountTotalTimeResult struct {
UserId string
Total time.Duration
}
func (srv *MiscService) ScheduleCountTotalTime() {
s := gocron.NewScheduler(time.Local)
s.Every(1).Hour().WaitForSchedule().Do(srv.runCountTotalTime)
s.StartBlocking()
if _, err := srv.queueDefault.DispatchEvery(srv.CountTotalTime, 1*time.Hour); err != nil {
config.Log().Error("failed to schedule user counting jobs, %v", err)
}
}
func (srv *MiscService) runCountTotalTime() error {
func (srv *MiscService) CountTotalTime() {
users, err := srv.userService.GetAll()
if err != nil {
return err
logbuch.Error("failed to fetch users for time counting, %v", err)
}
jobs := make(chan *CountTotalTimeJob, len(users))
results := make(chan *CountTotalTimeResult, len(users))
var totalTime time.Duration = 0
var pendingJobs sync.WaitGroup
pendingJobs.Add(len(users))
for _, u := range users {
jobs <- &CountTotalTimeJob{
UserID: u.ID,
NumJobs: len(users),
if err := srv.queueWorkers.Dispatch(func() {
defer pendingJobs.Done()
totalTime += srv.countUserTotalTime(u.ID)
}); err != nil {
config.Log().Error("failed to enqueue counting job for user '%s'", u.ID)
pendingJobs.Done()
}
}
close(jobs)
for i := 0; i < runtime.NumCPU(); i++ {
go srv.countTotalTimeWorker(jobs, results)
}
// persist
var i int
var total time.Duration
for i = 0; i < len(users); i++ {
result := <-results
total += result.Total
}
close(results)
if err := srv.keyValueService.PutString(&models.KeyStringValue{
Key: config.KeyLatestTotalTime,
Value: total.String(),
}); err != nil {
logbuch.Error("failed to save total time count: %v", err)
}
if err := srv.keyValueService.PutString(&models.KeyStringValue{
Key: config.KeyLatestTotalUsers,
Value: strconv.Itoa(i),
}); err != nil {
logbuch.Error("failed to save total users count: %v", err)
}
return nil
}
func (srv *MiscService) countTotalTimeWorker(jobs <-chan *CountTotalTimeJob, results chan<- *CountTotalTimeResult) {
for job := range jobs {
if result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: job.UserID}, srv.summaryService.Retrieve, nil, false); err != nil {
config.Log().Error("failed to count total for user %s: %v", job.UserID, err)
} else {
results <- &CountTotalTimeResult{
UserId: job.UserID,
Total: result.TotalTime(),
go func(wg *sync.WaitGroup) {
if utils.WaitTimeout(&pendingJobs, 10*time.Minute) {
if err := srv.keyValueService.PutString(&models.KeyStringValue{
Key: config.KeyLatestTotalTime,
Value: totalTime.String(),
}); err != nil {
logbuch.Error("failed to save total time count: %v", err)
}
if err := srv.keyValueService.PutString(&models.KeyStringValue{
Key: config.KeyLatestTotalUsers,
Value: strconv.Itoa(len(users)),
}); err != nil {
logbuch.Error("failed to save total users count: %v", err)
}
} else {
config.Log().Error("waiting for user counting jobs timed out")
}
}
}(&pendingJobs)
}
func (srv *MiscService) countUserTotalTime(userId string) time.Duration {
result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: userId}, srv.summaryService.Retrieve, nil, false)
if err != nil {
config.Log().Error("failed to count total for user %s: %v", userId, err)
return 0
}
return result.TotalTime()
}

View File

@ -4,6 +4,7 @@ import (
"github.com/duke-git/lancet/v2/slice"
"github.com/emvi/logbuch"
"github.com/leandro-lugaresi/hub"
"github.com/muety/artifex"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"math/rand"
@ -11,7 +12,7 @@ import (
)
// delay between evey report generation task (to throttle email sending frequency)
const reportDelay = 15 * time.Second
const reportDelay = 5 * time.Second
// past time range to cover in the report
const reportRange = 7 * 24 * time.Hour
@ -23,6 +24,8 @@ type ReportService struct {
userService IUserService
mailService IMailService
rand *rand.Rand
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewReportService(summaryService ISummaryService, userService IUserService, mailService IMailService) *ReportService {
@ -33,15 +36,27 @@ func NewReportService(summaryService ISummaryService, userService IUserService,
userService: userService,
mailService: mailService,
rand: rand.New(rand.NewSource(time.Now().Unix())),
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueReports),
}
return srv
}
func (srv *ReportService) Schedule() {
logbuch.Info("initializing report service")
logbuch.Info("scheduling report generation")
_, err := config.GetDefaultQueue().DispatchCron(func() {
scheduleUserReport := func(u *models.User, index int) {
if err := srv.queueWorkers.DispatchIn(func() {
if err := srv.SendReport(u, reportRange); err != nil {
config.Log().Error("failed to generate report for '%s', %v", u.ID, err)
}
}, time.Duration(index)*reportDelay); err != nil {
config.Log().Error("failed to dispatch report generation job for user '%s', %v", u.ID, err)
}
}
_, err := srv.queueDefault.DispatchCron(func() {
// fetch all users with reports enabled
users, err := srv.userService.GetAllByReports(true)
if err != nil {
@ -54,18 +69,10 @@ func (srv *ReportService) Schedule() {
return u.Email != ""
})
// schedule jobs, throttled by one job per 15 seconds
// schedule jobs, throttled by one job per x seconds
logbuch.Info("scheduling report generation for %d users", len(users))
for i, u := range users {
err := config.GetQueue(config.QueueMails).DispatchIn(func() {
if err := srv.SendReport(u, reportRange); err != nil {
config.Log().Error("failed to generate report for '%s', %v", u.ID, err)
}
}, time.Duration(i)*reportDelay)
if err != nil {
config.Log().Error("failed to dispatch report generation job for user '%s', %v", u.ID, err)
}
scheduleUserReport(u, i)
}
}, srv.config.App.GetWeeklyReportCron())

View File

@ -8,7 +8,7 @@ import (
type IAggregationService interface {
Schedule()
Run(set datastructure.Set[string]) error
AggregateSummaries(set datastructure.Set[string]) error
}
type IMiscService interface {
@ -97,8 +97,8 @@ type IReportService interface {
}
type ILeaderboardService interface {
ScheduleDefault()
Run([]*models.User, *models.IntervalKey, []uint8) error
Schedule()
ComputeLeaderboard([]*models.User, *models.IntervalKey, []uint8) error
ExistsAnyByUser(string) (bool, error)
CountUsers() (int64, error)
GetByInterval(*models.IntervalKey, *models.PageParams, bool) (models.Leaderboard, error)

23
utils/sync.go Normal file
View File

@ -0,0 +1,23 @@
package utils
import (
"sync"
"time"
)
// WaitTimeout waits for the waitgroup for the specified max timeout.
// Returns true if waiting timed out.
// See // https://stackoverflow.com/a/32843750/3112139.
func WaitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
select {
case <-c:
return false // completed normally
case <-time.After(timeout):
return true // timed out
}
}