Skip to content

Commit

Permalink
PRT-554 adding listener with retry to RPC Consumer (#376)
Browse files Browse the repository at this point in the history
* adding listener with retry

* fixing merge conflict

* adding provider reconnect retry mechanism

* recover from address already in use error
  • Loading branch information
ranlavanet authored Mar 29, 2023
1 parent 646ee7b commit 6317b10
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 43 deletions.
20 changes: 0 additions & 20 deletions docs/static/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29674,16 +29674,6 @@ paths:
in: query
required: false
type: boolean
- name: pagination.reverse
description: >-
reverse is set to true if results are to be returned in the
descending order.


Since: cosmos-sdk 0.43
in: query
required: false
type: boolean
tags:
- Query
'/lavanet/lava/epochstorage/fixated_params/{index}':
Expand Down Expand Up @@ -29943,16 +29933,6 @@ paths:
in: query
required: false
type: boolean
- name: pagination.reverse
description: >-
reverse is set to true if results are to be returned in the
descending order.


Since: cosmos-sdk 0.43
in: query
required: false
type: boolean
tags:
- Query
'/lavanet/lava/epochstorage/stake_storage/{index}':
Expand Down
24 changes: 24 additions & 0 deletions protocol/chainlib/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainlib
import (
"encoding/json"
"fmt"
"net"
"net/url"
"regexp"
"strings"
Expand All @@ -19,6 +20,7 @@ import (

const (
ContextUserValueKeyDappID = "dappID"
RetryListeningInterval = 10 // seconds
)

type BaseChainParser struct {
Expand Down Expand Up @@ -201,6 +203,28 @@ func verifyTendermintEndpoint(endpoints []common.NodeUrl) (websocketEndpoint com
return websocketEndpoint, httpEndpoint
}

func ListenWithRetry(app *fiber.App, address string) {
for {
err := app.Listen(address)
if err != nil {
utils.LavaFormatError("app.Listen(listenAddr)", err)
}
time.Sleep(RetryListeningInterval * time.Second)
}
}

func GetListenerWithRetryGrpc(protocol string, addr string) net.Listener {
for {
lis, err := net.Listen(protocol, addr)
if err == nil {
return lis
}
utils.LavaFormatError("failure setting up listener, net.Listen(protocol, addr)", err, utils.Attribute{Key: "listenAddr", Value: addr})
time.Sleep(RetryListeningInterval * time.Second)
utils.LavaFormatWarning("Attempting connection retry", nil)
}
}

func GetApiInterfaceFromServiceApi(serviceApi *spectypes.ServiceApi, connectionType string) *spectypes.ApiInterface {
var apiInterface *spectypes.ApiInterface = nil
for i := range serviceApi.ApiInterfaces {
Expand Down
8 changes: 2 additions & 6 deletions protocol/chainlib/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -203,10 +202,7 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {

utils.LavaFormatInfo("gRPC PortalStart")

lis, err := net.Listen("tcp", apil.endpoint.NetworkAddress)
if err != nil {
utils.LavaFormatFatal("provider failure setting up listener", err, utils.Attribute{Key: "listenAddr", Value: apil.endpoint.NetworkAddress})
}
lis := GetListenerWithRetryGrpc("tcp", apil.endpoint.NetworkAddress)
apiInterface := apil.endpoint.ApiInterface
sendRelayCallback := func(ctx context.Context, method string, reqBody []byte) ([]byte, error) {
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
Expand All @@ -215,7 +211,7 @@ func (apil *GrpcChainListener) Serve(ctx context.Context) {
utils.LavaFormatInfo("GRPC Got Relay ", utils.Attribute{Key: "GUID", Value: ctx}, utils.Attribute{Key: "method", Value: method})
var relayReply *pairingtypes.RelayReply
metricsData := metrics.NewRelayAnalytics("NoDappID", apil.endpoint.ChainID, apiInterface)
relayReply, _, err = apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData)
relayReply, _, err := apil.relaySender.SendRelay(ctx, method, string(reqBody), "", "NoDappID", metricsData)
go apil.logger.AddMetricForGrpc(metricsData, err, &metadataValues)

if err != nil {
Expand Down
5 changes: 1 addition & 4 deletions protocol/chainlib/jsonRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,7 @@ func (apil *JsonRPCChainListener) Serve(ctx context.Context) {
})

// Go
err := app.Listen(apil.endpoint.NetworkAddress)
if err != nil {
utils.LavaFormatError("app.Listen(listenAddr)", err)
}
ListenWithRetry(app, apil.endpoint.NetworkAddress)
}

type JrpcChainProxy struct {
Expand Down
5 changes: 1 addition & 4 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,7 @@ func (apil *RestChainListener) Serve(ctx context.Context) {
})

// Go
err := app.Listen(apil.endpoint.NetworkAddress)
if err != nil {
utils.LavaFormatError("app.Listen(listenAddr)", err)
}
ListenWithRetry(app, apil.endpoint.NetworkAddress)
}

type RestChainProxy struct {
Expand Down
5 changes: 1 addition & 4 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,7 @@ func (apil *TendermintRpcChainListener) Serve(ctx context.Context) {
})
//
// Go
err := app.Listen(apil.endpoint.NetworkAddress)
if err != nil {
utils.LavaFormatError("app.Listen(listenAddr)", err)
}
ListenWithRetry(app, apil.endpoint.NetworkAddress)
}

type tendermintRpcChainProxy struct {
Expand Down
7 changes: 2 additions & 5 deletions protocol/rpcprovider/provider_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package rpcprovider
import (
"context"
"errors"
"net"
"net/http"
"strings"
"sync"

"github.com/lavanet/lava/protocol/chainlib"
"github.com/lavanet/lava/protocol/lavasession"

"github.com/improbable-eng/grpc-web/go/grpcweb"
Expand Down Expand Up @@ -53,10 +53,7 @@ func NewProviderListener(ctx context.Context, networkAddress string) *ProviderLi
pl := &ProviderListener{networkAddress: networkAddress}

// GRPC
lis, err := net.Listen("tcp", networkAddress)
if err != nil {
utils.LavaFormatFatal("provider failure setting up listener", err, utils.Attribute{Key: "listenAddr", Value: networkAddress})
}
lis := chainlib.GetListenerWithRetryGrpc("tcp", networkAddress)
grpcServer := grpc.NewServer()

wrappedServer := grpcweb.WrapServer(grpcServer)
Expand Down
4 changes: 4 additions & 0 deletions scripts/pre_setups/init_lava_only_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ __dir=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
source "$__dir"/../useful_commands.sh
. "${__dir}"/../vars/variables.sh

LOGS_DIR=${__dir}/../../testutil/debugging/logs
mkdir -p $LOGS_DIR
rm $LOGS_DIR/*.log

killall screen
screen -wipe
GASPRICE="0.000000001ulava"
Expand Down

0 comments on commit 6317b10

Please sign in to comment.