Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GROW-519 Update the relay latency calculation, Add guard shouldCountMetric #301

Merged
merged 10 commits into from
Feb 16, 2023
13 changes: 12 additions & 1 deletion protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"encoding/json"
"fmt"
"net/url"
"os"
"regexp"
"strconv"
"strings"
"time"

Expand All @@ -17,6 +19,7 @@ import (

const (
ContextUserValueKeyDappID = "dappID"
refererHeaderKey = "Referer"
)

type parsedMessage struct {
Expand Down Expand Up @@ -59,9 +62,10 @@ func extractDappIDFromFiberContext(c *fiber.Ctx) (dappID string) {
return dappID
}

func constructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
storeRefererHeaderIfNeeded(c)
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down Expand Up @@ -177,3 +181,10 @@ func verifyTendermintEndpoint(endpoints []string) (websocketEndpoint string, htt
}
return websocketEndpoint, httpEndpoint
}

func storeRefererHeaderIfNeeded(c *fiber.Ctx) {
isMetricEnabled, _ := strconv.ParseBool(os.Getenv("IS_METRICS_ENABLED"))
tabakuj marked this conversation as resolved.
Show resolved Hide resolved
if isMetricEnabled {
c.Locals(refererHeaderKey, c.Get(refererHeaderKey, ""))
}
}
2 changes: 1 addition & 1 deletion protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func TestConstructFiberCallbackWithDappIDExtraction(t *testing.T) {
return nil
}

handler := constructFiberCallbackWithDappIDExtraction(callbackToBeCalled)
handler := constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled)
ctx := &fiber.Ctx{}

err := handler(ctx)
Expand Down
3 changes: 2 additions & 1 deletion protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", apil.endpoint.ChainID, apiInterface)
if relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForGrpc(metricsData, false, ctx)
tabakuj marked this conversation as resolved.
Show resolved Hide resolved
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
apil.logger.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
}
go apil.logger.AddMetricForGrpc(metricsData, true, ctx)
tabakuj marked this conversation as resolved.
Show resolved Hide resolved
apil.logger.LogRequestAndResponse("http in/out", false, method, string(reqBody), "", "", msgSeed, nil)
return relayReply.Data, nil
}
Expand Down
7 changes: 4 additions & 3 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err == nil, c)
tabakuj marked this conversation as resolved.
Show resolved Hide resolved

if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -271,7 +272,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -283,7 +284,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err == nil, c)
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
9 changes: 5 additions & 4 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,12 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
// TODO: handle contentType, in case its not application/json currently we set it to application/json in the Send() method
// contentType := string(c.Context().Request.Header.ContentType())
dappID := extractDappIDFromFiberContext(c)
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
utils.LavaFormatInfo("in <<<", &map[string]string{"path": path, "dappID": dappID, "msgSeed": msgSeed})
requestBody := string(c.Body())
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
reply, _, err := apil.relaySender.SendRelay(ctx, path, requestBody, http.MethodPost, dappID, analytics)
go apil.logger.AddMetricForHttp(analytics, err == nil, c)

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -240,7 +241,7 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
analytics := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)

reply, _, err := apil.relaySender.SendRelay(ctx, path, query, http.MethodGet, dappID, analytics)
go apil.logger.AddMetric(analytics, err != nil)
go apil.logger.AddMetricForHttp(analytics, err == nil, c)
if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
10 changes: 6 additions & 4 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := apil.relaySender.SendRelay(ctx, "", string(msg), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForWebSocket(metricsData, err == nil, c)
if err != nil {
apil.logger.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, "tendermint")
continue
Expand Down Expand Up @@ -314,7 +314,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
}
}
})
websocketCallbackWithDappID := constructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -325,7 +325,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err == nil, c)

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down Expand Up @@ -359,7 +360,8 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("urirpc in <<<", &map[string]string{"seed": msgSeed, "msg": path, "dappID": dappID})
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, _, err := apil.relaySender.SendRelay(ctx, path+query, "", http.MethodGet, dappID, metricsData)
go apil.logger.AddMetric(metricsData, err != nil)
go apil.logger.AddMetricForHttp(metricsData, err == nil, c)

if err != nil {
// Get unique GUID response
errMasking := apil.logger.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
66 changes: 59 additions & 7 deletions protocol/common/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/rand"
"os"
"strconv"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/websocket/v2"
Expand All @@ -13,16 +14,22 @@ import (
"github.com/lavanet/lava/relayer/parser"
"github.com/lavanet/lava/utils"
"github.com/newrelic/go-agent/v3/newrelic"
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)

var ReturnMaskedErrors = "false"

const webSocketCloseMessage = "websocket: close 1005 (no status)"
const (
webSocketCloseMessage = "websocket: close 1005 (no status)"
refererHeaderKey = "Referer"
)

type RPCConsumerLogs struct {
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
newRelicApplication *newrelic.Application
MetricService *metrics.MetricService
StoreMetricData bool
excludeMetricsReferrers string
}

func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
Expand All @@ -48,6 +55,7 @@ func NewRPCConsumerLogs() (*RPCConsumerLogs, error) {
if isMetricEnabled {
portal.StoreMetricData = true
portal.MetricService = metrics.NewMetricService()
portal.excludeMetricsReferrers = os.Getenv("TO_EXCLUDE_METRICS_REFERRERS")
}
return portal, err
}
Expand Down Expand Up @@ -110,9 +118,53 @@ func (pl *RPCConsumerLogs) LogStartTransaction(name string) {
}
}

