diff --git a/main.go b/main.go index 7da3a4a..a805ba0 100644 --- a/main.go +++ b/main.go @@ -105,8 +105,7 @@ func main() { summarySrvc := &services.SummaryService{config, db, heartbeatSrvc, aliasSrvc} aggregationSrvc := &services.AggregationService{config, db, userSrvc, summarySrvc, heartbeatSrvc} - sums, err := aggregationSrvc.GenerateJobs() - fmt.Println(sums) + aggregationSrvc.Start(time.Second) // Handlers heartbeatHandler := &routes.HeartbeatHandler{HeartbeatSrvc: heartbeatSrvc} diff --git a/services/aggregation.go b/services/aggregation.go index d6f828b..2cc2141 100644 --- a/services/aggregation.go +++ b/services/aggregation.go @@ -13,13 +13,19 @@ package services import ( - "container/list" + "log" "time" "github.com/jinzhu/gorm" "github.com/n1try/wakapi/models" ) +const ( + summaryInterval time.Duration = 24 * time.Hour + nSummaryWorkers int = 8 + nPersistWorkers int = 8 +) + type AggregationService struct { Config *models.Config Db *gorm.DB @@ -29,26 +35,54 @@ type AggregationService struct { } type AggregationJob struct { - UserId string + UserID string From time.Time To time.Time } // Use https://godoc.org/github.com/jasonlvhit/gocron to trigger jobs on a regular basis. func (srv *AggregationService) Start(interval time.Duration) { + jobs := make(chan *AggregationJob) + summaries := make(chan *models.Summary) + + for i := 0; i < nSummaryWorkers; i++ { + go srv.summaryWorker(jobs, summaries) + } + + for i := 0; i < nPersistWorkers; i++ { + go srv.persistWorker(summaries) + } + + srv.generateJobs(jobs) } -func (srv *AggregationService) generateJobs() (*list.List, error) { - var aggregationJobs *list.List = list.New() +func (srv *AggregationService) summaryWorker(jobs <-chan *AggregationJob, summaries chan<- *models.Summary) { + for job := range jobs { + if summary, err := srv.SummaryService.GetSummary(job.From, job.To, &models.User{ID: job.UserID}); err != nil { + log.Printf("Failed to generate summary (%v, %v, %s) – %v.", job.From, job.To, job.UserID, err) + } else { + summaries <- summary + } + } +} +func (srv *AggregationService) persistWorker(summaries <-chan *models.Summary) { + for summary := range summaries { + if err := srv.SummaryService.SaveSummary(summary); err != nil { + log.Printf("Failed to save summary (%v, %v, %s) – %v.", summary.UserID, summary.FromTime, summary.ToTime, err) + } + } +} + +func (srv *AggregationService) generateJobs(jobs chan<- *AggregationJob) error { users, err := srv.UserService.GetAll() if err != nil { - return nil, err + return err } latestSummaries, err := srv.SummaryService.GetLatestUserSummaries() if err != nil { - return nil, err + return err } userSummaryTimes := make(map[string]*time.Time) @@ -56,42 +90,48 @@ func (srv *AggregationService) generateJobs() (*list.List, error) { userSummaryTimes[s.UserID] = s.ToTime } - missingUserIds := make([]string, 0) + missingUserIDs := make([]string, 0) for _, u := range users { if _, ok := userSummaryTimes[u.ID]; !ok { - missingUserIds = append(missingUserIds, u.ID) + missingUserIDs = append(missingUserIDs, u.ID) } } - firstHeartbeats, err := srv.HeartbeatService.GetFirstUserHeartbeats(missingUserIds) + firstHeartbeats, err := srv.HeartbeatService.GetFirstUserHeartbeats(missingUserIDs) if err != nil { - return nil, err + return err } for id, t := range userSummaryTimes { - var from time.Time - if t.Hour() == 0 { - from = *t - } else { - nextDay := t.Add(24 * time.Hour) - from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location()) - } - - aggregationJobs.PushBack(&AggregationJob{id, from, from.Add(24 * time.Hour)}) + generateUserJobs(id, *t, jobs) } for _, h := range firstHeartbeats { - var from time.Time - var t time.Time = time.Time(*(h.Time)) - if t.Hour() == 0 { - from = time.Time(*(h.Time)) - } else { - nextDay := t.Add(24 * time.Hour) - from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location()) - } - - aggregationJobs.PushBack(&AggregationJob{h.UserID, from, from.Add(24 * time.Hour)}) + generateUserJobs(h.UserID, time.Time(*(h.Time)), jobs) } - return aggregationJobs, nil + return nil +} + +func generateUserJobs(userId string, lastAggregation time.Time, jobs chan<- *AggregationJob) { + var from, to time.Time + end := getStartOfToday().Add(-1 * time.Second) + + if lastAggregation.Hour() == 0 { + from = lastAggregation + } else { + nextDay := lastAggregation.Add(24 * time.Hour) + from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, lastAggregation.Location()) + } + + for from.Before(end) && to.Before(end) { + to = from.Add(24 * time.Hour) + jobs <- &AggregationJob{userId, from, to} + from = to + } +} + +func getStartOfToday() time.Time { + now := time.Now() + return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 1, now.Location()) } diff --git a/services/alias.go b/services/alias.go index 496075f..1786ad5 100644 --- a/services/alias.go +++ b/services/alias.go @@ -2,6 +2,7 @@ package services import ( "errors" + "sync" "github.com/jinzhu/gorm" "github.com/n1try/wakapi/models" @@ -12,13 +13,9 @@ type AliasService struct { Db *gorm.DB } -var userAliases map[string][]*models.Alias +var userAliases sync.Map func (srv *AliasService) LoadUserAliases(userId string) error { - if userAliases == nil { - userAliases = make(map[string][]*models.Alias) - } - var aliases []*models.Alias if err := srv.Db. Where(&models.Alias{UserID: userId}). @@ -26,13 +23,13 @@ func (srv *AliasService) LoadUserAliases(userId string) error { return err } - userAliases[userId] = aliases + userAliases.Store(userId, aliases) return nil } func (srv *AliasService) GetAliasOrDefault(userId string, summaryType uint8, value string) (string, error) { - if userAliases, ok := userAliases[userId]; ok { - for _, a := range userAliases { + if ua, ok := userAliases.Load(userId); ok { + for _, a := range ua.([]*models.Alias) { if a.Type == summaryType && a.Value == value { return a.Key, nil } @@ -43,7 +40,7 @@ func (srv *AliasService) GetAliasOrDefault(userId string, summaryType uint8, val } func (src *AliasService) IsInitialized(userId string) bool { - if _, ok := userAliases[userId]; ok { + if _, ok := userAliases.Load(userId); ok { return true } return false diff --git a/services/summary.go b/services/summary.go index 20d1afe..6efe4cb 100644 --- a/services/summary.go +++ b/services/summary.go @@ -1,6 +1,7 @@ package services import ( + "fmt" "math" "sort" "time" @@ -66,6 +67,14 @@ func (srv *SummaryService) GetSummary(from, to time.Time, user *models.User) (*m return summary, nil } +func (srv *SummaryService) SaveSummary(summary *models.Summary) error { + fmt.Println("Saving summary", summary) + if err := srv.Db.Create(summary).Error; err != nil { + return err + } + return nil +} + func (srv *SummaryService) GetLatestUserSummaries() ([]*models.Summary, error) { var summaries []*models.Summary if err := srv.Db.