mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
refactor: introduce concept of durations (resolve #261)
This commit is contained in:
@@ -9,33 +9,30 @@ import (
|
||||
"github.com/muety/wakapi/models"
|
||||
"github.com/muety/wakapi/repositories"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
const HeartbeatDiffThreshold = 2 * time.Minute
|
||||
|
||||
type SummaryService struct {
|
||||
config *config.Config
|
||||
cache *cache.Cache
|
||||
eventBus *hub.Hub
|
||||
repository repositories.ISummaryRepository
|
||||
heartbeatService IHeartbeatService
|
||||
durationService IDurationService
|
||||
aliasService IAliasService
|
||||
projectLabelService IProjectLabelService
|
||||
}
|
||||
|
||||
type SummaryRetriever func(f, t time.Time, u *models.User) (*models.Summary, error)
|
||||
|
||||
func NewSummaryService(summaryRepo repositories.ISummaryRepository, heartbeatService IHeartbeatService, aliasService IAliasService, projectLabelService IProjectLabelService) *SummaryService {
|
||||
func NewSummaryService(summaryRepo repositories.ISummaryRepository, durationService IDurationService, aliasService IAliasService, projectLabelService IProjectLabelService) *SummaryService {
|
||||
srv := &SummaryService{
|
||||
config: config.Get(),
|
||||
cache: cache.New(24*time.Hour, 24*time.Hour),
|
||||
eventBus: config.EventBus(),
|
||||
repository: summaryRepo,
|
||||
heartbeatService: heartbeatService,
|
||||
durationService: durationService,
|
||||
aliasService: aliasService,
|
||||
projectLabelService: projectLabelService,
|
||||
}
|
||||
@@ -99,7 +96,7 @@ func (srv *SummaryService) Retrieve(from, to time.Time, user *models.User) (*mod
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Generate missing slots (especially before and after existing summaries) from raw heartbeats
|
||||
// Generate missing slots (especially before and after existing summaries) from durations (formerly raw heartbeats)
|
||||
missingIntervals := srv.getMissingIntervals(from, to, summaries)
|
||||
for _, interval := range missingIntervals {
|
||||
if s, err := srv.Summarize(interval.Start, interval.End, user); err == nil {
|
||||
@@ -120,9 +117,9 @@ func (srv *SummaryService) Retrieve(from, to time.Time, user *models.User) (*mod
|
||||
|
||||
func (srv *SummaryService) Summarize(from, to time.Time, user *models.User) (*models.Summary, error) {
|
||||
// Initialize and fetch data
|
||||
var heartbeats models.Heartbeats
|
||||
if rawHeartbeats, err := srv.heartbeatService.GetAllWithin(from, to, user); err == nil {
|
||||
heartbeats = rawHeartbeats
|
||||
var durations models.Durations
|
||||
if result, err := srv.durationService.Get(from, to, user); err == nil {
|
||||
durations = result
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
@@ -132,10 +129,10 @@ func (srv *SummaryService) Summarize(from, to time.Time, user *models.User) (*mo
|
||||
typedAggregations := make(chan models.SummaryItemContainer)
|
||||
defer close(typedAggregations)
|
||||
for _, t := range types {
|
||||
go srv.aggregateBy(heartbeats, t, typedAggregations)
|
||||
go srv.aggregateBy(durations, t, typedAggregations)
|
||||
}
|
||||
|
||||
// Aggregate raw heartbeats by types in parallel and collect them
|
||||
// Aggregate durations (formerly raw heartbeats) by types in parallel and collect them
|
||||
var projectItems []*models.SummaryItem
|
||||
var languageItems []*models.SummaryItem
|
||||
var editorItems []*models.SummaryItem
|
||||
@@ -158,9 +155,9 @@ func (srv *SummaryService) Summarize(from, to time.Time, user *models.User) (*mo
|
||||
}
|
||||
}
|
||||
|
||||
if heartbeats.Len() > 0 {
|
||||
from = time.Time(heartbeats.First().Time)
|
||||
to = time.Time(heartbeats.Last().Time)
|
||||
if durations.Len() > 0 {
|
||||
from = time.Time(durations.First().Time)
|
||||
to = time.Time(durations.Last().Time)
|
||||
}
|
||||
|
||||
summary := &models.Summary{
|
||||
@@ -195,34 +192,15 @@ func (srv *SummaryService) Insert(summary *models.Summary) error {
|
||||
|
||||
// Private summary generation and utility methods
|
||||
|
||||
func (srv *SummaryService) aggregateBy(heartbeats []*models.Heartbeat, summaryType uint8, c chan models.SummaryItemContainer) {
|
||||
durations := make(map[string]time.Duration)
|
||||
func (srv *SummaryService) aggregateBy(durations []*models.Duration, summaryType uint8, c chan models.SummaryItemContainer) {
|
||||
mapping := make(map[string]time.Duration)
|
||||
|
||||
for i, h := range heartbeats {
|
||||
key := h.GetKey(summaryType)
|
||||
|
||||
if _, ok := durations[key]; !ok {
|
||||
durations[key] = time.Duration(0)
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
t1, t2, tdiff := h.Time.T(), heartbeats[i-1].Time.T(), time.Duration(0)
|
||||
// This is a hack. The time difference between two heartbeats from two subsequent day (e.g. 23:59:59 and 00:00:01) are ignored.
|
||||
// This is to prevent a discrepancy between summaries computed solely from heartbeats and summaries involving pre-aggregated per-day summaries.
|
||||
// For the latter, a duration is already pre-computed and information about individual heartbeats is lost, so there can be no cross-day overflow.
|
||||
// Essentially, we simply ignore such edge-case heartbeats here, which makes the eventual total duration potentially a bit shorter.
|
||||
if t1.Day() == t2.Day() {
|
||||
timePassed := t1.Sub(t2)
|
||||
tdiff = time.Duration(int64(math.Min(float64(timePassed), float64(HeartbeatDiffThreshold))))
|
||||
}
|
||||
durations[key] += tdiff
|
||||
for _, d := range durations {
|
||||
mapping[d.GetKey(summaryType)] += d.Duration
|
||||
}
|
||||
|
||||
items := make([]*models.SummaryItem, 0)
|
||||
for k, v := range durations {
|
||||
for k, v := range mapping {
|
||||
items = append(items, &models.SummaryItem{
|
||||
Key: k,
|
||||
Total: v / time.Second,
|
||||
|
Reference in New Issue
Block a user