mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
feat: implement computation of users first heartbeats data time
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/emvi/logbuch"
|
||||
"github.com/muety/artifex/v2"
|
||||
"github.com/muety/wakapi/config"
|
||||
@@ -14,36 +15,57 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
countUsersEvery = 1 * time.Hour
|
||||
countUsersEvery = 1 * time.Hour
|
||||
computeOldestDataEvery = 6 * time.Hour
|
||||
)
|
||||
|
||||
var countLock = sync.Mutex{}
|
||||
var firstDataLock = sync.Mutex{}
|
||||
|
||||
type MiscService struct {
|
||||
config *config.Config
|
||||
userService IUserService
|
||||
summaryService ISummaryService
|
||||
keyValueService IKeyValueService
|
||||
queueDefault *artifex.Dispatcher
|
||||
queueWorkers *artifex.Dispatcher
|
||||
config *config.Config
|
||||
userService IUserService
|
||||
heartbeatService IHeartbeatService
|
||||
summaryService ISummaryService
|
||||
keyValueService IKeyValueService
|
||||
queueDefault *artifex.Dispatcher
|
||||
queueWorkers *artifex.Dispatcher
|
||||
}
|
||||
|
||||
func NewMiscService(userService IUserService, summaryService ISummaryService, keyValueService IKeyValueService) *MiscService {
|
||||
func NewMiscService(userService IUserService, heartbeatService IHeartbeatService, summaryService ISummaryService, keyValueService IKeyValueService) *MiscService {
|
||||
return &MiscService{
|
||||
config: config.Get(),
|
||||
userService: userService,
|
||||
summaryService: summaryService,
|
||||
keyValueService: keyValueService,
|
||||
queueDefault: config.GetDefaultQueue(),
|
||||
queueWorkers: config.GetQueue(config.QueueProcessing),
|
||||
config: config.Get(),
|
||||
userService: userService,
|
||||
heartbeatService: heartbeatService,
|
||||
summaryService: summaryService,
|
||||
keyValueService: keyValueService,
|
||||
queueDefault: config.GetDefaultQueue(),
|
||||
queueWorkers: config.GetQueue(config.QueueProcessing),
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *MiscService) ScheduleCountTotalTime() {
|
||||
func (srv *MiscService) Schedule() {
|
||||
logbuch.Info("scheduling total time counting")
|
||||
if _, err := srv.queueDefault.DispatchEvery(srv.CountTotalTime, countUsersEvery); err != nil {
|
||||
config.Log().Error("failed to schedule user counting jobs, %v", err)
|
||||
}
|
||||
|
||||
logbuch.Info("scheduling first data computing")
|
||||
if _, err := srv.queueDefault.DispatchEvery(srv.ComputeOldestHeartbeats, computeOldestDataEvery); err != nil {
|
||||
config.Log().Error("failed to schedule first data computing jobs, %v", err)
|
||||
}
|
||||
|
||||
// run once initially for a fresh instance
|
||||
if !srv.existsUsersTotalTime() {
|
||||
if err := srv.queueDefault.Dispatch(srv.CountTotalTime); err != nil {
|
||||
config.Log().Error("failed to dispatch user counting jobs, %v", err)
|
||||
}
|
||||
}
|
||||
if !srv.existsUsersFirstData() {
|
||||
if err := srv.queueDefault.Dispatch(srv.ComputeOldestHeartbeats); err != nil {
|
||||
config.Log().Error("failed to dispatch first data computing jobs, %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *MiscService) CountTotalTime() {
|
||||
@@ -95,6 +117,39 @@ func (srv *MiscService) CountTotalTime() {
|
||||
}(&pendingJobs)
|
||||
}
|
||||
|
||||
func (srv *MiscService) ComputeOldestHeartbeats() {
|
||||
logbuch.Info("computing users' first data")
|
||||
|
||||
if err := srv.queueWorkers.Dispatch(func() {
|
||||
if ok := firstDataLock.TryLock(); !ok {
|
||||
config.Log().Warn("couldn't acquire lock for computing users' first data, job is still pending")
|
||||
return
|
||||
}
|
||||
defer firstDataLock.Unlock()
|
||||
|
||||
results, err := srv.heartbeatService.GetFirstByUsers()
|
||||
if err != nil {
|
||||
config.Log().Error("failed to compute users' first data, %v", err)
|
||||
}
|
||||
|
||||
for _, entry := range results {
|
||||
if entry.Time.T().IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
kvKey := fmt.Sprintf("%s_%s", config.KeyFirstHeartbeat, entry.User)
|
||||
if err := srv.keyValueService.PutString(&models.KeyStringValue{
|
||||
Key: kvKey,
|
||||
Value: entry.Time.T().Format(time.RFC822Z),
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to save user's first heartbeat time: %v", err)
|
||||
}
|
||||
}
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to enqueue computing first data for user, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *MiscService) countUserTotalTime(userId string) time.Duration {
|
||||
result, err := srv.summaryService.Aliased(time.Time{}, time.Now(), &models.User{ID: userId}, srv.summaryService.Retrieve, nil, false)
|
||||
if err != nil {
|
||||
@@ -103,3 +158,13 @@ func (srv *MiscService) countUserTotalTime(userId string) time.Duration {
|
||||
}
|
||||
return result.TotalTime()
|
||||
}
|
||||
|
||||
func (srv *MiscService) existsUsersTotalTime() bool {
|
||||
results, _ := srv.keyValueService.GetByPrefix(config.KeyLatestTotalTime)
|
||||
return len(results) > 0
|
||||
}
|
||||
|
||||
func (srv *MiscService) existsUsersFirstData() bool {
|
||||
results, _ := srv.keyValueService.GetByPrefix(config.KeyFirstHeartbeat)
|
||||
return len(results) > 0
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user