2021-01-17 11:24:09 +03:00
package services
import (
2022-12-23 15:41:32 +03:00
"fmt"
2022-12-29 19:12:34 +03:00
"github.com/duke-git/lancet/v2/slice"
2021-01-30 13:17:37 +03:00
"github.com/emvi/logbuch"
2022-12-01 17:31:19 +03:00
"github.com/muety/artifex/v2"
2021-01-17 11:24:09 +03:00
"github.com/muety/wakapi/config"
2022-11-20 12:10:24 +03:00
"github.com/muety/wakapi/utils"
2022-12-01 15:46:06 +03:00
"go.uber.org/atomic"
2021-01-17 11:24:09 +03:00
"strconv"
2022-12-29 19:12:34 +03:00
"strings"
2022-11-20 12:10:24 +03:00
"sync"
2021-01-17 11:24:09 +03:00
"time"
"github.com/muety/wakapi/models"
)
2022-12-01 15:46:06 +03:00
const (
2022-12-29 19:12:34 +03:00
countUsersEvery = 1 * time . Hour
computeOldestDataEvery = 6 * time . Hour
notifyExpiringSubscriptionsEvery = 12 * time . Hour
)
const (
notifyBeforeSubscriptionExpiry = 7 * 24 * time . Hour
2022-12-01 15:46:06 +03:00
)
var countLock = sync . Mutex { }
2022-12-23 15:41:32 +03:00
var firstDataLock = sync . Mutex { }
2022-12-01 15:46:06 +03:00
2021-01-17 11:24:09 +03:00
type MiscService struct {
2022-12-23 15:41:32 +03:00
config * config . Config
userService IUserService
heartbeatService IHeartbeatService
summaryService ISummaryService
keyValueService IKeyValueService
2022-12-29 19:12:34 +03:00
mailService IMailService
2022-12-23 15:41:32 +03:00
queueDefault * artifex . Dispatcher
queueWorkers * artifex . Dispatcher
2022-12-29 19:12:34 +03:00
queueMails * artifex . Dispatcher
2021-01-17 11:24:09 +03:00
}
2022-12-29 19:12:34 +03:00
func NewMiscService ( userService IUserService , heartbeatService IHeartbeatService , summaryService ISummaryService , keyValueService IKeyValueService , mailService IMailService ) * MiscService {
2021-01-17 11:24:09 +03:00
return & MiscService {
2022-12-23 15:41:32 +03:00
config : config . Get ( ) ,
userService : userService ,
heartbeatService : heartbeatService ,
summaryService : summaryService ,
keyValueService : keyValueService ,
2022-12-29 19:12:34 +03:00
mailService : mailService ,
2022-12-23 15:41:32 +03:00
queueDefault : config . GetDefaultQueue ( ) ,
queueWorkers : config . GetQueue ( config . QueueProcessing ) ,
2022-12-29 19:12:34 +03:00
queueMails : config . GetQueue ( config . QueueMails ) ,
2021-01-17 11:24:09 +03:00
}
}
2022-12-23 15:41:32 +03:00
func ( srv * MiscService ) Schedule ( ) {
2022-11-20 12:12:34 +03:00
logbuch . Info ( "scheduling total time counting" )
2022-12-01 15:46:06 +03:00
if _ , err := srv . queueDefault . DispatchEvery ( srv . CountTotalTime , countUsersEvery ) ; err != nil {
2022-11-20 12:10:24 +03:00
config . Log ( ) . Error ( "failed to schedule user counting jobs, %v" , err )
}
2022-12-23 15:41:32 +03:00
logbuch . Info ( "scheduling first data computing" )
if _ , err := srv . queueDefault . DispatchEvery ( srv . ComputeOldestHeartbeats , computeOldestDataEvery ) ; err != nil {
config . Log ( ) . Error ( "failed to schedule first data computing jobs, %v" , err )
}
2022-12-29 19:12:34 +03:00
if srv . config . Subscriptions . Enabled && srv . config . Subscriptions . ExpiryNotifications && srv . config . App . DataRetentionMonths > 0 {
logbuch . Info ( "scheduling subscription notifications" )
2022-12-31 18:03:44 +03:00
if _ , err := srv . queueDefault . DispatchEvery ( srv . NotifyExpiringSubscription , notifyExpiringSubscriptionsEvery ) ; err != nil {
2022-12-29 19:12:34 +03:00
config . Log ( ) . Error ( "failed to schedule subscription notification jobs, %v" , err )
}
}
2022-12-23 15:41:32 +03:00
// run once initially for a fresh instance
if ! srv . existsUsersTotalTime ( ) {
if err := srv . queueDefault . Dispatch ( srv . CountTotalTime ) ; err != nil {
config . Log ( ) . Error ( "failed to dispatch user counting jobs, %v" , err )
}
}
if ! srv . existsUsersFirstData ( ) {
if err := srv . queueDefault . Dispatch ( srv . ComputeOldestHeartbeats ) ; err != nil {
config . Log ( ) . Error ( "failed to dispatch first data computing jobs, %v" , err )
}
}
2022-12-29 19:12:34 +03:00
if ! srv . existsSubscriptionNotifications ( ) && srv . config . Subscriptions . Enabled && srv . config . Subscriptions . ExpiryNotifications && srv . config . App . DataRetentionMonths > 0 {
if err := srv . queueDefault . Dispatch ( srv . NotifyExpiringSubscription ) ; err != nil {
config . Log ( ) . Error ( "failed to schedule subscription notification jobs, %v" , err )
}
}
2021-01-17 11:24:09 +03:00
}
2022-11-20 12:10:24 +03:00
func ( srv * MiscService ) CountTotalTime ( ) {
2022-12-01 15:46:06 +03:00
logbuch . Info ( "counting users total time" )
if ok := countLock . TryLock ( ) ; ! ok {
config . Log ( ) . Warn ( "couldn't acquire lock for counting users total time, job is still pending" )
}
defer countLock . Unlock ( )
2021-04-30 18:19:17 +03:00
users , err := srv . userService . GetAll ( )
if err != nil {
2022-11-20 12:12:34 +03:00
config . Log ( ) . Error ( "failed to fetch users for time counting, %v" , err )
2022-12-29 19:12:34 +03:00
return
2021-01-17 11:24:09 +03:00
}
2022-12-01 15:46:06 +03:00
var totalTime = atomic . NewDuration ( 0 )
2022-11-20 12:10:24 +03:00
var pendingJobs sync . WaitGroup
pendingJobs . Add ( len ( users ) )
2021-01-17 11:24:09 +03:00
2021-04-30 18:19:17 +03:00
for _ , u := range users {
2022-12-01 16:13:52 +03:00
user := * u
2022-11-20 12:10:24 +03:00
if err := srv . queueWorkers . Dispatch ( func ( ) {
defer pendingJobs . Done ( )
2022-12-01 16:13:52 +03:00
totalTime . Add ( srv . countUserTotalTime ( user . ID ) )
2022-11-20 12:10:24 +03:00
} ) ; err != nil {
2022-12-01 16:13:52 +03:00
config . Log ( ) . Error ( "failed to enqueue counting job for user '%s'" , user . ID )
2022-11-20 12:10:24 +03:00
pendingJobs . Done ( )
2021-01-17 11:24:09 +03:00
}
}
2021-04-30 18:19:17 +03:00
// persist
2022-11-20 12:10:24 +03:00
go func ( wg * sync . WaitGroup ) {
2022-12-01 15:46:06 +03:00
if ! utils . WaitTimeout ( & pendingJobs , 2 * countUsersEvery ) {
2022-11-20 12:10:24 +03:00
if err := srv . keyValueService . PutString ( & models . KeyStringValue {
Key : config . KeyLatestTotalTime ,
2022-12-01 15:46:06 +03:00
Value : totalTime . Load ( ) . String ( ) ,
2022-11-20 12:10:24 +03:00
} ) ; err != nil {
2022-11-20 12:12:34 +03:00
config . Log ( ) . Error ( "failed to save total time count: %v" , err )
2022-11-20 12:10:24 +03:00
}
2021-04-30 18:19:17 +03:00
2022-11-20 12:10:24 +03:00
if err := srv . keyValueService . PutString ( & models . KeyStringValue {
Key : config . KeyLatestTotalUsers ,
Value : strconv . Itoa ( len ( users ) ) ,
} ) ; err != nil {
2022-11-20 12:12:34 +03:00
config . Log ( ) . Error ( "failed to save total users count: %v" , err )
2021-04-30 18:19:17 +03:00
}
2022-11-20 12:10:24 +03:00
} else {
config . Log ( ) . Error ( "waiting for user counting jobs timed out" )
2021-04-30 18:19:17 +03:00
}
2022-11-20 12:10:24 +03:00
} ( & pendingJobs )
}
2022-12-23 15:41:32 +03:00
func ( srv * MiscService ) ComputeOldestHeartbeats ( ) {
logbuch . Info ( "computing users' first data" )
if err := srv . queueWorkers . Dispatch ( func ( ) {
if ok := firstDataLock . TryLock ( ) ; ! ok {
config . Log ( ) . Warn ( "couldn't acquire lock for computing users' first data, job is still pending" )
return
}
defer firstDataLock . Unlock ( )
results , err := srv . heartbeatService . GetFirstByUsers ( )
if err != nil {
config . Log ( ) . Error ( "failed to compute users' first data, %v" , err )
2022-12-29 19:12:34 +03:00
return
2022-12-23 15:41:32 +03:00
}
for _ , entry := range results {
if entry . Time . T ( ) . IsZero ( ) {
continue
}
kvKey := fmt . Sprintf ( "%s_%s" , config . KeyFirstHeartbeat , entry . User )
if err := srv . keyValueService . PutString ( & models . KeyStringValue {
Key : kvKey ,
Value : entry . Time . T ( ) . Format ( time . RFC822Z ) ,
} ) ; err != nil {
config . Log ( ) . Error ( "failed to save user's first heartbeat time: %v" , err )
}
}
} ) ; err != nil {
config . Log ( ) . Error ( "failed to enqueue computing first data for user, %v" , err )
}
}
2022-12-29 19:12:34 +03:00
// NotifyExpiringSubscription sends a reminder e-mail to all users, notifying them if their subscription has expired or is about to, given these conditions:
// - Data cleanup is enabled on the server (non-zero retention time)
// - Subscriptions are enabled on the server (aka. users can do something about their old data getting cleaned up)
// - User has an e-mail address configured
// - User's subscription has expired or is about to expire soon
// - The user has gotten no such e-mail before recently
// Note: only one mail will be sent for either "expired" or "about to expire" state.
func ( srv * MiscService ) NotifyExpiringSubscription ( ) {
if srv . config . App . DataRetentionMonths <= 0 || ! srv . config . Subscriptions . Enabled {
return
}
logbuch . Info ( "notifying users about soon to expire subscriptions" )
users , err := srv . userService . GetAll ( )
if err != nil {
config . Log ( ) . Error ( "failed to fetch users for subscription notifications, %v" , err )
return
}
var subscriptionReminders map [ string ] [ ] * models . KeyStringValue
if result , err := srv . keyValueService . GetByPrefix ( config . KeySubscriptionNotificationSent ) ; err == nil {
subscriptionReminders = slice . GroupWith [ * models . KeyStringValue , string ] ( result , func ( kv * models . KeyStringValue ) string {
return strings . TrimPrefix ( kv . Key , config . KeySubscriptionNotificationSent + "_" )
} )
} else {
config . Log ( ) . Error ( "failed to fetch key-values for subscription notifications, %v" , err )
return
}
for _ , u := range users {
if u . HasActiveSubscription ( ) && u . Email == "" {
2022-12-31 18:03:44 +03:00
config . Log ( ) . Warn ( "invalid state: user '%s' has active subscription but no e-mail address set" , u . ID )
2022-12-29 19:12:34 +03:00
}
// skip users without e-mail address
// skip users who already received a notification before
// skip users who either never had a subscription before or intentionally deleted it
if _ , ok := subscriptionReminders [ u . ID ] ; ok || u . Email == "" || u . SubscribedUntil == nil {
continue
}
expired , expiredSince := u . SubscriptionExpiredSince ( )
if expired || ( expiredSince < 0 && expiredSince * - 1 <= notifyBeforeSubscriptionExpiry ) {
srv . sendSubscriptionNotificationScheduled ( u , expired )
}
}
}
2022-11-20 12:10:24 +03:00
func ( srv * MiscService ) countUserTotalTime ( userId string ) time . Duration {
result , err := srv . summaryService . Aliased ( time . Time { } , time . Now ( ) , & models . User { ID : userId } , srv . summaryService . Retrieve , nil , false )
if err != nil {
config . Log ( ) . Error ( "failed to count total for user %s: %v" , userId , err )
return 0
2021-04-30 18:19:17 +03:00
}
2022-11-20 12:10:24 +03:00
return result . TotalTime ( )
2021-01-17 11:24:09 +03:00
}
2022-12-23 15:41:32 +03:00
2022-12-29 19:12:34 +03:00
func ( srv * MiscService ) sendSubscriptionNotificationScheduled ( user * models . User , hasExpired bool ) {
u := * user
srv . queueMails . Dispatch ( func ( ) {
logbuch . Info ( "sending subscription expiry notification mail to %s (expired: %v)" , u . ID , hasExpired )
defer time . Sleep ( 10 * time . Second )
if err := srv . mailService . SendSubscriptionNotification ( & u , hasExpired ) ; err != nil {
config . Log ( ) . Error ( "failed to send subscription notification mail to user '%s', %v" , u . ID , err )
return
}
if err := srv . keyValueService . PutString ( & models . KeyStringValue {
Key : fmt . Sprintf ( "%s_%s" , config . KeySubscriptionNotificationSent , u . ID ) ,
Value : time . Now ( ) . Format ( time . RFC822Z ) ,
} ) ; err != nil {
config . Log ( ) . Error ( "failed to update subscription notification status key-value for user %s, %v" , u . ID , err )
}
} )
}
2022-12-23 15:41:32 +03:00
func ( srv * MiscService ) existsUsersTotalTime ( ) bool {
results , _ := srv . keyValueService . GetByPrefix ( config . KeyLatestTotalTime )
return len ( results ) > 0
}
func ( srv * MiscService ) existsUsersFirstData ( ) bool {
results , _ := srv . keyValueService . GetByPrefix ( config . KeyFirstHeartbeat )
return len ( results ) > 0
}
2022-12-29 19:12:34 +03:00
func ( srv * MiscService ) existsSubscriptionNotifications ( ) bool {
results , _ := srv . keyValueService . GetByPrefix ( config . KeySubscriptionNotificationSent )
return len ( results ) > 0
}