Skip to content

Commit

Permalink
GROW-519 Update the relay latency calculation, Add guard shouldCountM…
Browse files Browse the repository at this point in the history
…etric (#301)

* GROW-519 update the relay latency to count only success relays, Add a guard when storing the metrics to explude our internal tests.

* GROW-519 Update the relay latency calculation, Add guard shouldCountMetric for old portal and new one

* fix tests

* make latency uint in relays DTO

* refactoring the relays according to pr comments

* fix http bug

* fix lint issues
  • Loading branch information
tabakuj authored Feb 16, 2023
1 parent 9fdde19 commit 13c24ee
Show file tree
Hide file tree
Showing 16 changed files with 180 additions and 55 deletions.
6 changes: 5 additions & 1 deletion protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
common "github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
spectypes "github.com/lavanet/lava/x/spec/types"
Expand Down Expand Up @@ -59,9 +60,12 @@ func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) {
return dappID
}

func constructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler, isMetricEnabled bool) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
if isMetricEnabled {
c.Locals(common.RefererHeaderKey, c.Get(common.RefererHeaderKey, ""))
}
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestConstructFiberCallbackWithDappIDExtraction(t *testing.T) {
return nil
}

handler := constructFiberCallbackWithDappIDExtraction(callbackToBeCalled)
handler := constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled, false)
ctx := &fiber.Ctx{}

err := handler(ctx)
Expand Down
8 changes: 6 additions & 2 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/metadata"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -196,11 +197,14 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
msgSeed := apil.logger.GetMessageSeed()
metadataValues, _ := metadata.FromIncomingContext(ctx)
utils.LavaFormatInfo("GRPC Got Relay: "+method, nil)
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", apil.endpoint.ChainID, apiInterface)
if relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go apil.logger.AddMetric(metricsData, err != nil)
relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData)
go apil.logger.AddMetricForGrpc(metricsData, err, &metadataValues)

if err != nil {
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
apil.logger.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err, c)

if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -271,7 +272,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, apil.logger.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -283,7 +284,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
9 changes: 5 additions & 4 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
// TODO: handle contentType, in case its not application/json currently we set it to application/json in the Send() method
// contentType := string(c.Context().Request.Header.ContentType())
dappID := extractDappIDFromFiberContext(c)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
utils.LavaFormatInfo("in <<<", &map[string]string{"path": path, "dappID": dappID, "msgSeed": msgSeed})
requestBody := string(c.Body())
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, analytics)
go apil.logger.AddMetricForHttp(analytics, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)

reply, _, err := apil.relaySender.SendRelay(ctx, path, query, http.MethodGet, dappID, analytics)
go apil.logger.AddMetric(analytics, err != nil)
go apil.logger.AddMetricForHttp(analytics, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
10 changes: 6 additions & 4 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err, c)
if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, "tendermint")
continue
Expand Down Expand Up @@ -314,7 +314,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, apil.logger.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -325,7 +325,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -359,7 +360,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("urirpc in <<<", &map[string]string{"seed": msgSeed, "msg": path, "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, path+query, "", http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
64 changes: 57 additions & 7 deletions protocol/common/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
Expand All @@ -13,16 +14,21 @@ import (
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
"github.com/newrelic/go-agent/v3/newrelic"
"google.golang.org/grpc/metadata"
)

var ReturnMaskedErrors = "false"

const webSocketCloseMessage = "websocket: close 1005 (no status)"
const (
webSocketCloseMessage = "websocket: close 1005 (no status)"
RefererHeaderKey = "Referer"
)

type RPCConsumerLogs struct {
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
excludeMetricsReferrers string
}

func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
Expand Down Expand Up @@ -60,6 +66,7 @@ func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
if isMetricEnabled {
portal.StoreMetricData = true
portal.MetricService = metrics.NewMetricService()
portal.excludeMetricsReferrers = os.Getenv("TO_EXCLUDE_METRICS_REFERRERS")
}
return portal, err
}
Expand Down Expand Up @@ -122,9 +129,52 @@ func (pl *RPCConsumerLogs) LogStartTransaction(name string) {
}
}

