From c13fc96a169838dc1463e2d54d5e2af88350f213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Sun, 20 Nov 2022 11:09:51 +0100 Subject: [PATCH] refactor: use job queue for data imports --- config/jobqueue.go | 2 ++ services/imports/wakatime.go | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/config/jobqueue.go b/config/jobqueue.go index b96a39c..2e82741 100644 --- a/config/jobqueue.go +++ b/config/jobqueue.go @@ -15,6 +15,7 @@ const ( QueueDefault = "wakapi.default" QueueProcessing = "wakapi.processing" QueueReports = "wakapi.reports" + QueueImports = "wakapi.imports" ) type JobQueueMetrics struct { @@ -29,6 +30,7 @@ func init() { InitQueue(QueueDefault, 1) InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0))) InitQueue(QueueReports, 1) + InitQueue(QueueImports, 1) } func InitQueue(name string, workers int) error { diff --git a/services/imports/wakatime.go b/services/imports/wakatime.go index 94dd354..50a318b 100644 --- a/services/imports/wakatime.go +++ b/services/imports/wakatime.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "github.com/duke-git/lancet/v2/datetime" + "github.com/muety/artifex" "github.com/muety/wakapi/utils" "net/http" "strings" @@ -32,19 +33,23 @@ const ( type WakatimeHeartbeatImporter struct { ApiKey string httpClient *http.Client + queue *artifex.Dispatcher } func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatImporter { return &WakatimeHeartbeatImporter{ ApiKey: apiKey, httpClient: &http.Client{Timeout: 10 * time.Second}, + queue: config.GetQueue(config.QueueImports), } } func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) <-chan *models.Heartbeat { out := make(chan *models.Heartbeat) - go func(user *models.User, out chan *models.Heartbeat) { + process := func(user *models.User, minFrom time.Time, maxTo time.Time, out chan *models.Heartbeat) { + logbuch.Info("running wakatime import for user '%s'", user.ID) + baseUrl := user.WakaTimeURL(config.WakatimeApiUrl) startDate, endDate, err := w.fetchRange(baseUrl) @@ -109,7 +114,14 @@ func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time, } }(d) } - }(user, out) + } + + logbuch.Info("scheduling wakatime import for user '%s'", user.ID) + if err := w.queue.Dispatch(func() { + process(user, minFrom, maxTo, out) + }); err != nil { + config.Log().Error("failed to dispatch wakatime import job for user '%s', %v", user.ID, err) + } return out }