From 4ea37dbe3d0865f8b85bbd612bb64ca389716d0a Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Thu, 13 Apr 2023 13:21:49 +0300 Subject: [PATCH] PRT-614-improve-provider-startup-times (#414) * lint * make set up parallel for all chains and apis * lint * added debug print * closing old connections after 2 epochs * added connection recovery if it was shut down * reduce prints --------- Co-authored-by: Ran Mishael --- protocol/rpcprovider/provider_listener.go | 7 - protocol/rpcprovider/rpcprovider.go | 153 ++++++++++++---------- scripts/init_chain_commands.sh | 2 +- 3 files changed, 88 insertions(+), 74 deletions(-) diff --git a/protocol/rpcprovider/provider_listener.go b/protocol/rpcprovider/provider_listener.go index 95fc55d45e..8918c6fa13 100644 --- a/protocol/rpcprovider/provider_listener.go +++ b/protocol/rpcprovider/provider_listener.go @@ -101,13 +101,6 @@ func (rs *relayServer) Relay(ctx context.Context, request *pairingtypes.RelayReq } func (rs *relayServer) Probe(ctx context.Context, probeReq *wrapperspb.UInt64Value) (*wrapperspb.UInt64Value, error) { - guid, found := utils.GetUniqueIdentifier(ctx) - attributes := []utils.Attribute{{Key: "probe", Value: probeReq.Value}} - if found { - attributes = append(attributes, utils.Attribute{Key: "GUID", Value: guid}) - } - utils.LavaFormatDebug("Provider got probe", attributes...) - return probeReq, nil } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 82d5df5793..d1c0e9b5ef 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -8,6 +8,7 @@ import ( "os/signal" "strconv" "strings" + "sync" "time" "github.com/cosmos/cosmos-sdk/client" @@ -64,7 +65,7 @@ type RPCProvider struct { providerStateTracker ProviderStateTrackerInf rpcProviderServers map[string]*RPCProviderServer rpcProviderListeners map[string]*ProviderListener - disabledEndpoints []*lavasession.RPCProviderEndpoint + lock sync.Mutex } func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, clientCtx client.Context, rpcProviderEndpoints []*lavasession.RPCProviderEndpoint, cache *performance.Cache, parallelConnections uint) (err error) { @@ -77,7 +78,6 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client }() rpcp.rpcProviderServers = make(map[string]*RPCProviderServer) rpcp.rpcProviderListeners = make(map[string]*ProviderListener) - rpcp.disabledEndpoints = []*lavasession.RPCProviderEndpoint{} // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, clientCtx) providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, txFactory, clientCtx, lavaChainFetcher) @@ -111,79 +111,100 @@ func (rpcp *RPCProvider) Start(ctx context.Context, txFactory tx.Factory, client if err != nil { utils.LavaFormatFatal("Failed fetching GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment in RPCProvider Start", err) } + var wg sync.WaitGroup + parallelJobs := len(rpcProviderEndpoints) + wg.Add(parallelJobs) + disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs) + for _, rpcProviderEndpoint := range rpcProviderEndpoints { - err := rpcProviderEndpoint.Validate() - if err != nil { - utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) - rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint) - continue - } - providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize) - key := rpcProviderEndpoint.Key() - rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, providerSessionManager) - chainParser, err := chainlib.NewChainParser(rpcProviderEndpoint.ApiInterface) - if err != nil { - return err - } - providerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, rpcProviderEndpoint.ChainID) - _, averageBlockTime, _, _ := chainParser.ChainBlockStats() - chainProxy, err := chainlib.GetChainProxy(ctx, parallelConnections, rpcProviderEndpoint, averageBlockTime) - if err != nil { - utils.LavaFormatError("panic severity critical error, failed creating chain proxy, continuing with others endpoints", err, utils.Attribute{Key: "parallelConnections", Value: uint64(parallelConnections)}, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint}) - rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint) - continue - } + go func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint) error { + defer wg.Done() + err := rpcProviderEndpoint.Validate() + if err != nil { + return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid node url definition, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) + } + providerSessionManager := lavasession.NewProviderSessionManager(rpcProviderEndpoint, blockMemorySize) + key := rpcProviderEndpoint.Key() + rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, providerSessionManager) + chainParser, err := chainlib.NewChainParser(rpcProviderEndpoint.ApiInterface) + if err != nil { + disabledEndpoints <- rpcProviderEndpoint + return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to invalid chain parser, continuing with others", err, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint.String()}) + } + providerStateTracker.RegisterChainParserForSpecUpdates(ctx, chainParser, rpcProviderEndpoint.ChainID) + _, averageBlockTime, _, _ := chainParser.ChainBlockStats() + chainProxy, err := chainlib.GetChainProxy(ctx, parallelConnections, rpcProviderEndpoint, averageBlockTime) + if err != nil { + disabledEndpoints <- rpcProviderEndpoint + return utils.LavaFormatError("panic severity critical error, failed creating chain proxy, continuing with others endpoints", err, utils.Attribute{Key: "parallelConnections", Value: uint64(parallelConnections)}, utils.Attribute{Key: "rpcProviderEndpoint", Value: rpcProviderEndpoint}) + } - _, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats() - blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData) - chainTrackerConfig := chaintracker.ChainTrackerConfig{ - BlocksToSave: blocksToSaveChainTracker, - AverageBlockTime: averageBlockTime, - ServerBlockMemory: ChainTrackerDefaultMemory + blocksToSaveChainTracker, - } - chainFetcher := chainlib.NewChainFetcher(ctx, chainProxy, chainParser, rpcProviderEndpoint) - chainTracker, err := chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig) - if err != nil { - utils.LavaFormatError("panic severity critical error, aborting support for chain api due to node access, continuing with other endpoints", err, utils.Attribute{Key: "chainTrackerConfig", Value: chainTrackerConfig}, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint}) - rpcp.disabledEndpoints = append(rpcp.disabledEndpoints, rpcProviderEndpoint) - continue - } - reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, providerStateTracker, addr.String(), chainProxy, chainParser) - providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint) + _, averageBlockTime, blocksToFinalization, blocksInFinalizationData := chainParser.ChainBlockStats() + blocksToSaveChainTracker := uint64(blocksToFinalization + blocksInFinalizationData) + chainTrackerConfig := chaintracker.ChainTrackerConfig{ + BlocksToSave: blocksToSaveChainTracker, + AverageBlockTime: averageBlockTime, + ServerBlockMemory: ChainTrackerDefaultMemory + blocksToSaveChainTracker, + } + chainFetcher := chainlib.NewChainFetcher(ctx, chainProxy, chainParser, rpcProviderEndpoint) + chainTracker, err := chaintracker.NewChainTracker(ctx, chainFetcher, chainTrackerConfig) + if err != nil { + disabledEndpoints <- rpcProviderEndpoint + return utils.LavaFormatError("panic severity critical error, aborting support for chain api due to node access, continuing with other endpoints", err, utils.Attribute{Key: "chainTrackerConfig", Value: chainTrackerConfig}, utils.Attribute{Key: "endpoint", Value: rpcProviderEndpoint}) + } + reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, providerStateTracker, addr.String(), chainProxy, chainParser) + providerStateTracker.RegisterReliabilityManagerForVoteUpdates(ctx, reliabilityManager, rpcProviderEndpoint) - rpcProviderServer := &RPCProviderServer{} - if _, ok := rpcp.rpcProviderServers[key]; ok { - utils.LavaFormatFatal("Trying to add the same key twice to rpcProviderServers check config file.", nil, - utils.Attribute{Key: "key", Value: key}) - } - rpcp.rpcProviderServers[key] = rpcProviderServer - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rewardServer, providerSessionManager, reliabilityManager, privKey, cache, chainProxy, providerStateTracker, addr, lavaChainID, DEFAULT_ALLOWED_MISSING_CU) + rpcProviderServer := &RPCProviderServer{} + if _, ok := rpcp.rpcProviderServers[key]; ok { + utils.LavaFormatFatal("Trying to add the same key twice to rpcProviderServers check config file.", nil, + utils.Attribute{Key: "key", Value: key}) + } + rpcp.rpcProviderServers[key] = rpcProviderServer + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rewardServer, providerSessionManager, reliabilityManager, privKey, cache, chainProxy, providerStateTracker, addr, lavaChainID, DEFAULT_ALLOWED_MISSING_CU) - // set up grpc listener - var listener *ProviderListener - if rpcProviderEndpoint.NetworkAddress == "" && len(rpcp.rpcProviderListeners) > 0 { - // handle case only one network address was defined - for _, listener_p := range rpcp.rpcProviderListeners { - listener = listener_p - break + // set up grpc listener + var listener *ProviderListener + if rpcProviderEndpoint.NetworkAddress == "" && len(rpcp.rpcProviderListeners) > 0 { + // handle case only one network address was defined + for _, listener_p := range rpcp.rpcProviderListeners { + listener = listener_p + break + } + } else { + func() { + rpcp.lock.Lock() + defer rpcp.lock.Unlock() + var ok bool + listener, ok = rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress] + if !ok { + utils.LavaFormatDebug("creating new listener", utils.Attribute{Key: "NetworkAddress", Value: rpcProviderEndpoint.NetworkAddress}) + listener = NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress) + rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress] = listener + } + }() } - } else { - var ok bool - listener, ok = rpcp.rpcProviderListeners[rpcProviderEndpoint.NetworkAddress] - if !ok { - listener = NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress) - rpcp.rpcProviderListeners[listener.Key()] = listener + if listener == nil { + utils.LavaFormatFatal("listener not defined, cant register RPCProviderServer", nil, utils.Attribute{Key: "RPCProviderEndpoint", Value: rpcProviderEndpoint.String()}) } - } - if listener == nil { - utils.LavaFormatFatal("listener not defined, cant register RPCProviderServer", nil, utils.Attribute{Key: "RPCProviderEndpoint", Value: rpcProviderEndpoint.String()}) - } - listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) + listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) + return nil + }(rpcProviderEndpoint) } + wg.Wait() + close(disabledEndpoints) utils.LavaFormatInfo("RPCProvider done setting up endpoints, ready for service") - if len(rpcp.disabledEndpoints) > 0 { - utils.LavaFormatError(utils.FormatStringerList("RPCProvider Runnig with disabled Endpoints:", rpcp.disabledEndpoints), nil) + disabledEndpointsList := []*lavasession.RPCProviderEndpoint{} + for disabledEndpoint := range disabledEndpoints { + disabledEndpointsList = append(disabledEndpointsList, disabledEndpoint) + } + if len(disabledEndpointsList) > 0 { + utils.LavaFormatError(utils.FormatStringerList("RPCProvider Runnig with disabled Endpoints:", disabledEndpointsList), nil) + if len(disabledEndpointsList) == parallelJobs { + utils.LavaFormatFatal("all endpoints are disabled", nil) + } } + // tearing down select { case <-ctx.Done(): utils.LavaFormatInfo("Provider Server ctx.Done") diff --git a/scripts/init_chain_commands.sh b/scripts/init_chain_commands.sh index 90b1b4e751..c0ac6b7214 100755 --- a/scripts/init_chain_commands.sh +++ b/scripts/init_chain_commands.sh @@ -58,7 +58,7 @@ lavad query pairing providers "ETH1" lavad query pairing clients "ETH1" # we need to wait for the next epoch for the stake to take action. -# sleep_until_next_epoch +sleep_until_next_epoch . ${__dir}/setup_providers.sh