Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT- adding provider retry mechanism on node error for better QOS #1660

Merged
merged 22 commits into from
Sep 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions protocol/chaintracker/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func (cs *ChainTracker) start(ctx context.Context, pollingTime time.Duration) er
for {
select {
case <-cs.timer.C:
utils.LavaFormatTrace("chain tracker fetch triggered", utils.LogAttr("currTime", time.Now()))
fetchCtx, cancel := context.WithTimeout(ctx, 3*time.Second) // protect this flow from hanging code
err := cs.fetchAllPreviousBlocksIfNecessary(fetchCtx)
cancel()
Expand Down Expand Up @@ -425,12 +424,6 @@ func (cs *ChainTracker) updateTimer(tickerBaseTime time.Duration, fetchFails uin
newTickerDuration /= time.Duration(PollingMultiplier)
}

utils.LavaFormatTrace("state tracker ticker set",
utils.LogAttr("timeSinceLastUpdate", timeSinceLastUpdate),
utils.LogAttr("time", time.Now()),
utils.LogAttr("newTickerDuration", newTickerDuration),
)

cs.timer = time.NewTimer(newTickerDuration)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpcconsumer
package lavaprotocol

import (
"time"
Expand All @@ -8,7 +8,11 @@ import (
)

// entries ttl duration
const RetryEntryTTL = 6 * time.Hour
const (
CacheMaxCost = 10 * 1024 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
RetryEntryTTL = 6 * time.Hour
)

// On node errors we try to send a relay again.
// If this relay failed all retries we ban it from retries to avoid spam and save resources
Expand Down
6 changes: 4 additions & 2 deletions protocol/metrics/consumer_referrer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestReferrerClientFlows(t *testing.T) {
t.Run("one-shot", func(t *testing.T) {
messages := []map[string]interface{}{}
reqMap := []map[string]interface{}{}
ch := make(chan bool, 1)
serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Handle the incoming request and provide the desired response
data := make([]byte, r.ContentLength)
Expand All @@ -25,6 +26,7 @@ func TestReferrerClientFlows(t *testing.T) {
reqMap = []map[string]interface{}{}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`)
ch <- true
})

mockServer := httptest.NewServer(serverHandle)
Expand All @@ -34,7 +36,7 @@ func TestReferrerClientFlows(t *testing.T) {
serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
serverClient.AppendReferrer(NewReferrerRequest("banana", "COSMOSHUB", "Message-2", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
serverClient.AppendReferrer(NewReferrerRequest("papaya", "ETH1", "Message-3", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
time.Sleep(110 * time.Millisecond)
<-ch
require.Len(t, messages, 2)
bananas := 0
papayas := 0
Expand All @@ -56,7 +58,7 @@ func TestReferrerClientNull(t *testing.T) {
serverClient := NewConsumerReferrerClient("")
require.Nil(t, serverClient)
serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
time.Sleep(110 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
getSender := func() ReferrerSender {
return serverClient
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
CacheMaxCost = 2000 // each item cost would be 1
CacheMaxCost = 20000 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
INITIAL_DATA_STALENESS = 24
HALF_LIFE_TIME = time.Hour
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/consumer_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// this class handles seen block values in requests
const (
CacheMaxCost = 2000 // each item cost would be 1
CacheMaxCost = 20000 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
EntryTTL = 5 * time.Minute
)
Expand Down
5 changes: 3 additions & 2 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/protocol/lavasession"
"github.com/lavanet/lava/v2/utils"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ type RelayProcessor struct {
allowSessionDegradation uint32 // used in the scenario where extension was previously used.
metricsInf MetricsInterface
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter
relayRetriesManager *RelayRetriesManager
relayRetriesManager *lavaprotocol.RelayRetriesManager
ResultsManager
}

Expand All @@ -68,7 +69,7 @@ func NewRelayProcessor(
debugRelay bool,
metricsInf MetricsInterface,
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter,
relayRetriesManager *RelayRetriesManager,
relayRetriesManager *lavaprotocol.RelayRetriesManager,
) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
selection := Quorum // select the majority of node responses
Expand Down
3 changes: 2 additions & 1 deletion protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chainlib/extensionslib"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/protocol/lavasession"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
spectypes "github.com/lavanet/lava/v2/x/spec/types"
Expand All @@ -31,7 +32,7 @@ func (romm *relayProcessorMetricsMock) GetChainIdAndApiInterface() (string, stri
}

var (
relayRetriesManagerInstance = NewRelayRetriesManager()
relayRetriesManagerInstance = lavaprotocol.NewRelayRetriesManager()
relayProcessorMetrics = &relayProcessorMetricsMock{}
)

Expand Down
4 changes: 2 additions & 2 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type RPCConsumerServer struct {
connectedSubscriptionsContexts map[string]*CancelableContextHolder
chainListener chainlib.ChainListener
connectedSubscriptionsLock sync.RWMutex
relayRetriesManager *RelayRetriesManager
relayRetriesManager *lavaprotocol.RelayRetriesManager
}

type relayResponse struct {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp
rpccs.debugRelays = cmdFlags.DebugRelays
rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder)
rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10)
rpccs.relayRetriesManager = NewRelayRetriesManager()
rpccs.relayRetriesManager = lavaprotocol.NewRelayRetriesManager()
rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager)
if err != nil {
return err
Expand Down
83 changes: 83 additions & 0 deletions protocol/rpcprovider/provider_state_machine.go
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package rpcprovider

import (
"context"
"time"

"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/utils"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

type RelaySender interface {
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error)
}

type ProviderStateMachine struct {
relayRetriesManager *lavaprotocol.RelayRetriesManager
chainId string
}

func NewProviderStateMachine(chainId string) *ProviderStateMachine {
return &ProviderStateMachine{
relayRetriesManager: lavaprotocol.NewRelayRetriesManager(),
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
chainId: chainId,
}
}

func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, relaySender RelaySender, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) {
hash, err := chainMsg.GetRawRequestHash()
requestHashString := ""
if err != nil {
utils.LavaFormatWarning("Failed converting message to hash", err, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data)))
} else {
requestHashString = string(hash)
}

var replyWrapper *chainlib.RelayReplyWrapper
var isNodeError bool
for retryAttempt := 0; retryAttempt <= numberOfRetriesAllowedOnNodeErrors; retryAttempt++ {
sendTime := time.Now()
replyWrapper, _, _, _, _, err = relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)
if err != nil {
return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

if replyWrapper == nil || replyWrapper.RelayReply == nil {
return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

if debugLatency {
utils.LavaFormatDebug("node reply received", utils.LogAttr("attempt", retryAttempt), utils.LogAttr("timeTaken", time.Since(sendTime)), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

// Failed fetching hash return the reply.
if requestHashString == "" {
break // We can't perform the retries as we failed fetching the request hash.
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
}

// Check for node errors
isNodeError, _ = chainMsg.CheckResponseError(replyWrapper.RelayReply.Data, replyWrapper.StatusCode)
if !isNodeError {
// Successful relay, remove it from the cache if we have it and return a valid response.
go psm.relayRetriesManager.RemoveHashFromCache(requestHashString)
return replyWrapper, nil
}

// On the first retry, check if this hash has already failed previously
if retryAttempt == 0 && psm.relayRetriesManager.CheckHashInCache(requestHashString) {
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
utils.LavaFormatTrace("received node error, request hash was already in cache, skipping retry")
break
}
utils.LavaFormatTrace("Errored Node Message, retrying message", utils.LogAttr("retry", retryAttempt))
}

if isNodeError {
utils.LavaFormatTrace("failed all relay retries for message")
go psm.relayRetriesManager.AddHashToCache(requestHashString)
}
return replyWrapper, nil
}
1 change: 1 addition & 0 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check")
cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

common.AddRollingLogConfig(cmdRPCProvider)
Expand Down
25 changes: 8 additions & 17 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ const (
debugLatency = false
)

var RPCProviderStickinessHeaderName = "X-Node-Sticky"
var (
RPCProviderStickinessHeaderName = "X-Node-Sticky"
numberOfRetriesAllowedOnNodeErrors = 2
)

const (
RPCProviderAddressHeader = "Lava-Provider-Address"
Expand All @@ -67,6 +70,7 @@ type RPCProviderServer struct {
providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager
providerUniqueId string
StaticProvider bool
providerStateMachine *ProviderStateMachine
}

type ReliabilityManagerInf interface {
Expand Down Expand Up @@ -129,6 +133,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests(
rpcps.metrics = providerMetrics
rpcps.relaysMonitor = relaysMonitor
rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager
rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID)

rpcps.initRelaysMonitor(ctx)
}
Expand Down Expand Up @@ -869,7 +874,6 @@ func (rpcps *RPCProviderServer) trySetRelayReplyInCache(ctx context.Context, req
}

func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, request *pairingtypes.RelayRequest, chainMsg chainlib.ChainMessage, consumerAddr sdk.AccAddress) (*chainlib.RelayReplyWrapper, error) {
sendTime := time.Now()
if debugLatency {
utils.LavaFormatDebug("sending relay to node", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID})
}
Expand All @@ -879,21 +883,8 @@ func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, requ
if debugConsistency {
utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(common.UserData{DappId: consumerAddr.String(), ConsumerIp: common.GetIpFromGrpcContext(ctx)})))
}

replyWrapper, _, _, _, _, err := rpcps.chainRouter.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)
if err != nil {
return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID})
}

if replyWrapper == nil || replyWrapper.RelayReply == nil {
return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID})
}

if debugLatency {
utils.LavaFormatDebug("node reply received", utils.Attribute{Key: "timeTaken", Value: time.Since(sendTime)}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID})
}

return replyWrapper, nil
// use the provider state machine to send the messages
return rpcps.providerStateMachine.SendNodeMessage(ctx, rpcps.chainRouter, chainMsg, request)
ranlavanet marked this conversation as resolved.
Show resolved Hide resolved
}

func (rpcps *RPCProviderServer) TryRelayUnsubscribe(ctx context.Context, request *pairingtypes.RelayRequest, consumerAddress sdk.AccAddress, chainMessage chainlib.ChainMessage) (*pairingtypes.RelayReply, error) {
Expand Down
2 changes: 1 addition & 1 deletion protocol/statetracker/updaters/state_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const (
CacheMaxCost = 10 * 1024 // 10K cost
CacheNumCounters = 100000 // expect 10K items
CacheNumCounters = 10 * 1025 // expect 10K items
DefaultTimeToLiveExpiration = 30 * time.Minute
PairingRespKey = "pairing-resp"
VerifyPairingRespKey = "verify-pairing-resp"
Expand Down
Loading