feat: leaderboard generation and querying

This commit is contained in:
Ferdinand Mütsch 2022-10-02 00:01:39 +02:00
parent beffe71ea6
commit 13a3d9f03a
10 changed files with 270 additions and 13 deletions

View File

@ -65,16 +65,17 @@ var cFlag = flag.String("config", defaultConfigPath, "config file location")
var env string
type appConfig struct {
AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"`
ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"`
ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"`
InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"`
HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"`
CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"`
AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"`
CustomLanguages map[string]string `yaml:"custom_languages"`
Colors map[string]map[string]string `yaml:"-"`
AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"`
LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"06:00,18:00" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"`
ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"`
ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"`
InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"`
HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"`
CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"`
AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"`
CustomLanguages map[string]string `yaml:"custom_languages"`
Colors map[string]map[string]string `yaml:"-"`
}
type securityConfig struct {

View File

@ -59,6 +59,7 @@ var (
languageMappingRepository repositories.ILanguageMappingRepository
projectLabelRepository repositories.IProjectLabelRepository
summaryRepository repositories.ISummaryRepository
leaderboardRepository *repositories.LeaderboardRepository
keyValueRepository repositories.IKeyValueRepository
diagnosticsRepository repositories.IDiagnosticsRepository
metricsRepository *repositories.MetricsRepository
@ -72,6 +73,7 @@ var (
projectLabelService services.IProjectLabelService
durationService services.IDurationService
summaryService services.ISummaryService
leaderboardService services.ILeaderboardService
aggregationService services.IAggregationService
mailService services.IMailService
keyValueService services.IKeyValueService
@ -159,6 +161,7 @@ func main() {
languageMappingRepository = repositories.NewLanguageMappingRepository(db)
projectLabelRepository = repositories.NewProjectLabelRepository(db)
summaryRepository = repositories.NewSummaryRepository(db)
leaderboardRepository = repositories.NewLeaderboardRepository(db)
keyValueRepository = repositories.NewKeyValueRepository(db)
diagnosticsRepository = repositories.NewDiagnosticsRepository(db)
metricsRepository = repositories.NewMetricsRepository(db)
@ -172,6 +175,7 @@ func main() {
heartbeatService = services.NewHeartbeatService(heartbeatRepository, languageMappingService)
durationService = services.NewDurationService(heartbeatService)
summaryService = services.NewSummaryService(summaryRepository, durationService, aliasService, projectLabelService)
leaderboardService = services.NewLeaderboardService(leaderboardRepository, summaryService, userService)
aggregationService = services.NewAggregationService(userService, summaryService, heartbeatService)
keyValueService = services.NewKeyValueService(keyValueRepository)
reportService = services.NewReportService(summaryService, userService, mailService)
@ -180,6 +184,7 @@ func main() {
// Schedule background tasks
go aggregationService.Schedule()
go leaderboardService.ScheduleDefault()
go miscService.ScheduleCountTotalTime()
go reportService.Schedule()

View File

@ -6,10 +6,10 @@ type LeaderboardItem struct {
ID uint `json:"-" gorm:"primary_key; size:32"`
User *User `json:"-" gorm:"not null; constraint:OnUpdate:CASCADE,OnDelete:CASCADE"`
UserID string `json:"user_id" gorm:"not null; index:idx_leaderboard_user"`
Rank uint `json:"rank" gorm:"not null; size:16"`
Rank uint `json:"rank" gorm:"->"`
Interval string `json:"interval" gorm:"not null; size:32; index:idx_leaderboard_combined"`
By uint8 `json:"aggregated_by" gorm:"index:idx_leaderboard_combined"`
By *uint8 `json:"aggregated_by" gorm:"index:idx_leaderboard_combined"` // pointer because nullable
Total time.Duration `json:"total" gorm:"not null" swaggertype:"primitive,integer"`
Key string `json:"key" gorm:"size:255"`
Key *string `json:"key" gorm:"size:255"` // pointer because nullable
CreatedAt CustomTime `gorm:"type:timestamp; default:CURRENT_TIMESTAMP" swaggertype:"string" format:"date" example:"2006-01-02 15:04:05.000"`
}

View File

@ -0,0 +1,63 @@
package repositories
import (
"github.com/muety/wakapi/models"
"github.com/muety/wakapi/utils"
"gorm.io/gorm"
"gorm.io/gorm/clause"
)
type LeaderboardRepository struct {
db *gorm.DB
}
func NewLeaderboardRepository(db *gorm.DB) *LeaderboardRepository {
return &LeaderboardRepository{db: db}
}
func (r *LeaderboardRepository) InsertBatch(items []*models.LeaderboardItem) error {
if err := r.db.
Clauses(clause.OnConflict{DoNothing: true}).
Create(&items).Error; err != nil {
return err
}
return nil
}
func (r *LeaderboardRepository) GetAllAggregatedByInterval(key *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) {
// TODO: distinct by (user, key) to filter out potential duplicates ?
var items []*models.LeaderboardItem
q := r.db.
Select("*, rank() over (partition by `key` order by total desc) as `rank`").
Where("`interval` in ?", *key)
q = utils.WhereNullable(q, "`by`", by)
if err := q.Find(&items).Error; err != nil {
return nil, err
}
return items, nil
}
func (r *LeaderboardRepository) GetAggregatedByUserAndInterval(userId string, key *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) {
var items []*models.LeaderboardItem
q := r.db.
Select("*, rank() over (partition by `key` order by total desc) as `rank`").
Where("user_id = ?", userId).
Where("`interval` in ?", *key)
q = utils.WhereNullable(q, "`by`", by)
if err := q.Find(&items).Error; err != nil {
return nil, err
}
return items, nil
}
func (r *LeaderboardRepository) DeleteByUserAndInterval(userId string, key *models.IntervalKey) error {
if err := r.db.
Where("user_id = ?", userId).
Where("`interval` in ?", *key).
Delete(models.LeaderboardItem{}).Error; err != nil {
return err
}
return nil
}

View File

@ -76,6 +76,7 @@ type IUserRepository interface {
GetByResetToken(string) (*models.User, error)
GetAll() ([]*models.User, error)
GetAllByReports(bool) ([]*models.User, error)
GetAllByLeaderboard(bool) ([]*models.User, error)
GetByLoggedInAfter(time.Time) ([]*models.User, error)
GetByLastActiveAfter(time.Time) ([]*models.User, error)
Count() (int64, error)
@ -84,3 +85,10 @@ type IUserRepository interface {
UpdateField(*models.User, string, interface{}) (*models.User, error)
Delete(*models.User) error
}
type ILeaderboardRepository interface {
InsertBatch([]*models.LeaderboardItem) error
DeleteByUserAndInterval(string, *models.IntervalKey) error
GetAllAggregatedByInterval(*models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error)
GetAggregatedByUserAndInterval(string, *models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error)
}

View File

@ -85,6 +85,14 @@ func (r *UserRepository) GetAllByReports(reportsEnabled bool) ([]*models.User, e
return users, nil
}
func (r *UserRepository) GetAllByLeaderboard(leaderboardEnabled bool) ([]*models.User, error) {
var users []*models.User
if err := r.db.Where(&models.User{PublicLeaderboard: leaderboardEnabled}).Find(&users).Error; err != nil {
return nil, err
}
return users, nil
}
func (r *UserRepository) GetByLoggedInAfter(t time.Time) ([]*models.User, error) {
var users []*models.User
if err := r.db.

149
services/leaderboard.go Normal file
View File

@ -0,0 +1,149 @@
package services
import (
"github.com/emvi/logbuch"
"github.com/leandro-lugaresi/hub"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"github.com/muety/wakapi/repositories"
"github.com/muety/wakapi/utils"
"github.com/patrickmn/go-cache"
"time"
)
type LeaderboardService struct {
config *config.Config
cache *cache.Cache
eventBus *hub.Hub
repository repositories.ILeaderboardRepository
summaryService ISummaryService
userService IUserService
}
func NewLeaderboardService(leaderboardRepo repositories.ILeaderboardRepository, summaryService ISummaryService, userService IUserService) *LeaderboardService {
return &LeaderboardService{
config: config.Get(),
cache: cache.New(24*time.Hour, 24*time.Hour),
eventBus: config.EventBus(),
repository: leaderboardRepo,
summaryService: summaryService,
userService: userService,
}
}
func (srv *LeaderboardService) ScheduleDefault() {
runAllUsers := func(interval *models.IntervalKey, by []uint8) {
users, err := srv.userService.GetAllByLeaderboard(true)
if err != nil {
config.Log().Error("failed to get users for leaderboard generation - %v", err)
return
}
srv.Run(users, interval, by)
}
runAllUsers(models.IntervalPast7Days, []uint8{models.SummaryLanguage})
//s := gocron.NewScheduler(time.Local)
//s.Every(1).Day().At(srv.config.App.LeaderboardGenerationTime).Do(runAllUsers, models.IntervalPast7Days, []uint8{models.SummaryLanguage})
//s.StartBlocking()
}
func (srv *LeaderboardService) Run(users []*models.User, interval *models.IntervalKey, by []uint8) error {
logbuch.Info("generating leaderboard (%s) for %d users (%d aggregations)", (*interval)[0], len(users), len(by))
for _, user := range users {
if err := srv.repository.DeleteByUserAndInterval(user.ID, interval); err != nil {
config.Log().Error("failed to delete leaderboard items for user %s (interval %s) - %v", user.ID, (*interval)[0], err)
continue
}
item, err := srv.GenerateByUser(user, interval)
if err != nil {
config.Log().Error("failed to generate general leaderboard for user %s - %v", user.ID, err)
continue
}
if err := srv.repository.InsertBatch([]*models.LeaderboardItem{item}); err != nil {
config.Log().Error("failed to persist general leaderboard for user %s - %v", user.ID, err)
continue
}
for _, by := range by {
items, err := srv.GenerateAggregatedByUser(user, interval, by)
if err != nil {
config.Log().Error("failed to generate aggregated (by %s) leaderboard for user %s - %v", models.GetEntityColumn(by), user.ID, err)
continue
}
if len(items) == 0 {
continue
}
if err := srv.repository.InsertBatch(items); err != nil {
config.Log().Error("failed to persist aggregated (by %s) leaderboard for user %s - %v", models.GetEntityColumn(by), user.ID, err)
continue
}
}
}
logbuch.Info("finished leaderboard generation")
return nil
}
func (srv *LeaderboardService) GetByInterval(interval *models.IntervalKey) ([]*models.LeaderboardItem, error) {
return srv.GetAggregatedByInterval(interval, nil)
}
func (srv *LeaderboardService) GetAggregatedByInterval(interval *models.IntervalKey, by *uint8) ([]*models.LeaderboardItem, error) {
return srv.repository.GetAllAggregatedByInterval(interval, by)
}
func (srv *LeaderboardService) GenerateByUser(user *models.User, interval *models.IntervalKey) (*models.LeaderboardItem, error) {
err, from, to := utils.ResolveIntervalTZ(interval, user.TZ())
if err != nil {
return nil, err
}
summary, err := srv.summaryService.Aliased(from, to, user, srv.summaryService.Retrieve, nil, false)
if err != nil {
return nil, err
}
return &models.LeaderboardItem{
User: user,
UserID: user.ID,
Interval: (*interval)[0],
Total: summary.TotalTime(),
}, nil
}
func (srv *LeaderboardService) GenerateAggregatedByUser(user *models.User, interval *models.IntervalKey, by uint8) ([]*models.LeaderboardItem, error) {
err, from, to := utils.ResolveIntervalTZ(interval, user.TZ())
if err != nil {
return nil, err
}
summary, err := srv.summaryService.Aliased(from, to, user, srv.summaryService.Retrieve, nil, false)
if err != nil {
return nil, err
}
summaryItems := *summary.ItemsByType(by)
items := make([]*models.LeaderboardItem, summaryItems.Len())
for i := 0; i < summaryItems.Len(); i++ {
key := summaryItems[i].Key
items[i] = &models.LeaderboardItem{
User: user,
UserID: user.ID,
Interval: (*interval)[0],
By: &by,
Total: summary.TotalTimeByKey(by, key),
Key: &key,
}
}
return items, nil
}

View File

@ -97,6 +97,15 @@ type IReportService interface {
Run(*models.User, time.Duration) error
}
type ILeaderboardService interface {
ScheduleDefault()
Run([]*models.User, *models.IntervalKey, []uint8) error
GetByInterval(*models.IntervalKey) ([]*models.LeaderboardItem, error)
GetAggregatedByInterval(*models.IntervalKey, *uint8) ([]*models.LeaderboardItem, error)
GenerateByUser(*models.User, *models.IntervalKey) (*models.LeaderboardItem, error)
GenerateAggregatedByUser(*models.User, *models.IntervalKey, uint8) ([]*models.LeaderboardItem, error)
}
type IUserService interface {
GetUserById(string) (*models.User, error)
GetUserByKey(string) (*models.User, error)
@ -104,6 +113,7 @@ type IUserService interface {
GetUserByResetToken(string) (*models.User, error)
GetAll() ([]*models.User, error)
GetAllByReports(bool) ([]*models.User, error)
GetAllByLeaderboard(bool) ([]*models.User, error)
GetActive(bool) ([]*models.User, error)
Count() (int64, error)
CreateOrGet(*models.Signup, bool) (*models.User, bool, error)

View File

@ -100,6 +100,10 @@ func (srv *UserService) GetAllByReports(reportsEnabled bool) ([]*models.User, er
return srv.repository.GetAllByReports(reportsEnabled)
}
func (srv *UserService) GetAllByLeaderboard(leaderboardEnabled bool) ([]*models.User, error) {
return srv.repository.GetAllByLeaderboard(leaderboardEnabled)
}
func (srv *UserService) GetActive(exact bool) ([]*models.User, error) {
minDate := time.Now().AddDate(0, 0, -1*srv.config.App.InactiveDays)
if !exact {

View File

@ -1,8 +1,10 @@
package utils
import (
"fmt"
"github.com/emvi/logbuch"
"gorm.io/gorm"
"reflect"
)
func IsCleanDB(db *gorm.DB) bool {
@ -30,3 +32,10 @@ func HasConstraints(db *gorm.DB) bool {
logbuch.Warn("HasForeignKeyConstraints is not yet implemented for dialect '%s'", db.Dialector.Name())
return false
}
func WhereNullable(query *gorm.DB, col string, val any) *gorm.DB {
if val == nil || reflect.ValueOf(val).IsNil() {
return query.Where(fmt.Sprintf("%s is null", col))
}
return query.Where(fmt.Sprintf("%s = ?", col), val)
}