From adb5abd4d2b71a13f937e021f771aefcbbda4cd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ferdinand=20M=C3=BCtsch?= Date: Fri, 17 May 2019 02:05:38 +0200 Subject: [PATCH] Unstable. Further work on aggregations. --- main.go | 1 + models/aggregation.go | 28 ++++++--- models/heartbeat.go | 5 +- routes/aggregation.go | 12 +++- services/aggregation.go | 127 ++++++++++++++++++++++++++++++++++++++-- 5 files changed, 158 insertions(+), 15 deletions(-) diff --git a/main.go b/main.go index 49a55fa..e009031 100644 --- a/main.go +++ b/main.go @@ -63,6 +63,7 @@ func main() { // Connect to database db, err := gorm.Open(config.DbDialect, utils.MakeConnectionString(config)) + db.LogMode(true) if err != nil { // log.Fatal("Could not connect to database.") log.Fatal(err) diff --git a/models/aggregation.go b/models/aggregation.go index bba68ea..fffea6c 100644 --- a/models/aggregation.go +++ b/models/aggregation.go @@ -1,31 +1,45 @@ package models import ( + "database/sql/driver" "time" "github.com/jinzhu/gorm" ) const ( - AggregationProject int = 0 - AggregationLanguage int = 1 - AggregationEditor int = 2 - AggregationOS int = 3 + NAggregationTypes uint8 = 4 + AggregationProject uint8 = 0 + AggregationLanguage uint8 = 1 + AggregationEditor uint8 = 2 + AggregationOS uint8 = 3 ) type Aggregation struct { gorm.Model User *User `gorm:"not null; association_foreignkey:ID"` UserID string `gorm:"not null; index:idx_user,idx_type_time_user"` - From time.Time `gorm:"not null; index:idx_from,idx_type_time_user; default:now()"` - To time.Time `gorm:"not null; index:idx_to,idx_type_time_user; default:now()"` + FromTime *time.Time `gorm:"not null; index:idx_from,idx_type_time_user; default:now()"` + ToTime *time.Time `gorm:"not null; index:idx_to,idx_type_time_user; default:now()"` Duration time.Duration `gorm:"-"` Type uint8 `gorm:"not null; index:idx_type,idx_type_time_user"` Items []AggregationItem } type AggregationItem struct { + ID uint `gorm:"primary_key; auto_increment"` AggregationID uint `gorm:"not null; association_foreignkey:ID"` Key string `gorm:"not null"` - Total time.Duration + Total ScannableDuration +} + +type ScannableDuration time.Duration + +func (d *ScannableDuration) Scan(value interface{}) error { + *d = ScannableDuration(*d) * ScannableDuration(time.Second) + return nil +} + +func (d ScannableDuration) Value() (driver.Value, error) { + return int64(time.Duration(d) / time.Second), nil } diff --git a/models/heartbeat.go b/models/heartbeat.go index 11ba61c..8070411 100644 --- a/models/heartbeat.go +++ b/models/heartbeat.go @@ -45,7 +45,6 @@ func (j *HeartbeatReqTime) UnmarshalJSON(b []byte) error { } func (j *HeartbeatReqTime) Scan(value interface{}) error { - fmt.Printf("%T", value) switch value.(type) { case int64: *j = HeartbeatReqTime(time.Unix(123456, 0)) @@ -67,3 +66,7 @@ func (j HeartbeatReqTime) String() string { t := time.Time(j) return t.Format("2006-01-02 15:04:05") } + +func (j HeartbeatReqTime) Time() time.Time { + return time.Time(j) +} diff --git a/routes/aggregation.go b/routes/aggregation.go index 2d83040..f80ca56 100644 --- a/routes/aggregation.go +++ b/routes/aggregation.go @@ -33,7 +33,17 @@ func (h *AggregationHandler) Get(w http.ResponseWriter, r *http.Request) { to = time.Now() } - h.AggregationSrvc.Aggregate(from, to, user) + aggregations, err := h.AggregationSrvc.FindOrAggregate(from, to, user) + if err != nil { + w.WriteHeader(500) + return + } + for i := 0; i < len(aggregations); i++ { + if err := h.AggregationSrvc.SaveAggregation(aggregations[i]); err != nil { + w.WriteHeader(500) + return + } + } w.WriteHeader(200) } diff --git a/services/aggregation.go b/services/aggregation.go index eaedafc..c62bd2f 100644 --- a/services/aggregation.go +++ b/services/aggregation.go @@ -1,8 +1,8 @@ package services import ( - "fmt" "log" + "math" "time" "github.com/jinzhu/gorm" @@ -15,16 +15,131 @@ type AggregationService struct { HeartbeatService *HeartbeatService } -func (srv *AggregationService) Aggregate(from time.Time, to time.Time, user *models.User) { +func (srv *AggregationService) SaveAggregation(aggregation *models.Aggregation) error { + if err := srv.Db.Save(aggregation).Error; err != nil { + return err + } + return nil +} + +func (srv *AggregationService) DeleteAggregations(from, to time.Time) error { + // TODO + return nil +} + +func (srv *AggregationService) FindOrAggregate(from, to time.Time, user *models.User) ([]*models.Aggregation, error) { + var existingAggregations []*models.Aggregation + if err := srv.Db. + Where(&models.Aggregation{UserID: user.ID}). + Where("from_time <= ?", from). + Where("to_time <= ?", to). + Order("to_time desc"). + Limit(models.NAggregationTypes). + Find(&existingAggregations).Error; err != nil { + return nil, err + } + + maxTo := getMaxTo(existingAggregations) + + if len(existingAggregations) == 0 { + newAggregations := srv.aggregate(from, to, user) + for i := 0; i < len(newAggregations); i++ { + srv.SaveAggregation(newAggregations[i]) + } + return newAggregations, nil + } else if maxTo.Before(to) { + // newAggregations := srv.aggregate(maxTo, to, user) + // TODO: compute aggregation(s) for remaining heartbeats + // TODO: if these aggregations are more than 24h, save them + // NOTE: never save aggregations that are less than 24h -> no need to delete some later + } else if maxTo.Equal(to) { + return existingAggregations, nil + } + + // Should never occur + return make([]*models.Aggregation, 0), nil +} + +func (srv *AggregationService) aggregate(from, to time.Time, user *models.User) []*models.Aggregation { + // TODO: Handle case that a time frame >= 24h is requested -> more than 4 will be returned + types := []uint8{models.AggregationProject, models.AggregationLanguage, models.AggregationEditor, models.AggregationOS} heartbeats, err := srv.HeartbeatService.GetAllFrom(from, user) if err != nil { log.Fatal(err) } - for _, h := range *heartbeats { - fmt.Printf("%+v\n", h) + + var aggregations []*models.Aggregation + for _, t := range types { + aggregation := &models.Aggregation{ + UserID: user.ID, + FromTime: &from, + ToTime: &to, + Duration: to.Sub(from), + Type: t, + Items: srv.aggregateBy(heartbeats, t)[0:1], //make([]*models.AggregationItem, 0), + } + aggregations = append(aggregations, aggregation) } + + return aggregations } -func (srv *AggregationService) aggregateBy(heartbeats *[]models.Heartbeat, aggregationType int) *models.Aggregation { - return &models.Aggregation{} +func (srv *AggregationService) aggregateBy(heartbeats *[]models.Heartbeat, aggregationType uint8) []models.AggregationItem { + beats := *heartbeats + durations := make(map[string]time.Duration) + + for i := 0; i < len(beats); i++ { + h := &beats[i] + + var key string + switch aggregationType { + case models.AggregationProject: + key = h.Project + case models.AggregationEditor: + key = h.Editor + case models.AggregationLanguage: + key = h.Language + case models.AggregationOS: + key = h.OperatingSystem + } + + if _, ok := durations[key]; !ok { + durations[key] = time.Duration(0) + } + + if i == 0 { + continue + } + + timePassed := h.Time.Time().Sub((&beats[i-1]).Time.Time()) + timeThresholded := math.Min(float64(timePassed), float64(time.Duration(2)*time.Minute)) + durations[key] += time.Duration(int64(timeThresholded)) + } + + var items []models.AggregationItem + for k, v := range durations { + items = append(items, models.AggregationItem{ + AggregationID: 9, + Key: k, + Total: models.ScannableDuration(v), + }) + } + + return items +} + +func (srv *AggregationService) MergeAggregations(aggregations []*models.Aggregation) []*models.Aggregation { + // TODO + return make([]*models.Aggregation, 0) +} + +func getMaxTo(aggregations []*models.Aggregation) time.Time { + var maxTo time.Time + for i := 0; i < len(aggregations); i++ { + agg := aggregations[i] + if agg.ToTime.After(maxTo) { + maxTo = *agg.ToTime + } + } + return maxTo }