2022-11-20 00:21:51 +03:00
|
|
|
package config
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2022-11-20 12:10:24 +03:00
|
|
|
"math"
|
|
|
|
"runtime"
|
2023-04-03 15:40:57 +03:00
|
|
|
|
|
|
|
"github.com/emvi/logbuch"
|
|
|
|
"github.com/muety/artifex/v2"
|
2022-11-20 00:21:51 +03:00
|
|
|
)
|
|
|
|
|
2022-11-20 12:59:06 +03:00
|
|
|
var jobQueues map[string]*artifex.Dispatcher
|
|
|
|
var jobCounts map[string]int
|
2022-11-20 00:21:51 +03:00
|
|
|
|
|
|
|
const (
|
2022-12-01 22:26:03 +03:00
|
|
|
QueueDefault = "wakapi.default"
|
|
|
|
QueueProcessing = "wakapi.processing"
|
|
|
|
QueueReports = "wakapi.reports"
|
2022-12-29 19:12:34 +03:00
|
|
|
QueueMails = "wakapi.mail"
|
2022-12-01 22:26:03 +03:00
|
|
|
QueueImports = "wakapi.imports"
|
|
|
|
QueueHousekeeping = "wakapi.housekeeping"
|
2022-11-20 00:21:51 +03:00
|
|
|
)
|
|
|
|
|
2022-11-20 12:59:06 +03:00
|
|
|
type JobQueueMetrics struct {
|
|
|
|
Queue string
|
|
|
|
EnqueuedJobs int
|
|
|
|
FinishedJobs int
|
|
|
|
}
|
|
|
|
|
2022-11-20 00:21:51 +03:00
|
|
|
func init() {
|
2022-11-20 12:59:06 +03:00
|
|
|
jobQueues = make(map[string]*artifex.Dispatcher)
|
2023-04-03 15:40:57 +03:00
|
|
|
}
|
2022-11-20 12:10:24 +03:00
|
|
|
|
2023-04-03 15:40:57 +03:00
|
|
|
func StartJobs() {
|
2022-11-20 12:10:24 +03:00
|
|
|
InitQueue(QueueDefault, 1)
|
2022-12-01 22:26:03 +03:00
|
|
|
InitQueue(QueueProcessing, halfCPUs())
|
2022-11-20 12:10:24 +03:00
|
|
|
InitQueue(QueueReports, 1)
|
2022-12-29 19:12:34 +03:00
|
|
|
InitQueue(QueueMails, 1)
|
2022-11-20 13:09:51 +03:00
|
|
|
InitQueue(QueueImports, 1)
|
2022-12-01 22:26:03 +03:00
|
|
|
InitQueue(QueueHousekeeping, halfCPUs())
|
2022-11-20 00:21:51 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func InitQueue(name string, workers int) error {
|
2022-11-20 12:59:06 +03:00
|
|
|
if _, ok := jobQueues[name]; ok {
|
2022-11-20 00:21:51 +03:00
|
|
|
return fmt.Errorf("queue '%s' already existing", name)
|
|
|
|
}
|
2022-11-20 12:10:24 +03:00
|
|
|
logbuch.Info("creating job queue '%s' (%d workers)", name, workers)
|
2022-11-20 12:59:06 +03:00
|
|
|
jobQueues[name] = artifex.NewDispatcher(workers, 4096)
|
|
|
|
jobQueues[name].Start()
|
2022-11-20 00:21:51 +03:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetDefaultQueue() *artifex.Dispatcher {
|
2022-11-20 12:59:06 +03:00
|
|
|
return GetQueue(QueueDefault)
|
2022-11-20 00:21:51 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func GetQueue(name string) *artifex.Dispatcher {
|
2022-11-20 12:59:06 +03:00
|
|
|
if _, ok := jobQueues[name]; !ok {
|
2022-11-20 00:21:51 +03:00
|
|
|
InitQueue(name, 1)
|
|
|
|
}
|
2022-11-20 12:59:06 +03:00
|
|
|
return jobQueues[name]
|
|
|
|
}
|
|
|
|
|
|
|
|
func GetQueueMetrics() []*JobQueueMetrics {
|
|
|
|
metrics := make([]*JobQueueMetrics, 0, len(jobQueues))
|
|
|
|
for name, queue := range jobQueues {
|
|
|
|
metrics = append(metrics, &JobQueueMetrics{
|
|
|
|
Queue: name,
|
|
|
|
EnqueuedJobs: queue.CountEnqueued(),
|
|
|
|
FinishedJobs: queue.CountDispatched(),
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return metrics
|
2022-11-20 00:21:51 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func CloseQueues() {
|
2022-11-20 12:59:06 +03:00
|
|
|
for _, q := range jobQueues {
|
2022-11-20 00:21:51 +03:00
|
|
|
q.Stop()
|
|
|
|
}
|
|
|
|
}
|
2022-12-01 22:26:03 +03:00
|
|
|
|
|
|
|
func allCPUs() int {
|
|
|
|
return runtime.NumCPU()
|
|
|
|
}
|
|
|
|
|
|
|
|
func halfCPUs() int {
|
|
|
|
return int(math.Ceil(float64(runtime.NumCPU()) / 2.0))
|
|
|
|
}
|