diff --git a/config.default.yml b/config.default.yml index b3b48fe..70045c8 100644 --- a/config.default.yml +++ b/config.default.yml @@ -15,9 +15,11 @@ app: aggregation_time: '0 15 2 * * *' # time at which to run daily aggregation batch jobs leaderboard_generation_time: '0 0 6 * * *,0 0 18 * * *' # times at which to re-calculate the leaderboard report_time_weekly: '0 0 18 * * 5' # time at which to fan out weekly reports (extended cron) + data_cleanup_time: '0 0 6 * * 7' # time at which to run old data cleanup (if enabled through data_retention_months) inactive_days: 7 # time of previous days within a user must have logged in to be considered active import_batch_size: 50 # maximum number of heartbeats to insert into the database within one transaction heartbeat_max_age: '4320h' # maximum acceptable age of a heartbeat (see https://pkg.go.dev/time#ParseDuration) + data_retention_months: -1 # maximum retention period on months for user data (heartbeats) (-1 for infinity) custom_languages: vue: Vue jsx: JSX diff --git a/config/config.go b/config/config.go index 4d18c64..adc6f22 100644 --- a/config/config.go +++ b/config/config.go @@ -68,14 +68,16 @@ var cFlag = flag.String("config", defaultConfigPath, "config file location") var env string type appConfig struct { - AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"` - LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"06:00;18:00" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"` - ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"` + AggregationTime string `yaml:"aggregation_time" default:"0 15 2 * * *" env:"WAKAPI_AGGREGATION_TIME"` + LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"0 0 6 * * *,0 0 18 * * *" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"` + ReportTimeWeekly string `yaml:"report_time_weekly" default:"0 0 18 * * 5" env:"WAKAPI_REPORT_TIME_WEEKLY"` + DataCleanupTime string `yaml:"data_cleanup_time" default:"0 0 6 * * 7" env:"WAKAPI_DATA_CLEANUP_TIME"` ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"` ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"` InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"` HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"` CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"` + DataRetentionMonths int `yaml:"data_retention_months" default:"-1" env:"WAKAPI_DATA_RETENTION_MONTHS"` AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"` CustomLanguages map[string]string `yaml:"custom_languages"` Colors map[string]map[string]string `yaml:"-"` @@ -442,6 +444,12 @@ func Load(version string) *Config { initSentry(config.Sentry, config.IsDev()) } + if config.App.DataRetentionMonths <= 0 { + logbuch.Info("disabling data retention policy, keeping data forever") + } else { + logbuch.Info("data retention policy set to keep data for %d months at max", config.App.DataRetentionMonths) + } + // some validation checks if config.Server.ListenIpV4 == "" && config.Server.ListenIpV6 == "" && config.Server.ListenSocket == "" { logbuch.Fatal("either of listen_ipv4 or listen_ipv6 or listen_socket must be set") diff --git a/config/jobqueue.go b/config/jobqueue.go index d6c2f94..55ae422 100644 --- a/config/jobqueue.go +++ b/config/jobqueue.go @@ -12,10 +12,11 @@ var jobQueues map[string]*artifex.Dispatcher var jobCounts map[string]int const ( - QueueDefault = "wakapi.default" - QueueProcessing = "wakapi.processing" - QueueReports = "wakapi.reports" - QueueImports = "wakapi.imports" + QueueDefault = "wakapi.default" + QueueProcessing = "wakapi.processing" + QueueReports = "wakapi.reports" + QueueImports = "wakapi.imports" + QueueHousekeeping = "wakapi.housekeeping" ) type JobQueueMetrics struct { @@ -28,9 +29,10 @@ func init() { jobQueues = make(map[string]*artifex.Dispatcher) InitQueue(QueueDefault, 1) - InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0))) + InitQueue(QueueProcessing, halfCPUs()) InitQueue(QueueReports, 1) InitQueue(QueueImports, 1) + InitQueue(QueueHousekeeping, halfCPUs()) } func InitQueue(name string, workers int) error { @@ -71,3 +73,11 @@ func CloseQueues() { q.Stop() } } + +func allCPUs() int { + return runtime.NumCPU() +} + +func halfCPUs() int { + return int(math.Ceil(float64(runtime.NumCPU()) / 2.0)) +} diff --git a/main.go b/main.go index 237a135..883d3cb 100644 --- a/main.go +++ b/main.go @@ -81,6 +81,7 @@ var ( keyValueService services.IKeyValueService reportService services.IReportService diagnosticsService services.IDiagnosticsService + housekeepingService services.IHousekeepingService miscService services.IMiscService ) @@ -182,12 +183,14 @@ func main() { keyValueService = services.NewKeyValueService(keyValueRepository) reportService = services.NewReportService(summaryService, userService, mailService) diagnosticsService = services.NewDiagnosticsService(diagnosticsRepository) + housekeepingService = services.NewHousekeepingService(userService, heartbeatService, summaryService) miscService = services.NewMiscService(userService, summaryService, keyValueService) // Schedule background tasks go aggregationService.Schedule() go leaderboardService.Schedule() go reportService.Schedule() + go housekeepingService.Schedule() go miscService.ScheduleCountTotalTime() routes.Init() diff --git a/mocks/heartbeat_service.go b/mocks/heartbeat_service.go index 390cb0b..ea984ae 100644 --- a/mocks/heartbeat_service.go +++ b/mocks/heartbeat_service.go @@ -10,13 +10,13 @@ type HeartbeatServiceMock struct { mock.Mock } -func (m *HeartbeatServiceMock) Insert(heartbeat *models.Heartbeat) error { - args := m.Called(heartbeat) +func (m *HeartbeatServiceMock) Insert(h *models.Heartbeat) error { + args := m.Called(h) return args.Error(0) } -func (m *HeartbeatServiceMock) InsertBatch(heartbeats []*models.Heartbeat) error { - args := m.Called(heartbeats) +func (m *HeartbeatServiceMock) InsertBatch(h []*models.Heartbeat) error { + args := m.Called(h) return args.Error(0) } @@ -74,3 +74,8 @@ func (m *HeartbeatServiceMock) DeleteByUser(u *models.User) error { args := m.Called(u) return args.Error(0) } + +func (m *HeartbeatServiceMock) DeleteByUserBefore(u *models.User, t time.Time) error { + args := m.Called(u, t) + return args.Error(0) +} diff --git a/mocks/summary_repository.go b/mocks/summary_repository.go index 420e748..65ff043 100644 --- a/mocks/summary_repository.go +++ b/mocks/summary_repository.go @@ -10,8 +10,8 @@ type SummaryRepositoryMock struct { mock.Mock } -func (m *SummaryRepositoryMock) Insert(summary *models.Summary) error { - args := m.Called(summary) +func (m *SummaryRepositoryMock) Insert(s *models.Summary) error { + args := m.Called(s) return args.Error(0) } @@ -20,8 +20,8 @@ func (m *SummaryRepositoryMock) GetAll() ([]*models.Summary, error) { return args.Get(0).([]*models.Summary), args.Error(1) } -func (m *SummaryRepositoryMock) GetByUserWithin(user *models.User, time time.Time, time2 time.Time) ([]*models.Summary, error) { - args := m.Called(user, time, time2) +func (m *SummaryRepositoryMock) GetByUserWithin(u *models.User, t1 time.Time, t2 time.Time) ([]*models.Summary, error) { + args := m.Called(u, t1, t2) return args.Get(0).([]*models.Summary), args.Error(1) } @@ -34,3 +34,8 @@ func (m *SummaryRepositoryMock) DeleteByUser(s string) error { args := m.Called(s) return args.Error(0) } + +func (m *SummaryRepositoryMock) DeleteByUserBefore(s string, t time.Time) error { + args := m.Called(s, t) + return args.Error(0) +} diff --git a/repositories/heartbeat.go b/repositories/heartbeat.go index cbddcdb..d91fab1 100644 --- a/repositories/heartbeat.go +++ b/repositories/heartbeat.go @@ -170,7 +170,7 @@ func (r *HeartbeatRepository) CountByUsers(users []*models.User) ([]*models.Coun return counts, nil } -func (r HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) { +func (r *HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) { var results []string if err := r.db. Model(&models.Heartbeat{}). @@ -199,3 +199,13 @@ func (r *HeartbeatRepository) DeleteByUser(user *models.User) error { } return nil } + +func (r *HeartbeatRepository) DeleteByUserBefore(user *models.User, t time.Time) error { + if err := r.db. + Where("user_id = ?", user.ID). + Where("time <= ?", t.Local()). + Delete(models.Heartbeat{}).Error; err != nil { + return err + } + return nil +} diff --git a/repositories/repositories.go b/repositories/repositories.go index 9b9bbe0..31ffbd9 100644 --- a/repositories/repositories.go +++ b/repositories/repositories.go @@ -31,6 +31,7 @@ type IHeartbeatRepository interface { GetEntitySetByUser(uint8, *models.User) ([]string, error) DeleteBefore(time.Time) error DeleteByUser(*models.User) error + DeleteByUserBefore(*models.User, time.Time) error } type IDiagnosticsRepository interface { @@ -66,6 +67,7 @@ type ISummaryRepository interface { GetByUserWithin(*models.User, time.Time, time.Time) ([]*models.Summary, error) GetLastByUser() ([]*models.TimeByUser, error) DeleteByUser(string) error + DeleteByUserBefore(string, time.Time) error } type IUserRepository interface { diff --git a/repositories/summary.go b/repositories/summary.go index 1f0286c..9225158 100644 --- a/repositories/summary.go +++ b/repositories/summary.go @@ -86,6 +86,16 @@ func (r *SummaryRepository) DeleteByUser(userId string) error { return nil } +func (r *SummaryRepository) DeleteByUserBefore(userId string, t time.Time) error { + if err := r.db. + Where("user_id = ?", userId). + Where("to_time <= ?", t.Local()). + Delete(models.Summary{}).Error; err != nil { + return err + } + return nil +} + // inplace func (r *SummaryRepository) populateItems(summaries []*models.Summary, conditions []clause.Interface) error { var items []*models.SummaryItem diff --git a/routes/settings.go b/routes/settings.go index 5e8ad9c..92f384b 100644 --- a/routes/settings.go +++ b/routes/settings.go @@ -657,12 +657,12 @@ func (h *SettingsHandler) validateWakatimeKey(apiKey string, baseUrl string) boo func (h *SettingsHandler) regenerateSummaries(user *models.User) error { logbuch.Info("clearing summaries for user '%s'", user.ID) if err := h.summarySrvc.DeleteByUser(user.ID); err != nil { - logbuch.Error("failed to clear summaries: %v", err) + conf.Log().Error("failed to clear summaries: %v", err) return err } if err := h.aggregationSrvc.AggregateSummaries(datastructure.NewSet(user.ID)); err != nil { - logbuch.Error("failed to regenerate summaries: %v", err) + conf.Log().Error("failed to regenerate summaries: %v", err) return err } diff --git a/services/heartbeat.go b/services/heartbeat.go index c95c619..b9a3ed3 100644 --- a/services/heartbeat.go +++ b/services/heartbeat.go @@ -192,6 +192,11 @@ func (srv *HeartbeatService) DeleteByUser(user *models.User) error { return srv.repository.DeleteByUser(user) } +func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error { + go srv.cache.Flush() + return srv.repository.DeleteByUserBefore(user, t) +} + func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) { languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId) if err != nil { diff --git a/services/housekeeping.go b/services/housekeeping.go new file mode 100644 index 0000000..b83a679 --- /dev/null +++ b/services/housekeeping.go @@ -0,0 +1,81 @@ +package services + +import ( + "github.com/emvi/logbuch" + "github.com/muety/artifex/v2" + "github.com/muety/wakapi/config" + "github.com/muety/wakapi/models" + "time" +) + +type HousekeepingService struct { + config *config.Config + userSrvc IUserService + heartbeatSrvc IHeartbeatService + summarySrvc ISummaryService + queueDefault *artifex.Dispatcher + queueWorkers *artifex.Dispatcher +} + +func NewHousekeepingService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService) *HousekeepingService { + return &HousekeepingService{ + config: config.Get(), + userSrvc: userService, + heartbeatSrvc: heartbeatService, + summarySrvc: summaryService, + queueDefault: config.GetDefaultQueue(), + queueWorkers: config.GetQueue(config.QueueHousekeeping), + } +} + +func (s *HousekeepingService) Schedule() { + if s.config.App.DataRetentionMonths <= 0 { + return + } + + logbuch.Info("scheduling data cleanup") + + // this is not exactly precise, because of summer / winter time, etc. + retentionDuration := time.Now().Sub(time.Now().AddDate(0, -s.config.App.DataRetentionMonths, 0)) + + _, err := s.queueDefault.DispatchCron(func() { + // fetch all users + users, err := s.userSrvc.GetAll() + if err != nil { + config.Log().Error("failed to get users for data cleanup, %v", err) + return + } + + // schedule jobs + for _, u := range users { + user := *u + s.queueWorkers.Dispatch(func() { + if err := s.ClearOldUserData(&user, retentionDuration); err != nil { + config.Log().Error("failed to clear old user data for '%s'", user.ID) + } + }) + } + }, s.config.App.DataCleanupTime) + + if err != nil { + config.Log().Error("failed to dispatch data cleanup jobs, %v", err) + } +} + +func (s *HousekeepingService) ClearOldUserData(user *models.User, maxAge time.Duration) error { + before := time.Now().Add(-maxAge) + logbuch.Warn("cleaning up user data for '%s' older than %v", user.ID, before) + + // clear old heartbeats + if err := s.heartbeatSrvc.DeleteByUserBefore(user, before); err != nil { + return err + } + + // clear old summaries + logbuch.Info("clearing summaries for user '%s' older than %v", user.ID, before) + if err := s.summarySrvc.DeleteByUserBefore(user.ID, before); err != nil { + return err + } + + return nil +} diff --git a/services/services.go b/services/services.go index d3db4ed..dc4a653 100644 --- a/services/services.go +++ b/services/services.go @@ -42,6 +42,7 @@ type IHeartbeatService interface { GetEntitySetByUser(uint8, *models.User) ([]string, error) DeleteBefore(time.Time) error DeleteByUser(*models.User) error + DeleteByUserBefore(*models.User, time.Time) error } type IDiagnosticsService interface { @@ -89,6 +90,7 @@ type ISummaryService interface { Summarize(time.Time, time.Time, *models.User, *models.Filters) (*models.Summary, error) GetLatestByUser() ([]*models.TimeByUser, error) DeleteByUser(string) error + DeleteByUserBefore(string, time.Time) error Insert(*models.Summary) error } @@ -97,6 +99,11 @@ type IReportService interface { SendReport(*models.User, time.Duration) error } +type IHousekeepingService interface { + Schedule() + ClearOldUserData(*models.User, time.Duration) error +} + type ILeaderboardService interface { Schedule() ComputeLeaderboard([]*models.User, *models.IntervalKey, []uint8) error diff --git a/services/summary.go b/services/summary.go index afa4fd7..d302e1d 100644 --- a/services/summary.go +++ b/services/summary.go @@ -208,6 +208,11 @@ func (srv *SummaryService) DeleteByUser(userId string) error { return srv.repository.DeleteByUser(userId) } +func (srv *SummaryService) DeleteByUserBefore(userId string, t time.Time) error { + srv.invalidateUserCache(userId) + return srv.repository.DeleteByUserBefore(userId, t) +} + func (srv *SummaryService) Insert(summary *models.Summary) error { srv.invalidateUserCache(summary.UserID) return srv.repository.Insert(summary)