2019-05-19 20:49:27 +03:00
package services
import (
2019-10-10 19:10:14 +03:00
"errors"
2021-08-06 18:07:36 +03:00
"fmt"
2021-04-19 21:48:07 +03:00
"github.com/emvi/logbuch"
2021-08-06 18:07:36 +03:00
"github.com/leandro-lugaresi/hub"
2020-09-29 19:55:07 +03:00
"github.com/muety/wakapi/config"
2020-11-07 14:01:35 +03:00
"github.com/muety/wakapi/models"
2020-11-01 18:56:36 +03:00
"github.com/muety/wakapi/repositories"
2020-02-20 16:28:55 +03:00
"github.com/patrickmn/go-cache"
2019-05-19 20:49:27 +03:00
"math"
2019-07-07 11:37:17 +03:00
"sort"
2021-06-12 11:43:56 +03:00
"strings"
2019-05-19 20:49:27 +03:00
"time"
)
2020-10-16 13:00:20 +03:00
const HeartbeatDiffThreshold = 2 * time . Minute
2019-05-19 20:49:27 +03:00
type SummaryService struct {
2021-06-11 21:59:34 +03:00
config * config . Config
cache * cache . Cache
2021-08-06 18:07:36 +03:00
eventBus * hub . Hub
2021-06-11 21:59:34 +03:00
repository repositories . ISummaryRepository
heartbeatService IHeartbeatService
aliasService IAliasService
projectLabelService IProjectLabelService
2019-05-19 20:49:27 +03:00
}
2020-11-07 14:01:35 +03:00
type SummaryRetriever func ( f , t time . Time , u * models . User ) ( * models . Summary , error )
2021-06-11 21:59:34 +03:00
func NewSummaryService ( summaryRepo repositories . ISummaryRepository , heartbeatService IHeartbeatService , aliasService IAliasService , projectLabelService IProjectLabelService ) * SummaryService {
2021-08-06 18:07:36 +03:00
srv := & SummaryService {
2021-06-11 21:59:34 +03:00
config : config . Get ( ) ,
cache : cache . New ( 24 * time . Hour , 24 * time . Hour ) ,
2021-08-06 18:07:36 +03:00
eventBus : config . EventBus ( ) ,
2021-06-11 21:59:34 +03:00
repository : summaryRepo ,
heartbeatService : heartbeatService ,
aliasService : aliasService ,
projectLabelService : projectLabelService ,
2020-05-24 18:32:26 +03:00
}
2021-08-06 18:07:36 +03:00
sub1 := srv . eventBus . Subscribe ( 0 , config . TopicProjectLabel )
go func ( sub * hub . Subscription ) {
for m := range sub . Receiver {
userId := m . Fields [ config . FieldUserId ] . ( string )
for key := range srv . cache . Items ( ) {
if strings . HasSuffix ( key , fmt . Sprintf ( "__%s__--aliased" , userId ) ) {
srv . cache . Delete ( key )
}
}
}
} ( & sub1 )
return srv
2020-05-24 18:32:26 +03:00
}
2020-11-07 14:01:35 +03:00
// Public summary generation methods
2019-10-10 19:10:14 +03:00
2021-08-06 18:07:36 +03:00
// Aliased retrieves or computes a new summary based on the given SummaryRetriever and augments it with entity aliases and project labels
2021-03-25 01:31:04 +03:00
func ( srv * SummaryService ) Aliased ( from , to time . Time , user * models . User , f SummaryRetriever , skipCache bool ) ( * models . Summary , error ) {
2020-11-07 14:01:35 +03:00
// Check cache
cacheKey := srv . getHash ( from . String ( ) , to . String ( ) , user . ID , "--aliased" )
2021-03-25 01:31:04 +03:00
if cacheResult , ok := srv . cache . Get ( cacheKey ) ; ok && ! skipCache {
2020-11-07 14:01:35 +03:00
return cacheResult . ( * models . Summary ) , nil
}
2020-02-20 16:28:55 +03:00
2020-11-07 14:01:35 +03:00
// Wrap alias resolution
resolve := func ( t uint8 , k string ) string {
s , _ := srv . aliasService . GetAliasOrDefault ( user . ID , t , k )
return s
}
// Initialize alias resolver service
2021-01-21 02:26:52 +03:00
if err := srv . aliasService . InitializeUser ( user . ID ) ; err != nil {
2020-11-07 14:01:35 +03:00
return nil , err
}
// Get actual summary
s , err := f ( from , to , user )
if err != nil {
return nil , err
2019-05-19 20:49:27 +03:00
}
2020-11-07 14:01:35 +03:00
// Post-process summary and cache it
summary := s . WithResolvedAliases ( resolve )
2021-08-06 18:07:36 +03:00
summary = srv . withProjectLabels ( summary )
2021-06-11 21:59:34 +03:00
summary . FillBy ( models . SummaryProject , models . SummaryLabel ) // first fill up labels from projects
summary . FillMissing ( ) // then, full up types which are entirely missing
2020-11-07 14:01:35 +03:00
srv . cache . SetDefault ( cacheKey , summary )
2020-11-22 00:30:56 +03:00
return summary . Sorted ( ) , nil
2020-11-07 14:01:35 +03:00
}
2019-10-10 19:10:14 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) Retrieve ( from , to time . Time , user * models . User ) ( * models . Summary , error ) {
// Get all already existing, pre-generated summaries that fall into the requested interval
summaries , err := srv . repository . GetByUserWithin ( user , from , to )
if err != nil {
return nil , err
}
// Generate missing slots (especially before and after existing summaries) from raw heartbeats
missingIntervals := srv . getMissingIntervals ( from , to , summaries )
2019-10-10 19:10:14 +03:00
for _ , interval := range missingIntervals {
2020-11-07 14:01:35 +03:00
if s , err := srv . Summarize ( interval . Start , interval . End , user ) ; err == nil {
summaries = append ( summaries , s )
} else {
2019-10-10 19:10:14 +03:00
return nil , err
}
}
2020-11-07 14:01:35 +03:00
// Merge existing and newly generated summary snippets
summary , err := srv . mergeSummaries ( summaries )
if err != nil {
return nil , err
}
2019-05-19 22:00:19 +03:00
2020-11-22 00:30:56 +03:00
return summary . Sorted ( ) , nil
2020-11-07 14:01:35 +03:00
}
2019-05-19 22:00:19 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) Summarize ( from , to time . Time , user * models . User ) ( * models . Summary , error ) {
// Initialize and fetch data
var heartbeats models . Heartbeats
if rawHeartbeats , err := srv . heartbeatService . GetAllWithin ( from , to , user ) ; err == nil {
heartbeats = rawHeartbeats
} else {
2019-07-07 11:32:28 +03:00
return nil , err
}
2021-06-11 21:59:34 +03:00
types := models . NativeSummaryTypes ( )
2020-11-07 14:01:35 +03:00
typedAggregations := make ( chan models . SummaryItemContainer )
defer close ( typedAggregations )
2019-05-19 22:00:19 +03:00
for _ , t := range types {
2020-11-07 14:01:35 +03:00
go srv . aggregateBy ( heartbeats , t , typedAggregations )
2019-05-19 22:00:19 +03:00
}
2020-11-07 14:01:35 +03:00
// Aggregate raw heartbeats by types in parallel and collect them
var projectItems [ ] * models . SummaryItem
var languageItems [ ] * models . SummaryItem
var editorItems [ ] * models . SummaryItem
var osItems [ ] * models . SummaryItem
var machineItems [ ] * models . SummaryItem
2019-05-19 22:00:19 +03:00
for i := 0 ; i < len ( types ) ; i ++ {
2020-11-07 14:01:35 +03:00
item := <- typedAggregations
2019-05-19 22:00:19 +03:00
switch item . Type {
case models . SummaryProject :
projectItems = item . Items
case models . SummaryLanguage :
languageItems = item . Items
case models . SummaryEditor :
editorItems = item . Items
case models . SummaryOS :
osItems = item . Items
2020-08-29 23:03:01 +03:00
case models . SummaryMachine :
machineItems = item . Items
2019-05-19 22:00:19 +03:00
}
}
2020-11-07 14:01:35 +03:00
if heartbeats . Len ( ) > 0 {
from = time . Time ( heartbeats . First ( ) . Time )
to = time . Time ( heartbeats . Last ( ) . Time )
2020-08-23 14:10:58 +03:00
}
2020-11-07 14:01:35 +03:00
summary := & models . Summary {
2019-05-19 20:49:27 +03:00
UserID : user . ID ,
2020-11-07 14:01:35 +03:00
FromTime : models . CustomTime ( from ) ,
ToTime : models . CustomTime ( to ) ,
2019-05-19 22:00:19 +03:00
Projects : projectItems ,
Languages : languageItems ,
Editors : editorItems ,
OperatingSystems : osItems ,
2020-08-29 23:03:01 +03:00
Machines : machineItems ,
2019-05-19 20:49:27 +03:00
}
2020-02-20 16:28:55 +03:00
2020-11-22 00:30:56 +03:00
return summary . Sorted ( ) , nil
2019-05-19 20:49:27 +03:00
}
2020-11-07 14:01:35 +03:00
// CRUD methods
2020-11-01 14:50:59 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) GetLatestByUser ( ) ( [ ] * models . TimeByUser , error ) {
return srv . repository . GetLastByUser ( )
}
2020-11-01 14:50:59 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) DeleteByUser ( userId string ) error {
2021-06-12 11:43:56 +03:00
srv . invalidateUserCache ( userId )
2020-11-07 14:01:35 +03:00
return srv . repository . DeleteByUser ( userId )
2020-11-01 14:50:59 +03:00
}
2019-10-11 09:00:02 +03:00
func ( srv * SummaryService ) Insert ( summary * models . Summary ) error {
2021-06-12 11:43:56 +03:00
srv . invalidateUserCache ( summary . UserID )
2020-11-01 18:56:36 +03:00
return srv . repository . Insert ( summary )
2019-10-10 17:47:19 +03:00
}
2020-11-07 14:01:35 +03:00
// Private summary generation and utility methods
2020-11-06 19:09:41 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) aggregateBy ( heartbeats [ ] * models . Heartbeat , summaryType uint8 , c chan models . SummaryItemContainer ) {
2019-05-19 20:49:27 +03:00
durations := make ( map [ string ] time . Duration )
for i , h := range heartbeats {
2020-11-07 14:01:35 +03:00
key := h . GetKey ( summaryType )
2019-05-20 19:44:16 +03:00
2019-05-19 20:49:27 +03:00
if _ , ok := durations [ key ] ; ! ok {
durations [ key ] = time . Duration ( 0 )
}
if i == 0 {
continue
}
2020-10-16 13:49:36 +03:00
t1 , t2 , tdiff := h . Time . T ( ) , heartbeats [ i - 1 ] . Time . T ( ) , time . Duration ( 0 )
2020-10-16 13:00:20 +03:00
// This is a hack. The time difference between two heartbeats from two subsequent day (e.g. 23:59:59 and 00:00:01) are ignored.
// This is to prevent a discrepancy between summaries computed solely from heartbeats and summaries involving pre-aggregated per-day summaries.
// For the latter, a duration is already pre-computed and information about individual heartbeats is lost, so there can be no cross-day overflow.
// Essentially, we simply ignore such edge-case heartbeats here, which makes the eventual total duration potentially a bit shorter.
if t1 . Day ( ) == t2 . Day ( ) {
timePassed := t1 . Sub ( t2 )
tdiff = time . Duration ( int64 ( math . Min ( float64 ( timePassed ) , float64 ( HeartbeatDiffThreshold ) ) ) )
}
durations [ key ] += tdiff
2019-05-19 20:49:27 +03:00
}
2019-10-11 09:00:02 +03:00
items := make ( [ ] * models . SummaryItem , 0 )
2019-05-19 20:49:27 +03:00
for k , v := range durations {
2019-10-11 09:00:02 +03:00
items = append ( items , & models . SummaryItem {
2019-05-19 20:49:27 +03:00
Key : k ,
Total : v / time . Second ,
2019-10-10 19:32:17 +03:00
Type : summaryType ,
2019-05-19 20:49:27 +03:00
} )
}
2019-07-07 11:37:17 +03:00
sort . Slice ( items , func ( i , j int ) bool {
return items [ i ] . Total > items [ j ] . Total
} )
2019-05-19 22:00:19 +03:00
c <- models . SummaryItemContainer { Type : summaryType , Items : items }
2019-05-19 20:49:27 +03:00
}
2019-10-10 19:10:14 +03:00
2021-06-11 21:59:34 +03:00
func ( srv * SummaryService ) withProjectLabels ( summary * models . Summary ) * models . Summary {
newEntry := func ( key string , total time . Duration ) * models . SummaryItem {
return & models . SummaryItem {
Type : models . SummaryLabel ,
Key : key ,
Total : total ,
}
}
allLabels , err := srv . projectLabelService . GetByUser ( summary . UserID )
if err != nil {
logbuch . Error ( "failed to retrieve project labels for user summary ('%s', '%s', '%s')" , summary . UserID , summary . FromTime . String ( ) , summary . ToTime . String ( ) )
return summary
}
mappedProjects := make ( map [ string ] * models . SummaryItem , len ( summary . Projects ) )
for _ , p := range summary . Projects {
mappedProjects [ p . Key ] = p
}
var totalLabelTime time . Duration
labelMap := make ( map [ string ] * models . SummaryItem , 0 )
for _ , l := range allLabels {
if p , ok := mappedProjects [ l . ProjectKey ] ; ok {
if _ , ok2 := labelMap [ l . Label ] ; ! ok2 {
labelMap [ l . Label ] = newEntry ( l . Label , 0 )
}
labelMap [ l . Label ] . Total += p . Total
totalLabelTime += p . Total
}
}
//labelMap[models.DefaultProjectLabel] = newEntry(models.DefaultProjectLabel, summary.TotalTimeBy(models.SummaryProject) / time.Second-totalLabelTime)
labels := make ( [ ] * models . SummaryItem , 0 , len ( labelMap ) )
for _ , v := range labelMap {
2021-06-12 13:06:24 +03:00
if v . Total > 0 {
labels = append ( labels , v )
}
2021-06-11 21:59:34 +03:00
}
summary . Labels = labels
return summary
}
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) mergeSummaries ( summaries [ ] * models . Summary ) ( * models . Summary , error ) {
2019-10-11 09:00:02 +03:00
if len ( summaries ) < 1 {
return nil , errors . New ( "no summaries given" )
}
var minTime , maxTime time . Time
minTime = time . Now ( )
finalSummary := & models . Summary {
UserID : summaries [ 0 ] . UserID ,
Projects : make ( [ ] * models . SummaryItem , 0 ) ,
Languages : make ( [ ] * models . SummaryItem , 0 ) ,
Editors : make ( [ ] * models . SummaryItem , 0 ) ,
OperatingSystems : make ( [ ] * models . SummaryItem , 0 ) ,
2020-08-29 23:03:01 +03:00
Machines : make ( [ ] * models . SummaryItem , 0 ) ,
2021-06-11 21:59:34 +03:00
Labels : make ( [ ] * models . SummaryItem , 0 ) ,
2019-10-11 09:00:02 +03:00
}
2021-04-19 22:14:35 +03:00
var processed = map [ time . Time ] bool { }
2021-04-19 21:48:07 +03:00
2019-10-11 09:00:02 +03:00
for _ , s := range summaries {
2021-04-19 22:14:35 +03:00
hash := s . FromTime . T ( )
2021-04-19 21:48:07 +03:00
if _ , found := processed [ hash ] ; found {
logbuch . Warn ( "summary from %v to %v (user '%s') was attempted to be processed more often than once" , s . FromTime , s . ToTime , s . UserID )
continue
}
2019-10-11 09:00:02 +03:00
if s . UserID != finalSummary . UserID {
return nil , errors . New ( "users don't match" )
}
2020-10-16 15:49:22 +03:00
if s . FromTime . T ( ) . Before ( minTime ) {
minTime = s . FromTime . T ( )
2019-10-11 09:00:02 +03:00
}
2020-10-16 15:49:22 +03:00
if s . ToTime . T ( ) . After ( maxTime ) {
maxTime = s . ToTime . T ( )
2019-10-11 09:00:02 +03:00
}
2020-11-07 14:01:35 +03:00
finalSummary . Projects = srv . mergeSummaryItems ( finalSummary . Projects , s . Projects )
finalSummary . Languages = srv . mergeSummaryItems ( finalSummary . Languages , s . Languages )
finalSummary . Editors = srv . mergeSummaryItems ( finalSummary . Editors , s . Editors )
finalSummary . OperatingSystems = srv . mergeSummaryItems ( finalSummary . OperatingSystems , s . OperatingSystems )
finalSummary . Machines = srv . mergeSummaryItems ( finalSummary . Machines , s . Machines )
2021-06-11 21:59:34 +03:00
finalSummary . Labels = srv . mergeSummaryItems ( finalSummary . Labels , s . Labels )
2021-04-19 21:48:07 +03:00
processed [ hash ] = true
2019-10-11 09:00:02 +03:00
}
2020-10-16 15:49:22 +03:00
finalSummary . FromTime = models . CustomTime ( minTime )
finalSummary . ToTime = models . CustomTime ( maxTime )
2019-10-11 09:00:02 +03:00
return finalSummary , nil
}
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) mergeSummaryItems ( existing [ ] * models . SummaryItem , new [ ] * models . SummaryItem ) [ ] * models . SummaryItem {
2019-10-11 09:00:02 +03:00
items := make ( map [ string ] * models . SummaryItem )
// Build map from existing
for _ , item := range existing {
items [ item . Key ] = item
}
for _ , item := range new {
if it , ok := items [ item . Key ] ; ! ok {
items [ item . Key ] = item
} else {
( * it ) . Total += item . Total
}
}
var i int
itemList := make ( [ ] * models . SummaryItem , len ( items ) )
for k , v := range items {
itemList [ i ] = & models . SummaryItem { Key : k , Total : v . Total , Type : v . Type }
i ++
}
sort . Slice ( itemList , func ( i , j int ) bool {
return itemList [ i ] . Total > itemList [ j ] . Total
} )
return itemList
}
2020-02-20 16:28:55 +03:00
2020-11-07 14:01:35 +03:00
func ( srv * SummaryService ) getMissingIntervals ( from , to time . Time , summaries [ ] * models . Summary ) [ ] * models . Interval {
if len ( summaries ) == 0 {
return [ ] * models . Interval { { from , to } }
}
intervals := make ( [ ] * models . Interval , 0 )
// Pre
if from . Before ( summaries [ 0 ] . FromTime . T ( ) ) {
intervals = append ( intervals , & models . Interval { from , summaries [ 0 ] . FromTime . T ( ) } )
}
// Between
for i := 0 ; i < len ( summaries ) - 1 ; i ++ {
t1 , t2 := summaries [ i ] . ToTime . T ( ) , summaries [ i + 1 ] . FromTime . T ( )
if t1 . Equal ( t2 ) {
continue
}
// round to end of day / start of day, assuming that summaries are always generated on a per-day basis
2021-04-12 23:57:15 +03:00
// we assume that, if summary for any time range within a day is present, no further heartbeats exist on that day before 'from' and after 'to' time of that summary
// this requires that a summary exists for every single day in a year and none is skipped, which shouldn't ever happen
2020-11-07 14:01:35 +03:00
td1 := time . Date ( t1 . Year ( ) , t1 . Month ( ) , t1 . Day ( ) + 1 , 0 , 0 , 0 , 0 , t1 . Location ( ) )
td2 := time . Date ( t2 . Year ( ) , t2 . Month ( ) , t2 . Day ( ) , 0 , 0 , 0 , 0 , t2 . Location ( ) )
2021-04-12 23:57:15 +03:00
// we always want to jump to beginning of next day
// however, if left summary ends already at midnight, we would instead jump to beginning of second-next day -> go back again
if td1 . Sub ( t1 ) == 24 * time . Hour {
td1 = td1 . Add ( - 1 * time . Hour )
}
2020-11-07 14:01:35 +03:00
// one or more day missing in between?
if td1 . Before ( td2 ) {
intervals = append ( intervals , & models . Interval { summaries [ i ] . ToTime . T ( ) , summaries [ i + 1 ] . FromTime . T ( ) } )
}
}
// Post
if to . After ( summaries [ len ( summaries ) - 1 ] . ToTime . T ( ) ) {
intervals = append ( intervals , & models . Interval { summaries [ len ( summaries ) - 1 ] . ToTime . T ( ) , to } )
}
return intervals
}
func ( srv * SummaryService ) getHash ( args ... string ) string {
2021-06-12 11:43:56 +03:00
return strings . Join ( args , "__" )
}
func ( srv * SummaryService ) invalidateUserCache ( userId string ) {
for key := range srv . cache . Items ( ) {
if strings . Contains ( key , userId ) {
srv . cache . Delete ( key )
}
2020-02-20 16:28:55 +03:00
}
}