1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

refactor: include generics based utility lib and refactor some parts accordingly [ci-skip]

This commit is contained in:
Ferdinand Mütsch
2022-03-20 16:27:21 +01:00
parent a675417ab9
commit 8fc0d78f64
16 changed files with 543 additions and 797 deletions

View File

@@ -2,6 +2,7 @@ package services
import (
"errors"
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
"github.com/emvi/logbuch"
"github.com/muety/wakapi/config"
"runtime"
@@ -23,7 +24,7 @@ type AggregationService struct {
userService IUserService
summaryService ISummaryService
heartbeatService IHeartbeatService
inProgress map[string]bool
inProgress datastructure.Set[string]
}
func NewAggregationService(userService IUserService, summaryService ISummaryService, heartbeatService IHeartbeatService) *AggregationService {
@@ -32,7 +33,7 @@ func NewAggregationService(userService IUserService, summaryService ISummaryServ
userService: userService,
summaryService: summaryService,
heartbeatService: heartbeatService,
inProgress: map[string]bool{},
inProgress: datastructure.NewSet[string](),
}
}
@@ -45,16 +46,16 @@ type AggregationJob struct {
// Schedule a job to (re-)generate summaries every day shortly after midnight
func (srv *AggregationService) Schedule() {
// Run once initially
if err := srv.Run(nil); err != nil {
if err := srv.Run(datastructure.NewSet[string]()); err != nil {
logbuch.Fatal("failed to run AggregationJob: %v", err)
}
s := gocron.NewScheduler(time.Local)
s.Every(1).Day().At(srv.config.App.AggregationTime).Do(srv.Run, map[string]bool{})
s.Every(1).Day().At(srv.config.App.AggregationTime).Do(srv.Run, datastructure.NewSet[string]())
s.StartBlocking()
}
func (srv *AggregationService) Run(userIds map[string]bool) error {
func (srv *AggregationService) Run(userIds datastructure.Set[string]) error {
if err := srv.lockUsers(userIds); err != nil {
return err
}
@@ -100,24 +101,9 @@ func (srv *AggregationService) persistWorker(summaries <-chan *models.Summary) {
}
}
func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[string]bool) error {
func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds datastructure.Set[string]) error {
logbuch.Info("generating summaries")
var users []*models.User
if allUsers, err := srv.userService.GetAll(); err != nil {
config.Log().Error(err.Error())
return err
} else if userIds != nil && len(userIds) > 0 {
users = make([]*models.User, 0)
for _, u := range allUsers {
if yes, ok := userIds[u.ID]; yes && ok {
users = append(users, u)
}
}
} else {
users = allUsers
}
// Get a map from user ids to the time of their latest summary or nil if none exists yet
lastUserSummaryTimes, err := srv.summaryService.GetLatestByUser()
if err != nil {
@@ -140,6 +126,10 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[
// Generate summary aggregation jobs
for _, e := range lastUserSummaryTimes {
if userIds != nil && !userIds.IsEmpty() && !userIds.Contain(e.User) {
continue
}
if e.Time.Valid() {
// Case 1: User has aggregated summaries already
// -> Spawn jobs to create summaries from their latest aggregation to now
@@ -156,25 +146,23 @@ func (srv *AggregationService) trigger(jobs chan<- *AggregationJob, userIds map[
return nil
}
func (srv *AggregationService) lockUsers(userIds map[string]bool) error {
func (srv *AggregationService) lockUsers(userIds datastructure.Set[string]) error {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
if _, ok := srv.inProgress[uid]; ok {
if srv.inProgress.Contain(uid) {
return errors.New("aggregation already in progress for at least of the request users")
}
}
for uid := range userIds {
srv.inProgress[uid] = true
}
srv.inProgress = srv.inProgress.Union(userIds)
return nil
}
func (srv *AggregationService) unlockUsers(userIds map[string]bool) {
func (srv *AggregationService) unlockUsers(userIds datastructure.Set[string]) {
aggregationLock.Lock()
defer aggregationLock.Unlock()
for uid := range userIds {
delete(srv.inProgress, uid)
srv.inProgress.Delete(uid)
}
}