Skip to content

Commit

Permalink
feat: PRT- adding provider retry mechanism on node error for better Q…
Browse files Browse the repository at this point in the history
…OS (#1660)

* feat: PRT - adding relay processor retry options

* rename

* improving the features

* removed user data

* fix lint

* fix test

* fixed all comments

* llinty

* bugberan

* fix comment

* feat: PRT- adding provider retry mechanism on node error for better QOS

* fix race on tests.

* remove spam, and add logs

* fix retry 0

* implement provider relay state machine

* fixed all comments

* fix all comments

* lint
  • Loading branch information
ranlavanet committed Sep 1, 2024
1 parent 0f407c2 commit 4793c0b
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 36 deletions.
7 changes: 0 additions & 7 deletions protocol/chaintracker/chain_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,6 @@ func (cs *ChainTracker) start(ctx context.Context, pollingTime time.Duration) er
for {
select {
case <-cs.timer.C:
utils.LavaFormatTrace("chain tracker fetch triggered", utils.LogAttr("currTime", time.Now()))
fetchCtx, cancel := context.WithTimeout(ctx, 3*time.Second) // protect this flow from hanging code
err := cs.fetchAllPreviousBlocksIfNecessary(fetchCtx)
cancel()
Expand Down Expand Up @@ -425,12 +424,6 @@ func (cs *ChainTracker) updateTimer(tickerBaseTime time.Duration, fetchFails uin
newTickerDuration /= time.Duration(PollingMultiplier)
}

utils.LavaFormatTrace("state tracker ticker set",
utils.LogAttr("timeSinceLastUpdate", timeSinceLastUpdate),
utils.LogAttr("time", time.Now()),
utils.LogAttr("newTickerDuration", newTickerDuration),
)

cs.timer = time.NewTimer(newTickerDuration)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rpcconsumer
package lavaprotocol

import (
"time"
Expand All @@ -8,7 +8,17 @@ import (
)

// entries ttl duration
const RetryEntryTTL = 6 * time.Hour
const (
CacheMaxCost = 10 * 1024 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
RetryEntryTTL = 6 * time.Hour
)

type RelayRetriesManagerInf interface {
AddHashToCache(hash string)
CheckHashInCache(hash string) bool
RemoveHashFromCache(hash string)
}

// On node errors we try to send a relay again.
// If this relay failed all retries we ban it from retries to avoid spam and save resources
Expand Down
6 changes: 4 additions & 2 deletions protocol/metrics/consumer_referrer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func TestReferrerClientFlows(t *testing.T) {
t.Run("one-shot", func(t *testing.T) {
messages := []map[string]interface{}{}
reqMap := []map[string]interface{}{}
ch := make(chan bool, 1)
serverHandle := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Handle the incoming request and provide the desired response
data := make([]byte, r.ContentLength)
Expand All @@ -25,6 +26,7 @@ func TestReferrerClientFlows(t *testing.T) {
reqMap = []map[string]interface{}{}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, `{"jsonrpc":"2.0","id":1,"result":"0x10a7a08"}`)
ch <- true
})

mockServer := httptest.NewServer(serverHandle)
Expand All @@ -34,7 +36,7 @@ func TestReferrerClientFlows(t *testing.T) {
serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
serverClient.AppendReferrer(NewReferrerRequest("banana", "COSMOSHUB", "Message-2", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
serverClient.AppendReferrer(NewReferrerRequest("papaya", "ETH1", "Message-3", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
time.Sleep(110 * time.Millisecond)
<-ch
require.Len(t, messages, 2)
bananas := 0
papayas := 0
Expand All @@ -56,7 +58,7 @@ func TestReferrerClientNull(t *testing.T) {
serverClient := NewConsumerReferrerClient("")
require.Nil(t, serverClient)
serverClient.AppendReferrer(NewReferrerRequest("banana", "ETH1", "Message-1", "https://referer.com", "https://origin.com", "Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:47.0) Gecko/20100101 Firefox/47.0", "127.0.0.1"))
time.Sleep(110 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
getSender := func() ReferrerSender {
return serverClient
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
)

const (
CacheMaxCost = 2000 // each item cost would be 1
CacheMaxCost = 20000 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
INITIAL_DATA_STALENESS = 24
HALF_LIFE_TIME = time.Hour
Expand Down
2 changes: 1 addition & 1 deletion protocol/rpcconsumer/consumer_consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

// this class handles seen block values in requests
const (
CacheMaxCost = 2000 // each item cost would be 1
CacheMaxCost = 20000 // each item cost would be 1
CacheNumCounters = 20000 // expect 2000 items
EntryTTL = 5 * time.Minute
)
Expand Down
5 changes: 3 additions & 2 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
sdktypes "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/protocol/lavasession"
"github.com/lavanet/lava/v2/utils"
)
Expand Down Expand Up @@ -55,7 +56,7 @@ type RelayProcessor struct {
allowSessionDegradation uint32 // used in the scenario where extension was previously used.
metricsInf MetricsInterface
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter
relayRetriesManager *RelayRetriesManager
relayRetriesManager *lavaprotocol.RelayRetriesManager
ResultsManager
}

Expand All @@ -68,7 +69,7 @@ func NewRelayProcessor(
debugRelay bool,
metricsInf MetricsInterface,
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter,
relayRetriesManager *RelayRetriesManager,
relayRetriesManager *lavaprotocol.RelayRetriesManager,
) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
selection := Quorum // select the majority of node responses
Expand Down
3 changes: 2 additions & 1 deletion protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chainlib/extensionslib"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/protocol/lavasession"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
spectypes "github.com/lavanet/lava/v2/x/spec/types"
Expand All @@ -31,7 +32,7 @@ func (romm *relayProcessorMetricsMock) GetChainIdAndApiInterface() (string, stri
}

var (
relayRetriesManagerInstance = NewRelayRetriesManager()
relayRetriesManagerInstance = lavaprotocol.NewRelayRetriesManager()
relayProcessorMetrics = &relayProcessorMetricsMock{}
)

Expand Down
4 changes: 2 additions & 2 deletions protocol/rpcconsumer/rpcconsumer_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type RPCConsumerServer struct {
connectedSubscriptionsContexts map[string]*CancelableContextHolder
chainListener chainlib.ChainListener
connectedSubscriptionsLock sync.RWMutex
relayRetriesManager *RelayRetriesManager
relayRetriesManager *lavaprotocol.RelayRetriesManager
}

type relayResponse struct {
Expand Down Expand Up @@ -126,7 +126,7 @@ func (rpccs *RPCConsumerServer) ServeRPCRequests(ctx context.Context, listenEndp
rpccs.debugRelays = cmdFlags.DebugRelays
rpccs.connectedSubscriptionsContexts = make(map[string]*CancelableContextHolder)
rpccs.consumerProcessGuid = strconv.FormatUint(utils.GenerateUniqueIdentifier(), 10)
rpccs.relayRetriesManager = NewRelayRetriesManager()
rpccs.relayRetriesManager = lavaprotocol.NewRelayRetriesManager()
rpccs.chainListener, err = chainlib.NewChainListener(ctx, listenEndpoint, rpccs, rpccs, rpcConsumerLogs, chainParser, refererData, consumerWsSubscriptionManager)
if err != nil {
return err
Expand Down
86 changes: 86 additions & 0 deletions protocol/rpcprovider/provider_state_machine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package rpcprovider

import (
"context"
"time"

"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
"github.com/lavanet/lava/v2/utils"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

type RelaySender interface {
SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error)
}

type ProviderStateMachine struct {
relayRetriesManager lavaprotocol.RelayRetriesManagerInf
chainId string
relaySender RelaySender
}

func NewProviderStateMachine(chainId string, relayRetriesManager lavaprotocol.RelayRetriesManagerInf, relaySender RelaySender) *ProviderStateMachine {
return &ProviderStateMachine{
relayRetriesManager: relayRetriesManager,
chainId: chainId,
relaySender: relaySender,
}
}

func (psm *ProviderStateMachine) SendNodeMessage(ctx context.Context, chainMsg chainlib.ChainMessage, request *pairingtypes.RelayRequest) (*chainlib.RelayReplyWrapper, error) {
hash, err := chainMsg.GetRawRequestHash()
requestHashString := ""
if err != nil {
utils.LavaFormatWarning("Failed converting message to hash", err, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data)))
} else {
requestHashString = string(hash)
}

var replyWrapper *chainlib.RelayReplyWrapper
var isNodeError bool
for retryAttempt := 0; retryAttempt <= numberOfRetriesAllowedOnNodeErrors; retryAttempt++ {
sendTime := time.Now()
replyWrapper, _, _, _, _, err = psm.relaySender.SendNodeMsg(ctx, nil, chainMsg, request.RelayData.Extensions)
if err != nil {
return nil, utils.LavaFormatError("Sending chainMsg failed", err, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

if replyWrapper == nil || replyWrapper.RelayReply == nil {
return nil, utils.LavaFormatError("Relay Wrapper returned nil without an error", nil, utils.LogAttr("attempt", retryAttempt), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

if debugLatency {
utils.LavaFormatDebug("node reply received", utils.LogAttr("attempt", retryAttempt), utils.LogAttr("timeTaken", time.Since(sendTime)), utils.LogAttr("GUID", ctx), utils.LogAttr("specID", psm.chainId))
}

// Failed fetching hash return the reply.
if requestHashString == "" {
utils.LavaFormatWarning("Failed to hash request, shouldn't happen", nil, utils.LogAttr("url", request.RelayData.ApiUrl), utils.LogAttr("data", string(request.RelayData.Data)))
break // We can't perform the retries as we failed fetching the request hash.
}

// Check for node errors
isNodeError, _ = chainMsg.CheckResponseError(replyWrapper.RelayReply.Data, replyWrapper.StatusCode)
if !isNodeError {
// Successful relay, remove it from the cache if we have it and return a valid response.
go psm.relayRetriesManager.RemoveHashFromCache(requestHashString)
return replyWrapper, nil
}

// On the first retry, check if this hash has already failed previously
if retryAttempt == 0 && psm.relayRetriesManager.CheckHashInCache(requestHashString) {
utils.LavaFormatTrace("received node error, request hash was already in cache, skipping retry")
break
}
utils.LavaFormatTrace("Errored Node Message, retrying message", utils.LogAttr("retry", retryAttempt))
}

if isNodeError {
utils.LavaFormatTrace("failed all relay retries for message")
go psm.relayRetriesManager.AddHashToCache(requestHashString)
}
return replyWrapper, nil
}
132 changes: 132 additions & 0 deletions protocol/rpcprovider/provider_state_machine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package rpcprovider

import (
"context"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/lavanet/lava/v2/protocol/chainlib"
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/v2/protocol/common"
"github.com/lavanet/lava/v2/protocol/lavaprotocol"
types "github.com/lavanet/lava/v2/x/pairing/types"
"github.com/stretchr/testify/require"
)

type relaySenderMock struct {
numberOfTimesHitSendNodeMsg int
}

func (rs *relaySenderMock) SendNodeMsg(ctx context.Context, ch chan interface{}, chainMessage chainlib.ChainMessageForSend, extensions []string) (relayReply *chainlib.RelayReplyWrapper, subscriptionID string, relayReplyServer *rpcclient.ClientSubscription, proxyUrl common.NodeUrl, chainId string, err error) {
rs.numberOfTimesHitSendNodeMsg++
return &chainlib.RelayReplyWrapper{RelayReply: &types.RelayReply{}}, "", nil, common.NodeUrl{}, "", nil
}

func TestStateMachineHappyFlow(t *testing.T) {
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
chainMsgMock.
EXPECT().
GetRawRequestHash().
Return([]byte{1, 2, 3}, nil).
AnyTimes()
chainMsgMock.
EXPECT().
CheckResponseError(gomock.Any(), gomock.Any()).
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) {
if relaySender.numberOfTimesHitSendNodeMsg < numberOfRetriesAllowedOnNodeErrors {
return true, ""
}
return false, ""
}).
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, relaySender.numberOfTimesHitSendNodeMsg, numberOfRetriesAllowedOnNodeErrors)
require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))
}

func TestStateMachineAllFailureFlows(t *testing.T) {
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
returnFalse := false
chainMsgMock.
EXPECT().
GetRawRequestHash().
Return([]byte{1, 2, 3}, nil).
AnyTimes()
chainMsgMock.
EXPECT().
CheckResponseError(gomock.Any(), gomock.Any()).
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) {
if returnFalse {
return false, ""
}
return true, ""
}).
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg)
for i := 0; i < 10; i++ {
// wait for routine to end..
if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) {
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))

// send second relay with same hash.
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries.
}

func TestStateMachineFailureAndRecoveryFlow(t *testing.T) {
relaySender := &relaySenderMock{}
stateMachine := NewProviderStateMachine("test", lavaprotocol.NewRelayRetriesManager(), relaySender)
chainMsgMock := chainlib.NewMockChainMessage(gomock.NewController(t))
returnFalse := false
chainMsgMock.
EXPECT().
GetRawRequestHash().
Return([]byte{1, 2, 3}, nil).
AnyTimes()
chainMsgMock.
EXPECT().
CheckResponseError(gomock.Any(), gomock.Any()).
DoAndReturn(func(msg interface{}, msg2 interface{}) (interface{}, interface{}) {
if returnFalse {
return false, ""
}
return true, ""
}).
AnyTimes()
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
hash, _ := chainMsgMock.GetRawRequestHash()
require.Equal(t, numberOfRetriesAllowedOnNodeErrors+1, relaySender.numberOfTimesHitSendNodeMsg)
for i := 0; i < 10; i++ {
// wait for routine to end..
if stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) {
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))

// send second relay with same hash.
returnFalse = true
stateMachine.SendNodeMessage(context.Background(), chainMsgMock, &types.RelayRequest{RelayData: &types.RelayPrivateData{Extensions: []string{}}})
require.Equal(t, 4, relaySender.numberOfTimesHitSendNodeMsg) // no retries, first success.
// wait for routine to end..
for i := 0; i < 10; i++ {
if !stateMachine.relayRetriesManager.CheckHashInCache(string(hash)) {
break
}
time.Sleep(100 * time.Millisecond)
}
require.False(t, stateMachine.relayRetriesManager.CheckHashInCache(string(hash)))
}
1 change: 1 addition & 0 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().String(HealthCheckURLPathFlagName, HealthCheckURLPathFlagDefault, "the url path for the provider's grpc health check")
cmdRPCProvider.Flags().DurationVar(&updaters.TimeOutForFetchingLavaBlocks, common.TimeOutForFetchingLavaBlocksFlag, time.Second*5, "setting the timeout for fetching lava blocks")
cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec")
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")

common.AddRollingLogConfig(cmdRPCProvider)
Expand Down
Loading

0 comments on commit 4793c0b

Please sign in to comment.