Skip to content

Commit

Permalink
feat: PRT - Implement cache reconnect mechanism + Cache E2E (#1734)
Browse files Browse the repository at this point in the history
* Fix small things in protocol e2e

* Add cache reconnect mechanism

* Add cache to protocol E2E

* Fix lint

* Add missing line

* Separate the cache from the relayer for auto reconnection

* Fix lint
  • Loading branch information
shleikes authored Oct 30, 2024
1 parent fc67ad2 commit ab29fb6
Show file tree
Hide file tree
Showing 4 changed files with 230 additions and 93 deletions.
131 changes: 108 additions & 23 deletions protocol/performance/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,56 @@ package performance

import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
)

type Cache struct {
client pairingtypes.RelayerCacheClient
address string
type relayerCacheClientStore struct {
client pairingtypes.RelayerCacheClient
lock sync.RWMutex
ctx context.Context
address string
reconnecting atomic.Bool
}

func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string) (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
const (
reconnectInterval = 5 * time.Second
)

func newRelayerCacheClientStore(ctx context.Context, address string) (*relayerCacheClientStore, error) {
clientStore := &relayerCacheClientStore{
client: nil,
ctx: ctx,
address: address,
}
return clientStore, clientStore.connectClient()
}

func (r *relayerCacheClientStore) getClient() pairingtypes.RelayerCacheClient {
if r == nil {
return nil
}

r.lock.RLock()
defer r.lock.RUnlock()

if r.client == nil {
go r.reconnectClient()
}

return r.client // might be nil
}

func (r *relayerCacheClientStore) connectGRPCConnectionToRelayerCacheService() (*pairingtypes.RelayerCacheClient, error) {
connectCtx, cancel := context.WithTimeout(r.ctx, 3*time.Second)
defer cancel()

conn, err := lavasession.ConnectGRPCClient(connectCtx, addr, false, true, false)
conn, err := lavasession.ConnectGRPCClient(connectCtx, r.address, false, true, false)
if err != nil {
return nil, err
}
Expand All @@ -27,40 +61,91 @@ func ConnectGRPCConnectionToRelayerCacheService(ctx context.Context, addr string
return &c, nil
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
relayerCacheClient, err := ConnectGRPCConnectionToRelayerCacheService(ctx, addr)
if err != nil {
return &Cache{client: nil, address: addr}, err
func (r *relayerCacheClientStore) connectClient() error {
relayerCacheClient, err := r.connectGRPCConnectionToRelayerCacheService()
if err == nil {
utils.LavaFormatInfo("Connected to cache service", utils.LogAttr("address", r.address))
func() {
r.lock.Lock()
defer r.lock.Unlock()
r.client = *relayerCacheClient
}()

r.reconnecting.Store(false)
return nil // connected
}
cache := Cache{client: *relayerCacheClient, address: addr}
return &cache, nil

utils.LavaFormatDebug("Failed to connect to cache service", utils.LogAttr("address", r.address), utils.LogAttr("error", err))
return err
}

func (r *relayerCacheClientStore) reconnectClient() {
// This is a simple atomic operation to ensure that only one goroutine is reconnecting at a time.
// reconnecting.CompareAndSwap(false, true):
// if reconnecting == false {
// reconnecting = true
// return true -> reconnect
// }
// return false -> already reconnecting
if !r.reconnecting.CompareAndSwap(false, true) {
return
}

for {
select {
case <-r.ctx.Done():
return
case <-time.After(reconnectInterval):
if r.connectClient() != nil {
return
}
}
}
}

type Cache struct {
clientStore *relayerCacheClientStore
address string
serviceCtx context.Context
}

func InitCache(ctx context.Context, addr string) (*Cache, error) {
clientStore, err := newRelayerCacheClientStore(ctx, addr)
return &Cache{
clientStore: clientStore,
address: addr,
serviceCtx: ctx,
}, err
}

func (cache *Cache) GetEntry(ctx context.Context, relayCacheGet *pairingtypes.RelayCacheGet) (reply *pairingtypes.CacheRelayReply, err error) {
if cache == nil {
// TODO: try to connect again once in a while
return nil, NotInitializedError
}
if cache.client == nil {
return nil, NotConnectedError.Wrapf("No client connected to address: %s", cache.address)

client := cache.clientStore.getClient()
if client == nil {
return nil, NotConnectedError
}
// TODO: handle disconnections and error types here
return cache.client.GetRelay(ctx, relayCacheGet)

reply, err = client.GetRelay(ctx, relayCacheGet)
return reply, err
}

func (cache *Cache) CacheActive() bool {
return cache != nil
return cache != nil && cache.clientStore.getClient() != nil
}

func (cache *Cache) SetEntry(ctx context.Context, cacheSet *pairingtypes.RelayCacheSet) error {
if cache == nil {
// TODO: try to connect again once in a while
return NotInitializedError
}
if cache.client == nil {
return NotConnectedError.Wrapf("No client connected to address: %s", cache.address)

client := cache.clientStore.getClient()
if client == nil {
return NotConnectedError
}
// TODO: handle disconnections and SetRelay error types here
_, err := cache.client.SetRelay(ctx, cacheSet)

_, err := client.SetRelay(ctx, cacheSet)
return err
}
2 changes: 1 addition & 1 deletion protocol/performance/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ import (

var (
NotConnectedError = sdkerrors.New("Not Connected Error", 700, "No Connection To grpc server")
NotInitializedError = sdkerrors.New("Not Initialised Error", 701, "to use cache run initCache")
NotInitializedError = sdkerrors.New("Not Initialized Error", 701, "to use cache run initCache")
)
1 change: 1 addition & 0 deletions testutil/e2e/allowedErrorList.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ var allowedErrors = map[string]string{
"purging provider after all endpoints are disabled provider": "This error is allowed because it is caused by the initial bootup, continuous failure would be caught by the e2e so we can allowed this error.",
"Provider Side Failed Sending Message, Reason: Unavailable": "This error is allowed because it is caused by the lavad restart to turn on emergency mode",
"Maximum cu exceeded PrepareSessionForUsage": "This error is allowed because it is caused by switching between providers, continuous failure would be caught by the e2e so we can allowed this error.",
"Failed To Connect to cache at address": "This error is allowed because it is caused by cache being connected only during the test and not during the bootup",
}

var allowedErrorsDuringEmergencyMode = map[string]string{
Expand Down
Loading

0 comments on commit ab29fb6

Please sign in to comment.