From b676e8f65b437055939f6645a654d169e9863e6a Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 11 Apr 2023 11:18:32 +0200 Subject: [PATCH 1/4] closing old connections after 2 epochs --- .../lavasession/consumer_session_manager.go | 12 ++++++++++++ .../consumer_session_manager_test.go | 19 +++++++++++++++++++ protocol/lavasession/consumer_types.go | 12 +++++++----- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 863ecd4c87..e1cb1a9a86 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -70,6 +70,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList // Reset the pairingPurge. // This happens only after an entire epoch. so its impossible to have session connected to the old purged list + csm.closePurgedUnusedPairingsConnections() // this must be before updating csm.pairingPurge as we want to close the connections of older sessions (prev 2 epochs) csm.pairingPurge = csm.pairing csm.pairing = make(map[string]*ConsumerSessionsWithProvider, pairingListLength) for idx, provider := range pairingList { @@ -81,6 +82,17 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList return nil } +// After 2 epochs we need to close all open connections. +// otherwise golang garbage collector is not closing network connections and they +// will remain open forever. +func (csm *ConsumerSessionManager) closePurgedUnusedPairingsConnections() { + for _, purgedPairing := range csm.pairingPurge { + for _, endpoint := range purgedPairing.Endpoints { + endpoint.connection.Close() + } + } +} + func (csm *ConsumerSessionManager) validAddressesLen() int { csm.lock.RLock() defer csm.lock.RUnlock() diff --git a/protocol/lavasession/consumer_session_manager_test.go b/protocol/lavasession/consumer_session_manager_test.go index db211a5617..75ffbcbdb2 100644 --- a/protocol/lavasession/consumer_session_manager_test.go +++ b/protocol/lavasession/consumer_session_manager_test.go @@ -12,8 +12,11 @@ import ( "github.com/lavanet/lava/protocol/provideroptimizer" "github.com/lavanet/lava/utils" + pairingtypes "github.com/lavanet/lava/x/pairing/types" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/wrapperspb" ) const ( @@ -516,3 +519,19 @@ func TestContext(t *testing.T) { require.Equal(t, ctxTO.Err(), context.DeadlineExceeded) cancel() } + +func TestGrpcClientHang(t *testing.T) { + ctx := context.Background() + s := createGRPCServer(t) // create a grpcServer so we can connect to its endpoint and validate everything works. + defer s.Stop() // stop the server when finished. + conn, err := grpc.DialContext(ctx, grpcListener, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) + require.NoError(t, err) + client := pairingtypes.NewRelayerClient(conn) + err = conn.Close() + require.NoError(t, err) + err = conn.Close() + require.Error(t, err) + _, err = client.Probe(ctx, &wrapperspb.UInt64Value{}) + fmt.Println(err) + require.Error(t, err) +} diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index b59dc51ffb..4b6feb9857 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -59,6 +59,7 @@ type Endpoint struct { NetworkAddress string // change at the end to NetworkAddress Enabled bool Client *pairingtypes.RelayerClient + connection *grpc.ClientConn ConnectionRefusals uint64 } @@ -184,18 +185,18 @@ func (cswp *ConsumerSessionsWithProvider) decreaseUsedComputeUnits(cu uint64) er return nil } -func (cswp *ConsumerSessionsWithProvider) connectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, error) { +func (cswp *ConsumerSessionsWithProvider) connectRawClientWithTimeout(ctx context.Context, addr string) (*pairingtypes.RelayerClient, *grpc.ClientConn, error) { connectCtx, cancel := context.WithTimeout(ctx, TimeoutForEstablishingAConnection) defer cancel() conn, err := grpc.DialContext(connectCtx, addr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()) if err != nil { - return nil, err + return nil, nil, err } /*defer conn.Close()*/ c := pairingtypes.NewRelayerClient(conn) - return &c, nil + return &c, conn, nil } func (cswp *ConsumerSessionsWithProvider) getConsumerSessionInstanceFromEndpoint(endpoint *Endpoint, numberOfResets uint64) (singleConsumerSession *SingleConsumerSession, pairingEpoch uint64, err error) { @@ -263,7 +264,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes continue } if endpoint.Client == nil { - conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.NetworkAddress) + client, conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.NetworkAddress) if err != nil { endpoint.ConnectionRefusals++ utils.LavaFormatError("error connecting to provider", err, utils.Attribute{Key: "provider endpoint", Value: endpoint.NetworkAddress}, utils.Attribute{Key: "provider address", Value: cswp.PublicLavaAddress}, utils.Attribute{Key: "endpoint", Value: endpoint}) @@ -274,7 +275,8 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes continue } endpoint.ConnectionRefusals = 0 - endpoint.Client = conn + endpoint.Client = client + endpoint.connection = conn } cswp.Endpoints[idx] = endpoint return true, endpoint, false From f729c19767bc6091ecf1f0923a6f0f9f5ee65d8e Mon Sep 17 00:00:00 2001 From: omer mishael Date: Tue, 11 Apr 2023 02:10:51 +0300 Subject: [PATCH 2/4] added connection recovery if it was shut down --- protocol/lavasession/consumer_types.go | 28 ++++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 4b6feb9857..6f914c862a 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -13,6 +13,7 @@ import ( "github.com/lavanet/lava/utils" pairingtypes "github.com/lavanet/lava/x/pairing/types" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" ) @@ -263,7 +264,10 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes if !endpoint.Enabled { continue } - if endpoint.Client == nil { + connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected bool) { + if endpoint.Client != nil && endpoint.connection.GetState() != connectivity.Shutdown { + return true + } client, conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.NetworkAddress) if err != nil { endpoint.ConnectionRefusals++ @@ -272,11 +276,31 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes endpoint.Enabled = false utils.LavaFormatWarning("disabling provider endpoint for the duration of current epoch.", nil, utils.Attribute{Key: "Endpoint", Value: endpoint.NetworkAddress}, utils.Attribute{Key: "address", Value: cswp.PublicLavaAddress}) } - continue + return false } endpoint.ConnectionRefusals = 0 endpoint.Client = client + if endpoint.connection != nil { + endpoint.connection.Close() // just to be safe + } endpoint.connection = conn + return true + } + if endpoint.Client == nil { + connected := connectEndpoint(cswp, ctx, endpoint) + if !connected { + continue + } + } else { + if endpoint.connection.GetState() == connectivity.Shutdown { + // connection was shut down, so we need to create a new one + endpoint.connection.Close() + endpoint.Client = nil + connected := connectEndpoint(cswp, ctx, endpoint) + if !connected { + continue + } + } } cswp.Endpoints[idx] = endpoint return true, endpoint, false From f434c49dd976a73930d05e476ee58c8400507cc5 Mon Sep 17 00:00:00 2001 From: omer mishael Date: Tue, 11 Apr 2023 02:15:35 +0300 Subject: [PATCH 3/4] lint --- protocol/lavasession/consumer_types.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index 6f914c862a..ecbc4af1ed 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -264,7 +264,7 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes if !endpoint.Enabled { continue } - connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected bool) { + connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected_ bool) { if endpoint.Client != nil && endpoint.connection.GetState() != connectivity.Shutdown { return true } @@ -287,20 +287,19 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes return true } if endpoint.Client == nil { - connected := connectEndpoint(cswp, ctx, endpoint) - if !connected { + connected_ := connectEndpoint(cswp, ctx, endpoint) + if !connected_ { continue } - } else { - if endpoint.connection.GetState() == connectivity.Shutdown { - // connection was shut down, so we need to create a new one - endpoint.connection.Close() - endpoint.Client = nil - connected := connectEndpoint(cswp, ctx, endpoint) - if !connected { - continue - } + } else if endpoint.connection.GetState() == connectivity.Shutdown { + // connection was shut down, so we need to create a new one + endpoint.connection.Close() + endpoint.Client = nil + connected_ := connectEndpoint(cswp, ctx, endpoint) + if !connected_ { + continue } + } cswp.Endpoints[idx] = endpoint return true, endpoint, false From c8505c57139c21202ce33ddfd4e9a7fcd474baac Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Tue, 11 Apr 2023 12:09:07 +0200 Subject: [PATCH 4/4] white space lint fix --- protocol/lavasession/consumer_types.go | 1 - 1 file changed, 1 deletion(-) diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index ecbc4af1ed..dbb4d745c6 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -299,7 +299,6 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes if !connected_ { continue } - } cswp.Endpoints[idx] = endpoint return true, endpoint, false