Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GROW-519 Update the relay latency calculation, Add guard shouldCountMetric #301

Merged
merged 10 commits into from
Feb 16, 2023
6 changes: 5 additions & 1 deletion protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
common "github.com/lavanet/lava/protocol/common"
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
spectypes "github.com/lavanet/lava/x/spec/types"
Expand Down Expand Up @@ -59,9 +60,12 @@ func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) {
return dappID
}

func constructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler, isMetricEnabled bool) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
if isMetricEnabled {
c.Locals(common.RefererHeaderKey, c.Get(common.RefererHeaderKey, ""))
}
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestConstructFiberCallbackWithDappIDExtraction(t *testing.T) {
return nil
}

handler := constructFiberCallbackWithDappIDExtraction(callbackToBeCalled)
handler := constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled, false)
ctx := &fiber.Ctx{}

err := handler(ctx)
Expand Down
8 changes: 6 additions & 2 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"errors"
"fmt"
"google.golang.org/grpc/metadata"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -196,11 +197,14 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
msgSeed := apil.logger.GetMessageSeed()
metadataValues, _ := metadata.FromIncomingContext(ctx)
utils.LavaFormatInfo("GRPC Got Relay: "+method, nil)
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", apil.endpoint.ChainID, apiInterface)
if relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go apil.logger.AddMetric(metricsData, err != nil)
relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData)
go apil.logger.AddMetricForGrpc(metricsData, err, &metadataValues)

if err != nil {
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
apil.logger.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err, c)

if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -271,7 +272,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, apil.logger.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -283,7 +284,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
9 changes: 5 additions & 4 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
// TODO: handle contentType, in case its not application/json currently we set it to application/json in the Send() method
// contentType := string(c.Context().Request.Header.ContentType())
dappID := extractDappIDFromFiberContext(c)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
utils.LavaFormatInfo("in <<<", &map[string]string{"path": path, "dappID": dappID, "msgSeed": msgSeed})
requestBody := string(c.Body())
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, analytics)
go apil.logger.AddMetricForHttp(analytics, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)

reply, _, err := apil.relaySender.SendRelay(ctx, path, query, http.MethodGet, dappID, analytics)
go apil.logger.AddMetric(analytics, err != nil)
go apil.logger.AddMetricForHttp(analytics, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
10 changes: 6 additions & 4 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err, c)
if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, "tendermint")
continue
Expand Down Expand Up @@ -314,7 +314,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, apil.logger.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -325,7 +325,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -359,7 +360,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("urirpc in <<<", &map[string]string{"seed": msgSeed, "msg": path, "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, path+query, "", http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err, c.GetReqHeaders())

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
64 changes: 57 additions & 7 deletions protocol/common/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
Expand All @@ -13,16 +14,21 @@ import (
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
"github.com/newrelic/go-agent/v3/newrelic"
"google.golang.org/grpc/metadata"
)

var ReturnMaskedErrors = "false"

const webSocketCloseMessage = "websocket: close 1005 (no status)"
const (
webSocketCloseMessage = "websocket: close 1005 (no status)"
RefererHeaderKey = "Referer"
)

type RPCConsumerLogs struct {
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
excludeMetricsReferrers string
}

func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
Expand All @@ -48,6 +54,7 @@ func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
if isMetricEnabled {
portal.StoreMetricData = true
portal.MetricService = metrics.NewMetricService()
portal.excludeMetricsReferrers = os.Getenv("TO_EXCLUDE_METRICS_REFERRERS")
}
return portal, err
}
Expand Down Expand Up @@ -110,9 +117,52 @@ func (pl *RPCConsumerLogs) LogStartTransaction(name string) {
}
}

