From 851f3786846150c6ad6eb253011a4ef49eb7dcc2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Wed, 9 Oct 2019 23:26:28 +0200 Subject: [PATCH] Add database schema to persist summaries with their accompanying summary items. Add basic methods to generate summary aggregation jobs. --- main.go | 7 +++ models/summary.go | 13 +++--- services/aggregation.go | 97 +++++++++++++++++++++++++++++++++++++++++ services/heartbeat.go | 13 ++++++ services/summary.go | 12 +++++ services/user.go | 10 +++++ 6 files changed, 147 insertions(+), 5 deletions(-) create mode 100644 services/aggregation.go diff --git a/main.go b/main.go index 9ff6a2e..7da3a4a 100644 --- a/main.go +++ b/main.go @@ -89,7 +89,10 @@ func main() { // Migrate database schema db.AutoMigrate(&models.User{}) db.AutoMigrate(&models.Alias{}) + db.AutoMigrate(&models.Summary{}) + db.AutoMigrate(&models.SummaryItem{}) db.AutoMigrate(&models.Heartbeat{}).AddForeignKey("user_id", "users(id)", "RESTRICT", "RESTRICT") + db.AutoMigrate(&models.SummaryItem{}).AddForeignKey("summary_id", "summaries(id)", "CASCADE", "CASCADE") // Custom migrations and initial data addDefaultUser(db, config) @@ -100,6 +103,10 @@ func main() { heartbeatSrvc := &services.HeartbeatService{config, db} userSrvc := &services.UserService{config, db} summarySrvc := &services.SummaryService{config, db, heartbeatSrvc, aliasSrvc} + aggregationSrvc := &services.AggregationService{config, db, userSrvc, summarySrvc, heartbeatSrvc} + + sums, err := aggregationSrvc.GenerateJobs() + fmt.Println(sums) // Handlers heartbeatHandler := &routes.HeartbeatHandler{HeartbeatSrvc: heartbeatSrvc} diff --git a/models/summary.go b/models/summary.go index d34c26a..5b97e5a 100644 --- a/models/summary.go +++ b/models/summary.go @@ -13,9 +13,10 @@ const ( ) type Summary struct { - UserID string `json:"user_id"` - FromTime *time.Time `json:"from"` - ToTime *time.Time `json:"to"` + ID uint `json:"-" gorm:"primary_key"` + UserID string `json:"user_id" gorm:"not null; index:idx_time_summary_user"` + FromTime *time.Time `json:"from" gorm:"not null; type:timestamp; default:now(); index:idx_time_summary_user"` + ToTime *time.Time `json:"to" gorm:"not null; type:timestamp; default:now(); index:idx_time_summary_user"` Projects []SummaryItem `json:"projects"` Languages []SummaryItem `json:"languages"` Editors []SummaryItem `json:"editors"` @@ -23,8 +24,10 @@ type Summary struct { } type SummaryItem struct { - Key string `json:"key"` - Total time.Duration `json:"total"` + ID uint `json:"-" gorm:"primary_key"` + SummaryID uint `json:"-"` + Key string `json:"key"` + Total time.Duration `json:"total"` } type SummaryItemContainer struct { diff --git a/services/aggregation.go b/services/aggregation.go new file mode 100644 index 0000000..d6f828b --- /dev/null +++ b/services/aggregation.go @@ -0,0 +1,97 @@ +/* + << WORK IN PROGRESS >> + Don't use theses classes, yet. + + This aims to implement https://github.com/n1try/wakapi/issues/1. + Idea is to have regularly running, cron-like background jobs that request a summary + from SummaryService for a pre-defined time interval, e.g. 24 hours. Those are persisted + to the database. Once a user request a summary for a certain time frame that partilly + overlaps with pre-generated summaries, those will be aggregated together with actual heartbeats + for the non-overlapping time frames left and right. +*/ + +package services + +import ( + "container/list" + "time" + + "github.com/jinzhu/gorm" + "github.com/n1try/wakapi/models" +) + +type AggregationService struct { + Config *models.Config + Db *gorm.DB + UserService *UserService + SummaryService *SummaryService + HeartbeatService *HeartbeatService +} + +type AggregationJob struct { + UserId string + From time.Time + To time.Time +} + +// Use https://godoc.org/github.com/jasonlvhit/gocron to trigger jobs on a regular basis. +func (srv *AggregationService) Start(interval time.Duration) { +} + +func (srv *AggregationService) generateJobs() (*list.List, error) { + var aggregationJobs *list.List = list.New() + + users, err := srv.UserService.GetAll() + if err != nil { + return nil, err + } + + latestSummaries, err := srv.SummaryService.GetLatestUserSummaries() + if err != nil { + return nil, err + } + + userSummaryTimes := make(map[string]*time.Time) + for _, s := range latestSummaries { + userSummaryTimes[s.UserID] = s.ToTime + } + + missingUserIds := make([]string, 0) + for _, u := range users { + if _, ok := userSummaryTimes[u.ID]; !ok { + missingUserIds = append(missingUserIds, u.ID) + } + } + + firstHeartbeats, err := srv.HeartbeatService.GetFirstUserHeartbeats(missingUserIds) + if err != nil { + return nil, err + } + + for id, t := range userSummaryTimes { + var from time.Time + if t.Hour() == 0 { + from = *t + } else { + nextDay := t.Add(24 * time.Hour) + from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location()) + } + + aggregationJobs.PushBack(&AggregationJob{id, from, from.Add(24 * time.Hour)}) + } + + for _, h := range firstHeartbeats { + var from time.Time + var t time.Time = time.Time(*(h.Time)) + if t.Hour() == 0 { + from = time.Time(*(h.Time)) + } else { + nextDay := t.Add(24 * time.Hour) + from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location()) + } + + aggregationJobs.PushBack(&AggregationJob{h.UserID, from, from.Add(24 * time.Hour)}) + } + + return aggregationJobs, nil +} diff --git a/services/heartbeat.go b/services/heartbeat.go index 267cb0e..6239aa6 100644 --- a/services/heartbeat.go +++ b/services/heartbeat.go @@ -39,3 +39,16 @@ func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User) } return heartbeats, nil } + +func (srv *HeartbeatService) GetFirstUserHeartbeats(userIds []string) ([]*models.Heartbeat, error) { + var heartbeats []*models.Heartbeat + if err := srv.Db. + Table("heartbeats"). + Select("user_id, min(time) as time"). + Where("user_id IN (?)", userIds). + Group("user_id"). + Scan(&heartbeats).Error; err != nil { + return nil, err + } + return heartbeats, nil +} diff --git a/services/summary.go b/services/summary.go index f8c6576..20d1afe 100644 --- a/services/summary.go +++ b/services/summary.go @@ -66,6 +66,18 @@ func (srv *SummaryService) GetSummary(from, to time.Time, user *models.User) (*m return summary, nil } +func (srv *SummaryService) GetLatestUserSummaries() ([]*models.Summary, error) { + var summaries []*models.Summary + if err := srv.Db. + Table("summaries"). + Select("user_id, max(to_time) as to_time"). + Group("user_id"). + Scan(&summaries).Error; err != nil { + return nil, err + } + return summaries, nil +} + func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, user *models.User, c chan models.SummaryItemContainer) { durations := make(map[string]time.Duration) diff --git a/services/user.go b/services/user.go index 3377032..d4ff1ab 100644 --- a/services/user.go +++ b/services/user.go @@ -25,3 +25,13 @@ func (srv *UserService) GetUserByKey(key string) (*models.User, error) { } return u, nil } + +func (srv *UserService) GetAll() ([]*models.User, error) { + var users []*models.User + if err := srv.Db. + Table("users"). + Find(&users).Error; err != nil { + return nil, err + } + return users, nil +}