2021-01-22 00:17:32 +03:00
package relay
import (
"bytes"
"encoding/base64"
2022-01-21 14:35:05 +03:00
"encoding/json"
"errors"
2021-01-22 00:17:32 +03:00
"fmt"
2021-01-30 13:17:37 +03:00
"github.com/emvi/logbuch"
2021-08-07 00:28:03 +03:00
"github.com/leandro-lugaresi/hub"
2021-01-22 00:17:32 +03:00
"github.com/muety/wakapi/config"
2021-03-26 15:10:10 +03:00
"github.com/muety/wakapi/middlewares"
2021-08-06 23:38:57 +03:00
"github.com/muety/wakapi/models"
2022-01-21 14:35:05 +03:00
routeutils "github.com/muety/wakapi/routes/utils"
2021-08-07 00:28:03 +03:00
"github.com/patrickmn/go-cache"
2021-01-22 00:17:32 +03:00
"io"
"io/ioutil"
"net/http"
"time"
)
2021-08-07 00:28:03 +03:00
const maxFailuresPerDay = 100
2022-01-21 14:35:05 +03:00
// WakatimeRelayMiddleware is a middleware to conditionally relay heartbeats to Wakatime (and other compatible services)
2021-01-22 00:17:32 +03:00
type WakatimeRelayMiddleware struct {
2021-08-07 00:28:03 +03:00
httpClient * http . Client
2022-01-21 14:35:05 +03:00
hashCache * cache . Cache
2021-08-07 00:28:03 +03:00
failureCache * cache . Cache
eventBus * hub . Hub
2021-01-22 00:17:32 +03:00
}
func NewWakatimeRelayMiddleware ( ) * WakatimeRelayMiddleware {
return & WakatimeRelayMiddleware {
httpClient : & http . Client {
Timeout : 10 * time . Second ,
} ,
2022-01-21 14:35:05 +03:00
hashCache : cache . New ( 10 * time . Minute , 10 * time . Minute ) ,
2021-08-07 00:28:03 +03:00
failureCache : cache . New ( 24 * time . Hour , 1 * time . Hour ) ,
eventBus : config . EventBus ( ) ,
2021-01-22 00:17:32 +03:00
}
}
func ( m * WakatimeRelayMiddleware ) Handler ( h http . Handler ) http . Handler {
return http . HandlerFunc ( func ( w http . ResponseWriter , r * http . Request ) {
m . ServeHTTP ( w , r , h . ServeHTTP )
} )
}
func ( m * WakatimeRelayMiddleware ) ServeHTTP ( w http . ResponseWriter , r * http . Request , next http . HandlerFunc ) {
defer next ( w , r )
2022-01-21 14:35:05 +03:00
ownInstanceId := config . Get ( ) . InstanceId
originInstanceId := r . Header . Get ( "X-Origin-Instance" )
if r . Method != http . MethodPost || originInstanceId == ownInstanceId {
2021-01-22 00:17:32 +03:00
return
}
2021-03-26 15:10:10 +03:00
user := middlewares . GetPrincipal ( r )
2021-01-22 00:17:32 +03:00
if user == nil || user . WakatimeApiKey == "" {
return
}
2022-01-21 14:35:05 +03:00
err := m . filterByCache ( r )
if err != nil {
logbuch . Warn ( "%v" , err )
return
}
2021-01-22 00:17:32 +03:00
body , _ := ioutil . ReadAll ( r . Body )
r . Body . Close ( )
r . Body = ioutil . NopCloser ( bytes . NewBuffer ( body ) )
2022-01-21 14:35:05 +03:00
// prevent cycles
downstreamInstanceId := ownInstanceId
if originInstanceId != "" {
downstreamInstanceId = originInstanceId
}
2021-01-22 00:17:32 +03:00
headers := http . Header {
"X-Machine-Name" : r . Header . Values ( "X-Machine-Name" ) ,
"Content-Type" : r . Header . Values ( "Content-Type" ) ,
"Accept" : r . Header . Values ( "Accept" ) ,
"User-Agent" : r . Header . Values ( "User-Agent" ) ,
"X-Origin" : [ ] string {
fmt . Sprintf ( "wakapi v%s" , config . Get ( ) . Version ) ,
} ,
2022-01-21 14:35:05 +03:00
"X-Origin-Instance" : [ ] string { downstreamInstanceId } ,
2021-01-22 00:17:32 +03:00
"Authorization" : [ ] string {
fmt . Sprintf ( "Basic %s" , base64 . StdEncoding . EncodeToString ( [ ] byte ( user . WakatimeApiKey ) ) ) ,
} ,
}
2022-01-21 14:35:05 +03:00
url := user . WakaTimeURL ( config . WakatimeApiUrl ) + config . WakatimeApiHeartbeatsBulkUrl
2021-01-22 00:17:32 +03:00
go m . send (
http . MethodPost ,
2022-01-21 14:35:05 +03:00
url ,
2021-01-22 00:17:32 +03:00
bytes . NewReader ( body ) ,
headers ,
2021-08-06 23:38:57 +03:00
user ,
2021-01-22 00:17:32 +03:00
)
}
2021-08-06 23:38:57 +03:00
func ( m * WakatimeRelayMiddleware ) send ( method , url string , body io . Reader , headers http . Header , forUser * models . User ) {
2021-01-22 00:17:32 +03:00
request , err := http . NewRequest ( method , url , body )
if err != nil {
2022-02-17 14:20:22 +03:00
logbuch . Warn ( "error constructing relayed request - %v" , err )
2021-01-24 23:39:35 +03:00
return
2021-01-22 00:17:32 +03:00
}
for k , v := range headers {
for _ , h := range v {
request . Header . Set ( k , h )
}
}
response , err := m . httpClient . Do ( request )
if err != nil {
2022-02-17 14:20:22 +03:00
logbuch . Warn ( "error executing relayed request - %v" , err )
2021-01-24 23:39:35 +03:00
return
2021-01-22 00:17:32 +03:00
}
if response . StatusCode < 200 || response . StatusCode >= 300 {
2021-08-06 23:38:57 +03:00
logbuch . Warn ( "failed to relay request for user %s, got status %d" , forUser . ID , response . StatusCode )
2021-08-07 00:28:03 +03:00
// TODO: use leaky bucket instead of expiring cache?
if _ , found := m . failureCache . Get ( forUser . ID ) ; ! found {
m . failureCache . SetDefault ( forUser . ID , 0 )
}
if n , _ := m . failureCache . IncrementInt ( forUser . ID , 1 ) ; n == maxFailuresPerDay {
m . eventBus . Publish ( hub . Message {
Name : config . EventWakatimeFailure ,
Fields : map [ string ] interface { } { config . FieldUser : forUser , config . FieldPayload : n } ,
} )
} else if n % 10 == 0 {
logbuch . Warn ( "%d / %d failed wakatime heartbeat relaying attempts for user %s within last 24 hours" , n , maxFailuresPerDay , forUser . ID )
}
2021-01-22 00:17:32 +03:00
}
}
2022-01-21 14:35:05 +03:00
// filterByCache takes an HTTP request, tries to parse the body contents as heartbeats, checks against a local cache for whether a heartbeat has already been relayed before according to its hash and in-place filters these from the request's raw json body.
// This method operates on the raw body data (interface{}), because serialization of models.Heartbeat is not necessarily identical to what the CLI has actually sent.
// Purpose of this mechanism is mainly to prevent cyclic relays / loops.
// Caution: this method does in-place changes to the request.
func ( m * WakatimeRelayMiddleware ) filterByCache ( r * http . Request ) error {
heartbeats , err := routeutils . ParseHeartbeats ( r )
if err != nil {
return err
}
body , _ := ioutil . ReadAll ( r . Body )
r . Body . Close ( )
r . Body = ioutil . NopCloser ( bytes . NewBuffer ( body ) )
var rawData interface { }
if err := json . NewDecoder ( ioutil . NopCloser ( bytes . NewBuffer ( body ) ) ) . Decode ( & rawData ) ; err != nil {
return err
}
newData := make ( [ ] interface { } , 0 , len ( heartbeats ) )
2023-01-02 13:33:47 +03:00
process := func ( heartbeat * models . Heartbeat , rawData interface { } ) {
heartbeat = heartbeat . Hashed ( )
2022-01-21 14:35:05 +03:00
// we didn't see this particular heartbeat before
2023-01-02 13:33:47 +03:00
if _ , found := m . hashCache . Get ( heartbeat . Hash ) ; ! found {
m . hashCache . SetDefault ( heartbeat . Hash , true )
newData = append ( newData , rawData )
}
}
if _ , isList := rawData . ( [ ] interface { } ) ; isList {
for i , hb := range heartbeats {
process ( hb , rawData . ( [ ] interface { } ) [ i ] )
2022-01-21 14:35:05 +03:00
}
2023-01-02 13:33:47 +03:00
} else if len ( heartbeats ) > 0 {
process ( heartbeats [ 0 ] , rawData . ( interface { } ) )
2022-01-21 14:35:05 +03:00
}
if len ( newData ) == 0 {
return errors . New ( "no new heartbeats to relay" )
}
if len ( newData ) != len ( heartbeats ) {
user := middlewares . GetPrincipal ( r )
logbuch . Warn ( "only relaying %d of %d heartbeats for user %s" , len ( newData ) , len ( heartbeats ) , user . ID )
}
buf := bytes . Buffer { }
if err := json . NewEncoder ( & buf ) . Encode ( newData ) ; err != nil {
return err
}
r . Body = ioutil . NopCloser ( & buf )
return nil
}