1
0
mirror of https://github.com/muety/wakapi.git synced 2023-08-10 21:12:56 +03:00

chore: further optimizations and caching to speed up metrics endpoint (resolve #215)

This commit is contained in:
Ferdinand Mütsch 2021-06-27 11:33:14 +02:00
parent 407925ec53
commit 181aefa2f9
2 changed files with 94 additions and 8 deletions

View File

@ -8,9 +8,11 @@ type ApplicationEvent struct {
}
const (
TopicUser = "user.*"
EventUserUpdate = "user.update"
FieldPayload = "payload"
TopicUser = "user.*"
TopicHeartbeat = "heartbeat.*"
EventUserUpdate = "user.update"
EventHeartbeatCreate = "heartbeat.create"
FieldPayload = "payload"
)
var eventHub *hub.Hub

View File

@ -2,6 +2,7 @@ package services
import (
"fmt"
"github.com/leandro-lugaresi/hub"
"github.com/muety/wakapi/config"
"github.com/muety/wakapi/repositories"
"github.com/muety/wakapi/utils"
@ -12,20 +13,41 @@ import (
"github.com/muety/wakapi/models"
)
const (
// re-read heartbeat counters from database every 10 min, in between, rely on local state
countersTtl = 10 * time.Minute
)
type HeartbeatService struct {
config *config.Config
cache *cache.Cache
eventBus *hub.Hub
repository repositories.IHeartbeatRepository
languageMappingSrvc ILanguageMappingService
}
func NewHeartbeatService(heartbeatRepo repositories.IHeartbeatRepository, languageMappingService ILanguageMappingService) *HeartbeatService {
return &HeartbeatService{
srv := &HeartbeatService{
config: config.Get(),
cache: cache.New(24*time.Hour, 24*time.Hour),
eventBus: config.EventBus(),
repository: heartbeatRepo,
languageMappingSrvc: languageMappingService,
}
// using event hub is an unnecessary indirection here, however, we might
// potentially need heartbeat events elsewhere throughout the application some time
// so it's more consistent to already have it this way
sub1 := srv.eventBus.Subscribe(0, config.EventHeartbeatCreate)
go func(sub *hub.Subscription) {
for m := range sub.Receiver {
heartbeat := m.Fields[config.FieldPayload].(*models.Heartbeat)
srv.cache.IncrementInt64(srv.countByUserCacheKey(heartbeat.UserID), 1) // increment doesn't update expiration time
srv.cache.IncrementInt64(srv.countTotalCacheKey(), 1)
}
}(&sub1)
return srv
}
func (srv *HeartbeatService) Insert(heartbeat *models.Heartbeat) error {
@ -46,19 +68,64 @@ func (srv *HeartbeatService) InsertBatch(heartbeats []*models.Heartbeat) error {
srv.updateEntityUserCacheByHeartbeat(hb)
}
return srv.repository.InsertBatch(filteredHeartbeats)
err := srv.repository.InsertBatch(filteredHeartbeats)
if err == nil {
go srv.notifyBatch(filteredHeartbeats)
}
return err
}
func (srv *HeartbeatService) Count() (int64, error) {
return srv.repository.Count()
result, ok := srv.cache.Get(srv.countTotalCacheKey())
if ok {
return result.(int64), nil
}
count, err := srv.repository.Count()
if err == nil {
srv.cache.Set(srv.countTotalCacheKey(), count, countersTtl)
}
return count, err
}
func (srv *HeartbeatService) CountByUser(user *models.User) (int64, error) {
return srv.repository.CountByUser(user)
key := srv.countByUserCacheKey(user.ID)
result, ok := srv.cache.Get(key)
if ok {
return result.(int64), nil
}
count, err := srv.repository.CountByUser(user)
if err == nil {
srv.cache.Set(key, count, countersTtl)
}
return count, err
}
func (srv *HeartbeatService) CountByUsers(users []*models.User) ([]*models.CountByUser, error) {
return srv.repository.CountByUsers(users)
missingUsers := make([]*models.User, 0, len(users))
userCounts := make([]*models.CountByUser, 0, len(users))
for _, u := range users {
key := srv.countByUserCacheKey(u.ID)
result, ok := srv.cache.Get(key)
if ok {
userCounts = append(userCounts, &models.CountByUser{User: u.ID, Count: result.(int64)})
} else {
missingUsers = append(missingUsers, u)
}
}
counts, err := srv.repository.CountByUsers(missingUsers)
if err != nil {
return nil, err
}
for _, uc := range counts {
key := srv.countByUserCacheKey(uc.User)
srv.cache.Set(key, uc.Count, countersTtl)
userCounts = append(userCounts, uc)
}
return userCounts, nil
}
func (srv *HeartbeatService) GetAllWithin(from, to time.Time, user *models.User) ([]*models.Heartbeat, error) {
@ -142,3 +209,20 @@ func (srv *HeartbeatService) updateEntityUserCacheByHeartbeat(hb *models.Heartbe
srv.updateEntityUserCache(models.SummaryOS, hb.OperatingSystem, hb.User)
srv.updateEntityUserCache(models.SummaryMachine, hb.Machine, hb.User)
}
func (srv *HeartbeatService) notifyBatch(heartbeats []*models.Heartbeat) {
for _, hb := range heartbeats {
srv.eventBus.Publish(hub.Message{
Name: config.EventHeartbeatCreate,
Fields: map[string]interface{}{config.FieldPayload: hb},
})
}
}
func (srv *HeartbeatService) countByUserCacheKey(userId string) string {
return fmt.Sprintf("%s--hearbeat-count", userId)
}
func (srv *HeartbeatService) countTotalCacheKey() string {
return "heartbeat-count"
}