mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
Add database schema to persist summaries with their accompanying summary items.
Add basic methods to generate summary aggregation jobs.
This commit is contained in:
parent
680475d466
commit
851f378684
7
main.go
7
main.go
@ -89,7 +89,10 @@ func main() {
|
|||||||
// Migrate database schema
|
// Migrate database schema
|
||||||
db.AutoMigrate(&models.User{})
|
db.AutoMigrate(&models.User{})
|
||||||
db.AutoMigrate(&models.Alias{})
|
db.AutoMigrate(&models.Alias{})
|
||||||
|
db.AutoMigrate(&models.Summary{})
|
||||||
|
db.AutoMigrate(&models.SummaryItem{})
|
||||||
db.AutoMigrate(&models.Heartbeat{}).AddForeignKey("user_id", "users(id)", "RESTRICT", "RESTRICT")
|
db.AutoMigrate(&models.Heartbeat{}).AddForeignKey("user_id", "users(id)", "RESTRICT", "RESTRICT")
|
||||||
|
db.AutoMigrate(&models.SummaryItem{}).AddForeignKey("summary_id", "summaries(id)", "CASCADE", "CASCADE")
|
||||||
|
|
||||||
// Custom migrations and initial data
|
// Custom migrations and initial data
|
||||||
addDefaultUser(db, config)
|
addDefaultUser(db, config)
|
||||||
@ -100,6 +103,10 @@ func main() {
|
|||||||
heartbeatSrvc := &services.HeartbeatService{config, db}
|
heartbeatSrvc := &services.HeartbeatService{config, db}
|
||||||
userSrvc := &services.UserService{config, db}
|
userSrvc := &services.UserService{config, db}
|
||||||
summarySrvc := &services.SummaryService{config, db, heartbeatSrvc, aliasSrvc}
|
summarySrvc := &services.SummaryService{config, db, heartbeatSrvc, aliasSrvc}
|
||||||
|
aggregationSrvc := &services.AggregationService{config, db, userSrvc, summarySrvc, heartbeatSrvc}
|
||||||
|
|
||||||
|
sums, err := aggregationSrvc.GenerateJobs()
|
||||||
|
fmt.Println(sums)
|
||||||
|
|
||||||
// Handlers
|
// Handlers
|
||||||
heartbeatHandler := &routes.HeartbeatHandler{HeartbeatSrvc: heartbeatSrvc}
|
heartbeatHandler := &routes.HeartbeatHandler{HeartbeatSrvc: heartbeatSrvc}
|
||||||
|
@ -13,9 +13,10 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type Summary struct {
|
type Summary struct {
|
||||||
UserID string `json:"user_id"`
|
ID uint `json:"-" gorm:"primary_key"`
|
||||||
FromTime *time.Time `json:"from"`
|
UserID string `json:"user_id" gorm:"not null; index:idx_time_summary_user"`
|
||||||
ToTime *time.Time `json:"to"`
|
FromTime *time.Time `json:"from" gorm:"not null; type:timestamp; default:now(); index:idx_time_summary_user"`
|
||||||
|
ToTime *time.Time `json:"to" gorm:"not null; type:timestamp; default:now(); index:idx_time_summary_user"`
|
||||||
Projects []SummaryItem `json:"projects"`
|
Projects []SummaryItem `json:"projects"`
|
||||||
Languages []SummaryItem `json:"languages"`
|
Languages []SummaryItem `json:"languages"`
|
||||||
Editors []SummaryItem `json:"editors"`
|
Editors []SummaryItem `json:"editors"`
|
||||||
@ -23,8 +24,10 @@ type Summary struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type SummaryItem struct {
|
type SummaryItem struct {
|
||||||
Key string `json:"key"`
|
ID uint `json:"-" gorm:"primary_key"`
|
||||||
Total time.Duration `json:"total"`
|
SummaryID uint `json:"-"`
|
||||||
|
Key string `json:"key"`
|
||||||
|
Total time.Duration `json:"total"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SummaryItemContainer struct {
|
type SummaryItemContainer struct {
|
||||||
|
97
services/aggregation.go
Normal file
97
services/aggregation.go
Normal file
@ -0,0 +1,97 @@
|
|||||||
|
/*
|
||||||
|
<< WORK IN PROGRESS >>
|
||||||
|
Don't use theses classes, yet.
|
||||||
|
|
||||||
|
This aims to implement https://github.com/n1try/wakapi/issues/1.
|
||||||
|
Idea is to have regularly running, cron-like background jobs that request a summary
|
||||||
|
from SummaryService for a pre-defined time interval, e.g. 24 hours. Those are persisted
|
||||||
|
to the database. Once a user request a summary for a certain time frame that partilly
|
||||||
|
overlaps with pre-generated summaries, those will be aggregated together with actual heartbeats
|
||||||
|
for the non-overlapping time frames left and right.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package services
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jinzhu/gorm"
|
||||||
|
"github.com/n1try/wakapi/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type AggregationService struct {
|
||||||
|
Config *models.Config
|
||||||
|
Db *gorm.DB
|
||||||
|
UserService *UserService
|
||||||
|
SummaryService *SummaryService
|
||||||
|
HeartbeatService *HeartbeatService
|
||||||
|
}
|
||||||
|
|
||||||
|
type AggregationJob struct {
|
||||||
|
UserId string
|
||||||
|
From time.Time
|
||||||
|
To time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use https://godoc.org/github.com/jasonlvhit/gocron to trigger jobs on a regular basis.
|
||||||
|
func (srv *AggregationService) Start(interval time.Duration) {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (srv *AggregationService) generateJobs() (*list.List, error) {
|
||||||
|
var aggregationJobs *list.List = list.New()
|
||||||
|
|
||||||
|
users, err := srv.UserService.GetAll()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
latestSummaries, err := srv.SummaryService.GetLatestUserSummaries()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
userSummaryTimes := make(map[string]*time.Time)
|
||||||
|
for _, s := range latestSummaries {
|
||||||
|
userSummaryTimes[s.UserID] = s.ToTime
|
||||||
|
}
|
||||||
|
|
||||||
|
missingUserIds := make([]string, 0)
|
||||||
|
for _, u := range users {
|
||||||
|
if _, ok := userSummaryTimes[u.ID]; !ok {
|
||||||
|
missingUserIds = append(missingUserIds, u.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
firstHeartbeats, err := srv.HeartbeatService.GetFirstUserHeartbeats(missingUserIds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for id, t := range userSummaryTimes {
|
||||||
|
var from time.Time
|
||||||
|
if t.Hour() == 0 {
|
||||||
|
from = *t
|
||||||
|
} else {
|
||||||
|
nextDay := t.Add(24 * time.Hour)
|
||||||
|
from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location())
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregationJobs.PushBack(&AggregationJob{id, from, from.Add(24 * time.Hour)})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, h := range firstHeartbeats {
|
||||||
|
var from time.Time
|
||||||
|
var t time.Time = time.Time(*(h.Time))
|
||||||
|
if t.Hour() == 0 {
|
||||||
|
from = time.Time(*(h.Time))
|
||||||
|
} else {
|
||||||
|
nextDay := t.Add(24 * time.Hour)
|
||||||
|
from = time.Date(nextDay.Year(), nextDay.Month(), nextDay.Day(), 0, 0, 0, 0, t.Location())
|
||||||
|
}
|
||||||
|
|
||||||
|
aggregationJobs.PushBack(&AggregationJob{h.UserID, from, from.Add(24 * time.Hour)})
|
||||||
|
}
|
||||||
|
|
||||||
|
return aggregationJobs, nil
|
||||||
|
}
|
@ -39,3 +39,16 @@ func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User)
|
|||||||
}
|
}
|
||||||
return heartbeats, nil
|
return heartbeats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (srv *HeartbeatService) GetFirstUserHeartbeats(userIds []string) ([]*models.Heartbeat, error) {
|
||||||
|
var heartbeats []*models.Heartbeat
|
||||||
|
if err := srv.Db.
|
||||||
|
Table("heartbeats").
|
||||||
|
Select("user_id, min(time) as time").
|
||||||
|
Where("user_id IN (?)", userIds).
|
||||||
|
Group("user_id").
|
||||||
|
Scan(&heartbeats).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return heartbeats, nil
|
||||||
|
}
|
||||||
|
@ -66,6 +66,18 @@ func (srv *SummaryService) GetSummary(from, to time.Time, user *models.User) (*m
|
|||||||
return summary, nil
|
return summary, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (srv *SummaryService) GetLatestUserSummaries() ([]*models.Summary, error) {
|
||||||
|
var summaries []*models.Summary
|
||||||
|
if err := srv.Db.
|
||||||
|
Table("summaries").
|
||||||
|
Select("user_id, max(to_time) as to_time").
|
||||||
|
Group("user_id").
|
||||||
|
Scan(&summaries).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return summaries, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, user *models.User, c chan models.SummaryItemContainer) {
|
func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, user *models.User, c chan models.SummaryItemContainer) {
|
||||||
durations := make(map[string]time.Duration)
|
durations := make(map[string]time.Duration)
|
||||||
|
|
||||||
|
@ -25,3 +25,13 @@ func (srv *UserService) GetUserByKey(key string) (*models.User, error) {
|
|||||||
}
|
}
|
||||||
return u, nil
|
return u, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (srv *UserService) GetAll() ([]*models.User, error) {
|
||||||
|
var users []*models.User
|
||||||
|
if err := srv.Db.
|
||||||
|
Table("users").
|
||||||
|
Find(&users).Error; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return users, nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user