2019-05-05 23:36:49 +03:00
|
|
|
package services
|
|
|
|
|
|
|
|
import (
|
2021-05-01 14:52:03 +03:00
|
|
|
"fmt"
|
2022-03-20 18:27:21 +03:00
|
|
|
datastructure "github.com/duke-git/lancet/v2/datastructure/set"
|
2021-06-27 12:33:14 +03:00
|
|
|
"github.com/leandro-lugaresi/hub"
|
2020-09-29 19:55:07 +03:00
|
|
|
"github.com/muety/wakapi/config"
|
2020-11-01 18:56:36 +03:00
|
|
|
"github.com/muety/wakapi/repositories"
|
2021-05-01 14:52:03 +03:00
|
|
|
"github.com/patrickmn/go-cache"
|
2021-06-12 13:01:20 +03:00
|
|
|
"strings"
|
2021-10-14 11:22:59 +03:00
|
|
|
"sync"
|
2019-05-09 01:07:38 +03:00
|
|
|
"time"
|
2019-05-05 23:36:49 +03:00
|
|
|
|
2020-03-31 13:22:17 +03:00
|
|
|
"github.com/muety/wakapi/models"
|
2019-05-05 23:36:49 +03:00
|
|
|
)
|
|
|
|
|
|
|
|
type HeartbeatService struct {
|
2020-11-01 22:14:10 +03:00
|
|
|
config *config.Config
|
2021-05-01 14:52:03 +03:00
|
|
|
cache *cache.Cache
|
2021-06-27 12:33:14 +03:00
|
|
|
eventBus *hub.Hub
|
2020-11-08 12:12:49 +03:00
|
|
|
repository repositories.IHeartbeatRepository
|
|
|
|
languageMappingSrvc ILanguageMappingService
|
2021-10-14 11:22:59 +03:00
|
|
|
entityCacheLock *sync.RWMutex
|
2019-05-05 23:36:49 +03:00
|
|
|
}
|
|
|
|
|
2020-11-08 12:12:49 +03:00
|
|
|
func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, languageMappingService ILanguageMappingService) *HeartbeatService {
|
2021-06-27 12:33:14 +03:00
|
|
|
srv := &HeartbeatService{
|
2020-11-01 22:14:10 +03:00
|
|
|
config: config.Get(),
|
2021-05-01 14:52:03 +03:00
|
|
|
cache: cache.New(24*time.Hour, 24*time.Hour),
|
2021-06-27 12:33:14 +03:00
|
|
|
eventBus: config.EventBus(),
|
2020-11-01 22:14:10 +03:00
|
|
|
repository: heartbeatRepo,
|
|
|
|
languageMappingSrvc: languageMappingService,
|
2021-10-14 11:22:59 +03:00
|
|
|
entityCacheLock: &sync.RWMutex{},
|
2020-05-24 18:32:26 +03:00
|
|
|
}
|
2021-06-27 12:33:14 +03:00
|
|
|
|
|
|
|
// using event hub is an unnecessary indirection here, however, we might
|
|
|
|
// potentially need heartbeat events elsewhere throughout the application some time
|
|
|
|
// so it's more consistent to already have it this way
|
|
|
|
sub1 := srv.eventBus.Subscribe(0, config.EventHeartbeatCreate)
|
|
|
|
go func(sub *hub.Subscription) {
|
|
|
|
for m := range sub.Receiver {
|
|
|
|
heartbeat := m.Fields[config.FieldPayload].(*models.Heartbeat)
|
|
|
|
srv.cache.IncrementInt64(srv.countByUserCacheKey(heartbeat.UserID), 1) // increment doesn't update expiration time
|
|
|
|
srv.cache.IncrementInt64(srv.countTotalCacheKey(), 1)
|
|
|
|
}
|
|
|
|
}(&sub1)
|
|
|
|
|
|
|
|
return srv
|
2020-05-24 18:32:26 +03:00
|
|
|
}
|
2020-02-20 16:28:55 +03:00
|
|
|
|
2021-02-05 20:47:28 +03:00
|
|
|
func (srv *HeartbeatService) Insert(heartbeat *models.Heartbeat) error {
|
2021-10-14 11:22:59 +03:00
|
|
|
go srv.updateEntityUserCacheByHeartbeat(heartbeat)
|
2021-02-05 20:47:28 +03:00
|
|
|
return srv.repository.InsertBatch([]*models.Heartbeat{heartbeat})
|
|
|
|
}
|
|
|
|
|
2019-05-17 09:40:03 +03:00
|
|
|
func (srv *HeartbeatService) InsertBatch(heartbeats []*models.Heartbeat) error {
|
2022-05-13 17:12:18 +03:00
|
|
|
if len(heartbeats) == 0 {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-03-20 18:27:21 +03:00
|
|
|
hashes := datastructure.NewSet[string]()
|
2021-03-05 23:39:21 +03:00
|
|
|
|
|
|
|
// https://github.com/muety/wakapi/issues/139
|
|
|
|
filteredHeartbeats := make([]*models.Heartbeat, 0, len(heartbeats))
|
|
|
|
for _, hb := range heartbeats {
|
2022-03-20 18:27:21 +03:00
|
|
|
if !hashes.Contain(hb.Hash) {
|
2023-07-09 11:28:23 +03:00
|
|
|
hb = hb.Sanitize()
|
2021-03-05 23:39:21 +03:00
|
|
|
filteredHeartbeats = append(filteredHeartbeats, hb)
|
2022-03-20 18:27:21 +03:00
|
|
|
hashes.Add(hb.Hash)
|
2021-03-05 23:39:21 +03:00
|
|
|
}
|
2021-10-14 11:22:59 +03:00
|
|
|
go srv.updateEntityUserCacheByHeartbeat(hb)
|
2021-03-05 23:39:21 +03:00
|
|
|
}
|
|
|
|
|
2021-06-27 12:33:14 +03:00
|
|
|
err := srv.repository.InsertBatch(filteredHeartbeats)
|
|
|
|
if err == nil {
|
|
|
|
go srv.notifyBatch(filteredHeartbeats)
|
|
|
|
}
|
|
|
|
return err
|
2019-05-05 23:36:49 +03:00
|
|
|
}
|
2019-05-09 01:07:38 +03:00
|
|
|
|
2022-03-19 12:30:32 +03:00
|
|
|
func (srv *HeartbeatService) Count(approximate bool) (int64, error) {
|
2021-06-27 12:33:14 +03:00
|
|
|
result, ok := srv.cache.Get(srv.countTotalCacheKey())
|
|
|
|
if ok {
|
|
|
|
return result.(int64), nil
|
|
|
|
}
|
2022-03-19 12:30:32 +03:00
|
|
|
count, err := srv.repository.Count(approximate)
|
2021-06-27 12:33:14 +03:00
|
|
|
if err == nil {
|
2021-06-27 13:08:11 +03:00
|
|
|
srv.cache.Set(srv.countTotalCacheKey(), count, srv.countCacheTtl())
|
2021-06-27 12:33:14 +03:00
|
|
|
}
|
|
|
|
return count, err
|
2021-02-12 20:37:30 +03:00
|
|
|
}
|
|
|
|
|
2021-02-05 20:47:28 +03:00
|
|
|
func (srv *HeartbeatService) CountByUser(user *models.User) (int64, error) {
|
2021-06-27 12:33:14 +03:00
|
|
|
key := srv.countByUserCacheKey(user.ID)
|
|
|
|
result, ok := srv.cache.Get(key)
|
|
|
|
if ok {
|
|
|
|
return result.(int64), nil
|
|
|
|
}
|
|
|
|
count, err := srv.repository.CountByUser(user)
|
|
|
|
if err == nil {
|
2021-06-27 13:08:11 +03:00
|
|
|
srv.cache.Set(key, count, srv.countCacheTtl())
|
2021-06-27 12:33:14 +03:00
|
|
|
}
|
|
|
|
return count, err
|
2021-02-05 20:47:28 +03:00
|
|
|
}
|
|
|
|
|
2021-02-13 13:23:58 +03:00
|
|
|
func (srv *HeartbeatService) CountByUsers(users []*models.User) ([]*models.CountByUser, error) {
|
2021-06-27 12:33:14 +03:00
|
|
|
missingUsers := make([]*models.User, 0, len(users))
|
|
|
|
userCounts := make([]*models.CountByUser, 0, len(users))
|
|
|
|
|
|
|
|
for _, u := range users {
|
|
|
|
key := srv.countByUserCacheKey(u.ID)
|
|
|
|
result, ok := srv.cache.Get(key)
|
|
|
|
if ok {
|
|
|
|
userCounts = append(userCounts, &models.CountByUser{User: u.ID, Count: result.(int64)})
|
|
|
|
} else {
|
|
|
|
missingUsers = append(missingUsers, u)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
counts, err := srv.repository.CountByUsers(missingUsers)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, uc := range counts {
|
|
|
|
key := srv.countByUserCacheKey(uc.User)
|
2021-06-27 13:08:11 +03:00
|
|
|
srv.cache.Set(key, uc.Count, srv.countCacheTtl())
|
2021-06-27 12:33:14 +03:00
|
|
|
userCounts = append(userCounts, uc)
|
|
|
|
}
|
|
|
|
|
|
|
|
return userCounts, nil
|
2021-02-13 13:23:58 +03:00
|
|
|
}
|
|
|
|
|
2019-05-19 20:49:27 +03:00
|
|
|
func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Heartbeat, error) {
|
2020-11-01 22:14:10 +03:00
|
|
|
heartbeats, err := srv.repository.GetAllWithin(from, to, user)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return srv.augmented(heartbeats, user.ID)
|
2019-05-09 01:07:38 +03:00
|
|
|
}
|
2019-10-10 00:26:28 +03:00
|
|
|
|
2022-03-13 10:17:50 +03:00
|
|
|
func (srv *HeartbeatService) GetAllWithinByFilters(from, to time.Time, user *models.User, filters *models.Filters) ([]*models.Heartbeat, error) {
|
|
|
|
heartbeats, err := srv.repository.GetAllWithinByFilters(from, to, user, srv.filtersToColumnMap(filters))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return srv.augmented(heartbeats, user.ID)
|
|
|
|
}
|
|
|
|
|
2021-04-30 11:12:28 +03:00
|
|
|
func (srv *HeartbeatService) GetLatestByUser(user *models.User) (*models.Heartbeat, error) {
|
|
|
|
return srv.repository.GetLatestByUser(user)
|
|
|
|
}
|
|
|
|
|
2021-02-06 02:31:30 +03:00
|
|
|
func (srv *HeartbeatService) GetLatestByOriginAndUser(origin string, user *models.User) (*models.Heartbeat, error) {
|
|
|
|
return srv.repository.GetLatestByOriginAndUser(origin, user)
|
|
|
|
}
|
|
|
|
|
2020-11-07 14:01:35 +03:00
|
|
|
func (srv *HeartbeatService) GetFirstByUsers() ([]*models.TimeByUser, error) {
|
|
|
|
return srv.repository.GetFirstByUsers()
|
2019-10-10 00:26:28 +03:00
|
|
|
}
|
2020-03-09 19:30:23 +03:00
|
|
|
|
2021-05-01 14:52:03 +03:00
|
|
|
func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
|
|
|
|
cacheKey := srv.getEntityUserCacheKey(entityType, user)
|
2021-10-14 11:22:59 +03:00
|
|
|
if results, found := srv.cache.Get(cacheKey); found {
|
|
|
|
srv.entityCacheLock.RLock()
|
|
|
|
defer srv.entityCacheLock.RUnlock()
|
2022-03-20 18:27:21 +03:00
|
|
|
return results.(datastructure.Set[string]).Values(), nil
|
2021-05-01 14:52:03 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
results, err := srv.repository.GetEntitySetByUser(entityType, user)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2021-06-12 13:01:20 +03:00
|
|
|
|
|
|
|
filtered := make([]string, 0, len(results))
|
|
|
|
for _, r := range results {
|
|
|
|
if strings.TrimSpace(r) != "" {
|
|
|
|
filtered = append(filtered, r)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-03-20 18:27:21 +03:00
|
|
|
srv.cache.Set(cacheKey, datastructure.NewSet(filtered...), cache.NoExpiration)
|
2021-06-12 13:01:20 +03:00
|
|
|
return filtered, nil
|
2021-05-01 14:52:03 +03:00
|
|
|
}
|
|
|
|
|
2020-03-09 19:30:23 +03:00
|
|
|
func (srv *HeartbeatService) DeleteBefore(t time.Time) error {
|
2022-03-17 13:55:13 +03:00
|
|
|
go srv.cache.Flush()
|
2020-11-01 18:56:36 +03:00
|
|
|
return srv.repository.DeleteBefore(t)
|
2020-03-09 19:30:23 +03:00
|
|
|
}
|
|
|
|
|
2022-03-17 13:55:13 +03:00
|
|
|
func (srv *HeartbeatService) DeleteByUser(user *models.User) error {
|
|
|
|
go srv.cache.Flush()
|
|
|
|
return srv.repository.DeleteByUser(user)
|
|
|
|
}
|
|
|
|
|
2022-12-01 22:26:03 +03:00
|
|
|
func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error {
|
|
|
|
go srv.cache.Flush()
|
|
|
|
return srv.repository.DeleteByUserBefore(user, t)
|
|
|
|
}
|
|
|
|
|
2020-11-01 22:14:10 +03:00
|
|
|
func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) {
|
|
|
|
languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for i := range heartbeats {
|
|
|
|
heartbeats[i].Augment(languageMapping)
|
|
|
|
}
|
|
|
|
|
|
|
|
return heartbeats, nil
|
|
|
|
}
|
2021-05-01 14:52:03 +03:00
|
|
|
|
|
|
|
func (srv *HeartbeatService) getEntityUserCacheKey(entityType uint8, user *models.User) string {
|
|
|
|
return fmt.Sprintf("entity_set_%d_%s", entityType, user.ID)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (srv *HeartbeatService) updateEntityUserCache(entityType uint8, entityKey string, user *models.User) {
|
|
|
|
cacheKey := srv.getEntityUserCacheKey(entityType, user)
|
2021-10-14 11:22:59 +03:00
|
|
|
if entities, found := srv.cache.Get(cacheKey); found {
|
2022-03-20 18:27:21 +03:00
|
|
|
entitySet := entities.(datastructure.Set[string])
|
2021-10-14 11:22:59 +03:00
|
|
|
|
|
|
|
srv.entityCacheLock.Lock()
|
|
|
|
defer srv.entityCacheLock.Unlock()
|
|
|
|
|
2022-03-20 18:27:21 +03:00
|
|
|
if !entitySet.Contain(entityKey) {
|
|
|
|
entitySet.Add(entityKey)
|
2021-05-01 14:52:03 +03:00
|
|
|
// new project / language / ..., which is not yet present in cache, arrived as part of a heartbeats
|
2021-10-14 11:22:59 +03:00
|
|
|
// -> update cache instead of just invalidating it, because rebuilding is expensive here
|
|
|
|
srv.cache.Set(cacheKey, entitySet, cache.NoExpiration)
|
2021-05-01 14:52:03 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (srv *HeartbeatService) updateEntityUserCacheByHeartbeat(hb *models.Heartbeat) {
|
2021-10-14 11:22:59 +03:00
|
|
|
go srv.updateEntityUserCache(models.SummaryProject, hb.Project, hb.User)
|
|
|
|
go srv.updateEntityUserCache(models.SummaryLanguage, hb.Language, hb.User)
|
|
|
|
go srv.updateEntityUserCache(models.SummaryEditor, hb.Editor, hb.User)
|
|
|
|
go srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.User)
|
|
|
|
go srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.User)
|
2022-01-02 15:39:20 +03:00
|
|
|
go srv.updateEntityUserCache(models.SummaryBranch, hb.Branch, hb.User)
|
2023-03-22 22:45:27 +03:00
|
|
|
go srv.updateEntityUserCache(models.SummaryEntity, hb.Entity, hb.User)
|
2021-05-01 14:52:03 +03:00
|
|
|
}
|
2021-06-27 12:33:14 +03:00
|
|
|
|
|
|
|
func (srv *HeartbeatService) notifyBatch(heartbeats []*models.Heartbeat) {
|
|
|
|
for _, hb := range heartbeats {
|
|
|
|
srv.eventBus.Publish(hub.Message{
|
|
|
|
Name: config.EventHeartbeatCreate,
|
|
|
|
Fields: map[string]interface{}{config.FieldPayload: hb},
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (srv *HeartbeatService) countByUserCacheKey(userId string) string {
|
|
|
|
return fmt.Sprintf("%s--hearbeat-count", userId)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (srv *HeartbeatService) countTotalCacheKey() string {
|
|
|
|
return "heartbeat-count"
|
|
|
|
}
|
2021-06-27 13:08:11 +03:00
|
|
|
|
|
|
|
func (srv *HeartbeatService) countCacheTtl() time.Duration {
|
|
|
|
return time.Duration(srv.config.App.CountCacheTTLMin) * time.Minute
|
|
|
|
}
|
2022-03-13 10:17:50 +03:00
|
|
|
|
|
|
|
func (srv *HeartbeatService) filtersToColumnMap(filters *models.Filters) map[string][]string {
|
|
|
|
columnMap := map[string][]string{}
|
2022-05-17 00:23:18 +03:00
|
|
|
for _, t := range models.NativeSummaryTypes() {
|
2022-03-13 10:17:50 +03:00
|
|
|
f := filters.ResolveEntity(t)
|
|
|
|
if len(*f) > 0 {
|
|
|
|
columnMap[models.GetEntityColumn(t)] = *f
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return columnMap
|
|
|
|
}
|