diff --git a/protocol/chaintracker/chain_tracker.go b/protocol/chaintracker/chain_tracker.go index 303b88615b..7fb2ec0e5f 100644 --- a/protocol/chaintracker/chain_tracker.go +++ b/protocol/chaintracker/chain_tracker.go @@ -378,7 +378,6 @@ func (cs *ChainTracker) start(ctx context.Context, pollingTime time.Duration) er for { select { case <-cs.timer.C: - utils.LavaFormatTrace("chain tracker fetch triggered", utils.LogAttr("currTime", time.Now())) fetchCtx, cancel := context.WithTimeout(ctx, 3*time.Second) // protect this flow from hanging code err := cs.fetchAllPreviousBlocksIfNecessary(fetchCtx) cancel() @@ -425,12 +424,6 @@ func (cs *ChainTracker) updateTimer(tickerBaseTime time.Duration, fetchFails uin newTickerDuration /= time.Duration(PollingMultiplier) } - utils.LavaFormatTrace("state tracker ticker set", - utils.LogAttr("timeSinceLastUpdate", timeSinceLastUpdate), - utils.LogAttr("time", time.Now()), - utils.LogAttr("newTickerDuration", newTickerDuration), - ) - cs.timer = time.NewTimer(newTickerDuration) } diff --git a/protocol/rpcconsumer/relay_retries_manager.go b/protocol/lavaprotocol/relay_retries_manager.go similarity index 78% rename from protocol/rpcconsumer/relay_retries_manager.go rename to protocol/lavaprotocol/relay_retries_manager.go index aa14324dbe..31be090b67 100644 --- a/protocol/rpcconsumer/relay_retries_manager.go +++ b/protocol/lavaprotocol/relay_retries_manager.go @@ -1,4 +1,4 @@ -package rpcconsumer +package lavaprotocol import ( "time" @@ -8,7 +8,17 @@ import ( ) // entries ttl duration -const RetryEntryTTL = 6 * time.Hour +const ( + CacheMaxCost = 10 * 1024 // each item cost would be 1 + CacheNumCounters = 20000 // expect 2000 items + RetryEntryTTL = 6 * time.Hour +) + +type RelayRetriesManagerInf interface { + AddHashToCache(hash string) + CheckHashInCache(hash string) bool + RemoveHashFromCache(hash string) +} // On node errors we try to send a relay again. // If this relay failed all retries we ban it from retries to avoid spam and save resources diff --git a/protocol/metrics/consumer_referrer_client_test.go b/protocol/metrics/consumer_referrer_client_test.go index 6f0990a659..c8b4fe0173 100644 --- a/protocol/metrics/consumer_referrer_client_test.go +++ b/protocol/metrics/consumer_referrer_client_test.go @@ -15,6 +15,7 @@ func TestReferrerClientFlows(t *testing.T) { t.Run("one-shot", func(t *testing.T) { messages := []map[string]interface{}{} reqMap := []map[string]interface{}{} + ch := make(chan bool, 1) serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle the incoming request and provide the desired response data := make([]byte, r.ContentLength) @@ -25,6 +26,7 @@ func TestReferrerClientFlows(t *testing.T) { reqMap = []map[string]interface{}{} w.WriteHeader(http.StatusOK) fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`) + ch <- true }) mockServer := httptest.NewServer(serverHandle) @@ -34,7 +36,7 @@ func TestReferrerClientFlows(t *testing.T) { serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1")) serverClient.AppendReferrer(NewReferrerRequest("banana", "COSMOSHUB", "Message-2", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1")) serverClient.AppendReferrer(NewReferrerRequest("papaya", "ETH1", "Message-3", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1")) - time.Sleep(110 * time.Millisecond) + <-ch require.Len(t, messages, 2) bananas := 0 papayas := 0 @@ -56,7 +58,7 @@ func TestReferrerClientNull(t *testing.T) { serverClient := NewConsumerReferrerClient("") require.Nil(t, serverClient) serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1")) - time.Sleep(110 * time.Millisecond) + time.Sleep(500 * time.Millisecond) getSender := func() ReferrerSender { return serverClient } diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 38f53b034c..b7cfd2413a 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -18,7 +18,7 @@ import ( ) const ( - CacheMaxCost = 2000 // each item cost would be 1 + CacheMaxCost = 20000 // each item cost would be 1 CacheNumCounters = 20000 // expect 2000 items INITIAL_DATA_STALENESS = 24 HALF_LIFE_TIME = time.Hour diff --git a/protocol/rpcconsumer/consumer_consistency.go b/protocol/rpcconsumer/consumer_consistency.go index 30c2decfd3..09fbcaa2c5 100644 --- a/protocol/rpcconsumer/consumer_consistency.go +++ b/protocol/rpcconsumer/consumer_consistency.go @@ -10,7 +10,7 @@ import ( // this class handles seen block values in requests const ( - CacheMaxCost = 2000 // each item cost would be 1 + CacheMaxCost = 20000 // each item cost would be 1 CacheNumCounters = 20000 // expect 2000 items EntryTTL = 5 * time.Minute ) diff --git a/protocol/rpcconsumer/relay_processor.go b/protocol/rpcconsumer/relay_processor.go index bc671776b8..65fffc4637 100644 --- a/protocol/rpcconsumer/relay_processor.go +++ b/protocol/rpcconsumer/relay_processor.go @@ -13,6 +13,7 @@ import ( sdktypes "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/v2/protocol/chainlib" "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/protocol/lavaprotocol" "github.com/lavanet/lava/v2/protocol/lavasession" "github.com/lavanet/lava/v2/utils" ) @@ -55,7 +56,7 @@ type RelayProcessor struct { allowSessionDegradation uint32 // used in the scenario where extension was previously used. metricsInf MetricsInterface chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter - relayRetriesManager *RelayRetriesManager + relayRetriesManager *lavaprotocol.RelayRetriesManager ResultsManager } @@ -68,7 +69,7 @@ func NewRelayProcessor( debugRelay bool, metricsInf MetricsInterface, chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter, - relayRetriesManager *RelayRetriesManager, + relayRetriesManager *lavaprotocol.RelayRetriesManager, ) *RelayProcessor { guid, _ := utils.GetUniqueIdentifier(ctx) selection := Quorum // select the majority of node responses diff --git a/protocol/rpcconsumer/relay_processor_test.go b/protocol/rpcconsumer/relay_processor_test.go index a323614428..847001836f 100644 --- a/protocol/rpcconsumer/relay_processor_test.go +++ b/protocol/rpcconsumer/relay_processor_test.go @@ -10,6 +10,7 @@ import ( "github.com/lavanet/lava/v2/protocol/chainlib" "github.com/lavanet/lava/v2/protocol/chainlib/extensionslib" "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/protocol/lavaprotocol" "github.com/lavanet/lava/v2/protocol/lavasession" pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" spectypes "github.com/lavanet/lava/v2/x/spec/types" @@ -31,7 +32,7 @@ func (romm *relayProcessorMetricsMock) GetChainIdAndApiInterface() (string, stri } var ( - relayRetriesManagerInstance = NewRelayRetriesManager() + relayRetriesManagerInstance = lavaprotocol.NewRelayRetriesManager() relayProcessorMetrics = &relayProcessorMetricsMock{} ) diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 48a6d82e2d..80a0cf0637 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -76,7 +76,7 @@ type RPCConsumerServer struct { connectedSubscriptionsContexts map[string]*CancelableContextHolder chainListener chainlib.ChainListener connectedSubscriptionsLock sync.RWMutex - relayRetriesManager *RelayRetriesManager + relayRetriesManager *lavaprotocol.RelayRetriesManager } type relayResponse struct { @@ -126,7 +126,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp rpccs.debugRelays = cmdFlags.DebugRelays rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder) rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10) - rpccs.relayRetriesManager = NewRelayRetriesManager() + rpccs.relayRetriesManager = lavaprotocol.NewRelayRetriesManager() rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager) if err != nil { return err diff --git a/protocol/rpcprovider/provider_state_machine.go b/protocol/rpcprovider/provider_state_machine.go new file mode 100644 index 0000000000..44a126af1d --- /dev/null +++ b/protocol/rpcprovider/provider_state_machine.go @@ -0,0 +1,86 @@ +package rpcprovider + +import ( + "context" + "time" + + "github.com/lavanet/lava/v2/protocol/chainlib" + "github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" + "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/protocol/lavaprotocol" + "github.com/lavanet/lava/v2/utils" + pairingtypes "github.com/lavanet/lava/v2/x/pairing/types" +) + +type RelaySender interface { + SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) +} + +type ProviderStateMachine struct { + relayRetriesManager lavaprotocol.RelayRetriesManagerInf + chainId string + relaySender RelaySender +} + +func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender RelaySender) *ProviderStateMachine { + return &ProviderStateMachine{ + relayRetriesManager: relayRetriesManager, + chainId: chainId, + relaySender: relaySender, + } +} + +func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) { + hash, err := chainMsg.GetRawRequestHash() + requestHashString := "" + if err != nil { + utils.LavaFormatWarning("Failed converting message to hash", err, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data))) + } else { + requestHashString = string(hash) + } + + var replyWrapper *chainlib.RelayReplyWrapper + var isNodeError bool + for retryAttempt := 0; retryAttempt <= numberOfRetriesAllowedOnNodeErrors; retryAttempt++ { + sendTime := time.Now() + replyWrapper, _, _, _, _, err = psm.relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions) + if err != nil { + return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) + } + + if replyWrapper == nil || replyWrapper.RelayReply == nil { + return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) + } + + if debugLatency { + utils.LavaFormatDebug("node reply received", utils.LogAttr("attempt", retryAttempt), utils.LogAttr("timeTaken", time.Since(sendTime)), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId)) + } + + // Failed fetching hash return the reply. + if requestHashString == "" { + utils.LavaFormatWarning("Failed to hash request, shouldn't happen", nil, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data))) + break // We can't perform the retries as we failed fetching the request hash. + } + + // Check for node errors + isNodeError, _ = chainMsg.CheckResponseError(replyWrapper.RelayReply.Data, replyWrapper.StatusCode) + if !isNodeError { + // Successful relay, remove it from the cache if we have it and return a valid response. + go psm.relayRetriesManager.RemoveHashFromCache(requestHashString) + return replyWrapper, nil + } + + // On the first retry, check if this hash has already failed previously + if retryAttempt == 0 && psm.relayRetriesManager.CheckHashInCache(requestHashString) { + utils.LavaFormatTrace("received node error, request hash was already in cache, skipping retry") + break + } + utils.LavaFormatTrace("Errored Node Message, retrying message", utils.LogAttr("retry", retryAttempt)) + } + + if isNodeError { + utils.LavaFormatTrace("failed all relay retries for message") + go psm.relayRetriesManager.AddHashToCache(requestHashString) + } + return replyWrapper, nil +} diff --git a/protocol/rpcprovider/provider_state_machine_test.go b/protocol/rpcprovider/provider_state_machine_test.go new file mode 100644 index 0000000000..eebaa77c10 --- /dev/null +++ b/protocol/rpcprovider/provider_state_machine_test.go @@ -0,0 +1,132 @@ +package rpcprovider + +import ( + "context" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/lavanet/lava/v2/protocol/chainlib" + "github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient" + "github.com/lavanet/lava/v2/protocol/common" + "github.com/lavanet/lava/v2/protocol/lavaprotocol" + types "github.com/lavanet/lava/v2/x/pairing/types" + "github.com/stretchr/testify/require" +) + +type relaySenderMock struct { + numberOfTimesHitSendNodeMsg int +} + +func (rs *relaySenderMock) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) { + rs.numberOfTimesHitSendNodeMsg++ + return &chainlib.RelayReplyWrapper{RelayReply: &types.RelayReply{}}, "", nil, common.NodeUrl{}, "", nil +} + +func TestStateMachineHappyFlow(t *testing.T) { + relaySender := &relaySenderMock{} + stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) + chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) + chainMsgMock. + EXPECT(). + GetRawRequestHash(). + Return([]byte{1, 2, 3}, nil). + AnyTimes() + chainMsgMock. + EXPECT(). + CheckResponseError(gomock.Any(), gomock.Any()). + DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { + if relaySender.numberOfTimesHitSendNodeMsg < numberOfRetriesAllowedOnNodeErrors { + return true, "" + } + return false, "" + }). + AnyTimes() + stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) + hash, _ := chainMsgMock.GetRawRequestHash() + require.Equal(t, relaySender.numberOfTimesHitSendNodeMsg, numberOfRetriesAllowedOnNodeErrors) + require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) +} + +func TestStateMachineAllFailureFlows(t *testing.T) { + relaySender := &relaySenderMock{} + stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) + chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) + returnFalse := false + chainMsgMock. + EXPECT(). + GetRawRequestHash(). + Return([]byte{1, 2, 3}, nil). + AnyTimes() + chainMsgMock. + EXPECT(). + CheckResponseError(gomock.Any(), gomock.Any()). + DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { + if returnFalse { + return false, "" + } + return true, "" + }). + AnyTimes() + stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) + hash, _ := chainMsgMock.GetRawRequestHash() + require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg) + for i := 0; i < 10; i++ { + // wait for routine to end.. + if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { + break + } + time.Sleep(100 * time.Millisecond) + } + require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) + + // send second relay with same hash. + stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) + require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries. +} + +func TestStateMachineFailureAndRecoveryFlow(t *testing.T) { + relaySender := &relaySenderMock{} + stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender) + chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t)) + returnFalse := false + chainMsgMock. + EXPECT(). + GetRawRequestHash(). + Return([]byte{1, 2, 3}, nil). + AnyTimes() + chainMsgMock. + EXPECT(). + CheckResponseError(gomock.Any(), gomock.Any()). + DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) { + if returnFalse { + return false, "" + } + return true, "" + }). + AnyTimes() + stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) + hash, _ := chainMsgMock.GetRawRequestHash() + require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg) + for i := 0; i < 10; i++ { + // wait for routine to end.. + if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { + break + } + time.Sleep(100 * time.Millisecond) + } + require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) + + // send second relay with same hash. + returnFalse = true + stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}}) + require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries, first success. + // wait for routine to end.. + for i := 0; i < 10; i++ { + if !stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) { + break + } + time.Sleep(100 * time.Millisecond) + } + require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash))) +} diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 5a9ac144b6..e6cd4fbc7e 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -772,6 +772,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check") cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks") cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") + cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") common.AddRollingLogConfig(cmdRPCProvider) diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index f1f239eea4..9c71410c3b 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -43,7 +43,10 @@ const ( debugLatency = false ) -var RPCProviderStickinessHeaderName = "X-Node-Sticky" +var ( + RPCProviderStickinessHeaderName = "X-Node-Sticky" + numberOfRetriesAllowedOnNodeErrors = 2 +) const ( RPCProviderAddressHeader = "Lava-Provider-Address" @@ -67,6 +70,7 @@ type RPCProviderServer struct { providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager providerUniqueId string StaticProvider bool + providerStateMachine *ProviderStateMachine } type ReliabilityManagerInf interface { @@ -129,6 +133,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.metrics = providerMetrics rpcps.relaysMonitor = relaysMonitor rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager + rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter) rpcps.initRelaysMonitor(ctx) } @@ -869,7 +874,6 @@ func (rpcps *RPCProviderServer) trySetRelayReplyInCache(ctx context.Context, req } func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, request *pairingtypes.RelayRequest, chainMsg chainlib.ChainMessage, consumerAddr sdk.AccAddress) (*chainlib.RelayReplyWrapper, error) { - sendTime := time.Now() if debugLatency { utils.LavaFormatDebug("sending relay to node", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) } @@ -879,21 +883,8 @@ func (rpcps *RPCProviderServer) sendRelayMessageToNode(ctx context.Context, requ if debugConsistency { utils.LavaFormatDebug("adding stickiness header", utils.LogAttr("tokenFromContext", common.GetTokenFromGrpcContext(ctx)), utils.LogAttr("unique_token", common.GetUniqueToken(common.UserData{DappId: consumerAddr.String(), ConsumerIp: common.GetIpFromGrpcContext(ctx)}))) } - - replyWrapper, _, _, _, _, err := rpcps.chainRouter.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions) - if err != nil { - return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) - } - - if replyWrapper == nil || replyWrapper.RelayReply == nil { - return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) - } - - if debugLatency { - utils.LavaFormatDebug("node reply received", utils.Attribute{Key: "timeTaken", Value: time.Since(sendTime)}, utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "specID", Value: rpcps.rpcProviderEndpoint.ChainID}) - } - - return replyWrapper, nil + // use the provider state machine to send the messages + return rpcps.providerStateMachine.SendNodeMessage(ctx, chainMsg, request) } func (rpcps *RPCProviderServer) TryRelayUnsubscribe(ctx context.Context, request *pairingtypes.RelayRequest, consumerAddress sdk.AccAddress, chainMessage chainlib.ChainMessage) (*pairingtypes.RelayReply, error) { diff --git a/protocol/statetracker/updaters/state_query.go b/protocol/statetracker/updaters/state_query.go index 31b1fb7206..135edf0c73 100644 --- a/protocol/statetracker/updaters/state_query.go +++ b/protocol/statetracker/updaters/state_query.go @@ -24,7 +24,7 @@ import ( const ( CacheMaxCost = 10 * 1024 // 10K cost - CacheNumCounters = 100000 // expect 10K items + CacheNumCounters = 10 * 1025 // expect 10K items DefaultTimeToLiveExpiration = 30 * time.Minute PairingRespKey = "pairing-resp" VerifyPairingRespKey = "verify-pairing-resp"