Skip to content

Commit

Permalink
PRT-614-improve-provider-startup-times (#414)
Browse files Browse the repository at this point in the history
* lint

* make set up parallel for all chains and apis

* lint

* added debug print

* closing old connections after 2 epochs

* added connection recovery if it was shut down

* reduce prints

---------

Co-authored-by: Ran Mishael <ran@lavanet.xyz>
  • Loading branch information
omerlavanet and ranlavanet authored Apr 13, 2023
1 parent 47705e9 commit 4ea37db
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 74 deletions.
7 changes: 0 additions & 7 deletions protocol/rpcprovider/provider_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,6 @@ func (rs *relayServer) Relay(ctx context.Context, request *pairingtypes.RelayReq
}

func (rs *relayServer) Probe(ctx context.Context, probeReq *wrapperspb.UInt64Value) (*wrapperspb.UInt64Value, error) {
guid, found := utils.GetUniqueIdentifier(ctx)
attributes := []utils.Attribute{{Key: "probe", Value: probeReq.Value}}
if found {
attributes = append(attributes, utils.Attribute{Key: "GUID", Value: guid})
}
utils.LavaFormatDebug("Provider got probe", attributes...)

return probeReq, nil
}

Expand Down
153 changes: 87 additions & 66 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"strconv"
"strings"
"sync"
"time"

"github.com/cosmos/cosmos-sdk/client"
Expand Down Expand Up @@ -64,7 +65,7 @@ type RPCProvider struct {
providerStateTracker ProviderStateTrackerInf
rpcProviderServers map[string]*RPCProviderServer
rpcProviderListeners map[string]*ProviderListener
disabledEndpoints []*lavasession.RPCProviderEndpoint
lock sync.Mutex
}

func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, rpcProviderEndpoints []*lavasession.RPCProviderEndpoint, cache *performance.Cache, parallelConnections uint) (err error) {
Expand All @@ -77,7 +78,6 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client
}()
rpcp.rpcProviderServers = make(map[string]*RPCProviderServer)
rpcp.rpcProviderListeners = make(map[string]*ProviderListener)
rpcp.disabledEndpoints = []*lavasession.RPCProviderEndpoint{}
// single state tracker
lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, clientCtx)
providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, txFactory, clientCtx, lavaChainFetcher)
Expand Down Expand Up @@ -111,79 +111,100 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client
if err != nil {
utils.LavaFormatFatal("Failed fetching GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment in RPCProvider Start", err)
}
var wg sync.WaitGroup
parallelJobs := len(rpcProviderEndpoints)
wg.Add(parallelJobs)
disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs)

for _, rpcProviderEndpoint := range rpcProviderEndpoints {
err := rpcProviderEndpoint.Validate()
if err != nil {
utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()})
rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint)
continue
}
providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize)
key := rpcProviderEndpoint.Key()
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, providerSessionManager)
chainParser, err := chainlib.NewChainParser(rpcProviderEndpoint.ApiInterface)
if err != nil {
return err
}
providerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, rpcProviderEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
chainProxy, err := chainlib.GetChainProxy(ctx, parallelConnections, rpcProviderEndpoint, averageBlockTime)
if err != nil {
utils.LavaFormatError("panic severity critical error, failed creating chain proxy, continuing with others endpoints", err, utils.Attribute{Key: "parallelConnections", Value: uint64(parallelConnections)}, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint})
rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint)
continue
}
go func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint) error {
defer wg.Done()
err := rpcProviderEndpoint.Validate()
if err != nil {
return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()})
}
providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize)
key := rpcProviderEndpoint.Key()
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, providerSessionManager)
chainParser, err := chainlib.NewChainParser(rpcProviderEndpoint.ApiInterface)
if err != nil {
disabledEndpoints <- rpcProviderEndpoint
return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid chain parser, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()})
}
providerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, rpcProviderEndpoint.ChainID)
_, averageBlockTime, _, _ := chainParser.ChainBlockStats()
chainProxy, err := chainlib.GetChainProxy(ctx, parallelConnections, rpcProviderEndpoint, averageBlockTime)
if err != nil {
disabledEndpoints <- rpcProviderEndpoint
return utils.LavaFormatError("panic severity critical error, failed creating chain proxy, continuing with others endpoints", err, utils.Attribute{Key: "parallelConnections", Value: uint64(parallelConnections)}, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint})
}

_, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats()
blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData)
chainTrackerConfig := chaintracker.ChainTrackerConfig{
BlocksToSave: blocksToSaveChainTracker,
AverageBlockTime: averageBlockTime,
ServerBlockMemory: ChainTrackerDefaultMemory + blocksToSaveChainTracker,
}
chainFetcher := chainlib.NewChainFetcher(ctx, chainProxy, chainParser, rpcProviderEndpoint)
chainTracker, err := chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig)
if err != nil {
utils.LavaFormatError("panic severity critical error, aborting support for chain api due to node access, continuing with other endpoints", err, utils.Attribute{Key: "chainTrackerConfig", Value: chainTrackerConfig}, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint})
rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint)
continue
}
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, providerStateTracker, addr.String(), chainProxy, chainParser)
providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint)
_, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats()
blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData)
chainTrackerConfig := chaintracker.ChainTrackerConfig{
BlocksToSave: blocksToSaveChainTracker,
AverageBlockTime: averageBlockTime,
ServerBlockMemory: ChainTrackerDefaultMemory + blocksToSaveChainTracker,
}
chainFetcher := chainlib.NewChainFetcher(ctx, chainProxy, chainParser, rpcProviderEndpoint)
chainTracker, err := chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig)
if err != nil {
disabledEndpoints <- rpcProviderEndpoint
return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to node access, continuing with other endpoints", err, utils.Attribute{Key: "chainTrackerConfig", Value: chainTrackerConfig}, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint})
}
reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, providerStateTracker, addr.String(), chainProxy, chainParser)
providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint)

rpcProviderServer := &RPCProviderServer{}
if _, ok := rpcp.rpcProviderServers[key]; ok {
utils.LavaFormatFatal("Trying to add the same key twice to rpcProviderServers check config file.", nil,
utils.Attribute{Key: "key", Value: key})
}
rpcp.rpcProviderServers[key] = rpcProviderServer
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rewardServer, providerSessionManager, reliabilityManager, privKey, cache, chainProxy, providerStateTracker, addr, lavaChainID, DEFAULT_ALLOWED_MISSING_CU)
rpcProviderServer := &RPCProviderServer{}
if _, ok := rpcp.rpcProviderServers[key]; ok {
utils.LavaFormatFatal("Trying to add the same key twice to rpcProviderServers check config file.", nil,
utils.Attribute{Key: "key", Value: key})
}
rpcp.rpcProviderServers[key] = rpcProviderServer
rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rewardServer, providerSessionManager, reliabilityManager, privKey, cache, chainProxy, providerStateTracker, addr, lavaChainID, DEFAULT_ALLOWED_MISSING_CU)

// set up grpc listener
var listener *ProviderListener
if rpcProviderEndpoint.NetworkAddress == "" && len(rpcp.rpcProviderListeners) > 0 {
// handle case only one network address was defined
for _, listener_p := range rpcp.rpcProviderListeners {
listener = listener_p
break
// set up grpc listener
var listener *ProviderListener
if rpcProviderEndpoint.NetworkAddress == "" && len(rpcp.rpcProviderListeners) > 0 {
// handle case only one network address was defined
for _, listener_p := range rpcp.rpcProviderListeners {
listener = listener_p
break
}
} else {
func() {
rpcp.lock.Lock()
defer rpcp.lock.Unlock()
var ok bool
listener, ok = rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress]
if !ok {
utils.LavaFormatDebug("creating new listener", utils.Attribute{Key: "NetworkAddress", Value: rpcProviderEndpoint.NetworkAddress})
listener = NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress)
rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress] = listener
}
}()
}
} else {
var ok bool
listener, ok = rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress]
if !ok {
listener = NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress)
rpcp.rpcProviderListeners[listener.Key()] = listener
if listener == nil {
utils.LavaFormatFatal("listener not defined, cant register RPCProviderServer", nil, utils.Attribute{Key: "RPCProviderEndpoint", Value: rpcProviderEndpoint.String()})
}
}
if listener == nil {
utils.LavaFormatFatal("listener not defined, cant register RPCProviderServer", nil, utils.Attribute{Key: "RPCProviderEndpoint", Value: rpcProviderEndpoint.String()})
}
listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint)
return nil
}(rpcProviderEndpoint)
}
wg.Wait()
close(disabledEndpoints)
utils.LavaFormatInfo("RPCProvider done setting up endpoints, ready for service")
if len(rpcp.disabledEndpoints) > 0 {
utils.LavaFormatError(utils.FormatStringerList("RPCProvider Runnig with disabled Endpoints:", rpcp.disabledEndpoints), nil)
disabledEndpointsList := []*lavasession.RPCProviderEndpoint{}
for disabledEndpoint := range disabledEndpoints {
disabledEndpointsList = append(disabledEndpointsList, disabledEndpoint)
}
if len(disabledEndpointsList) > 0 {
utils.LavaFormatError(utils.FormatStringerList("RPCProvider Runnig with disabled Endpoints:", disabledEndpointsList), nil)
if len(disabledEndpointsList) == parallelJobs {
utils.LavaFormatFatal("all endpoints are disabled", nil)
}
}
// tearing down
select {
case <-ctx.Done():
utils.LavaFormatInfo("Provider Server ctx.Done")
Expand Down
2 changes: 1 addition & 1 deletion scripts/init_chain_commands.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ lavad query pairing providers "ETH1"
lavad query pairing clients "ETH1"

# we need to wait for the next epoch for the stake to take action.
# sleep_until_next_epoch
sleep_until_next_epoch

. ${__dir}/setup_providers.sh

0 comments on commit 4ea37db

Please sign in to comment.