1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

chore: enhanced caching for user entity sets (resolve #264)

This commit is contained in:
Ferdinand Mütsch
2021-10-14 10:22:59 +02:00
parent 5fc87dd143
commit 7cae3c43d0
3 changed files with 453 additions and 434 deletions

View File

@ -8,6 +8,7 @@ import (
"github.com/muety/wakapi/utils"
"github.com/patrickmn/go-cache"
"strings"
"sync"
"time"
"github.com/muety/wakapi/models"
@ -16,20 +17,20 @@ import (
type HeartbeatService struct {
config *config.Config
cache *cache.Cache
cache2 *cache.Cache
eventBus *hub.Hub
repository repositories.IHeartbeatRepository
languageMappingSrvc ILanguageMappingService
entityCacheLock *sync.RWMutex
}
func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, languageMappingService ILanguageMappingService) *HeartbeatService {
srv := &HeartbeatService{
config: config.Get(),
cache: cache.New(24*time.Hour, 24*time.Hour),
cache2: cache.New(cache.NoExpiration, cache.NoExpiration),
eventBus: config.EventBus(),
repository: heartbeatRepo,
languageMappingSrvc: languageMappingService,
entityCacheLock: &sync.RWMutex{},
}
// using event hub is an unnecessary indirection here, however, we might
@ -48,7 +49,7 @@ func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, langua
}
func (srv *HeartbeatService) Insert(heartbeat *models.Heartbeat) error {
srv.updateEntityUserCacheByHeartbeat(heartbeat)
go srv.updateEntityUserCacheByHeartbeat(heartbeat)
return srv.repository.InsertBatch([]*models.Heartbeat{heartbeat})
}
@ -62,7 +63,7 @@ func (srv *HeartbeatService) InsertBatch(heartbeats []*models.Heartbeat) error {
filteredHeartbeats = append(filteredHeartbeats, hb)
hashes[hb.Hash] = true
}
srv.updateEntityUserCacheByHeartbeat(hb)
go srv.updateEntityUserCacheByHeartbeat(hb)
}
err := srv.repository.InsertBatch(filteredHeartbeats)
@ -147,7 +148,9 @@ func (srv *HeartbeatService) GetFirstByUsers() ([]*models.TimeByUser, error) {
func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, user *models.User) ([]string, error) {
cacheKey := srv.getEntityUserCacheKey(entityType, user)
if results, found := srv.cache2.Get(cacheKey); found {
if results, found := srv.cache.Get(cacheKey); found {
srv.entityCacheLock.RLock()
defer srv.entityCacheLock.RUnlock()
return utils.SetToStrings(results.(map[string]bool)), nil
}
@ -163,7 +166,7 @@ func (srv *HeartbeatService) GetEntitySetByUser(entityType uint8, user *models.U
}
}
srv.cache2.Set(cacheKey, utils.StringsToSet(filtered), cache.DefaultExpiration)
srv.cache.Set(cacheKey, utils.StringsToSet(filtered), cache.NoExpiration)
return filtered, nil
}
@ -190,21 +193,27 @@ func (srv *HeartbeatService) getEntityUserCacheKey(entityType uint8, user *model
func (srv *HeartbeatService) updateEntityUserCache(entityType uint8, entityKey string, user *models.User) {
cacheKey := srv.getEntityUserCacheKey(entityType, user)
if entities, found := srv.cache2.Get(cacheKey); found {
if _, ok := entities.(map[string]bool)[entityKey]; !ok {
if entities, found := srv.cache.Get(cacheKey); found {
entitySet := entities.(map[string]bool)
srv.entityCacheLock.Lock()
defer srv.entityCacheLock.Unlock()
if _, ok := entitySet[entityKey]; !ok {
entitySet[entityKey] = true
// new project / language / ..., which is not yet present in cache, arrived as part of a heartbeats
// -> invalidate cache
srv.cache2.Delete(cacheKey)
// -> update cache instead of just invalidating it, because rebuilding is expensive here
srv.cache.Set(cacheKey, entitySet, cache.NoExpiration)
}
}
}
func (srv *HeartbeatService) updateEntityUserCacheByHeartbeat(hb *models.Heartbeat) {
srv.updateEntityUserCache(models.SummaryProject, hb.Project, hb.User)
srv.updateEntityUserCache(models.SummaryLanguage, hb.Language, hb.User)
srv.updateEntityUserCache(models.SummaryEditor, hb.Editor, hb.User)
srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.User)
srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.User)
go srv.updateEntityUserCache(models.SummaryProject, hb.Project, hb.User)
go srv.updateEntityUserCache(models.SummaryLanguage, hb.Language, hb.User)
go srv.updateEntityUserCache(models.SummaryEditor, hb.Editor, hb.User)
go srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.User)
go srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.User)
}
func (srv *HeartbeatService) notifyBatch(heartbeats []*models.Heartbeat) {