package services import ( "fmt" datastructure "github.com/duke-git/lancet/v2/datastructure/set" "github.com/leandro-lugaresi/hub" "github.com/muety/wakapi/config" "github.com/muety/wakapi/repositories" "github.com/patrickmn/go-cache" "strings" "sync" "time" "github.com/muety/wakapi/models" ) type HeartbeatService struct { config *config.Config cache *cache.Cache eventBus *hub.Hub repository repositories.IHeartbeatRepository languageMappingSrvc ILanguageMappingService entityCacheLock *sync.RWMutex } func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, languageMappingService ILanguageMappingService) *HeartbeatService { srv := &HeartbeatService{ config: config.Get(), cache: cache.New(24*time.Hour, 24*time.Hour), eventBus: config.EventBus(), repository: heartbeatRepo, languageMappingSrvc: languageMappingService, entityCacheLock: &sync.RWMutex{}, } // using event hub is an unnecessary indirection here, however, we might // potentially need heartbeat events elsewhere throughout the application some time // so it's more consistent to already have it this way sub1 := srv.eventBus.Subscribe(0, config.EventHeartbeatCreate) go func(sub *hub.Subscription) { for m := range sub.Receiver { heartbeat := m.Fields[config.FieldPayload].(*models.Heartbeat) srv.cache.IncrementInt64(srv.countByUserCacheKey(heartbeat.UserID), 1) // increment doesn't update expiration time srv.cache.IncrementInt64(srv.countTotalCacheKey(), 1) } }(&sub1) return srv } func (srv *HeartbeatService) Insert(heartbeat *models.Heartbeat) error { go srv.updateEntityUserCacheByHeartbeat(heartbeat) return srv.repository.InsertBatch([]*models.Heartbeat{heartbeat}) } func (srv *HeartbeatService) InsertBatch(heartbeats []*models.Heartbeat) error { if len(heartbeats) == 0 { return nil } hashes := datastructure.NewSet[string]() // https://github.com/muety/wakapi/issues/139 filteredHeartbeats := make([]*models.Heartbeat, 0, len(heartbeats)) for _, hb := range heartbeats { if !hashes.Contain(hb.Hash) { hb = hb.Sanitize() filteredHeartbeats = append(filteredHeartbeats, hb) hashes.Add(hb.Hash) } go srv.updateEntityUserCacheByHeartbeat(hb) } err := srv.repository.InsertBatch(filteredHeartbeats) if err == nil { go srv.notifyBatch(filteredHeartbeats) } return err } func (srv *HeartbeatService) Count(approximate bool) (int64, error) { result, ok := srv.cache.Get(srv.countTotalCacheKey()) if ok { return result.(int64), nil } count, err := srv.repository.Count(approximate) if err == nil { srv.cache.Set(srv.countTotalCacheKey(), count, srv.countCacheTtl()) } return count, err } func (srv *HeartbeatService) CountByUser(user *models.User) (int64, error) { key := srv.countByUserCacheKey(user.ID) result, ok := srv.cache.Get(key) if ok { return result.(int64), nil } count, err := srv.repository.CountByUser(user) if err == nil { srv.cache.Set(key, count, srv.countCacheTtl()) } return count, err } func (srv *HeartbeatService) CountByUsers(users []*models.User) ([]*models.CountByUser, error) { missingUsers := make([]*models.User, 0, len(users)) userCounts := make([]*models.CountByUser, 0, len(users)) for _, u := range users { key := srv.countByUserCacheKey(u.ID) result, ok := srv.cache.Get(key) if ok { userCounts = append(userCounts, &models.CountByUser{User: u.ID, Count: result.(int64)}) } else { missingUsers = append(missingUsers, u) } } counts, err := srv.repository.CountByUsers(missingUsers) if err != nil { return nil, err } for _, uc := range counts { key := srv.countByUserCacheKey(uc.User) srv.cache.Set(key, uc.Count, srv.countCacheTtl()) userCounts = append(userCounts, uc) } return userCounts, nil } func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Heartbeat, error) { heartbeats, err := srv.repository.GetAllWithin(from, to, user) if err != nil { return nil, err } return srv.augmented(heartbeats, user.ID) } func (srv *HeartbeatService) GetAllWithinByFilters(from, to time.Time, user *models.User, filters *models.Filters) ([]*models.Heartbeat, error) { heartbeats, err := srv.repository.GetAllWithinByFilters(from, to, user, srv.filtersToColumnMap(filters)) if err != nil { return nil, err } return srv.augmented(heartbeats, user.ID) } func (srv *HeartbeatService) GetLatestByUser(user *models.User) (*models.Heartbeat, error) { return srv.repository.GetLatestByUser(user) } func (srv *HeartbeatService) GetLatestByOriginAndUser(origin string, user *models.User) (*models.Heartbeat, error) { return srv.repository.GetLatestByOriginAndUser(origin, user) } func (srv *HeartbeatService) GetFirstByUsers() ([]*models.TimeByUser, error) { return srv.repository.GetFirstByUsers() } func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) { cacheKey := srv.getEntityUserCacheKey(entityType, user) if results, found := srv.cache.Get(cacheKey); found { srv.entityCacheLock.RLock() defer srv.entityCacheLock.RUnlock() return results.(datastructure.Set[string]).Values(), nil } results, err := srv.repository.GetEntitySetByUser(entityType, user) if err != nil { return nil, err } filtered := make([]string, 0, len(results)) for _, r := range results { if strings.TrimSpace(r) != "" { filtered = append(filtered, r) } } srv.cache.Set(cacheKey, datastructure.NewSet(filtered...), cache.NoExpiration) return filtered, nil } func (srv *HeartbeatService) DeleteBefore(t time.Time) error { go srv.cache.Flush() return srv.repository.DeleteBefore(t) } func (srv *HeartbeatService) DeleteByUser(user *models.User) error { go srv.cache.Flush() return srv.repository.DeleteByUser(user) } func (srv *HeartbeatService) DeleteByUserBefore(user *models.User, t time.Time) error { go srv.cache.Flush() return srv.repository.DeleteByUserBefore(user, t) } func (srv *HeartbeatService) augmented(heartbeats []*models.Heartbeat, userId string) ([]*models.Heartbeat, error) { languageMapping, err := srv.languageMappingSrvc.ResolveByUser(userId) if err != nil { return nil, err } for i := range heartbeats { heartbeats[i].Augment(languageMapping) } return heartbeats, nil } func (srv *HeartbeatService) getEntityUserCacheKey(entityType uint8, user *models.User) string { return fmt.Sprintf("entity_set_%d_%s", entityType, user.ID) } func (srv *HeartbeatService) updateEntityUserCache(entityType uint8, entityKey string, user *models.User) { cacheKey := srv.getEntityUserCacheKey(entityType, user) if entities, found := srv.cache.Get(cacheKey); found { entitySet := entities.(datastructure.Set[string]) srv.entityCacheLock.Lock() defer srv.entityCacheLock.Unlock() if !entitySet.Contain(entityKey) { entitySet.Add(entityKey) // new project / language / ..., which is not yet present in cache, arrived as part of a heartbeats // -> update cache instead of just invalidating it, because rebuilding is expensive here srv.cache.Set(cacheKey, entitySet, cache.NoExpiration) } } } func (srv *HeartbeatService) updateEntityUserCacheByHeartbeat(hb *models.Heartbeat) { go srv.updateEntityUserCache(models.SummaryProject, hb.Project, hb.User) go srv.updateEntityUserCache(models.SummaryLanguage, hb.Language, hb.User) go srv.updateEntityUserCache(models.SummaryEditor, hb.Editor, hb.User) go srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.User) go srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.User) go srv.updateEntityUserCache(models.SummaryBranch, hb.Branch, hb.User) go srv.updateEntityUserCache(models.SummaryEntity, hb.Entity, hb.User) } func (srv *HeartbeatService) notifyBatch(heartbeats []*models.Heartbeat) { for _, hb := range heartbeats { srv.eventBus.Publish(hub.Message{ Name: config.EventHeartbeatCreate, Fields: map[string]interface{}{config.FieldPayload: hb}, }) } } func (srv *HeartbeatService) countByUserCacheKey(userId string) string { return fmt.Sprintf("%s--hearbeat-count", userId) } func (srv *HeartbeatService) countTotalCacheKey() string { return "heartbeat-count" } func (srv *HeartbeatService) countCacheTtl() time.Duration { return time.Duration(srv.config.App.CountCacheTTLMin) * time.Minute } func (srv *HeartbeatService) filtersToColumnMap(filters *models.Filters) map[string][]string { columnMap := map[string][]string{} for _, t := range models.NativeSummaryTypes() { f := filters.ResolveEntity(t) if len(*f) > 0 { columnMap[models.GetEntityColumn(t)] = *f } } return columnMap }