mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
refactor: simplify summary generation (resolve #68)
This commit is contained in:
@@ -4,14 +4,12 @@ import (
|
||||
"crypto/md5"
|
||||
"errors"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/models"
|
||||
"github.com/muety/wakapi/repositories"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"math"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/muety/wakapi/models"
|
||||
)
|
||||
|
||||
const HeartbeatDiffThreshold = 2 * time.Minute
|
||||
@@ -24,6 +22,8 @@ type SummaryService struct {
|
||||
aliasService *AliasService
|
||||
}
|
||||
|
||||
type SummaryRetriever func(f, t time.Time, u *models.User) (*models.Summary, error)
|
||||
|
||||
func NewSummaryService(summaryRepo *repositories.SummaryRepository, heartbeatService *HeartbeatService, aliasService *AliasService) *SummaryService {
|
||||
return &SummaryService{
|
||||
config: config.Get(),
|
||||
@@ -34,60 +34,98 @@ func NewSummaryService(summaryRepo *repositories.SummaryRepository, heartbeatSer
|
||||
}
|
||||
}
|
||||
|
||||
type Interval struct {
|
||||
Start time.Time
|
||||
End time.Time
|
||||
}
|
||||
// Public summary generation methods
|
||||
|
||||
// TODO: simplify!
|
||||
func (srv *SummaryService) Construct(from, to time.Time, user *models.User, recompute bool) (*models.Summary, error) {
|
||||
var existingSummaries []*models.Summary
|
||||
var cacheKey string
|
||||
|
||||
if recompute {
|
||||
existingSummaries = make([]*models.Summary, 0)
|
||||
} else {
|
||||
cacheKey = getHash([]time.Time{from, to}, user)
|
||||
if result, ok := srv.cache.Get(cacheKey); ok {
|
||||
return result.(*models.Summary), nil
|
||||
}
|
||||
summaries, err := srv.GetByUserWithin(user, from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
existingSummaries = summaries
|
||||
func (srv *SummaryService) Aliased(from, to time.Time, user *models.User, f SummaryRetriever) (*models.Summary, error) {
|
||||
// Check cache
|
||||
cacheKey := srv.getHash(from.String(), to.String(), user.ID, "--aliased")
|
||||
if cacheResult, ok := srv.cache.Get(cacheKey); ok {
|
||||
return cacheResult.(*models.Summary), nil
|
||||
}
|
||||
|
||||
missingIntervals := getMissingIntervals(from, to, existingSummaries)
|
||||
// Wrap alias resolution
|
||||
resolve := func(t uint8, k string) string {
|
||||
s, _ := srv.aliasService.GetAliasOrDefault(user.ID, t, k)
|
||||
return s
|
||||
}
|
||||
|
||||
heartbeats := make([]*models.Heartbeat, 0)
|
||||
// Initialize alias resolver service
|
||||
if err := srv.aliasService.LoadUserAliases(user.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get actual summary
|
||||
s, err := f(from, to, user)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Post-process summary and cache it
|
||||
summary := s.WithResolvedAliases(resolve)
|
||||
srv.cache.SetDefault(cacheKey, summary)
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (srv *SummaryService) Retrieve(from, to time.Time, user *models.User) (*models.Summary, error) {
|
||||
// Check cache
|
||||
cacheKey := srv.getHash(from.String(), to.String(), user.ID, "--aliased")
|
||||
if cacheResult, ok := srv.cache.Get(cacheKey); ok {
|
||||
return cacheResult.(*models.Summary), nil
|
||||
}
|
||||
|
||||
// Get all already existing, pre-generated summaries that fall into the requested interval
|
||||
summaries, err := srv.repository.GetByUserWithin(user, from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Generate missing slots (especially before and after existing summaries) from raw heartbeats
|
||||
missingIntervals := srv.getMissingIntervals(from, to, summaries)
|
||||
for _, interval := range missingIntervals {
|
||||
hb, err := srv.heartbeatService.GetAllWithin(interval.Start, interval.End, user)
|
||||
if err != nil {
|
||||
if s, err := srv.Summarize(interval.Start, interval.End, user); err == nil {
|
||||
summaries = append(summaries, s)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
heartbeats = append(heartbeats, hb...)
|
||||
}
|
||||
|
||||
// Merge existing and newly generated summary snippets
|
||||
summary, err := srv.mergeSummaries(summaries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Cache 'em
|
||||
srv.cache.SetDefault(cacheKey, summary)
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (srv *SummaryService) Summarize(from, to time.Time, user *models.User) (*models.Summary, error) {
|
||||
// Initialize and fetch data
|
||||
var heartbeats models.Heartbeats
|
||||
if rawHeartbeats, err := srv.heartbeatService.GetAllWithin(from, to, user); err == nil {
|
||||
heartbeats = rawHeartbeats
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
types := models.SummaryTypes()
|
||||
|
||||
typedAggregations := make(chan models.SummaryItemContainer)
|
||||
defer close(typedAggregations)
|
||||
for _, t := range types {
|
||||
go srv.aggregateBy(heartbeats, t, typedAggregations)
|
||||
}
|
||||
|
||||
// Aggregate raw heartbeats by types in parallel and collect them
|
||||
var projectItems []*models.SummaryItem
|
||||
var languageItems []*models.SummaryItem
|
||||
var editorItems []*models.SummaryItem
|
||||
var osItems []*models.SummaryItem
|
||||
var machineItems []*models.SummaryItem
|
||||
|
||||
if err := srv.aliasService.LoadUserAliases(user.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
c := make(chan models.SummaryItemContainer)
|
||||
for _, t := range types {
|
||||
go srv.aggregateBy(heartbeats, t, user, c)
|
||||
}
|
||||
|
||||
for i := 0; i < len(types); i++ {
|
||||
item := <-c
|
||||
item := <-typedAggregations
|
||||
switch item.Type {
|
||||
case models.SummaryProject:
|
||||
projectItems = item.Items
|
||||
@@ -101,31 +139,16 @@ func (srv *SummaryService) Construct(from, to time.Time, user *models.User, reco
|
||||
machineItems = item.Items
|
||||
}
|
||||
}
|
||||
close(c)
|
||||
|
||||
realFrom, realTo := from, to
|
||||
if len(existingSummaries) > 0 {
|
||||
realFrom = existingSummaries[0].FromTime.T()
|
||||
realTo = existingSummaries[len(existingSummaries)-1].ToTime.T()
|
||||
|
||||
for _, summary := range existingSummaries {
|
||||
summary.FillUnknown()
|
||||
}
|
||||
}
|
||||
if len(heartbeats) > 0 {
|
||||
t1, t2 := time.Time(heartbeats[0].Time), time.Time(heartbeats[len(heartbeats)-1].Time)
|
||||
if t1.After(realFrom) && t1.Before(time.Date(realFrom.Year(), realFrom.Month(), realFrom.Day()+1, 0, 0, 0, 0, realFrom.Location())) {
|
||||
realFrom = t1
|
||||
}
|
||||
if t2.Before(realTo) && t2.After(time.Date(realTo.Year(), realTo.Month(), realTo.Day()-1, 0, 0, 0, 0, realTo.Location())) {
|
||||
realTo = t2
|
||||
}
|
||||
if heartbeats.Len() > 0 {
|
||||
from = time.Time(heartbeats.First().Time)
|
||||
to = time.Time(heartbeats.Last().Time)
|
||||
}
|
||||
|
||||
aggregatedSummary := &models.Summary{
|
||||
summary := &models.Summary{
|
||||
UserID: user.ID,
|
||||
FromTime: models.CustomTime(realFrom),
|
||||
ToTime: models.CustomTime(realTo),
|
||||
FromTime: models.CustomTime(from),
|
||||
ToTime: models.CustomTime(to),
|
||||
Projects: projectItems,
|
||||
Languages: languageItems,
|
||||
Editors: editorItems,
|
||||
@@ -133,123 +156,32 @@ func (srv *SummaryService) Construct(from, to time.Time, user *models.User, reco
|
||||
Machines: machineItems,
|
||||
}
|
||||
|
||||
allSummaries := []*models.Summary{aggregatedSummary}
|
||||
allSummaries = append(allSummaries, existingSummaries...)
|
||||
|
||||
summary, err := mergeSummaries(allSummaries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cacheKey != "" {
|
||||
srv.cache.SetDefault(cacheKey, summary)
|
||||
}
|
||||
summary.FillUnknown()
|
||||
|
||||
return summary, nil
|
||||
}
|
||||
|
||||
func (srv *SummaryService) PostProcessWrapped(summary *models.Summary, err error) (*models.Summary, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return srv.PostProcess(summary), nil
|
||||
}
|
||||
// CRUD methods
|
||||
|
||||
func (srv *SummaryService) PostProcess(summary *models.Summary) *models.Summary {
|
||||
updatedSummary := &models.Summary{
|
||||
ID: summary.ID,
|
||||
UserID: summary.UserID,
|
||||
FromTime: summary.FromTime,
|
||||
ToTime: summary.ToTime,
|
||||
}
|
||||
|
||||
processAliases := func(origin []*models.SummaryItem) []*models.SummaryItem {
|
||||
target := make([]*models.SummaryItem, 0)
|
||||
|
||||
findItem := func(key string) *models.SummaryItem {
|
||||
for _, item := range target {
|
||||
if item.Key == key {
|
||||
return item
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, item := range origin {
|
||||
// Add all "top-level" items, i.e. such without aliases
|
||||
if key, _ := srv.aliasService.GetAliasOrDefault(summary.UserID, item.Type, item.Key); key == item.Key {
|
||||
target = append(target, item)
|
||||
}
|
||||
}
|
||||
|
||||
for _, item := range origin {
|
||||
// Add all remaining projects and merge with their alias
|
||||
if key, _ := srv.aliasService.GetAliasOrDefault(summary.UserID, item.Type, item.Key); key != item.Key {
|
||||
if targetItem := findItem(key); targetItem != nil {
|
||||
targetItem.Total += item.Total
|
||||
} else {
|
||||
target = append(target, &models.SummaryItem{
|
||||
ID: item.ID,
|
||||
SummaryID: item.SummaryID,
|
||||
Type: item.Type,
|
||||
Key: key,
|
||||
Total: item.Total,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return target
|
||||
}
|
||||
|
||||
// Resolve aliases
|
||||
updatedSummary.Projects = processAliases(summary.Projects)
|
||||
updatedSummary.Editors = processAliases(summary.Editors)
|
||||
updatedSummary.Languages = processAliases(summary.Languages)
|
||||
updatedSummary.OperatingSystems = processAliases(summary.OperatingSystems)
|
||||
updatedSummary.Machines = processAliases(summary.Machines)
|
||||
|
||||
return updatedSummary
|
||||
}
|
||||
|
||||
func (srv *SummaryService) Insert(summary *models.Summary) error {
|
||||
return srv.repository.Insert(summary)
|
||||
}
|
||||
|
||||
func (srv *SummaryService) GetByUserWithin(user *models.User, from, to time.Time) ([]*models.Summary, error) {
|
||||
return srv.repository.GetByUserWithin(user, from, to)
|
||||
}
|
||||
|
||||
// Will return *models.Index objects with only user_id and to_time filled
|
||||
func (srv *SummaryService) GetLatestByUser() ([]*models.Summary, error) {
|
||||
return srv.repository.GetLatestByUser()
|
||||
func (srv *SummaryService) GetLatestByUser() ([]*models.TimeByUser, error) {
|
||||
return srv.repository.GetLastByUser()
|
||||
}
|
||||
|
||||
func (srv *SummaryService) DeleteByUser(userId string) error {
|
||||
return srv.repository.DeleteByUser(userId)
|
||||
}
|
||||
|
||||
func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, user *models.User, c chan models.SummaryItemContainer) {
|
||||
func (srv *SummaryService) Insert(summary *models.Summary) error {
|
||||
return srv.repository.Insert(summary)
|
||||
}
|
||||
|
||||
// Private summary generation and utility methods
|
||||
|
||||
func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, c chan models.SummaryItemContainer) {
|
||||
durations := make(map[string]time.Duration)
|
||||
|
||||
for i, h := range heartbeats {
|
||||
var key string
|
||||
switch summaryType {
|
||||
case models.SummaryProject:
|
||||
key = h.Project
|
||||
case models.SummaryEditor:
|
||||
key = h.Editor
|
||||
case models.SummaryLanguage:
|
||||
key = h.Language
|
||||
case models.SummaryOS:
|
||||
key = h.OperatingSystem
|
||||
case models.SummaryMachine:
|
||||
key = h.Machine
|
||||
}
|
||||
|
||||
if key == "" {
|
||||
key = models.UnknownSummaryKey
|
||||
}
|
||||
key := h.GetKey(summaryType)
|
||||
|
||||
if _, ok := durations[key]; !ok {
|
||||
durations[key] = time.Duration(0)
|
||||
@@ -287,43 +219,7 @@ func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryTy
|
||||
c <- models.SummaryItemContainer{Type: summaryType, Items: items}
|
||||
}
|
||||
|
||||
func getMissingIntervals(from, to time.Time, existingSummaries []*models.Summary) []*Interval {
|
||||
if len(existingSummaries) == 0 {
|
||||
return []*Interval{{from, to}}
|
||||
}
|
||||
|
||||
intervals := make([]*Interval, 0)
|
||||
|
||||
// Pre
|
||||
if from.Before(existingSummaries[0].FromTime.T()) {
|
||||
intervals = append(intervals, &Interval{from, existingSummaries[0].FromTime.T()})
|
||||
}
|
||||
|
||||
// Between
|
||||
for i := 0; i < len(existingSummaries)-1; i++ {
|
||||
t1, t2 := existingSummaries[i].ToTime.T(), existingSummaries[i+1].FromTime.T()
|
||||
if t1.Equal(t2) {
|
||||
continue
|
||||
}
|
||||
|
||||
// round to end of day / start of day, assuming that summaries are always generated on a per-day basis
|
||||
td1 := time.Date(t1.Year(), t1.Month(), t1.Day()+1, 0, 0, 0, 0, t1.Location())
|
||||
td2 := time.Date(t2.Year(), t2.Month(), t2.Day(), 0, 0, 0, 0, t2.Location())
|
||||
// one or more day missing in between?
|
||||
if td1.Before(td2) {
|
||||
intervals = append(intervals, &Interval{existingSummaries[i].ToTime.T(), existingSummaries[i+1].FromTime.T()})
|
||||
}
|
||||
}
|
||||
|
||||
// Post
|
||||
if to.After(existingSummaries[len(existingSummaries)-1].ToTime.T()) {
|
||||
intervals = append(intervals, &Interval{existingSummaries[len(existingSummaries)-1].ToTime.T(), to})
|
||||
}
|
||||
|
||||
return intervals
|
||||
}
|
||||
|
||||
func mergeSummaries(summaries []*models.Summary) (*models.Summary, error) {
|
||||
func (srv *SummaryService) mergeSummaries(summaries []*models.Summary) (*models.Summary, error) {
|
||||
if len(summaries) < 1 {
|
||||
return nil, errors.New("no summaries given")
|
||||
}
|
||||
@@ -353,11 +249,11 @@ func mergeSummaries(summaries []*models.Summary) (*models.Summary, error) {
|
||||
maxTime = s.ToTime.T()
|
||||
}
|
||||
|
||||
finalSummary.Projects = mergeSummaryItems(finalSummary.Projects, s.Projects)
|
||||
finalSummary.Languages = mergeSummaryItems(finalSummary.Languages, s.Languages)
|
||||
finalSummary.Editors = mergeSummaryItems(finalSummary.Editors, s.Editors)
|
||||
finalSummary.OperatingSystems = mergeSummaryItems(finalSummary.OperatingSystems, s.OperatingSystems)
|
||||
finalSummary.Machines = mergeSummaryItems(finalSummary.Machines, s.Machines)
|
||||
finalSummary.Projects = srv.mergeSummaryItems(finalSummary.Projects, s.Projects)
|
||||
finalSummary.Languages = srv.mergeSummaryItems(finalSummary.Languages, s.Languages)
|
||||
finalSummary.Editors = srv.mergeSummaryItems(finalSummary.Editors, s.Editors)
|
||||
finalSummary.OperatingSystems = srv.mergeSummaryItems(finalSummary.OperatingSystems, s.OperatingSystems)
|
||||
finalSummary.Machines = srv.mergeSummaryItems(finalSummary.Machines, s.Machines)
|
||||
}
|
||||
|
||||
finalSummary.FromTime = models.CustomTime(minTime)
|
||||
@@ -366,7 +262,7 @@ func mergeSummaries(summaries []*models.Summary) (*models.Summary, error) {
|
||||
return finalSummary, nil
|
||||
}
|
||||
|
||||
func mergeSummaryItems(existing []*models.SummaryItem, new []*models.SummaryItem) []*models.SummaryItem {
|
||||
func (srv *SummaryService) mergeSummaryItems(existing []*models.SummaryItem, new []*models.SummaryItem) []*models.SummaryItem {
|
||||
items := make(map[string]*models.SummaryItem)
|
||||
|
||||
// Build map from existing
|
||||
@@ -396,11 +292,46 @@ func mergeSummaryItems(existing []*models.SummaryItem, new []*models.SummaryItem
|
||||
return itemList
|
||||
}
|
||||
|
||||
func getHash(times []time.Time, user *models.User) string {
|
||||
digest := md5.New()
|
||||
for _, t := range times {
|
||||
digest.Write([]byte(strconv.Itoa(int(t.Unix()))))
|
||||
func (srv *SummaryService) getMissingIntervals(from, to time.Time, summaries []*models.Summary) []*models.Interval {
|
||||
if len(summaries) == 0 {
|
||||
return []*models.Interval{{from, to}}
|
||||
}
|
||||
|
||||
intervals := make([]*models.Interval, 0)
|
||||
|
||||
// Pre
|
||||
if from.Before(summaries[0].FromTime.T()) {
|
||||
intervals = append(intervals, &models.Interval{from, summaries[0].FromTime.T()})
|
||||
}
|
||||
|
||||
// Between
|
||||
for i := 0; i < len(summaries)-1; i++ {
|
||||
t1, t2 := summaries[i].ToTime.T(), summaries[i+1].FromTime.T()
|
||||
if t1.Equal(t2) {
|
||||
continue
|
||||
}
|
||||
|
||||
// round to end of day / start of day, assuming that summaries are always generated on a per-day basis
|
||||
td1 := time.Date(t1.Year(), t1.Month(), t1.Day()+1, 0, 0, 0, 0, t1.Location())
|
||||
td2 := time.Date(t2.Year(), t2.Month(), t2.Day(), 0, 0, 0, 0, t2.Location())
|
||||
// one or more day missing in between?
|
||||
if td1.Before(td2) {
|
||||
intervals = append(intervals, &models.Interval{summaries[i].ToTime.T(), summaries[i+1].FromTime.T()})
|
||||
}
|
||||
}
|
||||
|
||||
// Post
|
||||
if to.After(summaries[len(summaries)-1].ToTime.T()) {
|
||||
intervals = append(intervals, &models.Interval{summaries[len(summaries)-1].ToTime.T(), to})
|
||||
}
|
||||
|
||||
return intervals
|
||||
}
|
||||
|
||||
func (srv *SummaryService) getHash(args ...string) string {
|
||||
digest := md5.New()
|
||||
for _, a := range args {
|
||||
digest.Write([]byte(a))
|
||||
}
|
||||
digest.Write([]byte(user.ID))
|
||||
return string(digest.Sum(nil))
|
||||
}
|
||||
|
Reference in New Issue
Block a user