func (pl *RPCConsumerLogs) AddMetric(data *metrics.RelayMetrics, isNotSuccessful bool) {
if pl.StoreMetricData {
data.Success = !isNotSuccessful
func (pl *RPCConsumerLogs) AddMetricForHttp(data *metrics.RelayMetrics, err error, headers map[string]string) {
if pl.StoreMetricData && pl.shouldCountMetricForHttp(headers) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForWebSocket(data *metrics.RelayMetrics, err error, c *websocket.Conn) {
if pl.StoreMetricData && pl.shouldCountMetricForWebSocket(c) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForGrpc(data *metrics.RelayMetrics, err error, metadataValues *metadata.MD) {
if pl.StoreMetricData && pl.shouldCountMetricForGrpc(metadataValues) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) shouldCountMetricForHttp(headers map[string]string) bool {
refererHeaderValue := headers[RefererHeaderKey]
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForWebSocket(c *websocket.Conn) bool {
refererHeaderValue, isHeaderFound := c.Locals(RefererHeaderKey).(string)
if !isHeaderFound {
return true
}
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForGrpc(metadataValues *metadata.MD) bool {
if metadataValues != nil {
refererHeaderValue := metadataValues.Get(RefererHeaderKey)
result := len(refererHeaderValue) > 0 && pl.shouldCountMetrics(refererHeaderValue[0])
return !result
}
return true
}

func (pl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string) bool {
if len(pl.excludeMetricsReferrers) > 0 && len(refererHeaderValue) > 0 {
return !strings.Contains(refererHeaderValue, pl.excludeMetricsReferrers)
}
return true
}
5 changes: 4 additions & 1 deletion relayer/chainproxy/chainproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,11 +318,14 @@ func SendRelay(
return reply, replyServer, err
}

func ConstructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler, isMetricEnabled bool) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
dappId := ExtractDappIDFromFiberContext(c)
c.Locals("dappId", dappId)
if isMetricEnabled {
c.Locals(RefererHeaderKey, c.Get(RefererHeaderKey, ""))
}
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down
7 changes: 5 additions & 2 deletions relayer/chainproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
reflectionpbo "google.golang.org/grpc/reflection/grpc_reflection_v1alpha"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -293,10 +294,12 @@ func (cp *GrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
msgSeed := cp.portalLogs.GetMessageSeed()
utils.LavaFormatInfo("GRPC Got Relay: "+method, nil)
metadataValues, _ := metadata.FromIncomingContext(ctx)
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", cp.chainID, apiInterface)
if relayReply, _, err = SendRelay(ctx, cp, privKey, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go cp.portalLogs.AddMetric(metricsData, err != nil)
relayReply, _, err = SendRelay(ctx, cp, privKey, method, string(reqBody), "", "NoDappID", metricsData)
go cp.portalLogs.AddMetricForGrpc(metricsData, err, &metadataValues)
if err != nil {
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
Expand Down
6 changes: 3 additions & 3 deletions relayer/chainproxy/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := SendRelay(ctx, cp, privKey, "", string(msg), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForWebSocket(metricsData, err, c)
if err != nil {
cp.portalLogs.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -373,7 +373,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
}
}
})
websocketCallbackWithDappID := ConstructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback, cp.portalLogs.StoreMetricData)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -385,7 +385,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := SendRelay(ctx, cp, privKey, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForHttp(metricsData, err, c.GetReqHeaders())
if err != nil {
// Get unique GUID response
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
64 changes: 57 additions & 7 deletions relayer/chainproxy/portalLogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"math/rand"
"os"
"strconv"
"strings"

"google.golang.org/grpc/metadata"

"github.com/lavanet/lava/relayer/metrics"

Expand All @@ -17,12 +20,16 @@ import (

var ReturnMaskedErrors = "false"

const webSocketCloseMessage = "websocket: close 1005 (no status)"
const (
webSocketCloseMessage = "websocket: close 1005 (no status)"
RefererHeaderKey = "Referer"
)

type PortalLogs struct {
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
excludeMetricsReferrers string
}

func NewPortalLogs() (*PortalLogs, error) {
Expand Down Expand Up @@ -114,9 +121,52 @@ func (pl *PortalLogs) LogStartTransaction(name string) {
}
}

func (pl *PortalLogs) AddMetric(data *metrics.RelayMetrics, isNotSuccessful bool) {
if pl.StoreMetricData {
data.Success = !isNotSuccessful
func (pl *PortalLogs) AddMetricForHttp(data *metrics.RelayMetrics, err error, headers map[string]string) {
if pl.StoreMetricData && pl.shouldCountMetricForHttp(headers) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) AddMetricForWebSocket(data *metrics.RelayMetrics, err error, c *websocket.Conn) {
if pl.StoreMetricData && pl.shouldCountMetricForWebSocket(c) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) AddMetricForGrpc(data *metrics.RelayMetrics, err error, metadataValues *metadata.MD) {
if pl.StoreMetricData && pl.shouldCountMetricForGrpc(metadataValues) {
data.Success = err == nil
pl.MetricService.SendData(*data)
}
}

func (pl *PortalLogs) shouldCountMetricForHttp(headers map[string]string) bool {
refererHeaderValue := headers[RefererHeaderKey]
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *PortalLogs) shouldCountMetricForWebSocket(c *websocket.Conn) bool {
refererHeaderValue, isHeaderFound := c.Locals(RefererHeaderKey).(string)
if !isHeaderFound {
return true
}
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *PortalLogs) shouldCountMetricForGrpc(metadataValues *metadata.MD) bool {
if metadataValues != nil {
refererHeaderValue := metadataValues.Get(RefererHeaderKey)
result := len(refererHeaderValue) > 0 && pl.shouldCountMetrics(refererHeaderValue[0])
return !result
}
return true
}

func (pl *PortalLogs) shouldCountMetrics(refererHeaderValue string) bool {
if len(pl.excludeMetricsReferrers) > 0 && len(refererHeaderValue) > 0 {
return !strings.Contains(refererHeaderValue, pl.excludeMetricsReferrers)
}
return true
}
Loading