diff --git a/protocol/performance/cache.go b/protocol/performance/cache.go index abce3be96f..638644fbff 100644 --- a/protocol/performance/cache.go +++ b/protocol/performance/cache.go @@ -2,22 +2,56 @@ package performance import ( "context" + "sync" + "sync/atomic" "time" "github.com/lavanet/lava/v4/protocol/lavasession" + "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" ) -type Cache struct { - client pairingtypes.RelayerCacheClient - address string +type relayerCacheClientStore struct { + client pairingtypes.RelayerCacheClient + lock sync.RWMutex + ctx context.Context + address string + reconnecting atomic.Bool } -func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) { - connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second) +const ( + reconnectInterval = 5 * time.Second +) + +func newRelayerCacheClientStore(ctx context.Context, address string) (*relayerCacheClientStore, error) { + clientStore := &relayerCacheClientStore{ + client: nil, + ctx: ctx, + address: address, + } + return clientStore, clientStore.connectClient() +} + +func (r *relayerCacheClientStore) getClient() pairingtypes.RelayerCacheClient { + if r == nil { + return nil + } + + r.lock.RLock() + defer r.lock.RUnlock() + + if r.client == nil { + go r.reconnectClient() + } + + return r.client // might be nil +} + +func (r *relayerCacheClientStore) connectGRPCConnectionToRelayerCacheService() (*pairingtypes.RelayerCacheClient, error) { + connectCtx, cancel := context.WithTimeout(r.ctx, 3*time.Second) defer cancel() - conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false) + conn, err := lavasession.ConnectGRPCClient(connectCtx, r.address, false, true, false) if err != nil { return nil, err } @@ -27,40 +61,91 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string return &c, nil } -func InitCache(ctx context.Context, addr string) (*Cache, error) { - relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(ctx, addr) - if err != nil { - return &Cache{client: nil, address: addr}, err +func (r *relayerCacheClientStore) connectClient() error { + relayerCacheClient, err := r.connectGRPCConnectionToRelayerCacheService() + if err == nil { + utils.LavaFormatInfo("Connected to cache service", utils.LogAttr("address", r.address)) + func() { + r.lock.Lock() + defer r.lock.Unlock() + r.client = *relayerCacheClient + }() + + r.reconnecting.Store(false) + return nil // connected } - cache := Cache{client: *relayerCacheClient, address: addr} - return &cache, nil + + utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", r.address), utils.LogAttr("error", err)) + return err +} + +func (r *relayerCacheClientStore) reconnectClient() { + // This is a simple atomic operation to ensure that only one goroutine is reconnecting at a time. + // reconnecting.CompareAndSwap(false, true): + // if reconnecting == false { + // reconnecting = true + // return true -> reconnect + // } + // return false -> already reconnecting + if !r.reconnecting.CompareAndSwap(false, true) { + return + } + + for { + select { + case <-r.ctx.Done(): + return + case <-time.After(reconnectInterval): + if r.connectClient() != nil { + return + } + } + } +} + +type Cache struct { + clientStore *relayerCacheClientStore + address string + serviceCtx context.Context +} + +func InitCache(ctx context.Context, addr string) (*Cache, error) { + clientStore, err := newRelayerCacheClientStore(ctx, addr) + return &Cache{ + clientStore: clientStore, + address: addr, + serviceCtx: ctx, + }, err } func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (reply *pairingtypes.CacheRelayReply, err error) { if cache == nil { - // TODO: try to connect again once in a while return nil, NotInitializedError } - if cache.client == nil { - return nil, NotConnectedError.Wrapf("No client connected to address: %s", cache.address) + + client := cache.clientStore.getClient() + if client == nil { + return nil, NotConnectedError } - // TODO: handle disconnections and error types here - return cache.client.GetRelay(ctx, relayCacheGet) + + reply, err = client.GetRelay(ctx, relayCacheGet) + return reply, err } func (cache *Cache) CacheActive() bool { - return cache != nil + return cache != nil && cache.clientStore.getClient() != nil } func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCacheSet) error { if cache == nil { - // TODO: try to connect again once in a while return NotInitializedError } - if cache.client == nil { - return NotConnectedError.Wrapf("No client connected to address: %s", cache.address) + + client := cache.clientStore.getClient() + if client == nil { + return NotConnectedError } - // TODO: handle disconnections and SetRelay error types here - _, err := cache.client.SetRelay(ctx, cacheSet) + + _, err := client.SetRelay(ctx, cacheSet) return err } diff --git a/protocol/performance/errors.go b/protocol/performance/errors.go index bb69cd3f51..1170f0b67f 100644 --- a/protocol/performance/errors.go +++ b/protocol/performance/errors.go @@ -6,5 +6,5 @@ import ( var ( NotConnectedError = sdkerrors.New("Not Connected Error", 700, "No Connection To grpc server") - NotInitializedError = sdkerrors.New("Not Initialised Error", 701, "to use cache run initCache") + NotInitializedError = sdkerrors.New("Not Initialized Error", 701, "to use cache run initCache") ) diff --git a/testutil/e2e/allowedErrorList.go b/testutil/e2e/allowedErrorList.go index 5b083bae70..192e3eac1b 100644 --- a/testutil/e2e/allowedErrorList.go +++ b/testutil/e2e/allowedErrorList.go @@ -9,6 +9,7 @@ var allowedErrors = map[string]string{ "purging provider after all endpoints are disabled provider": "This error is allowed because it is caused by the initial bootup, continuous failure would be caught by the e2e so we can allowed this error.", "Provider Side Failed Sending Message, Reason: Unavailable": "This error is allowed because it is caused by the lavad restart to turn on emergency mode", "Maximum cu exceeded PrepareSessionForUsage": "This error is allowed because it is caused by switching between providers, continuous failure would be caught by the e2e so we can allowed this error.", + "Failed To Connect to cache at address": "This error is allowed because it is caused by cache being connected only during the test and not during the bootup", } var allowedErrorsDuringEmergencyMode = map[string]string{ diff --git a/testutil/e2e/protocolE2E.go b/testutil/e2e/protocolE2E.go index bb19f16e8c..541eb66a2c 100644 --- a/testutil/e2e/protocolE2E.go +++ b/testutil/e2e/protocolE2E.go @@ -77,6 +77,8 @@ type lavaTest struct { wg sync.WaitGroup logPath string tokenDenom string + consumerCacheAddress string + providerCacheAddress string } func init() { @@ -339,8 +341,8 @@ func (lt *lavaTest) startJSONRPCProxy(ctx context.Context) { func (lt *lavaTest) startJSONRPCProvider(ctx context.Context) { for idx := 1; idx <= 5; idx++ { command := fmt.Sprintf( - "%s rpcprovider %s/jsonrpcProvider%d.yml --chain-id=lava --from servicer%d %s", - lt.protocolPath, providerConfigsFolder, idx, idx, lt.lavadArgs, + "%s rpcprovider %s/jsonrpcProvider%d.yml --cache-be %s --chain-id=lava --from servicer%d %s", + lt.protocolPath, providerConfigsFolder, idx, lt.providerCacheAddress, idx, lt.lavadArgs, ) logName := "03_EthProvider_" + fmt.Sprintf("%02d", idx) funcName := fmt.Sprintf("startJSONRPCProvider (provider %02d)", idx) @@ -358,8 +360,8 @@ func (lt *lavaTest) startJSONRPCProvider(ctx context.Context) { func (lt *lavaTest) startJSONRPCConsumer(ctx context.Context) { for idx, u := range []string{"user1"} { command := fmt.Sprintf( - "%s rpcconsumer %s/ethConsumer%d.yml --chain-id=lava --from %s %s", - lt.protocolPath, consumerConfigsFolder, idx+1, u, lt.lavadArgs+lt.consumerArgs, + "%s rpcconsumer %s/ethConsumer%d.yml --cache-be %s --chain-id=lava --from %s %s", + lt.protocolPath, consumerConfigsFolder, idx+1, lt.consumerCacheAddress, u, lt.lavadArgs+lt.consumerArgs, ) logName := "04_jsonConsumer_" + fmt.Sprintf("%02d", idx+1) funcName := fmt.Sprintf("startJSONRPCConsumer (consumer %02d)", idx+1) @@ -533,8 +535,8 @@ func jsonrpcTests(rpcURL string, testDuration time.Duration) error { func (lt *lavaTest) startLavaProviders(ctx context.Context) { for idx := 6; idx <= 10; idx++ { command := fmt.Sprintf( - "%s rpcprovider %s/lavaProvider%d --chain-id=lava --from servicer%d %s", - lt.protocolPath, providerConfigsFolder, idx, idx, lt.lavadArgs, + "%s rpcprovider %s/lavaProvider%d --cache-be %s --chain-id=lava --from servicer%d %s", + lt.protocolPath, providerConfigsFolder, idx, lt.providerCacheAddress, idx, lt.lavadArgs, ) logName := "05_LavaProvider_" + fmt.Sprintf("%02d", idx-5) funcName := fmt.Sprintf("startLavaProviders (provider %02d)", idx-5) @@ -552,8 +554,8 @@ func (lt *lavaTest) startLavaProviders(ctx context.Context) { func (lt *lavaTest) startLavaConsumer(ctx context.Context) { for idx, u := range []string{"user3"} { command := fmt.Sprintf( - "%s rpcconsumer %s/lavaConsumer%d.yml --chain-id=lava --from %s %s", - lt.protocolPath, consumerConfigsFolder, idx+1, u, lt.lavadArgs+lt.consumerArgs, + "%s rpcconsumer %s/lavaConsumer%d.yml --cache-be %s --chain-id=lava --from %s %s", + lt.protocolPath, consumerConfigsFolder, idx+1, lt.consumerCacheAddress, u, lt.lavadArgs+lt.consumerArgs, ) logName := "06_RPCConsumer_" + fmt.Sprintf("%02d", idx+1) funcName := fmt.Sprintf("startRPCConsumer (consumer %02d)", idx+1) @@ -562,6 +564,43 @@ func (lt *lavaTest) startLavaConsumer(ctx context.Context) { utils.LavaFormatInfo("startRPCConsumer OK") } +func (lt *lavaTest) startConsumerCache(ctx context.Context) { + command := fmt.Sprintf("%s cache %s --log_level debug", lt.protocolPath, lt.consumerCacheAddress) + logName := "08_Consumer_Cache" + funcName := "startConsumerCache" + + lt.execCommand(ctx, funcName, logName, command, false) + lt.checkCacheIsUp(ctx, lt.consumerCacheAddress, time.Minute) + utils.LavaFormatInfo(funcName + OKstr) +} + +func (lt *lavaTest) startProviderCache(ctx context.Context) { + command := fmt.Sprintf("%s cache %s --log_level debug", lt.protocolPath, lt.providerCacheAddress) + logName := "09_Provider_Cache" + funcName := "startProviderCache" + + lt.execCommand(ctx, funcName, logName, command, false) + lt.checkCacheIsUp(ctx, lt.providerCacheAddress, time.Minute) + utils.LavaFormatInfo(funcName + OKstr) +} + +func (lt *lavaTest) checkCacheIsUp(ctx context.Context, cacheAddress string, timeout time.Duration) { + for start := time.Now(); time.Since(start) < timeout; { + utils.LavaFormatInfo("Waiting Cache " + cacheAddress) + nctx, cancel := context.WithTimeout(ctx, time.Second) + grpcClient, err := grpc.DialContext(nctx, cacheAddress, grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + cancel() + time.Sleep(time.Second) + continue + } + cancel() + grpcClient.Close() + return + } + panic("checkCacheIsUp: Check Failed Cache didn't respond" + cacheAddress) +} + func (lt *lavaTest) startLavaEmergencyConsumer(ctx context.Context) { for idx, u := range []string{"user5"} { command := fmt.Sprintf( @@ -1394,16 +1433,18 @@ func runProtocolE2E(timeout time.Duration) { fmt.Println(err) } lt := &lavaTest{ - grpcConn: grpcConn, - lavadPath: gopath + "/bin/lavad", - protocolPath: gopath + "/bin/lavap", - lavadArgs: "--geolocation 1 --log_level debug", - consumerArgs: " --allow-insecure-provider-dialing", - logs: make(map[string]*sdk.SafeBuffer), - commands: make(map[string]*exec.Cmd), - providerType: make(map[string][]epochStorageTypes.Endpoint), - logPath: protocolLogsFolder, - tokenDenom: commonconsts.TestTokenDenom, + grpcConn: grpcConn, + lavadPath: gopath + "/bin/lavad", + protocolPath: gopath + "/bin/lavap", + lavadArgs: "--geolocation 1 --log_level debug", + consumerArgs: " --allow-insecure-provider-dialing", + logs: make(map[string]*sdk.SafeBuffer), + commands: make(map[string]*exec.Cmd), + providerType: make(map[string][]epochStorageTypes.Endpoint), + logPath: protocolLogsFolder, + tokenDenom: commonconsts.TestTokenDenom, + consumerCacheAddress: "127.0.0.1:2778", + providerCacheAddress: "127.0.0.1:2777", } // use defer to save logs in case the tests fail defer func() { @@ -1465,63 +1506,73 @@ func runProtocolE2E(timeout time.Duration) { lt.startLavaProviders(ctx) lt.startLavaConsumer(ctx) - // staked client then with subscription - repeat(1, func(n int) { - url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) - lt.checkTendermintConsumer(url, time.Second*30) - url = fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3+1) - lt.checkRESTConsumer(url, time.Second*30) - url = fmt.Sprintf("127.0.0.1:334%d", (n-1)*3+2) - lt.checkGRPCConsumer(url, time.Second*30) - }) + runChecksAndTests := func() { + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) + lt.checkTendermintConsumer(url, time.Second*15) + url = fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3+1) + lt.checkRESTConsumer(url, time.Second*15) + url = fmt.Sprintf("127.0.0.1:334%d", (n-1)*3+2) + lt.checkGRPCConsumer(url, time.Second*15) + }) - // staked client then with subscription - repeat(1, func(n int) { - url := fmt.Sprintf("http://127.0.0.1:333%d", n) - if err := jsonrpcTests(url, time.Second*30); err != nil { - panic(err) - } - }) - utils.LavaFormatInfo("JSONRPC TEST OK") + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("http://127.0.0.1:333%d", n) + if err := jsonrpcTests(url, time.Second*15); err != nil { + panic(err) + } + }) + utils.LavaFormatInfo("JSONRPC TEST OK") - // staked client then with subscription - repeat(1, func(n int) { - url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) - if err := tendermintTests(url, time.Second*30); err != nil { - panic(err) - } - }) - utils.LavaFormatInfo("TENDERMINTRPC TEST OK") + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) + if err := tendermintTests(url, time.Second*15); err != nil { + panic(err) + } + }) + utils.LavaFormatInfo("TENDERMINTRPC TEST OK") - // staked client then with subscription - repeat(1, func(n int) { - url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) - if err := tendermintURITests(url, time.Second*30); err != nil { - panic(err) - } - }) - utils.LavaFormatInfo("TENDERMINTRPC URI TEST OK") + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3) + if err := tendermintURITests(url, time.Second*15); err != nil { + panic(err) + } + }) + utils.LavaFormatInfo("TENDERMINTRPC URI TEST OK") - lt.lavaOverLava(ctx) + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3+1) + if err := restTests(url, time.Second*15); err != nil { + panic(err) + } + }) + utils.LavaFormatInfo("REST TEST OK") - // staked client then with subscription - repeat(1, func(n int) { - url := fmt.Sprintf("http://127.0.0.1:334%d", (n-1)*3+1) - if err := restTests(url, time.Second*30); err != nil { - panic(err) - } - }) - utils.LavaFormatInfo("REST TEST OK") + // staked client then with subscription + repeat(1, func(n int) { + url := fmt.Sprintf("127.0.0.1:334%d", (n-1)*3+2) + if err := grpcTests(url, time.Second*15); err != nil { + panic(err) + } + }) + utils.LavaFormatInfo("GRPC TEST OK") + } - // staked client then with subscription - // TODO: if set to 30 secs fails e2e need to investigate why. currently blocking PR's - repeat(1, func(n int) { - url := fmt.Sprintf("127.0.0.1:334%d", (n-1)*3+2) - if err := grpcTests(url, time.Second*5); err != nil { - panic(err) - } - }) - utils.LavaFormatInfo("GRPC TEST OK") + // run tests without cache + runChecksAndTests() + + lt.startConsumerCache(ctx) + lt.startProviderCache(ctx) + + // run tests with cache + runChecksAndTests() + + lt.lavaOverLava(ctx) lt.checkResponse("http://127.0.0.1:3340", "http://127.0.0.1:3341", "127.0.0.1:3342")