func (pl *RPCConsumerLogs) AddMetric(data *metrics.RelayMetrics, isNotSuccessful bool) {
if pl.StoreMetricData {
data.Success = !isNotSuccessful
func (pl *RPCConsumerLogs) AddMetricForHttp(data *metrics.RelayMetrics, isSuccessful bool, c *fiber.Ctx) {
if pl.StoreMetricData && pl.shouldCountMetricForHttp(c) {
data.Success = isSuccessful
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForWebSocket(data *metrics.RelayMetrics, isSuccessful bool, c *websocket.Conn) {
if pl.StoreMetricData && pl.shouldCountMetricForWebSocket(c) {
data.Success = isSuccessful
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) AddMetricForGrpc(data *metrics.RelayMetrics, isSuccessful bool, ctx context.Context) {
if pl.StoreMetricData && pl.shouldCountMetricForGrpc(ctx) {
data.Success = isSuccessful
pl.MetricService.SendData(*data)
}
}

func (pl *RPCConsumerLogs) shouldCountMetricForHttp(c *fiber.Ctx) bool {
refererHeaderValue := c.Get(refererHeaderKey, "")
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForWebSocket(c *websocket.Conn) bool {
refererHeaderValue, isHeaderFound := c.Locals(refererHeaderKey).(string)
if !isHeaderFound {
return true
}
return pl.shouldCountMetrics(refererHeaderValue)
}

func (pl *RPCConsumerLogs) shouldCountMetricForGrpc(ctx context.Context) bool {
headersValues, ok := metadata.FromIncomingContext(ctx)
if ok {
refererHeaderValue := headersValues.Get(refererHeaderKey)
result := len(refererHeaderValue) > 0 && pl.shouldCountMetrics(refererHeaderValue[0])
return !result
}
return true
}

func (pl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string) bool {
if len(pl.excludeMetricsReferrers) > 0 && len(refererHeaderValue) > 0 {
return !strings.Contains(refererHeaderValue, pl.excludeMetricsReferrers)
}
return true
}
12 changes: 11 additions & 1 deletion relayer/chainproxy/chainproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"fmt"
"net/url"
"os"
"strconv"
"time"

"github.com/lavanet/lava/relayer/metrics"
Expand Down Expand Up @@ -318,11 +320,12 @@ func SendRelay(
return reply, replyServer, err
}

func ConstructFiberCallbackWithDappIDExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
func constructFiberCallbackWithHeaderAndParameterExtraction(callbackToBeCalled fiber.Handler) fiber.Handler {
webSocketCallback := callbackToBeCalled
handler := func(c *fiber.Ctx) error {
dappId := ExtractDappIDFromFiberContext(c)
c.Locals("dappId", dappId)
storeRefererHeaderIfNeeded(c)
return webSocketCallback(c) // uses external dappID
}
return handler
Expand Down Expand Up @@ -376,3 +379,10 @@ func verifyRPCendpoint(endpoint string) {
utils.LavaFormatWarning("URL scheme should be websocket (ws/wss), got: "+u.Scheme, nil, nil)
}
}

func storeRefererHeaderIfNeeded(c *fiber.Ctx) {
isMetricEnabled, _ := strconv.ParseBool(os.Getenv("IS_METRICS_ENABLED"))
if isMetricEnabled {
c.Locals(refererHeaderKey, c.Get(refererHeaderKey, ""))
}
}
3 changes: 2 additions & 1 deletion relayer/chainproxy/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,11 +296,12 @@ func (cp *GrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", cp.chainID, apiInterface)
if relayReply, _, err = SendRelay(ctx, cp, privKey, method, string(reqBody), "", "NoDappID", metricsData); err != nil {
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForGrpc(metricsData, err == nil, ctx)
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
cp.portalLogs.LogRequestAndResponse("http in/out", true, method, string(reqBody), "", errMasking, msgSeed, err)
return nil, utils.LavaFormatError("Failed to SendRelay", fmt.Errorf(errMasking), nil)
}
go cp.portalLogs.AddMetricForGrpc(metricsData, err == nil, ctx)
cp.portalLogs.LogRequestAndResponse("http in/out", false, method, string(reqBody), "", "", msgSeed, nil)
return relayReply.Data, nil
}
Expand Down
6 changes: 3 additions & 3 deletions relayer/chainproxy/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
defer cancel() // incase there's a problem make sure to cancel the connection
metricsData := metrics.NewRelayAnalytics(dappID, chainID, apiInterface)
reply, replyServer, err := SendRelay(ctx, cp, privKey, "", string(msg), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForWebSocket(metricsData, err == nil, c)
if err != nil {
cp.portalLogs.AnalyzeWebSocketErrorAndWriteMessage(c, mt, err, msgSeed, msg, spectypes.APIInterfaceJsonRPC)
continue
Expand Down Expand Up @@ -373,7 +373,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
}
}
})
websocketCallbackWithDappID := ConstructFiberCallbackWithDappIDExtraction(webSocketCallback)
websocketCallbackWithDappID := constructFiberCallbackWithHeaderAndParameterExtraction(webSocketCallback)
app.Get("/ws/:dappId", websocketCallbackWithDappID)
app.Get("/:dappId/websocket", websocketCallbackWithDappID) // catching http://ip:port/1/websocket requests.

Expand All @@ -385,7 +385,7 @@ func (cp *JrpcChainProxy) PortalStart(ctx context.Context, privKey *btcec.Privat
utils.LavaFormatInfo("in <<<", &map[string]string{"seed": msgSeed, "msg": string(c.Body()), "dappID": dappID})

reply, _, err := SendRelay(ctx, cp, privKey, "", string(c.Body()), http.MethodGet, dappID, metricsData)
go cp.portalLogs.AddMetric(metricsData, err != nil)
go cp.portalLogs.AddMetricForHttp(metricsData, err == nil, c)
if err != nil {
// Get unique GUID response
errMasking := cp.portalLogs.GetUniqueGuidResponseForError(err, msgSeed)
Expand Down
Loading