diff --git a/config/config.go b/config/config.go index be82c86..a15e6de 100644 --- a/config/config.go +++ b/config/config.go @@ -65,16 +65,17 @@ var cFlag = flag.String("config", defaultConfigPath, "config file location") var env string type appConfig struct { - AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"` - ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"` - ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"` - ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"` - InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"` - HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"` - CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"` - AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"` - CustomLanguages map[string]string `yaml:"custom_languages"` - Colors map[string]map[string]string `yaml:"-"` + AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"` + LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"06:00,18:00" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"` + ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"` + ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"` + ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"` + InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"` + HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"` + CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"` + AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"` + CustomLanguages map[string]string `yaml:"custom_languages"` + Colors map[string]map[string]string `yaml:"-"` } type securityConfig struct { diff --git a/main.go b/main.go index 795c185..c289d36 100644 --- a/main.go +++ b/main.go @@ -59,6 +59,7 @@ var ( languageMappingRepository repositories.ILanguageMappingRepository projectLabelRepository repositories.IProjectLabelRepository summaryRepository repositories.ISummaryRepository + leaderboardRepository *repositories.LeaderboardRepository keyValueRepository repositories.IKeyValueRepository diagnosticsRepository repositories.IDiagnosticsRepository metricsRepository *repositories.MetricsRepository @@ -72,6 +73,7 @@ var ( projectLabelService services.IProjectLabelService durationService services.IDurationService summaryService services.ISummaryService + leaderboardService services.ILeaderboardService aggregationService services.IAggregationService mailService services.IMailService keyValueService services.IKeyValueService @@ -159,6 +161,7 @@ func main() { languageMappingRepository = repositories.NewLanguageMappingRepository(db) projectLabelRepository = repositories.NewProjectLabelRepository(db) summaryRepository = repositories.NewSummaryRepository(db) + leaderboardRepository = repositories.NewLeaderboardRepository(db) keyValueRepository = repositories.NewKeyValueRepository(db) diagnosticsRepository = repositories.NewDiagnosticsRepository(db) metricsRepository = repositories.NewMetricsRepository(db) @@ -172,6 +175,7 @@ func main() { heartbeatService = services.NewHeartbeatService(heartbeatRepository, languageMappingService) durationService = services.NewDurationService(heartbeatService) summaryService = services.NewSummaryService(summaryRepository, durationService, aliasService, projectLabelService) + leaderboardService = services.NewLeaderboardService(leaderboardRepository, summaryService, userService) aggregationService = services.NewAggregationService(userService, summaryService, heartbeatService) keyValueService = services.NewKeyValueService(keyValueRepository) reportService = services.NewReportService(summaryService, userService, mailService) @@ -180,6 +184,7 @@ func main() { // Schedule background tasks go aggregationService.Schedule() + go leaderboardService.ScheduleDefault() go miscService.ScheduleCountTotalTime() go reportService.Schedule() diff --git a/models/leaderboard.go b/models/leaderboard.go index 32949f2..085347b 100644 --- a/models/leaderboard.go +++ b/models/leaderboard.go @@ -6,10 +6,10 @@ type LeaderboardItem struct { ID uint `json:"-" gorm:"primary_key; size:32"` User *User `json:"-" gorm:"not null; constraint:OnUpdate:CASCADE,OnDelete:CASCADE"` UserID string `json:"user_id" gorm:"not null; index:idx_leaderboard_user"` - Rank uint `json:"rank" gorm:"not null; size:16"` + Rank uint `json:"rank" gorm:"->"` Interval string `json:"interval" gorm:"not null; size:32; index:idx_leaderboard_combined"` - By uint8 `json:"aggregated_by" gorm:"index:idx_leaderboard_combined"` + By *uint8 `json:"aggregated_by" gorm:"index:idx_leaderboard_combined"` // pointer because nullable Total time.Duration `json:"total" gorm:"not null" swaggertype:"primitive,integer"` - Key string `json:"key" gorm:"size:255"` + Key *string `json:"key" gorm:"size:255"` // pointer because nullable CreatedAt CustomTime `gorm:"type:timestamp; default:CURRENT_TIMESTAMP" swaggertype:"string" format:"date" example:"2006-01-02 15:04:05.000"` } diff --git a/repositories/leaderboard.go b/repositories/leaderboard.go new file mode 100644 index 0000000..366873a --- /dev/null +++ b/repositories/leaderboard.go @@ -0,0 +1,63 @@ +package repositories + +import ( + "github.com/muety/wakapi/models" + "github.com/muety/wakapi/utils" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type LeaderboardRepository struct { + db *gorm.DB +} + +func NewLeaderboardRepository(db *gorm.DB) *LeaderboardRepository { + return &LeaderboardRepository{db: db} +} + +func (r *LeaderboardRepository) InsertBatch(items []*models.LeaderboardItem) error { + if err := r.db. + Clauses(clause.OnConflict{DoNothing: true}). + Create(&items).Error; err != nil { + return err + } + return nil +} + +func (r *LeaderboardRepository) GetAllAggregatedByInterval(key *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) { + // TODO: distinct by (user, key) to filter out potential duplicates ? + var items []*models.LeaderboardItem + q := r.db. + Select("*, rank() over (partition by `key` order by total desc) as `rank`"). + Where("`interval` in ?", *key) + q = utils.WhereNullable(q, "`by`", by) + + if err := q.Find(&items).Error; err != nil { + return nil, err + } + return items, nil +} + +func (r *LeaderboardRepository) GetAggregatedByUserAndInterval(userId string, key *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) { + var items []*models.LeaderboardItem + q := r.db. + Select("*, rank() over (partition by `key` order by total desc) as `rank`"). + Where("user_id = ?", userId). + Where("`interval` in ?", *key) + q = utils.WhereNullable(q, "`by`", by) + + if err := q.Find(&items).Error; err != nil { + return nil, err + } + return items, nil +} + +func (r *LeaderboardRepository) DeleteByUserAndInterval(userId string, key *models.IntervalKey) error { + if err := r.db. + Where("user_id = ?", userId). + Where("`interval` in ?", *key). + Delete(models.LeaderboardItem{}).Error; err != nil { + return err + } + return nil +} diff --git a/repositories/repositories.go b/repositories/repositories.go index 7a4af1f..e8361b2 100644 --- a/repositories/repositories.go +++ b/repositories/repositories.go @@ -76,6 +76,7 @@ type IUserRepository interface { GetByResetToken(string) (*models.User, error) GetAll() ([]*models.User, error) GetAllByReports(bool) ([]*models.User, error) + GetAllByLeaderboard(bool) ([]*models.User, error) GetByLoggedInAfter(time.Time) ([]*models.User, error) GetByLastActiveAfter(time.Time) ([]*models.User, error) Count() (int64, error) @@ -84,3 +85,10 @@ type IUserRepository interface { UpdateField(*models.User, string, interface{}) (*models.User, error) Delete(*models.User) error } + +type ILeaderboardRepository interface { + InsertBatch([]*models.LeaderboardItem) error + DeleteByUserAndInterval(string, *models.IntervalKey) error + GetAllAggregatedByInterval(*models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error) + GetAggregatedByUserAndInterval(string, *models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error) +} diff --git a/repositories/user.go b/repositories/user.go index 4b1a3d0..cca7b76 100644 --- a/repositories/user.go +++ b/repositories/user.go @@ -85,6 +85,14 @@ func (r *UserRepository) GetAllByReports(reportsEnabled bool) ([]*models.User, e return users, nil } +func (r *UserRepository) GetAllByLeaderboard(leaderboardEnabled bool) ([]*models.User, error) { + var users []*models.User + if err := r.db.Where(&models.User{PublicLeaderboard: leaderboardEnabled}).Find(&users).Error; err != nil { + return nil, err + } + return users, nil +} + func (r *UserRepository) GetByLoggedInAfter(t time.Time) ([]*models.User, error) { var users []*models.User if err := r.db. diff --git a/services/leaderboard.go b/services/leaderboard.go new file mode 100644 index 0000000..db015cd --- /dev/null +++ b/services/leaderboard.go @@ -0,0 +1,149 @@ +package services + +import ( + "github.com/emvi/logbuch" + "github.com/leandro-lugaresi/hub" + "github.com/muety/wakapi/config" + "github.com/muety/wakapi/models" + "github.com/muety/wakapi/repositories" + "github.com/muety/wakapi/utils" + "github.com/patrickmn/go-cache" + "time" +) + +type LeaderboardService struct { + config *config.Config + cache *cache.Cache + eventBus *hub.Hub + repository repositories.ILeaderboardRepository + summaryService ISummaryService + userService IUserService +} + +func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, summaryService ISummaryService, userService IUserService) *LeaderboardService { + return &LeaderboardService{ + config: config.Get(), + cache: cache.New(24*time.Hour, 24*time.Hour), + eventBus: config.EventBus(), + repository: leaderboardRepo, + summaryService: summaryService, + userService: userService, + } +} + +func (srv *LeaderboardService) ScheduleDefault() { + runAllUsers := func(interval *models.IntervalKey, by []uint8) { + users, err := srv.userService.GetAllByLeaderboard(true) + if err != nil { + config.Log().Error("failed to get users for leaderboard generation - %v", err) + return + } + + srv.Run(users, interval, by) + } + + runAllUsers(models.IntervalPast7Days, []uint8{models.SummaryLanguage}) + + //s := gocron.NewScheduler(time.Local) + //s.Every(1).Day().At(srv.config.App.LeaderboardGenerationTime).Do(runAllUsers, models.IntervalPast7Days, []uint8{models.SummaryLanguage}) + //s.StartBlocking() +} + +func (srv *LeaderboardService) Run(users []*models.User, interval *models.IntervalKey, by []uint8) error { + logbuch.Info("generating leaderboard (%s) for %d users (%d aggregations)", (*interval)[0], len(users), len(by)) + + for _, user := range users { + if err := srv.repository.DeleteByUserAndInterval(user.ID, interval); err != nil { + config.Log().Error("failed to delete leaderboard items for user %s (interval %s) - %v", user.ID, (*interval)[0], err) + continue + } + + item, err := srv.GenerateByUser(user, interval) + if err != nil { + config.Log().Error("failed to generate general leaderboard for user %s - %v", user.ID, err) + continue + } + + if err := srv.repository.InsertBatch([]*models.LeaderboardItem{item}); err != nil { + config.Log().Error("failed to persist general leaderboard for user %s - %v", user.ID, err) + continue + } + + for _, by := range by { + items, err := srv.GenerateAggregatedByUser(user, interval, by) + if err != nil { + config.Log().Error("failed to generate aggregated (by %s) leaderboard for user %s - %v", models.GetEntityColumn(by), user.ID, err) + continue + } + + if len(items) == 0 { + continue + } + + if err := srv.repository.InsertBatch(items); err != nil { + config.Log().Error("failed to persist aggregated (by %s) leaderboard for user %s - %v", models.GetEntityColumn(by), user.ID, err) + continue + } + } + } + + logbuch.Info("finished leaderboard generation") + + return nil +} + +func (srv *LeaderboardService) GetByInterval(interval *models.IntervalKey) ([]*models.LeaderboardItem, error) { + return srv.GetAggregatedByInterval(interval, nil) +} + +func (srv *LeaderboardService) GetAggregatedByInterval(interval *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) { + return srv.repository.GetAllAggregatedByInterval(interval, by) +} + +func (srv *LeaderboardService) GenerateByUser(user *models.User, interval *models.IntervalKey) (*models.LeaderboardItem, error) { + err, from, to := utils.ResolveIntervalTZ(interval, user.TZ()) + if err != nil { + return nil, err + } + + summary, err := srv.summaryService.Aliased(from, to, user, srv.summaryService.Retrieve, nil, false) + if err != nil { + return nil, err + } + + return &models.LeaderboardItem{ + User: user, + UserID: user.ID, + Interval: (*interval)[0], + Total: summary.TotalTime(), + }, nil +} + +func (srv *LeaderboardService) GenerateAggregatedByUser(user *models.User, interval *models.IntervalKey, by uint8) ([]*models.LeaderboardItem, error) { + err, from, to := utils.ResolveIntervalTZ(interval, user.TZ()) + if err != nil { + return nil, err + } + + summary, err := srv.summaryService.Aliased(from, to, user, srv.summaryService.Retrieve, nil, false) + if err != nil { + return nil, err + } + + summaryItems := *summary.ItemsByType(by) + items := make([]*models.LeaderboardItem, summaryItems.Len()) + + for i := 0; i < summaryItems.Len(); i++ { + key := summaryItems[i].Key + items[i] = &models.LeaderboardItem{ + User: user, + UserID: user.ID, + Interval: (*interval)[0], + By: &by, + Total: summary.TotalTimeByKey(by, key), + Key: &key, + } + } + + return items, nil +} diff --git a/services/services.go b/services/services.go index b37ad44..75320a8 100644 --- a/services/services.go +++ b/services/services.go @@ -97,6 +97,15 @@ type IReportService interface { Run(*models.User, time.Duration) error } +type ILeaderboardService interface { + ScheduleDefault() + Run([]*models.User, *models.IntervalKey, []uint8) error + GetByInterval(*models.IntervalKey) ([]*models.LeaderboardItem, error) + GetAggregatedByInterval(*models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error) + GenerateByUser(*models.User, *models.IntervalKey) (*models.LeaderboardItem, error) + GenerateAggregatedByUser(*models.User, *models.IntervalKey, uint8) ([]*models.LeaderboardItem, error) +} + type IUserService interface { GetUserById(string) (*models.User, error) GetUserByKey(string) (*models.User, error) @@ -104,6 +113,7 @@ type IUserService interface { GetUserByResetToken(string) (*models.User, error) GetAll() ([]*models.User, error) GetAllByReports(bool) ([]*models.User, error) + GetAllByLeaderboard(bool) ([]*models.User, error) GetActive(bool) ([]*models.User, error) Count() (int64, error) CreateOrGet(*models.Signup, bool) (*models.User, bool, error) diff --git a/services/user.go b/services/user.go index 3e1483b..c1ce616 100644 --- a/services/user.go +++ b/services/user.go @@ -100,6 +100,10 @@ func (srv *UserService) GetAllByReports(reportsEnabled bool) ([]*models.User, er return srv.repository.GetAllByReports(reportsEnabled) } +func (srv *UserService) GetAllByLeaderboard(leaderboardEnabled bool) ([]*models.User, error) { + return srv.repository.GetAllByLeaderboard(leaderboardEnabled) +} + func (srv *UserService) GetActive(exact bool) ([]*models.User, error) { minDate := time.Now().AddDate(0, 0, -1*srv.config.App.InactiveDays) if !exact { diff --git a/utils/db.go b/utils/db.go index e8469a4..beaa009 100644 --- a/utils/db.go +++ b/utils/db.go @@ -1,8 +1,10 @@ package utils import ( + "fmt" "github.com/emvi/logbuch" "gorm.io/gorm" + "reflect" ) func IsCleanDB(db *gorm.DB) bool { @@ -30,3 +32,10 @@ func HasConstraints(db *gorm.DB) bool { logbuch.Warn("HasForeignKeyConstraints is not yet implemented for dialect '%s'", db.Dialector.Name()) return false } + +func WhereNullable(query *gorm.DB, col string, val any) *gorm.DB { + if val == nil || reflect.ValueOf(val).IsNil() { + return query.Where(fmt.Sprintf("%s is null", col)) + } + return query.Where(fmt.Sprintf("%s = ?", col), val) +}