mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
refactor: use wakatime dump api for data imports (resolve #323)
This commit is contained in:
parent
38286c7f3a
commit
3063e80692
@ -51,6 +51,7 @@ const (
|
||||
WakatimeApiHeartbeatsBulkUrl = "/users/current/heartbeats.bulk"
|
||||
WakatimeApiUserAgentsUrl = "/users/current/user_agents"
|
||||
WakatimeApiMachineNamesUrl = "/users/current/machine_names"
|
||||
WakatimeApiDataDumpUrl = "/users/current/data_dumps"
|
||||
)
|
||||
|
||||
const (
|
||||
|
File diff suppressed because it is too large
Load Diff
24
models/compat/wakatime/v1/data_dump.go
Normal file
24
models/compat/wakatime/v1/data_dump.go
Normal file
@ -0,0 +1,24 @@
|
||||
package v1
|
||||
|
||||
type DataDumpViewModel struct {
|
||||
Data []*DataDumpData `json:"data"`
|
||||
Total int `json:"total"`
|
||||
TotalPages int `json:"total_pages"`
|
||||
}
|
||||
|
||||
type DataDumpResultViewModel struct {
|
||||
Data *DataDumpData `json:"data"`
|
||||
}
|
||||
|
||||
type DataDumpData struct {
|
||||
Id string `json:"id"`
|
||||
Type string `json:"type"`
|
||||
DownloadUrl string `json:"download_url"`
|
||||
Status string `json:"status"`
|
||||
PercentComplete float32 `json:"percent_complete"`
|
||||
Expires string `json:"expires"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
HasFailed bool `json:"has_failed"`
|
||||
IsStuck bool `json:"is_stuck"`
|
||||
IsProcessing bool `json:"is_processing"`
|
||||
}
|
17
models/compat/wakatime/v1/json_export.go
Normal file
17
models/compat/wakatime/v1/json_export.go
Normal file
@ -0,0 +1,17 @@
|
||||
package v1
|
||||
|
||||
type JsonExportViewModel struct {
|
||||
//User *User `json:"user"`
|
||||
Range *JsonExportRange `json:"range"`
|
||||
Days []*JsonExportDay `json:"days"`
|
||||
}
|
||||
|
||||
type JsonExportRange struct {
|
||||
Start int64 `json:"start"`
|
||||
End int64 `json:"end"`
|
||||
}
|
||||
|
||||
type JsonExportDay struct {
|
||||
Date string `json:"date"`
|
||||
Heartbeats []*HeartbeatEntry `json:"heartbeats"`
|
||||
}
|
@ -513,19 +513,26 @@ func (h *SettingsHandler) actionImportWakatime(w http.ResponseWriter, r *http.Re
|
||||
|
||||
go func(user *models.User) {
|
||||
start := time.Now()
|
||||
importer := imports.NewWakatimeHeartbeatImporter(user.WakatimeApiKey)
|
||||
importer := imports.NewWakatimeImporter(user.WakatimeApiKey)
|
||||
|
||||
countBefore, err := h.heartbeatSrvc.CountByUser(user)
|
||||
if err != nil {
|
||||
println(err)
|
||||
}
|
||||
|
||||
var stream <-chan *models.Heartbeat
|
||||
var (
|
||||
stream <-chan *models.Heartbeat
|
||||
importErr error
|
||||
)
|
||||
if latest, err := h.heartbeatSrvc.GetLatestByOriginAndUser(imports.OriginWakatime, user); latest == nil || err != nil {
|
||||
stream = importer.ImportAll(user)
|
||||
stream, importErr = importer.ImportAll(user)
|
||||
} else {
|
||||
// if an import has happened before, only import heartbeats newer than the latest of the last import
|
||||
stream = importer.Import(user, latest.Time.T(), time.Now())
|
||||
stream, importErr = importer.Import(user, latest.Time.T(), time.Now())
|
||||
}
|
||||
if importErr != nil {
|
||||
conf.Log().Error("wakatime import for user '%s' failed - %v", user.ID, importErr)
|
||||
return
|
||||
}
|
||||
|
||||
count := 0
|
||||
|
@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type HeartbeatImporter interface {
|
||||
Import(*models.User, time.Time, time.Time) <-chan *models.Heartbeat
|
||||
ImportAll(*models.User) <-chan *models.Heartbeat
|
||||
type DataImporter interface {
|
||||
Import(*models.User, time.Time, time.Time) (<-chan *models.Heartbeat, error)
|
||||
ImportAll(*models.User) (<-chan *models.Heartbeat, error)
|
||||
}
|
||||
|
@ -1,342 +1,30 @@
|
||||
package imports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/duke-git/lancet/v2/datetime"
|
||||
"github.com/muety/artifex/v2"
|
||||
"github.com/muety/wakapi/utils"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/emvi/logbuch"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/models"
|
||||
wakatime "github.com/muety/wakapi/models/compat/wakatime/v1"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/sync/semaphore"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const OriginWakatime = "wakatime"
|
||||
const (
|
||||
// wakatime api permits a max. rate of 10 req / sec
|
||||
// https://github.com/wakatime/wakatime/issues/261
|
||||
// with 5 workers, each sleeping slightly over 1/2 sec after every req., we should stay well below that limit
|
||||
maxWorkers = 5
|
||||
throttleDelay = 550 * time.Millisecond
|
||||
)
|
||||
|
||||
type WakatimeHeartbeatImporter struct {
|
||||
ApiKey string
|
||||
httpClient *http.Client
|
||||
queue *artifex.Dispatcher
|
||||
type WakatimeImporter struct {
|
||||
apiKey string
|
||||
}
|
||||
|
||||
func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatImporter {
|
||||
return &WakatimeHeartbeatImporter{
|
||||
ApiKey: apiKey,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
queue: config.GetQueue(config.QueueImports),
|
||||
}
|
||||
func NewWakatimeImporter(apiKey string) *WakatimeImporter {
|
||||
return &WakatimeImporter{apiKey: apiKey}
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) <-chan *models.Heartbeat {
|
||||
out := make(chan *models.Heartbeat)
|
||||
|
||||
process := func(user *models.User, minFrom time.Time, maxTo time.Time, out chan *models.Heartbeat) {
|
||||
logbuch.Info("running wakatime import for user '%s'", user.ID)
|
||||
|
||||
baseUrl := user.WakaTimeURL(config.WakatimeApiUrl)
|
||||
|
||||
startDate, endDate, err := w.fetchRange(baseUrl)
|
||||
if err != nil {
|
||||
config.Log().Error("failed to fetch date range while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if startDate.Before(minFrom) {
|
||||
startDate = minFrom
|
||||
}
|
||||
if endDate.After(maxTo) {
|
||||
endDate = maxTo
|
||||
}
|
||||
|
||||
userAgents := map[string]*wakatime.UserAgentEntry{}
|
||||
if data, err := w.fetchUserAgents(baseUrl); err == nil {
|
||||
userAgents = data
|
||||
} else if strings.Contains(baseUrl, "wakatime.com") {
|
||||
// when importing from wakatime, resolving user agents is mandatorily required
|
||||
config.Log().Error("failed to fetch user agents while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
machinesNames := map[string]*wakatime.MachineEntry{}
|
||||
if data, err := w.fetchMachineNames(baseUrl); err == nil {
|
||||
machinesNames = data
|
||||
} else if strings.Contains(baseUrl, "wakatime.com") {
|
||||
// when importing from wakatime, resolving machine names is mandatorily required
|
||||
config.Log().Error("failed to fetch machine names while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
days := generateDays(startDate, endDate)
|
||||
|
||||
c := atomic.NewUint32(uint32(len(days)))
|
||||
ctx := context.TODO()
|
||||
sem := semaphore.NewWeighted(maxWorkers)
|
||||
|
||||
for _, d := range days {
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
logbuch.Error("failed to acquire semaphore - %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
go func(day time.Time) {
|
||||
defer sem.Release(1)
|
||||
defer time.Sleep(throttleDelay)
|
||||
|
||||
d := day.Format(config.SimpleDateFormat)
|
||||
heartbeats, err := w.fetchHeartbeats(d, baseUrl)
|
||||
if err != nil {
|
||||
config.Log().Error("failed to fetch heartbeats for day '%s' and user '%s' - %v", d, user.ID, err)
|
||||
}
|
||||
|
||||
for _, h := range heartbeats {
|
||||
out <- mapHeartbeat(h, userAgents, machinesNames, user)
|
||||
}
|
||||
|
||||
if c.Dec() == 0 {
|
||||
close(out)
|
||||
}
|
||||
}(d)
|
||||
}
|
||||
func (w *WakatimeImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) (<-chan *models.Heartbeat, error) {
|
||||
if strings.Contains(user.WakaTimeURL(config.WakatimeApiUrl), "wakatime.com") {
|
||||
return NewWakatimeDumpImporter(w.apiKey).Import(user, minFrom, maxTo)
|
||||
}
|
||||
|
||||
if minDataAge := user.MinDataAge(); minFrom.Before(minDataAge) {
|
||||
logbuch.Info("wakatime data import for user '%s' capped to [%v, &v]", user.ID, minDataAge, maxTo)
|
||||
}
|
||||
|
||||
logbuch.Info("scheduling wakatime import for user '%s' (interval [%v, %v])", user.ID, minFrom, maxTo)
|
||||
if err := w.queue.Dispatch(func() {
|
||||
process(user, minFrom, maxTo, out)
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to dispatch wakatime import job for user '%s', %v", user.ID, err)
|
||||
}
|
||||
|
||||
return out
|
||||
return NewWakatimeHeartbeatImporter(w.apiKey).Import(user, minFrom, maxTo)
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatImporter) ImportAll(user *models.User) <-chan *models.Heartbeat {
|
||||
return w.Import(user, time.Time{}, time.Now())
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/heartbeats?date=2021-02-05
|
||||
// https://pastr.de/p/b5p4od5s8w0pfntmwoi117jy
|
||||
func (w *WakatimeHeartbeatImporter) fetchHeartbeats(day string, baseUrl string) ([]*wakatime.HeartbeatEntry, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, baseUrl+config.WakatimeApiHeartbeatsUrl, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q := req.URL.Query()
|
||||
q.Add("date", day)
|
||||
req.URL.RawQuery = q.Encode()
|
||||
|
||||
var empty []*wakatime.HeartbeatEntry
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return empty, err
|
||||
} else if res.StatusCode == 402 {
|
||||
return empty, nil // date outside free plan range -> return empty data, but do not throw error
|
||||
} else if res.StatusCode >= 400 {
|
||||
return empty, errors.New(fmt.Sprintf("got status %d from wakatime api", res.StatusCode))
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var heartbeatsData wakatime.HeartbeatsViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&heartbeatsData); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
return heartbeatsData.Data, nil
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/all_time_since_today
|
||||
// https://pastr.de/p/w8xb4biv575pu32pox7jj2gr
|
||||
func (w *WakatimeHeartbeatImporter) fetchRange(baseUrl string) (time.Time, time.Time, error) {
|
||||
notime := time.Time{}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, baseUrl+config.WakatimeApiAllTimeUrl, nil)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
// see https://github.com/muety/wakapi/issues/370
|
||||
allTimeData, err := utils.ParseJsonDropKeys[wakatime.AllTimeViewModel](res.Body, "text")
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
startDate, err := time.Parse(config.SimpleDateFormat, allTimeData.Data.Range.StartDate)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
endDate, err := time.Parse(config.SimpleDateFormat, allTimeData.Data.Range.EndDate)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
return startDate, endDate, nil
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/user_agents
|
||||
// https://pastr.de/p/05k5do8q108k94lic4lfl3pc
|
||||
func (w *WakatimeHeartbeatImporter) fetchUserAgents(baseUrl string) (map[string]*wakatime.UserAgentEntry, error) {
|
||||
userAgents := make(map[string]*wakatime.UserAgentEntry)
|
||||
|
||||
for page := 1; ; page++ {
|
||||
url := fmt.Sprintf("%s%s?page=%d", baseUrl, config.WakatimeApiUserAgentsUrl, page)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var userAgentsData wakatime.UserAgentsViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&userAgentsData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, ua := range userAgentsData.Data {
|
||||
userAgents[ua.Id] = ua
|
||||
}
|
||||
|
||||
if page == userAgentsData.TotalPages {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return userAgents, nil
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/machine_names
|
||||
// https://pastr.de/p/v58cv0xrupp3zvyyv8o6973j
|
||||
func (w *WakatimeHeartbeatImporter) fetchMachineNames(baseUrl string) (map[string]*wakatime.MachineEntry, error) {
|
||||
httpClient := &http.Client{Timeout: 10 * time.Second}
|
||||
|
||||
machines := make(map[string]*wakatime.MachineEntry)
|
||||
|
||||
for page := 1; ; page++ {
|
||||
url := fmt.Sprintf("%s%s?page=%d", baseUrl, config.WakatimeApiMachineNamesUrl, page)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var machineData wakatime.MachineViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&machineData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, ma := range machineData.Data {
|
||||
machines[ma.Id] = ma
|
||||
}
|
||||
|
||||
if page == machineData.TotalPages {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return machines, nil
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatImporter) withHeaders(req *http.Request) *http.Request {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.ApiKey))))
|
||||
return req
|
||||
}
|
||||
|
||||
func mapHeartbeat(
|
||||
entry *wakatime.HeartbeatEntry,
|
||||
userAgents map[string]*wakatime.UserAgentEntry,
|
||||
machineNames map[string]*wakatime.MachineEntry,
|
||||
user *models.User,
|
||||
) *models.Heartbeat {
|
||||
ua := userAgents[entry.UserAgentId]
|
||||
if ua == nil {
|
||||
// try to parse id as an actual user agent string (as returned by wakapi)
|
||||
if opSys, editor, err := utils.ParseUserAgent(entry.UserAgentId); err == nil {
|
||||
ua = &wakatime.UserAgentEntry{
|
||||
Editor: editor,
|
||||
Os: opSys,
|
||||
}
|
||||
} else {
|
||||
ua = &wakatime.UserAgentEntry{
|
||||
Editor: "unknown",
|
||||
Os: "unknown",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ma := machineNames[entry.MachineNameId]
|
||||
if ma == nil {
|
||||
ma = &wakatime.MachineEntry{
|
||||
Id: entry.MachineNameId,
|
||||
Value: entry.MachineNameId,
|
||||
}
|
||||
}
|
||||
|
||||
return (&models.Heartbeat{
|
||||
User: user,
|
||||
UserID: user.ID,
|
||||
Entity: entry.Entity,
|
||||
Type: entry.Type,
|
||||
Category: entry.Category,
|
||||
Project: entry.Project,
|
||||
Branch: entry.Branch,
|
||||
Language: entry.Language,
|
||||
IsWrite: entry.IsWrite,
|
||||
Editor: ua.Editor,
|
||||
OperatingSystem: ua.Os,
|
||||
Machine: ma.Value,
|
||||
UserAgent: ua.Value,
|
||||
Time: models.CustomTime(time.Unix(0, int64(entry.Time*1e9))),
|
||||
Origin: OriginWakatime,
|
||||
OriginId: entry.Id,
|
||||
CreatedAt: models.CustomTime(entry.CreatedAt),
|
||||
}).Hashed()
|
||||
}
|
||||
|
||||
func generateDays(from, to time.Time) []time.Time {
|
||||
days := make([]time.Time, 0)
|
||||
|
||||
from = datetime.BeginOfDay(from)
|
||||
to = datetime.BeginOfDay(to.AddDate(0, 0, 1))
|
||||
|
||||
for d := from; d.Before(to); d = d.AddDate(0, 0, 1) {
|
||||
days = append(days, d)
|
||||
}
|
||||
|
||||
return days
|
||||
func (w *WakatimeImporter) ImportAll(user *models.User) (<-chan *models.Heartbeat, error) {
|
||||
if strings.Contains(user.WakaTimeURL(config.WakatimeApiUrl), "wakatime.com") {
|
||||
return NewWakatimeDumpImporter(w.apiKey).ImportAll(user)
|
||||
}
|
||||
return NewWakatimeHeartbeatImporter(w.apiKey).ImportAll(user)
|
||||
}
|
||||
|
162
services/imports/wakatime_dump.go
Normal file
162
services/imports/wakatime_dump.go
Normal file
@ -0,0 +1,162 @@
|
||||
package imports
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/duke-git/lancet/v2/slice"
|
||||
"github.com/emvi/logbuch"
|
||||
"github.com/muety/artifex/v2"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/models"
|
||||
wakatime "github.com/muety/wakapi/models/compat/wakatime/v1"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// data example: https://pastr.de/p/0viiv8e0rwq27dim8gyq1jrc
|
||||
|
||||
type WakatimeDumpImporter struct {
|
||||
apiKey string
|
||||
httpClient *http.Client
|
||||
queue *artifex.Dispatcher
|
||||
}
|
||||
|
||||
func NewWakatimeDumpImporter(apiKey string) *WakatimeDumpImporter {
|
||||
return &WakatimeDumpImporter{
|
||||
apiKey: apiKey,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
queue: config.GetQueue(config.QueueImports),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WakatimeDumpImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) (<-chan *models.Heartbeat, error) {
|
||||
out := make(chan *models.Heartbeat)
|
||||
logbuch.Info("running wakatime dump import for user '%s'", user.ID)
|
||||
|
||||
url := config.WakatimeApiUrl + config.WakatimeApiDataDumpUrl // this importer only works with wakatime currently, so no point in using user's custom wakatime api url
|
||||
req, _ := http.NewRequest(http.MethodPost, url, bytes.NewBuffer([]byte(`{ "type": "heartbeats", "email_when_finished": "false" }`)))
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
} else if res.StatusCode >= 400 {
|
||||
return nil, errors.New(fmt.Sprintf("got status %d from wakatime data dump api (post)", res.StatusCode))
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var datadumpData wakatime.DataDumpResultViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&datadumpData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
checkDumpReady := func(dumpId string, user *models.User) (bool, *wakatime.DataDumpData, error) {
|
||||
req, _ := http.NewRequest(http.MethodGet, url, nil)
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return false, nil, err
|
||||
} else if res.StatusCode >= 400 {
|
||||
return false, nil, errors.New(fmt.Sprintf("got status %d from wakatime data dump api (get)", res.StatusCode))
|
||||
}
|
||||
|
||||
var datadumpData wakatime.DataDumpViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&datadumpData); err != nil {
|
||||
return false, nil, err
|
||||
}
|
||||
|
||||
dump, ok := slice.FindBy[*wakatime.DataDumpData](datadumpData.Data, func(i int, item *wakatime.DataDumpData) bool {
|
||||
return item.Id == dumpId
|
||||
})
|
||||
if !ok {
|
||||
return false, nil, errors.New(fmt.Sprintf("data dump with id '%s' for user '%s' not found", dumpId, user.ID))
|
||||
}
|
||||
|
||||
return dump.Status == "Completed", dump, nil
|
||||
}
|
||||
|
||||
// start polling for dump to be ready
|
||||
var readyPollTimer *artifex.DispatchTicker
|
||||
|
||||
onDumpFailed := func(err error, user *models.User) {
|
||||
config.Log().Error("fetching data dump for user '%s' failed - %v", user.ID, err)
|
||||
readyPollTimer.Stop()
|
||||
close(out)
|
||||
}
|
||||
|
||||
onDumpReady := func(dump *wakatime.DataDumpData, user *models.User, out chan *models.Heartbeat) {
|
||||
config.Log().Info("data dump for user '%s' is available for download", user.ID)
|
||||
readyPollTimer.Stop()
|
||||
|
||||
defer close(out)
|
||||
|
||||
// download
|
||||
req, _ := http.NewRequest(http.MethodGet, dump.DownloadUrl, nil)
|
||||
res, err := w.httpClient.Do(req)
|
||||
if err != nil {
|
||||
config.Log().Error("failed to download %s - %v", dump.DownloadUrl, err)
|
||||
return
|
||||
} else if res.StatusCode >= 400 {
|
||||
config.Log().Error("failed to download %s - %v", dump.DownloadUrl, errors.New(fmt.Sprintf("got status %d from wakatime", res.StatusCode)))
|
||||
return
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
logbuch.Info("fetched %d bytes data dump for user '%s'", res.ContentLength, user.ID)
|
||||
|
||||
// decode
|
||||
var data wakatime.JsonExportViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&data); err != nil {
|
||||
config.Log().Error("failed to decode data dump for user '%s' ('%s') - %v", user.ID, dump.DownloadUrl, err)
|
||||
return
|
||||
}
|
||||
|
||||
// fetch user agents and machine names
|
||||
var userAgents map[string]*wakatime.UserAgentEntry
|
||||
if userAgents, err = fetchUserAgents(config.WakatimeApiUrl, w.apiKey); err != nil {
|
||||
config.Log().Error("failed to fetch user agents while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
var machinesNames map[string]*wakatime.MachineEntry
|
||||
if machinesNames, err = fetchMachineNames(config.WakatimeApiUrl, w.apiKey); err != nil {
|
||||
config.Log().Error("failed to fetch machine names while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, d := range data.Days {
|
||||
for _, h := range d.Heartbeats {
|
||||
hb := mapHeartbeat(h, userAgents, machinesNames, user)
|
||||
if hb.Time.T().Before(minFrom) || hb.Time.T().After(maxTo) {
|
||||
continue
|
||||
}
|
||||
out <- hb
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
readyPollTimer, err = w.queue.DispatchEvery(func() {
|
||||
u := *user
|
||||
ok, dump, err := checkDumpReady(datadumpData.Data.Id, &u)
|
||||
logbuch.Info("waiting for data dump '%s' for user '%s' to become downloadable (%.2f percent complete)", datadumpData.Data.Id, u.ID, dump.PercentComplete)
|
||||
if err != nil {
|
||||
onDumpFailed(err, &u)
|
||||
} else if ok {
|
||||
onDumpReady(dump, &u, out)
|
||||
}
|
||||
}, 10*time.Second)
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (w *WakatimeDumpImporter) ImportAll(user *models.User) (<-chan *models.Heartbeat, error) {
|
||||
return w.Import(user, time.Time{}, time.Now())
|
||||
}
|
||||
|
||||
func (w *WakatimeDumpImporter) withHeaders(req *http.Request) *http.Request {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.apiKey))))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
return req
|
||||
}
|
219
services/imports/wakatime_heartbeats.go
Normal file
219
services/imports/wakatime_heartbeats.go
Normal file
@ -0,0 +1,219 @@
|
||||
package imports
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/duke-git/lancet/v2/datetime"
|
||||
"github.com/muety/artifex/v2"
|
||||
"github.com/muety/wakapi/utils"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/emvi/logbuch"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/models"
|
||||
wakatime "github.com/muety/wakapi/models/compat/wakatime/v1"
|
||||
"go.uber.org/atomic"
|
||||
"golang.org/x/sync/semaphore"
|
||||
)
|
||||
|
||||
const OriginWakatime = "wakatime"
|
||||
const (
|
||||
// wakatime api permits a max. rate of 10 req / sec
|
||||
// https://github.com/wakatime/wakatime/issues/261
|
||||
// with 5 workers, each sleeping slightly over 1/2 sec after every req., we should stay well below that limit
|
||||
maxWorkers = 5
|
||||
throttleDelay = 550 * time.Millisecond
|
||||
)
|
||||
|
||||
type WakatimeHeartbeatsImporter struct {
|
||||
ApiKey string
|
||||
httpClient *http.Client
|
||||
queue *artifex.Dispatcher
|
||||
}
|
||||
|
||||
func NewWakatimeHeartbeatImporter(apiKey string) *WakatimeHeartbeatsImporter {
|
||||
return &WakatimeHeartbeatsImporter{
|
||||
ApiKey: apiKey,
|
||||
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||
queue: config.GetQueue(config.QueueImports),
|
||||
}
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatsImporter) Import(user *models.User, minFrom time.Time, maxTo time.Time) (<-chan *models.Heartbeat, error) {
|
||||
out := make(chan *models.Heartbeat)
|
||||
|
||||
process := func(user *models.User, minFrom time.Time, maxTo time.Time, out chan *models.Heartbeat) {
|
||||
logbuch.Info("running wakatime import for user '%s'", user.ID)
|
||||
|
||||
baseUrl := user.WakaTimeURL(config.WakatimeApiUrl)
|
||||
|
||||
startDate, endDate, err := w.fetchRange(baseUrl)
|
||||
if err != nil {
|
||||
config.Log().Error("failed to fetch date range while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if startDate.Before(minFrom) {
|
||||
startDate = minFrom
|
||||
}
|
||||
if endDate.After(maxTo) {
|
||||
endDate = maxTo
|
||||
}
|
||||
|
||||
userAgents := map[string]*wakatime.UserAgentEntry{}
|
||||
if data, err := fetchUserAgents(baseUrl, w.ApiKey); err == nil {
|
||||
userAgents = data
|
||||
} else if strings.Contains(baseUrl, "wakatime.com") {
|
||||
// when importing from wakatime, resolving user agents is mandatorily required
|
||||
config.Log().Error("failed to fetch user agents while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
machinesNames := map[string]*wakatime.MachineEntry{}
|
||||
if data, err := fetchMachineNames(baseUrl, w.ApiKey); err == nil {
|
||||
machinesNames = data
|
||||
} else if strings.Contains(baseUrl, "wakatime.com") {
|
||||
// when importing from wakatime, resolving machine names is mandatorily required
|
||||
config.Log().Error("failed to fetch machine names while importing wakatime heartbeats for user '%s' - %v", user.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
days := generateDays(startDate, endDate)
|
||||
|
||||
c := atomic.NewUint32(uint32(len(days)))
|
||||
ctx := context.TODO()
|
||||
sem := semaphore.NewWeighted(maxWorkers)
|
||||
|
||||
for _, d := range days {
|
||||
if err := sem.Acquire(ctx, 1); err != nil {
|
||||
logbuch.Error("failed to acquire semaphore - %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
go func(day time.Time) {
|
||||
defer sem.Release(1)
|
||||
defer time.Sleep(throttleDelay)
|
||||
|
||||
d := day.Format(config.SimpleDateFormat)
|
||||
heartbeats, err := w.fetchHeartbeats(d, baseUrl)
|
||||
if err != nil {
|
||||
config.Log().Error("failed to fetch heartbeats for day '%s' and user '%s' - %v", d, user.ID, err)
|
||||
}
|
||||
|
||||
for _, h := range heartbeats {
|
||||
out <- mapHeartbeat(h, userAgents, machinesNames, user)
|
||||
}
|
||||
|
||||
if c.Dec() == 0 {
|
||||
close(out)
|
||||
}
|
||||
}(d)
|
||||
}
|
||||
}
|
||||
|
||||
if minDataAge := user.MinDataAge(); minFrom.Before(minDataAge) {
|
||||
logbuch.Info("wakatime data import for user '%s' capped to [%v, &v]", user.ID, minDataAge, maxTo)
|
||||
}
|
||||
|
||||
logbuch.Info("scheduling wakatime import for user '%s' (interval [%v, %v])", user.ID, minFrom, maxTo)
|
||||
if err := w.queue.Dispatch(func() {
|
||||
process(user, minFrom, maxTo, out)
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to dispatch wakatime import job for user '%s', %v", user.ID, err)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatsImporter) ImportAll(user *models.User) (<-chan *models.Heartbeat, error) {
|
||||
return w.Import(user, time.Time{}, time.Now())
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/heartbeats?date=2021-02-05
|
||||
// https://pastr.de/p/b5p4od5s8w0pfntmwoi117jy
|
||||
func (w *WakatimeHeartbeatsImporter) fetchHeartbeats(day string, baseUrl string) ([]*wakatime.HeartbeatEntry, error) {
|
||||
req, err := http.NewRequest(http.MethodGet, baseUrl+config.WakatimeApiHeartbeatsUrl, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
q := req.URL.Query()
|
||||
q.Add("date", day)
|
||||
req.URL.RawQuery = q.Encode()
|
||||
|
||||
var empty []*wakatime.HeartbeatEntry
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return empty, err
|
||||
} else if res.StatusCode == 402 {
|
||||
return empty, nil // date outside free plan range -> return empty data, but do not throw error
|
||||
} else if res.StatusCode >= 400 {
|
||||
return empty, errors.New(fmt.Sprintf("got status %d from wakatime api", res.StatusCode))
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var heartbeatsData wakatime.HeartbeatsViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&heartbeatsData); err != nil {
|
||||
return empty, err
|
||||
}
|
||||
|
||||
return heartbeatsData.Data, nil
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/all_time_since_today
|
||||
// https://pastr.de/p/w8xb4biv575pu32pox7jj2gr
|
||||
func (w *WakatimeHeartbeatsImporter) fetchRange(baseUrl string) (time.Time, time.Time, error) {
|
||||
notime := time.Time{}
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, baseUrl+config.WakatimeApiAllTimeUrl, nil)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
res, err := w.httpClient.Do(w.withHeaders(req))
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
// see https://github.com/muety/wakapi/issues/370
|
||||
allTimeData, err := utils.ParseJsonDropKeys[wakatime.AllTimeViewModel](res.Body, "text")
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
startDate, err := time.Parse(config.SimpleDateFormat, allTimeData.Data.Range.StartDate)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
endDate, err := time.Parse(config.SimpleDateFormat, allTimeData.Data.Range.EndDate)
|
||||
if err != nil {
|
||||
return notime, notime, err
|
||||
}
|
||||
|
||||
return startDate, endDate, nil
|
||||
}
|
||||
|
||||
func (w *WakatimeHeartbeatsImporter) withHeaders(req *http.Request) *http.Request {
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(w.ApiKey))))
|
||||
return req
|
||||
}
|
||||
|
||||
func generateDays(from, to time.Time) []time.Time {
|
||||
days := make([]time.Time, 0)
|
||||
|
||||
from = datetime.BeginOfDay(from)
|
||||
to = datetime.BeginOfDay(to.AddDate(0, 0, 1))
|
||||
|
||||
for d := from; d.Before(to); d = d.AddDate(0, 0, 1) {
|
||||
days = append(days, d)
|
||||
}
|
||||
|
||||
return days
|
||||
}
|
140
services/imports/wakatime_utils.go
Normal file
140
services/imports/wakatime_utils.go
Normal file
@ -0,0 +1,140 @@
|
||||
package imports
|
||||
|
||||
import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/models"
|
||||
wakatime "github.com/muety/wakapi/models/compat/wakatime/v1"
|
||||
"github.com/muety/wakapi/utils"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/machine_names
|
||||
// https://pastr.de/p/v58cv0xrupp3zvyyv8o6973j
|
||||
func fetchMachineNames(baseUrl, apiKey string) (map[string]*wakatime.MachineEntry, error) {
|
||||
httpClient := &http.Client{Timeout: 10 * time.Second}
|
||||
|
||||
machines := make(map[string]*wakatime.MachineEntry)
|
||||
|
||||
for page := 1; ; page++ {
|
||||
url := fmt.Sprintf("%s%s?page=%d", baseUrl, config.WakatimeApiMachineNamesUrl, page)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(apiKey))))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var machineData wakatime.MachineViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&machineData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, ma := range machineData.Data {
|
||||
machines[ma.Id] = ma
|
||||
}
|
||||
|
||||
if page == machineData.TotalPages {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return machines, nil
|
||||
}
|
||||
|
||||
// https://wakatime.com/api/v1/users/current/user_agents
|
||||
// https://pastr.de/p/05k5do8q108k94lic4lfl3pc
|
||||
func fetchUserAgents(baseUrl, apiKey string) (map[string]*wakatime.UserAgentEntry, error) {
|
||||
httpClient := &http.Client{Timeout: 10 * time.Second}
|
||||
|
||||
userAgents := make(map[string]*wakatime.UserAgentEntry)
|
||||
|
||||
for page := 1; ; page++ {
|
||||
url := fmt.Sprintf("%s%s?page=%d", baseUrl, config.WakatimeApiUserAgentsUrl, page)
|
||||
req, err := http.NewRequest(http.MethodGet, url, nil)
|
||||
req.Header.Set("Authorization", fmt.Sprintf("Basic %s", base64.StdEncoding.EncodeToString([]byte(apiKey))))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
var userAgentsData wakatime.UserAgentsViewModel
|
||||
if err := json.NewDecoder(res.Body).Decode(&userAgentsData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, ua := range userAgentsData.Data {
|
||||
userAgents[ua.Id] = ua
|
||||
}
|
||||
|
||||
if page == userAgentsData.TotalPages {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return userAgents, nil
|
||||
}
|
||||
|
||||
func mapHeartbeat(
|
||||
entry *wakatime.HeartbeatEntry,
|
||||
userAgents map[string]*wakatime.UserAgentEntry,
|
||||
machineNames map[string]*wakatime.MachineEntry,
|
||||
user *models.User,
|
||||
) *models.Heartbeat {
|
||||
ua := userAgents[entry.UserAgentId]
|
||||
if ua == nil {
|
||||
// try to parse id as an actual user agent string (as returned by wakapi)
|
||||
if opSys, editor, err := utils.ParseUserAgent(entry.UserAgentId); err == nil {
|
||||
ua = &wakatime.UserAgentEntry{
|
||||
Editor: editor,
|
||||
Os: opSys,
|
||||
}
|
||||
} else {
|
||||
ua = &wakatime.UserAgentEntry{
|
||||
Editor: "unknown",
|
||||
Os: "unknown",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ma := machineNames[entry.MachineNameId]
|
||||
if ma == nil {
|
||||
ma = &wakatime.MachineEntry{
|
||||
Id: entry.MachineNameId,
|
||||
Value: entry.MachineNameId,
|
||||
}
|
||||
}
|
||||
|
||||
return (&models.Heartbeat{
|
||||
User: user,
|
||||
UserID: user.ID,
|
||||
Entity: entry.Entity,
|
||||
Type: entry.Type,
|
||||
Category: entry.Category,
|
||||
Project: entry.Project,
|
||||
Branch: entry.Branch,
|
||||
Language: entry.Language,
|
||||
IsWrite: entry.IsWrite,
|
||||
Editor: ua.Editor,
|
||||
OperatingSystem: ua.Os,
|
||||
Machine: ma.Value,
|
||||
UserAgent: ua.Value,
|
||||
Time: models.CustomTime(time.Unix(0, int64(entry.Time*1e9))),
|
||||
Origin: OriginWakatime,
|
||||
OriginId: entry.Id,
|
||||
CreatedAt: models.CustomTime(entry.CreatedAt),
|
||||
}).Hashed()
|
||||
}
|
Loading…
Reference in New Issue
Block a user