mirror of
https://github.com/muety/wakapi.git
synced 2023-08-10 21:12:56 +03:00
fix: error handling for user counting
fix: make user counting thread-safe
This commit is contained in:
parent
d4945c982f
commit
aab9e98ebd
@ -5,6 +5,7 @@ import (
|
||||
"github.com/muety/artifex"
|
||||
"github.com/muety/wakapi/config"
|
||||
"github.com/muety/wakapi/utils"
|
||||
"go.uber.org/atomic"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
@ -12,6 +13,12 @@ import (
|
||||
"github.com/muety/wakapi/models"
|
||||
)
|
||||
|
||||
const (
|
||||
countUsersEvery = 1 * time.Hour
|
||||
)
|
||||
|
||||
var countLock = sync.Mutex{}
|
||||
|
||||
type MiscService struct {
|
||||
config *config.Config
|
||||
userService IUserService
|
||||
@ -34,25 +41,31 @@ func NewMiscService(userService IUserService, summaryService ISummaryService, ke
|
||||
|
||||
func (srv *MiscService) ScheduleCountTotalTime() {
|
||||
logbuch.Info("scheduling total time counting")
|
||||
if _, err := srv.queueDefault.DispatchEvery(srv.CountTotalTime, 1*time.Hour); err != nil {
|
||||
if _, err := srv.queueDefault.DispatchEvery(srv.CountTotalTime, countUsersEvery); err != nil {
|
||||
config.Log().Error("failed to schedule user counting jobs, %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (srv *MiscService) CountTotalTime() {
|
||||
logbuch.Info("counting users total time")
|
||||
if ok := countLock.TryLock(); !ok {
|
||||
config.Log().Warn("couldn't acquire lock for counting users total time, job is still pending")
|
||||
}
|
||||
defer countLock.Unlock()
|
||||
|
||||
users, err := srv.userService.GetAll()
|
||||
if err != nil {
|
||||
config.Log().Error("failed to fetch users for time counting, %v", err)
|
||||
}
|
||||
|
||||
var totalTime time.Duration = 0
|
||||
var totalTime = atomic.NewDuration(0)
|
||||
var pendingJobs sync.WaitGroup
|
||||
pendingJobs.Add(len(users))
|
||||
|
||||
for _, u := range users {
|
||||
if err := srv.queueWorkers.Dispatch(func() {
|
||||
defer pendingJobs.Done()
|
||||
totalTime += srv.countUserTotalTime(u.ID)
|
||||
totalTime.Add(srv.countUserTotalTime(u.ID))
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to enqueue counting job for user '%s'", u.ID)
|
||||
pendingJobs.Done()
|
||||
@ -61,10 +74,10 @@ func (srv *MiscService) CountTotalTime() {
|
||||
|
||||
// persist
|
||||
go func(wg *sync.WaitGroup) {
|
||||
if utils.WaitTimeout(&pendingJobs, 10*time.Minute) {
|
||||
if !utils.WaitTimeout(&pendingJobs, 2*countUsersEvery) {
|
||||
if err := srv.keyValueService.PutString(&models.KeyStringValue{
|
||||
Key: config.KeyLatestTotalTime,
|
||||
Value: totalTime.String(),
|
||||
Value: totalTime.Load().String(),
|
||||
}); err != nil {
|
||||
config.Log().Error("failed to save total time count: %v", err)
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ type IAggregationService interface {
|
||||
|
||||
type IMiscService interface {
|
||||
ScheduleCountTotalTime()
|
||||
CountTotalTime()
|
||||
}
|
||||
|
||||
type IAliasService interface {
|
||||
|
Loading…
Reference in New Issue
Block a user