wakapi/services/aggregation.go

170 lines
4.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package services
import (
"github.com/muety/wakapi/config"
"log"
"runtime"
"time"
"github.com/jasonlvhit/gocron"
"github.com/muety/wakapi/models"
)
const (
aggregateIntervalDays int = 1
)
type AggregationService struct {
config *config.Config
userService *UserService
summaryService *SummaryService
heartbeatService *HeartbeatService
}
func NewAggregationService(userService *UserService, summaryService *SummaryService, heartbeatService *HeartbeatService) *AggregationService {
return &AggregationService{
config: config.Get(),
userService: userService,
summaryService: summaryService,
heartbeatService: heartbeatService,
}
}
type AggregationJob struct {
UserID string
From time.Time
To time.Time
}
// Schedule a job to (re-)generate summaries every day shortly after midnight
func (srv *AggregationService) Schedule() {
// Run once initially
if err := srv.Run(nil); err != nil {
log.Fatalf("failed to run aggregation jobs: %v\n", err)
}
gocron.Every(1).Day().At(srv.config.App.AggregationTime).Do(srv.Run, nil)
<-gocron.Start()
}
func (srv *AggregationService) Run(userIds map[string]bool) error {
jobs := make(chan *AggregationJob)
summaries := make(chan *models.Summary)
for i := 0; i < runtime.NumCPU(); i++ {
go srv.summaryWorker(jobs, summaries)
}
for i := 0; i < int(srv.config.Db.MaxConn); i++ {
go srv.persistWorker(summaries)
}
return srv.trigger(jobs, userIds)
}
func (srv *AggregationService) summaryWorker(jobs <-chan *AggregationJob, summaries chan<- *models.Summary) {
for job := range jobs {
if summary, err := srv.summaryService.Construct(job.From, job.To, &models.User{ID: job.UserID}, true); err != nil {
log.Printf("Failed to generate summary (%v, %v, %s) %v.\n", job.From, job.To, job.UserID, err)
} else {
log.Printf("Successfully generated summary (%v, %v, %s).\n", job.From, job.To, job.UserID)
summaries <- summary
}
}
}
func (srv *AggregationService) persistWorker(summaries <-chan *models.Summary) {
for summary := range summaries {
if err := srv.summaryService.Insert(summary); err != nil {
log.Printf("Failed to save summary (%v, %v, %s) %v.\n", summary.UserID, summary.FromTime, summary.ToTime, err)
}
}
}
func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[string]bool) error {
log.Println("Generating summaries.")
var users []*models.User
if allUsers, err := srv.userService.GetAll(); err != nil {
log.Println(err)
return err
} else if userIds != nil && len(userIds) > 0 {
users = make([]*models.User, len(userIds))
for i, u := range allUsers {
if yes, ok := userIds[u.ID]; yes && ok {
users[i] = u
}
}
} else {
users = allUsers
}
latestSummaries, err := srv.summaryService.GetLatestByUser()
if err != nil {
log.Println(err)
return err
}
userSummaryTimes := make(map[string]time.Time)
for _, s := range latestSummaries {
userSummaryTimes[s.UserID] = s.ToTime.T()
}
missingUserIDs := make([]string, 0)
for _, u := range users {
if _, ok := userSummaryTimes[u.ID]; !ok {
missingUserIDs = append(missingUserIDs, u.ID)
}
}
firstHeartbeats, err := srv.heartbeatService.GetFirstUserHeartbeats(missingUserIDs)
if err != nil {
log.Println(err)
return err
}
for id, t := range userSummaryTimes {
generateUserJobs(id, t, jobs)
}
for _, h := range firstHeartbeats {
generateUserJobs(h.UserID, time.Time(h.Time), jobs)
}
return nil
}
func generateUserJobs(userId string, lastAggregation time.Time, jobs chan<- *AggregationJob) {
var from, to time.Time
end := getStartOfToday().Add(-1 * time.Second)
if lastAggregation.Hour() == 0 {
from = lastAggregation
} else {
from = time.Date(
lastAggregation.Year(),
lastAggregation.Month(),
lastAggregation.Day()+aggregateIntervalDays,
0, 0, 0, 0,
lastAggregation.Location(),
)
}
for from.Before(end) && to.Before(end) {
to = time.Date(
from.Year(),
from.Month(),
from.Day()+aggregateIntervalDays,
0, 0, 0, 0,
from.Location(),
)
jobs <- &AggregationJob{userId, from, to}
from = to
}
}
func getStartOfToday() time.Time {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 1, now.Location())
}