feat: implement data retention mechanism

This commit is contained in:
Ferdinand Mütsch 2022-12-01 20:26:03 +01:00
parent 2db065d47a
commit 5ae7527b7b
14 changed files with 172 additions and 19 deletions

View File

@ -15,9 +15,11 @@ app:
aggregation_time: '0 15 2 * * *' # time at which to run daily aggregation batch jobs
leaderboard_generation_time: '0 0 6 * * *,0 0 18 * * *' # times at which to re-calculate the leaderboard
report_time_weekly: '0 0 18 * * 5' # time at which to fan out weekly reports (extended cron)
data_cleanup_time: '0 0 6 * * 7' # time at which to run old data cleanup (if enabled through data_retention_months)
inactive_days: 7 # time of previous days within a user must have logged in to be considered active
import_batch_size: 50 # maximum number of heartbeats to insert into the database within one transaction
heartbeat_max_age: '4320h' # maximum acceptable age of a heartbeat (see https://pkg.go.dev/time#ParseDuration)
data_retention_months: -1 # maximum retention period on months for user data (heartbeats) (-1 for infinity)
custom_languages:
vue: Vue
jsx: JSX

View File

@ -68,14 +68,16 @@ var cFlag = flag.String("config", defaultConfigPath, "config file location")
var env string
type appConfig struct {
AggregationTime string `yaml:"aggregation_time" default:"02:15" env:"WAKAPI_AGGREGATION_TIME"`
LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"06:00;18:00" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"fri,18:00" env:"WAKAPI_REPORT_TIME_WEEKLY"`
AggregationTime string `yaml:"aggregation_time" default:"0 15 2 * * *" env:"WAKAPI_AGGREGATION_TIME"`
LeaderboardGenerationTime string `yaml:"leaderboard_generation_time" default:"0 0 6 * * *,0 0 18 * * *" env:"WAKAPI_LEADERBOARD_GENERATION_TIME"`
ReportTimeWeekly string `yaml:"report_time_weekly" default:"0 0 18 * * 5" env:"WAKAPI_REPORT_TIME_WEEKLY"`
DataCleanupTime string `yaml:"data_cleanup_time" default:"0 0 6 * * 7" env:"WAKAPI_DATA_CLEANUP_TIME"`
ImportBackoffMin int `yaml:"import_backoff_min" default:"5" env:"WAKAPI_IMPORT_BACKOFF_MIN"`
ImportBatchSize int `yaml:"import_batch_size" default:"50" env:"WAKAPI_IMPORT_BATCH_SIZE"`
InactiveDays int `yaml:"inactive_days" default:"7" env:"WAKAPI_INACTIVE_DAYS"`
HeartbeatMaxAge string `yaml:"heartbeat_max_age" default:"4320h" env:"WAKAPI_HEARTBEAT_MAX_AGE"`
CountCacheTTLMin int `yaml:"count_cache_ttl_min" default:"30" env:"WAKAPI_COUNT_CACHE_TTL_MIN"`
DataRetentionMonths int `yaml:"data_retention_months" default:"-1" env:"WAKAPI_DATA_RETENTION_MONTHS"`
AvatarURLTemplate string `yaml:"avatar_url_template" default:"api/avatar/{username_hash}.svg" env:"WAKAPI_AVATAR_URL_TEMPLATE"`
CustomLanguages map[string]string `yaml:"custom_languages"`
Colors map[string]map[string]string `yaml:"-"`
@ -442,6 +444,12 @@ func Load(version string) *Config {
initSentry(config.Sentry, config.IsDev())
}
if config.App.DataRetentionMonths <= 0 {
logbuch.Info("disabling data retention policy, keeping data forever")
} else {
logbuch.Info("data retention policy set to keep data for %d months at max", config.App.DataRetentionMonths)
}
// some validation checks
if config.Server.ListenIpV4 == "" && config.Server.ListenIpV6 == "" && config.Server.ListenSocket == "" {
logbuch.Fatal("either of listen_ipv4 or listen_ipv6 or listen_socket must be set")

View File

@ -12,10 +12,11 @@ var jobQueues map[string]*artifex.Dispatcher
var jobCounts map[string]int
const (
QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports"
QueueImports = "wakapi.imports"
QueueDefault = "wakapi.default"
QueueProcessing = "wakapi.processing"
QueueReports = "wakapi.reports"
QueueImports = "wakapi.imports"
QueueHousekeeping = "wakapi.housekeeping"
)
type JobQueueMetrics struct {
@ -28,9 +29,10 @@ func init() {
jobQueues = make(map[string]*artifex.Dispatcher)
InitQueue(QueueDefault, 1)
InitQueue(QueueProcessing, int(math.Ceil(float64(runtime.NumCPU())/2.0)))
InitQueue(QueueProcessing, halfCPUs())
InitQueue(QueueReports, 1)
InitQueue(QueueImports, 1)
InitQueue(QueueHousekeeping, halfCPUs())
}
func InitQueue(name string, workers int) error {
@ -71,3 +73,11 @@ func CloseQueues() {
q.Stop()
}
}
func allCPUs() int {
return runtime.NumCPU()
}
func halfCPUs() int {
return int(math.Ceil(float64(runtime.NumCPU()) / 2.0))
}

View File

@ -81,6 +81,7 @@ var (
keyValueService services.IKeyValueService
reportService services.IReportService
diagnosticsService services.IDiagnosticsService
housekeepingService services.IHousekeepingService
miscService services.IMiscService
)
@ -182,12 +183,14 @@ func main() {
keyValueService = services.NewKeyValueService(keyValueRepository)
reportService = services.NewReportService(summaryService, userService, mailService)
diagnosticsService = services.NewDiagnosticsService(diagnosticsRepository)
housekeepingService = services.NewHousekeepingService(userService, heartbeatService, summaryService)
miscService = services.NewMiscService(userService, summaryService, keyValueService)
// Schedule background tasks
go aggregationService.Schedule()
go leaderboardService.Schedule()
go reportService.Schedule()
go housekeepingService.Schedule()
go miscService.ScheduleCountTotalTime()
routes.Init()

View File

@ -10,13 +10,13 @@ type HeartbeatServiceMock struct {
mock.Mock
}
func (m *HeartbeatServiceMock) Insert(heartbeat *models.Heartbeat) error {
args := m.Called(heartbeat)
func (m *HeartbeatServiceMock) Insert(h *models.Heartbeat) error {
args := m.Called(h)
return args.Error(0)
}
func (m *HeartbeatServiceMock) InsertBatch(heartbeats []*models.Heartbeat) error {
args := m.Called(heartbeats)
func (m *HeartbeatServiceMock) InsertBatch(h []*models.Heartbeat) error {
args := m.Called(h)
return args.Error(0)
}
@ -74,3 +74,8 @@ func (m *HeartbeatServiceMock) DeleteByUser(u *models.User) error {
args := m.Called(u)
return args.Error(0)
}
func (m *HeartbeatServiceMock) DeleteByUserBefore(u *models.User, t time.Time) error {
args := m.Called(u, t)
return args.Error(0)
}

View File

@ -10,8 +10,8 @@ type SummaryRepositoryMock struct {
mock.Mock
}
func (m *SummaryRepositoryMock) Insert(summary *models.Summary) error {
args := m.Called(summary)
func (m *SummaryRepositoryMock) Insert(s *models.Summary) error {
args := m.Called(s)
return args.Error(0)
}
@ -20,8 +20,8 @@ func (m *SummaryRepositoryMock) GetAll() ([]*models.Summary, error) {
return args.Get(0).([]*models.Summary), args.Error(1)
}
func (m *SummaryRepositoryMock) GetByUserWithin(user *models.User, time time.Time, time2 time.Time) ([]*models.Summary, error) {
args := m.Called(user, time, time2)
func (m *SummaryRepositoryMock) GetByUserWithin(u *models.User, t1 time.Time, t2 time.Time) ([]*models.Summary, error) {
args := m.Called(u, t1, t2)
return args.Get(0).([]*models.Summary), args.Error(1)
}
@ -34,3 +34,8 @@ func (m *SummaryRepositoryMock) DeleteByUser(s string) error {
args := m.Called(s)
return args.Error(0)
}
func (m *SummaryRepositoryMock) DeleteByUserBefore(s string, t time.Time) error {
args := m.Called(s, t)
return args.Error(0)
}

View File

@ -170,7 +170,7 @@ func (r *HeartbeatRepository) CountByUsers(users []*models.User) ([]*models.Coun
return counts, nil
}
func (r HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
func (r *HeartbeatRepository) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
var results []string
if err := r.db.
Model(&models.Heartbeat{}).
@ -199,3 +199,13 @@ func (r *HeartbeatRepository) DeleteByUser(user *models.User) error {
}
return nil
}
func (r *HeartbeatRepository) DeleteByUserBefore(user *models.User, t time.Time) error {
if err := r.db.
Where("user_id = ?", user.ID).
Where("time <= ?", t.Local()).
Delete(models.Heartbeat{}).Error; err != nil {
return err
}
return nil
}

View File

@ -31,6 +31,7 @@ type IHeartbeatRepository interface {
GetEntitySetByUser(uint8, *models.User) ([]string, error)
DeleteBefore(time.Time) error
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}
type IDiagnosticsRepository interface {
@ -66,6 +67,7 @@ type ISummaryRepository interface {
GetByUserWithin(*models.User, time.Time, time.Time) ([]*models.Summary, error)
GetLastByUser() ([]*models.TimeByUser, error)
DeleteByUser(string) error
DeleteByUserBefore(string, time.Time) error
}
type IUserRepository interface {

View File

@ -86,6 +86,16 @@ func (r *SummaryRepository) DeleteByUser(userId string) error {
return nil
}
func (r *SummaryRepository) DeleteByUserBefore(userId string, t time.Time) error {
if err := r.db.
Where("user_id = ?", userId).
Where("to_time <= ?", t.Local()).
Delete(models.Summary{}).Error; err != nil {
return err
}
return nil
}
// inplace
func (r *SummaryRepository) populateItems(summaries []*models.Summary, conditions []clause.Interface) error {
var items []*models.SummaryItem

View File

@ -657,12 +657,12 @@ func (h *SettingsHandler) validateWakatimeKey(apiKey string, baseUrl string) boo
func (h *SettingsHandler) regenerateSummaries(user *models.User) error {
logbuch.Info("clearing summaries for user '%s'", user.ID)
if err := h.summarySrvc.DeleteByUser(user.ID); err != nil {
logbuch.Error("failed to clear summaries: %v", err)
conf.Log().Error("failed to clear summaries: %v", err)
return err
}
if err := h.aggregationSrvc.AggregateSummaries(datastructure.NewSet(user.ID)); err != nil {
logbuch.Error("failed to regenerate summaries: %v", err)
conf.Log().Error("failed to regenerate summaries: %v", err)
return err
}

View File

@ -192,6 +192,11 @@ func (srv *HeartbeatService) DeleteByUser(user *models.User) error {
return srv.repository.DeleteByUser(user)
}
func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error {
go srv.cache.Flush()
return srv.repository.DeleteByUserBefore(user, t)
}
func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) {
languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId)
if err != nil {

81
services/housekeeping.go Normal file
View File

@ -0,0 +1,81 @@
package services
import (
"github.com/emvi/logbuch"
"github.com/muety/artifex/v2"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/models"
"time"
)
type HousekeepingService struct {
config *config.Config
userSrvc IUserService
heartbeatSrvc IHeartbeatService
summarySrvc ISummaryService
queueDefault *artifex.Dispatcher
queueWorkers *artifex.Dispatcher
}
func NewHousekeepingService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService) *HousekeepingService {
return &HousekeepingService{
config: config.Get(),
userSrvc: userService,
heartbeatSrvc: heartbeatService,
summarySrvc: summaryService,
queueDefault: config.GetDefaultQueue(),
queueWorkers: config.GetQueue(config.QueueHousekeeping),
}
}
func (s *HousekeepingService) Schedule() {
if s.config.App.DataRetentionMonths <= 0 {
return
}
logbuch.Info("scheduling data cleanup")
// this is not exactly precise, because of summer / winter time, etc.
retentionDuration := time.Now().Sub(time.Now().AddDate(0, -s.config.App.DataRetentionMonths, 0))
_, err := s.queueDefault.DispatchCron(func() {
// fetch all users
users, err := s.userSrvc.GetAll()
if err != nil {
config.Log().Error("failed to get users for data cleanup, %v", err)
return
}
// schedule jobs
for _, u := range users {
user := *u
s.queueWorkers.Dispatch(func() {
if err := s.ClearOldUserData(&user, retentionDuration); err != nil {
config.Log().Error("failed to clear old user data for '%s'", user.ID)
}
})
}
}, s.config.App.DataCleanupTime)
if err != nil {
config.Log().Error("failed to dispatch data cleanup jobs, %v", err)
}
}
func (s *HousekeepingService) ClearOldUserData(user *models.User, maxAge time.Duration) error {
before := time.Now().Add(-maxAge)
logbuch.Warn("cleaning up user data for '%s' older than %v", user.ID, before)
// clear old heartbeats
if err := s.heartbeatSrvc.DeleteByUserBefore(user, before); err != nil {
return err
}
// clear old summaries
logbuch.Info("clearing summaries for user '%s' older than %v", user.ID, before)
if err := s.summarySrvc.DeleteByUserBefore(user.ID, before); err != nil {
return err
}
return nil
}

View File

@ -42,6 +42,7 @@ type IHeartbeatService interface {
GetEntitySetByUser(uint8, *models.User) ([]string, error)
DeleteBefore(time.Time) error
DeleteByUser(*models.User) error
DeleteByUserBefore(*models.User, time.Time) error
}
type IDiagnosticsService interface {
@ -89,6 +90,7 @@ type ISummaryService interface {
Summarize(time.Time, time.Time, *models.User, *models.Filters) (*models.Summary, error)
GetLatestByUser() ([]*models.TimeByUser, error)
DeleteByUser(string) error
DeleteByUserBefore(string, time.Time) error
Insert(*models.Summary) error
}
@ -97,6 +99,11 @@ type IReportService interface {
SendReport(*models.User, time.Duration) error
}
type IHousekeepingService interface {
Schedule()
ClearOldUserData(*models.User, time.Duration) error
}
type ILeaderboardService interface {
Schedule()
ComputeLeaderboard([]*models.User, *models.IntervalKey, []uint8) error

View File

@ -208,6 +208,11 @@ func (srv *SummaryService) DeleteByUser(userId string) error {
return srv.repository.DeleteByUser(userId)
}
func (srv *SummaryService) DeleteByUserBefore(userId string, t time.Time) error {
srv.invalidateUserCache(userId)
return srv.repository.DeleteByUserBefore(userId, t)
}
func (srv *SummaryService) Insert(summary *models.Summary) error {
srv.invalidateUserCache(summary.UserID)
return srv.repository.Insert(summary)