Skip to content

Commit

Permalink
added connection recovery if it was shut down
Browse files Browse the repository at this point in the history
  • Loading branch information
omerlavanet committed Apr 10, 2023
1 parent b676e8f commit f729c19
Showing 1 changed file with 26 additions and 2 deletions.
28 changes: 26 additions & 2 deletions protocol/lavasession/consumer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/lavanet/lava/utils"
pairingtypes "github.com/lavanet/lava/x/pairing/types"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
)

Expand Down Expand Up @@ -263,7 +264,10 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
if !endpoint.Enabled {
continue
}
if endpoint.Client == nil {
connectEndpoint := func(cswp *ConsumerSessionsWithProvider, ctx context.Context, endpoint *Endpoint) (connected bool) {
if endpoint.Client != nil && endpoint.connection.GetState() != connectivity.Shutdown {
return true
}
client, conn, err := cswp.connectRawClientWithTimeout(ctx, endpoint.NetworkAddress)
if err != nil {
endpoint.ConnectionRefusals++
Expand All @@ -272,11 +276,31 @@ func (cswp *ConsumerSessionsWithProvider) fetchEndpointConnectionFromConsumerSes
endpoint.Enabled = false
utils.LavaFormatWarning("disabling provider endpoint for the duration of current epoch.", nil, utils.Attribute{Key: "Endpoint", Value: endpoint.NetworkAddress}, utils.Attribute{Key: "address", Value: cswp.PublicLavaAddress})
}
continue
return false
}
endpoint.ConnectionRefusals = 0
endpoint.Client = client
if endpoint.connection != nil {
endpoint.connection.Close() // just to be safe
}
endpoint.connection = conn
return true
}
if endpoint.Client == nil {
connected := connectEndpoint(cswp, ctx, endpoint)
if !connected {
continue
}
} else {
if endpoint.connection.GetState() == connectivity.Shutdown {
// connection was shut down, so we need to create a new one
endpoint.connection.Close()
endpoint.Client = nil
connected := connectEndpoint(cswp, ctx, endpoint)
if !connected {
continue
}
}
}
cswp.Endpoints[idx] = endpoint
return true, endpoint, false
Expand Down

0 comments on commit f729c19

Please sign in to comment.