func (pl *RPCConsumerLogs) AddMetric(data *metrics.RelayMetrics, isNotSuccessful bool) {
if pl.StoreMetricData {
data.Success = !isNotSuccessful
func (pl *RPCConsumerLogs) AddMetricForHttp(data *metrics.RelayMetrics, err error, headers map[string]string) {
if pl.StoreMetricData && pl.shouldCountMetricForHttp(headers) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForWebSocket(data *metrics.RelayMetrics, err error, c *websocket.Conn) {
if pl.StoreMetricData && pl.shouldCountMetricForWebSocket(c) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForGrpc(data *metrics.RelayMetrics, err error, metadataValues *metadata.MD) {
if pl.StoreMetricData && pl.shouldCountMetricForGrpc(metadataValues) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) shouldCountMetricForHttp(headers map[string]string) bool {
refererHeaderValue := headers[RefererHeaderKey]
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForWebSocket(c *websocket.Conn) bool {
refererHeaderValue, isHeaderFound := c.Locals(RefererHeaderKey).(string)
if !isHeaderFound {
return true
}
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForGrpc(metadataValues *metadata.MD) bool {
if metadataValues != nil {
refererHeaderValue := metadataValues.Get(RefererHeaderKey)
result := len(refererHeaderValue) > 0 && pl.shouldCountMetrics(refererHeaderValue[0])
return !result
}
return true
}

func (pl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string) bool {
if len(pl.excludeMetricsReferrers) > 0 && len(refererHeaderValue) > 0 {
return !strings.Contains(refererHeaderValue, pl.excludeMetricsReferrers)
}
return true
}
5 changes: 4 additions & 1 deletion relayer/chainproxy/chainproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ func SendRelay(
return reply, replyServer, err
}

func ConstructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler, isMetricEnabled bool) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
dappId := ExtractDappIDFromFiberContext(c)
c.Locals("dappId", dappId)
if isMetricEnabled {
c.Locals(RefererHeaderKey, c.Get(RefererHeaderKey, ""))
}
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down
7 changes: 5 additions & 2 deletions relayer/chainproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
reflectionpbo "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -293,10 +294,12 @@ func (cp *GrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
msgSeed := cp.portalLogs.GetMessageSeed()
utils.LavaFormatInfo("GRPC Got Relay: "+method, nil)
metadataValues, _ := metadata.FromIncomingContext(ctx)
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", cp.chainID, apiInterface)
if relayReply, _, err = SendRelay(ctx, cp, privKey, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go cp.portalLogs.AddMetric(metricsData, err != nil)
relayReply, _, err = SendRelay(ctx, cp, privKey, method, string(reqBody), "", "NoDappID", metricsData)
go cp.portalLogs.AddMetricForGrpc(metricsData, err, &metadataValues)
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
Expand Down
6 changes: 3 additions & 3 deletions relayer/chainproxy/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := SendRelay(ctx, cp, privKey, "", string(msg), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForWebSocket(metricsData, err, c)
if err != nil {
cp.portalLogs.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -373,7 +373,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
}
}
})
websocketCallbackWithDappID := ConstructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, cp.portalLogs.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -385,7 +385,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := SendRelay(ctx, cp, privKey, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForHttp(metricsData, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
64 changes: 57 additions & 7 deletions relayer/chainproxy/portalLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"math/rand"
"os"
"strconv"
"strings"

"google.golang.org/grpc/metadata"

"github.com/lavanet/lava/relayer/metrics"

Expand All @@ -17,12 +20,16 @@ import (

var ReturnMaskedErrors = "false"

const webSocketCloseMessage = "websocket: close 1005 (no status)"
const (
webSocketCloseMessage = "websocket: close 1005 (no status)"
RefererHeaderKey = "Referer"
)

type PortalLogs struct {
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
excludeMetricsReferrers string
}

func NewPortalLogs() (*PortalLogs, error) {
Expand Down Expand Up @@ -124,9 +131,52 @@ func (pl *PortalLogs) LogStartTransaction(name string) {
}
}

func (pl *PortalLogs) AddMetric(data *metrics.RelayMetrics, isNotSuccessful bool) {
if pl.StoreMetricData {
data.Success = !isNotSuccessful
func (pl *PortalLogs) AddMetricForHttp(data *metrics.RelayMetrics, err error, headers map[string]string) {
if pl.StoreMetricData && pl.shouldCountMetricForHttp(headers) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) AddMetricForWebSocket(data *metrics.RelayMetrics, err error, c *websocket.Conn) {
if pl.StoreMetricData && pl.shouldCountMetricForWebSocket(c) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) AddMetricForGrpc(data *metrics.RelayMetrics, err error, metadataValues *metadata.MD) {
if pl.StoreMetricData && pl.shouldCountMetricForGrpc(metadataValues) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) shouldCountMetricForHttp(headers map[string]string) bool {
refererHeaderValue := headers[RefererHeaderKey]
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *PortalLogs) shouldCountMetricForWebSocket(c *websocket.Conn) bool {
refererHeaderValue, isHeaderFound := c.Locals(RefererHeaderKey).(string)
if !isHeaderFound {
return true
}
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *PortalLogs) shouldCountMetricForGrpc(metadataValues *metadata.MD) bool {
if metadataValues != nil {
refererHeaderValue := metadataValues.Get(RefererHeaderKey)
result := len(refererHeaderValue) > 0 && pl.shouldCountMetrics(refererHeaderValue[0])
return !result
}
return true
}

func (pl *PortalLogs) shouldCountMetrics(refererHeaderValue string) bool {
if len(pl.excludeMetricsReferrers) > 0 && len(refererHeaderValue) > 0 {
return !strings.Contains(refererHeaderValue, pl.excludeMetricsReferrers)
}
return true
}
Loading

0 comments on commit 13c24ee

Please sign in to comment.