From 45a003185e2836b36d5fc9ff8495ef18fd25ab9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Sun, 9 Jul 2023 20:16:34 +0200 Subject: [PATCH] chore: minor code style and cleanup --- routes/settings.go | 22 ++++++++----------- services/imports/wakatime_dump.go | 29 ++++++++++--------------- services/imports/wakatime_heartbeats.go | 16 +++++++++----- services/mail/mailwhale.go | 6 ++--- utils/http.go | 11 ++++++++++ 5 files changed, 43 insertions(+), 41 deletions(-) diff --git a/routes/settings.go b/routes/settings.go index 86d941f..907c9a4 100644 --- a/routes/settings.go +++ b/routes/settings.go @@ -515,28 +515,25 @@ func (h *SettingsHandler) actionImportWakatime(w http.ResponseWriter, r *http.Re start := time.Now() importer := imports.NewWakatimeImporter(user.WakatimeApiKey) - countBefore, err := h.heartbeatSrvc.CountByUser(user) - if err != nil { - println(err) - } + countBefore, _ := h.heartbeatSrvc.CountByUser(user) var ( - stream <-chan *models.Heartbeat - importErr error + stream <-chan *models.Heartbeat + importError error ) if latest, err := h.heartbeatSrvc.GetLatestByOriginAndUser(imports.OriginWakatime, user); latest == nil || err != nil { - stream, importErr = importer.ImportAll(user) + stream, importError = importer.ImportAll(user) } else { // if an import has happened before, only import heartbeats newer than the latest of the last import - stream, importErr = importer.Import(user, latest.Time.T(), time.Now()) + stream, importError = importer.Import(user, latest.Time.T(), time.Now()) } - if importErr != nil { - conf.Log().Error("wakatime import for user '%s' failed - %v", user.ID, importErr) + if importError != nil { + conf.Log().Error("wakatime import for user '%s' failed - %v", user.ID, importError) return } count := 0 - batch := make([]*models.Heartbeat, 0) + batch := make([]*models.Heartbeat, 0, h.config.App.ImportBatchSize) insert := func(batch []*models.Heartbeat) { if err := h.heartbeatSrvc.InsertBatch(batch); err != nil { @@ -550,10 +547,9 @@ func (h *SettingsHandler) actionImportWakatime(w http.ResponseWriter, r *http.Re if len(batch) == h.config.App.ImportBatchSize { insert(batch) - batch = make([]*models.Heartbeat, 0) + batch = make([]*models.Heartbeat, 0, h.config.App.ImportBatchSize) } } - if len(batch) > 0 { insert(batch) } diff --git a/services/imports/wakatime_dump.go b/services/imports/wakatime_dump.go index 02b51ec..36a9a0e 100644 --- a/services/imports/wakatime_dump.go +++ b/services/imports/wakatime_dump.go @@ -12,11 +12,12 @@ import ( "github.com/muety/wakapi/config" "github.com/muety/wakapi/models" wakatime "github.com/muety/wakapi/models/compat/wakatime/v1" + "github.com/muety/wakapi/utils" "net/http" "time" ) -// data example: https://pastr.de/p/0viiv8e0rwq27dim8gyq1jrc +// data example: https://github.com/muety/wakapi/issues/323#issuecomment-1627467052 type WakatimeDumpImporter struct { apiKey string @@ -37,13 +38,10 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT logbuch.Info("running wakatime dump import for user '%s'", user.ID) url := config.WakatimeApiUrl + config.WakatimeApiDataDumpUrl // this importer only works with wakatime currently, so no point in using user's custom wakatime api url - req, _ := http.NewRequest(http.MethodPost, url, bytes.NewBuffer([]byte(`{ "type": "heartbeats", "email_when_finished": "false" }`))) - - res, err := w.httpClient.Do(w.withHeaders(req)) + req, _ := http.NewRequest(http.MethodPost, url, bytes.NewBuffer([]byte(`{ "type": "heartbeats", "email_when_finished": false }`))) + res, err := utils.RaiseForStatus(w.httpClient.Do(w.withHeaders(req))) if err != nil { return nil, err - } else if res.StatusCode >= 400 { - return nil, errors.New(fmt.Sprintf("got status %d from wakatime data dump api (post)", res.StatusCode)) } defer res.Body.Close() @@ -52,14 +50,14 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT return nil, err } + var readyPollTimer *artifex.DispatchTicker + + // callbacks checkDumpReady := func(dumpId string, user *models.User) (bool, *wakatime.DataDumpData, error) { req, _ := http.NewRequest(http.MethodGet, url, nil) - - res, err := w.httpClient.Do(w.withHeaders(req)) + res, err := utils.RaiseForStatus(w.httpClient.Do(w.withHeaders(req))) if err != nil { return false, nil, err - } else if res.StatusCode >= 400 { - return false, nil, errors.New(fmt.Sprintf("got status %d from wakatime data dump api (get)", res.StatusCode)) } var datadumpData wakatime.DataDumpViewModel @@ -77,9 +75,6 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT return dump.Status == "Completed", dump, nil } - // start polling for dump to be ready - var readyPollTimer *artifex.DispatchTicker - onDumpFailed := func(err error, user *models.User) { config.Log().Error("fetching data dump for user '%s' failed - %v", user.ID, err) readyPollTimer.Stop() @@ -89,18 +84,14 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT onDumpReady := func(dump *wakatime.DataDumpData, user *models.User, out chan *models.Heartbeat) { config.Log().Info("data dump for user '%s' is available for download", user.ID) readyPollTimer.Stop() - defer close(out) // download req, _ := http.NewRequest(http.MethodGet, dump.DownloadUrl, nil) - res, err := w.httpClient.Do(req) + res, err := utils.RaiseForStatus((&http.Client{Timeout: 5 * time.Minute}).Do(req)) if err != nil { config.Log().Error("failed to download %s - %v", dump.DownloadUrl, err) return - } else if res.StatusCode >= 400 { - config.Log().Error("failed to download %s - %v", dump.DownloadUrl, errors.New(fmt.Sprintf("got status %d from wakatime", res.StatusCode))) - return } defer res.Body.Close() @@ -125,6 +116,7 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT return } + // stream for _, d := range data.Days { for _, h := range d.Heartbeats { hb := mapHeartbeat(h, userAgents, machinesNames, user) @@ -136,6 +128,7 @@ func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxT } } + // start polling for dump to be ready readyPollTimer, err = w.queue.DispatchEvery(func() { u := *user ok, dump, err := checkDumpReady(datadumpData.Data.Id, &u) diff --git a/services/imports/wakatime_heartbeats.go b/services/imports/wakatime_heartbeats.go index f53fcac..1e4bab6 100644 --- a/services/imports/wakatime_heartbeats.go +++ b/services/imports/wakatime_heartbeats.go @@ -31,14 +31,14 @@ const ( ) type WakatimeHeartbeatsImporter struct { - ApiKey string + apiKey string httpClient *http.Client queue *artifex.Dispatcher } func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatsImporter { return &WakatimeHeartbeatsImporter{ - ApiKey: apiKey, + apiKey: apiKey, httpClient: &http.Client{Timeout: 10 * time.Second}, queue: config.GetQueue(config.QueueImports), } @@ -66,7 +66,7 @@ func (w *WakatimeHeartbeatsImporter) Import(user *models.User, minFrom time.Time } userAgents := map[string]*wakatime.UserAgentEntry{} - if data, err := fetchUserAgents(baseUrl, w.ApiKey); err == nil { + if data, err := fetchUserAgents(baseUrl, w.apiKey); err == nil { userAgents = data } else if strings.Contains(baseUrl, "wakatime.com") { // when importing from wakatime, resolving user agents is mandatorily required @@ -75,7 +75,7 @@ func (w *WakatimeHeartbeatsImporter) Import(user *models.User, minFrom time.Time } machinesNames := map[string]*wakatime.MachineEntry{} - if data, err := fetchMachineNames(baseUrl, w.ApiKey); err == nil { + if data, err := fetchMachineNames(baseUrl, w.apiKey); err == nil { machinesNames = data } else if strings.Contains(baseUrl, "wakatime.com") { // when importing from wakatime, resolving machine names is mandatorily required @@ -106,7 +106,11 @@ func (w *WakatimeHeartbeatsImporter) Import(user *models.User, minFrom time.Time } for _, h := range heartbeats { - out <- mapHeartbeat(h, userAgents, machinesNames, user) + hb := mapHeartbeat(h, userAgents, machinesNames, user) + if hb.Time.T().Before(minFrom) || hb.Time.T().After(maxTo) { + continue + } + out <- hb } if c.Dec() == 0 { @@ -201,7 +205,7 @@ func (w *WakatimeHeartbeatsImporter) fetchRange(baseUrl string) (time.Time, time } func (w *WakatimeHeartbeatsImporter) withHeaders(req *http.Request) *http.Request { - req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.ApiKey)))) + req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.apiKey)))) return req } diff --git a/services/mail/mailwhale.go b/services/mail/mailwhale.go index 2fee253..93a1f18 100644 --- a/services/mail/mailwhale.go +++ b/services/mail/mailwhale.go @@ -7,6 +7,7 @@ import ( "fmt" conf "github.com/muety/wakapi/config" "github.com/muety/wakapi/models" + "github.com/muety/wakapi/utils" "net/http" "time" ) @@ -58,13 +59,10 @@ func (s *MailWhaleSendingService) Send(mail *models.Mail) error { req.SetBasicAuth(s.config.ClientId, s.config.ClientSecret) req.Header.Set("Content-Type", "application/json") - res, err := s.httpClient.Do(req) + _, err = utils.RaiseForStatus(s.httpClient.Do(req)) if err != nil { return err } - if res.StatusCode >= 400 { - return errors.New(fmt.Sprintf("got status %d from mailwhale", res.StatusCode)) - } return nil } diff --git a/utils/http.go b/utils/http.go index 19dba80..9dad46a 100644 --- a/utils/http.go +++ b/utils/http.go @@ -2,6 +2,7 @@ package utils import ( "errors" + "fmt" "net/http" "regexp" "strconv" @@ -85,3 +86,13 @@ func ParseUserAgent(ua string) (string, string, error) { } return groups[0][1], groups[0][2], nil } + +func RaiseForStatus(res *http.Response, err error) (*http.Response, error) { + if err != nil { + return res, err + } + if res.StatusCode >= 400 { + return res, fmt.Errorf("got response status %d for '%s %s'", res.StatusCode, res.Request.Method, res.Request.URL.String()) + } + return res, nil +}