1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

refactor: use job queue for data imports

This commit is contained in:
Ferdinand Mütsch 2022-11-20 11:09:51 +01:00
parent 61f13fce20
commit c13fc96a16
2 changed files with 16 additions and 2 deletions

View File

@ -15,6 +15,7 @@ const (
QueueDefault = "wakapi.default" QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing" QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports" QueueReports = "wakapi.reports"
QueueImports = "wakapi.imports"
) )
type JobQueueMetrics struct { type JobQueueMetrics struct {
@ -29,6 +30,7 @@ func init() {
InitQueue(QueueDefault, 1) InitQueue(QueueDefault, 1)
InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0))) InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0)))
InitQueue(QueueReports, 1) InitQueue(QueueReports, 1)
InitQueue(QueueImports, 1)
} }
func InitQueue(name string, workers int) error { func InitQueue(name string, workers int) error {

View File

@ -7,6 +7,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/duke-git/lancet/v2/datetime" "github.com/duke-git/lancet/v2/datetime"
"github.com/muety/artifex"
"github.com/muety/wakapi/utils" "github.com/muety/wakapi/utils"
"net/http" "net/http"
"strings" "strings"
@ -32,19 +33,23 @@ const (
type WakatimeHeartbeatImporter struct { type WakatimeHeartbeatImporter struct {
ApiKey string ApiKey string
httpClient *http.Client httpClient *http.Client
queue *artifex.Dispatcher
} }
func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatImporter { func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatImporter {
return &WakatimeHeartbeatImporter{ return &WakatimeHeartbeatImporter{
ApiKey: apiKey, ApiKey: apiKey,
httpClient: &http.Client{Timeout: 10 * time.Second}, httpClient: &http.Client{Timeout: 10 * time.Second},
queue: config.GetQueue(config.QueueImports),
} }
} }
func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) <-chan *models.Heartbeat { func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) <-chan *models.Heartbeat {
out := make(chan *models.Heartbeat) out := make(chan *models.Heartbeat)
go func(user *models.User, out chan *models.Heartbeat) { process := func(user *models.User, minFrom time.Time, maxTo time.Time, out chan *models.Heartbeat) {
logbuch.Info("running wakatime import for user '%s'", user.ID)
baseUrl := user.WakaTimeURL(config.WakatimeApiUrl) baseUrl := user.WakaTimeURL(config.WakatimeApiUrl)
startDate, endDate, err := w.fetchRange(baseUrl) startDate, endDate, err := w.fetchRange(baseUrl)
@ -109,7 +114,14 @@ func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time,
} }
}(d) }(d)
} }
}(user, out) }
logbuch.Info("scheduling wakatime import for user '%s'", user.ID)
if err := w.queue.Dispatch(func() {
process(user, minFrom, maxTo, out)
}); err != nil {
config.Log().Error("failed to dispatch wakatime import job for user '%s', %v", user.ID, err)
}
return out return out
} }