1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

Unstable. Further work on aggregations.

This commit is contained in:
Ferdinand Mütsch 2019-05-17 02:05:38 +02:00
parent de65ab1814
commit adb5abd4d2
5 changed files with 158 additions and 15 deletions

View File

@ -63,6 +63,7 @@ func main() {
// Connect to database // Connect to database
db, err := gorm.Open(config.DbDialect, utils.MakeConnectionString(config)) db, err := gorm.Open(config.DbDialect, utils.MakeConnectionString(config))
db.LogMode(true)
if err != nil { if err != nil {
// log.Fatal("Could not connect to database.") // log.Fatal("Could not connect to database.")
log.Fatal(err) log.Fatal(err)

View File

@ -1,31 +1,45 @@
package models package models
import ( import (
"database/sql/driver"
"time" "time"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
) )
const ( const (
AggregationProject int = 0 NAggregationTypes uint8 = 4
AggregationLanguage int = 1 AggregationProject uint8 = 0
AggregationEditor int = 2 AggregationLanguage uint8 = 1
AggregationOS int = 3 AggregationEditor uint8 = 2
AggregationOS uint8 = 3
) )
type Aggregation struct { type Aggregation struct {
gorm.Model gorm.Model
User *User `gorm:"not null; association_foreignkey:ID"` User *User `gorm:"not null; association_foreignkey:ID"`
UserID string `gorm:"not null; index:idx_user,idx_type_time_user"` UserID string `gorm:"not null; index:idx_user,idx_type_time_user"`
From time.Time `gorm:"not null; index:idx_from,idx_type_time_user; default:now()"` FromTime *time.Time `gorm:"not null; index:idx_from,idx_type_time_user; default:now()"`
To time.Time `gorm:"not null; index:idx_to,idx_type_time_user; default:now()"` ToTime *time.Time `gorm:"not null; index:idx_to,idx_type_time_user; default:now()"`
Duration time.Duration `gorm:"-"` Duration time.Duration `gorm:"-"`
Type uint8 `gorm:"not null; index:idx_type,idx_type_time_user"` Type uint8 `gorm:"not null; index:idx_type,idx_type_time_user"`
Items []AggregationItem Items []AggregationItem
} }
type AggregationItem struct { type AggregationItem struct {
ID uint `gorm:"primary_key; auto_increment"`
AggregationID uint `gorm:"not null; association_foreignkey:ID"` AggregationID uint `gorm:"not null; association_foreignkey:ID"`
Key string `gorm:"not null"` Key string `gorm:"not null"`
Total time.Duration Total ScannableDuration
}
type ScannableDuration time.Duration
func (d *ScannableDuration) Scan(value interface{}) error {
*d = ScannableDuration(*d) * ScannableDuration(time.Second)
return nil
}
func (d ScannableDuration) Value() (driver.Value, error) {
return int64(time.Duration(d) / time.Second), nil
} }

View File

@ -45,7 +45,6 @@ func (j *HeartbeatReqTime) UnmarshalJSON(b []byte) error {
} }
func (j *HeartbeatReqTime) Scan(value interface{}) error { func (j *HeartbeatReqTime) Scan(value interface{}) error {
fmt.Printf("%T", value)
switch value.(type) { switch value.(type) {
case int64: case int64:
*j = HeartbeatReqTime(time.Unix(123456, 0)) *j = HeartbeatReqTime(time.Unix(123456, 0))
@ -67,3 +66,7 @@ func (j HeartbeatReqTime) String() string {
t := time.Time(j) t := time.Time(j)
return t.Format("2006-01-02 15:04:05") return t.Format("2006-01-02 15:04:05")
} }
func (j HeartbeatReqTime) Time() time.Time {
return time.Time(j)
}

View File

@ -33,7 +33,17 @@ func (h *AggregationHandler) Get(w http.ResponseWriter, r *http.Request) {
to = time.Now() to = time.Now()
} }
h.AggregationSrvc.Aggregate(from, to, user) aggregations, err := h.AggregationSrvc.FindOrAggregate(from, to, user)
if err != nil {
w.WriteHeader(500)
return
}
for i := 0; i < len(aggregations); i++ {
if err := h.AggregationSrvc.SaveAggregation(aggregations[i]); err != nil {
w.WriteHeader(500)
return
}
}
w.WriteHeader(200) w.WriteHeader(200)
} }

View File

@ -1,8 +1,8 @@
package services package services
import ( import (
"fmt"
"log" "log"
"math"
"time" "time"
"github.com/jinzhu/gorm" "github.com/jinzhu/gorm"
@ -15,16 +15,131 @@ type AggregationService struct {
HeartbeatService *HeartbeatService HeartbeatService *HeartbeatService
} }
func (srv *AggregationService) Aggregate(from time.Time, to time.Time, user *models.User) { func (srv *AggregationService) SaveAggregation(aggregation *models.Aggregation) error {
if err := srv.Db.Save(aggregation).Error; err != nil {
return err
}
return nil
}
func (srv *AggregationService) DeleteAggregations(from, to time.Time) error {
// TODO
return nil
}
func (srv *AggregationService) FindOrAggregate(from, to time.Time, user *models.User) ([]*models.Aggregation, error) {
var existingAggregations []*models.Aggregation
if err := srv.Db.
Where(&models.Aggregation{UserID: user.ID}).
Where("from_time <= ?", from).
Where("to_time <= ?", to).
Order("to_time desc").
Limit(models.NAggregationTypes).
Find(&existingAggregations).Error; err != nil {
return nil, err
}
maxTo := getMaxTo(existingAggregations)
if len(existingAggregations) == 0 {
newAggregations := srv.aggregate(from, to, user)
for i := 0; i < len(newAggregations); i++ {
srv.SaveAggregation(newAggregations[i])
}
return newAggregations, nil
} else if maxTo.Before(to) {
// newAggregations := srv.aggregate(maxTo, to, user)
// TODO: compute aggregation(s) for remaining heartbeats
// TODO: if these aggregations are more than 24h, save them
// NOTE: never save aggregations that are less than 24h -> no need to delete some later
} else if maxTo.Equal(to) {
return existingAggregations, nil
}
// Should never occur
return make([]*models.Aggregation, 0), nil
}
func (srv *AggregationService) aggregate(from, to time.Time, user *models.User) []*models.Aggregation {
// TODO: Handle case that a time frame >= 24h is requested -> more than 4 will be returned
types := []uint8{models.AggregationProject, models.AggregationLanguage, models.AggregationEditor, models.AggregationOS}
heartbeats, err := srv.HeartbeatService.GetAllFrom(from, user) heartbeats, err := srv.HeartbeatService.GetAllFrom(from, user)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
for _, h := range *heartbeats {
fmt.Printf("%+v\n", h) var aggregations []*models.Aggregation
for _, t := range types {
aggregation := &models.Aggregation{
UserID: user.ID,
FromTime: &from,
ToTime: &to,
Duration: to.Sub(from),
Type: t,
Items: srv.aggregateBy(heartbeats, t)[0:1], //make([]*models.AggregationItem, 0),
}
aggregations = append(aggregations, aggregation)
} }
return aggregations
} }
func (srv *AggregationService) aggregateBy(heartbeats *[]models.Heartbeat, aggregationType int) *models.Aggregation { func (srv *AggregationService) aggregateBy(heartbeats *[]models.Heartbeat, aggregationType uint8) []models.AggregationItem {
return &models.Aggregation{} beats := *heartbeats
durations := make(map[string]time.Duration)
for i := 0; i < len(beats); i++ {
h := &beats[i]
var key string
switch aggregationType {
case models.AggregationProject:
key = h.Project
case models.AggregationEditor:
key = h.Editor
case models.AggregationLanguage:
key = h.Language
case models.AggregationOS:
key = h.OperatingSystem
}
if _, ok := durations[key]; !ok {
durations[key] = time.Duration(0)
}
if i == 0 {
continue
}
timePassed := h.Time.Time().Sub((&beats[i-1]).Time.Time())
timeThresholded := math.Min(float64(timePassed), float64(time.Duration(2)*time.Minute))
durations[key] += time.Duration(int64(timeThresholded))
}
var items []models.AggregationItem
for k, v := range durations {
items = append(items, models.AggregationItem{
AggregationID: 9,
Key: k,
Total: models.ScannableDuration(v),
})
}
return items
}
func (srv *AggregationService) MergeAggregations(aggregations []*models.Aggregation) []*models.Aggregation {
// TODO
return make([]*models.Aggregation, 0)
}
func getMaxTo(aggregations []*models.Aggregation) time.Time {
var maxTo time.Time
for i := 0; i < len(aggregations); i++ {
agg := aggregations[i]
if agg.ToTime.After(maxTo) {
maxTo = *agg.ToTime
}
}
return maxTo
} }