From 4a7ed067c74704cb3663a82466dd0d50bcaf3d34 Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 22 Jan 2025 17:41:38 +0200 Subject: [PATCH 01/42] do not crash if one client fails version check --- beacon/goclient/goclient.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index d7c77a48cf..f81c9945e5 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -167,19 +167,24 @@ func New( return nil, err } + consensusClients = append(consensusClients, httpClient) + consensusClientsAsServices = append(consensusClientsAsServices, httpClient) + nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{}) if err != nil { logger.Error(clResponseErrMsg, + zap.String("address", httpClient.Address()), zap.String("api", "NodeVersion"), zap.Error(err), ) - return nil, fmt.Errorf("failed to get node version: %w", err) + continue } if nodeVersionResp == nil { logger.Error(clNilResponseErrMsg, + zap.String("address", httpClient.Address()), zap.String("api", "NodeVersion"), ) - return nil, fmt.Errorf("node version response is nil") + continue } logger.Info("consensus client connected", @@ -188,9 +193,6 @@ func New( zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), zap.String("version", nodeVersionResp.Data), ) - - consensusClients = append(consensusClients, httpClient) - consensusClientsAsServices = append(consensusClientsAsServices, httpClient) } err := assertSameGenesis(opt.Context, consensusClients...) From 17849073361efa4a4c4e9a43f77ca718ff6212e1 Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 22 Jan 2025 17:43:20 +0200 Subject: [PATCH 02/42] fix help note on multiple addresses --- eth/executionclient/config.go | 2 +- protocol/v2/blockchain/beacon/client.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/config.go b/eth/executionclient/config.go index 7f40854d50..ea99a89789 100644 --- a/eth/executionclient/config.go +++ b/eth/executionclient/config.go @@ -8,7 +8,7 @@ import ( // ExecutionOptions contains config configurations related to Ethereum execution client. type ExecutionOptions struct { - Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address. Supports multiple comma-separated addresses"` + Addr string `yaml:"ETH1Addr" env:"ETH_1_ADDR" env-required:"true" env-description:"Execution client WebSocket address. Supports multiple semicolon separated addresses. ex: ws://localhost:8546;ws://localhost:8547"` ConnectionTimeout time.Duration `yaml:"ETH1ConnectionTimeout" env:"ETH_1_CONNECTION_TIMEOUT" env-default:"10s" env-description:"Execution client connection timeout"` SyncDistanceTolerance uint64 `yaml:"ETH1SyncDistanceTolerance" env:"ETH_1_SYNC_DISTANCE_TOLERANCE" env-default:"5" env-description:"The number of out-of-sync blocks we can tolerate"` } diff --git a/protocol/v2/blockchain/beacon/client.go b/protocol/v2/blockchain/beacon/client.go index 7bbbf1212c..fbea06c5bb 100644 --- a/protocol/v2/blockchain/beacon/client.go +++ b/protocol/v2/blockchain/beacon/client.go @@ -63,7 +63,7 @@ type BeaconNode interface { type Options struct { Context context.Context Network Network - BeaconNodeAddr string `yaml:"BeaconNodeAddr" env:"BEACON_NODE_ADDR" env-required:"true" env-description:"Beacon node address. Supports multiple comma-separated addresses'"` + BeaconNodeAddr string `yaml:"BeaconNodeAddr" env:"BEACON_NODE_ADDR" env-required:"true" env-description:"Beacon node address. Supports multiple semicolon separated addresses. ex: http://localhost:5052;http://localhost:5053"` SyncDistanceTolerance uint64 `yaml:"SyncDistanceTolerance" env:"BEACON_SYNC_DISTANCE_TOLERANCE" env-default:"4" env-description:"The number of out-of-sync slots we can tolerate"` CommonTimeout time.Duration // Optional. From 613974836a578b75b508c2cc5df5d7ff77741df4 Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 22 Jan 2025 18:33:17 +0200 Subject: [PATCH 03/42] don't compare genensis values --- beacon/goclient/goclient.go | 9 +++++---- beacon/goclient/goclient_test.go | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index f81c9945e5..62cb5a5d8f 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -195,10 +195,11 @@ func New( ) } - err := assertSameGenesis(opt.Context, consensusClients...) - if err != nil { - return nil, fmt.Errorf("assert same spec: %w", err) - } + // TODO: assert genesis against info in networkconfig + //err := assertSameGenesis(opt.Context, consensusClients...) + //if err != nil { + // return nil, fmt.Errorf("assert same spec: %w", err) + //} multiClient, err := eth2clientmulti.New( opt.Context, diff --git a/beacon/goclient/goclient_test.go b/beacon/goclient/goclient_test.go index 7f608d9b1e..1e313a0eee 100644 --- a/beacon/goclient/goclient_test.go +++ b/beacon/goclient/goclient_test.go @@ -88,7 +88,7 @@ func TestTimeouts(t *testing.T) { return nil }) _, err := mockClient(ctx, undialableServer.URL, commonTimeout, longTimeout) - require.ErrorContains(t, err, "context deadline exceeded") + require.ErrorContains(t, err, "client is not active") } // Too slow to respond to the Validators request. From 54f982f8e84adb6bca033f36a296181559782768 Mon Sep 17 00:00:00 2001 From: y0sher Date: Wed, 22 Jan 2025 18:37:13 +0200 Subject: [PATCH 04/42] remove old assertSameGenesis code --- beacon/goclient/goclient.go | 41 ------------------------------------- 1 file changed, 41 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 62cb5a5d8f..0abcc81fb7 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -267,47 +267,6 @@ func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commo return httpClient.(*eth2clienthttp.Service), nil } -// assertSameGenesis should receive a non-empty list -func assertSameGenesis(ctx context.Context, services ...Client) error { - firstGenesis, err := services[0].Genesis(ctx, &api.GenesisOpts{}) - if err != nil { - return fmt.Errorf("get first genesis: %w", err) - } - - for _, service := range services[1:] { - srvGenesis, err := service.Genesis(ctx, &api.GenesisOpts{}) - if err != nil { - return fmt.Errorf("get service genesis: %w", err) - } - - if err := sameGenesis(firstGenesis.Data, srvGenesis.Data); err != nil { - return fmt.Errorf("different genesis: %w", err) - } - } - - return nil -} - -func sameGenesis(a, b *apiv1.Genesis) error { - if a == nil || b == nil { // Input parameters should never be nil, so the check may fail if both are nil - return fmt.Errorf("genesis is nil") - } - - if !a.GenesisTime.Equal(b.GenesisTime) { - return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime) - } - - if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot { - return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot) - } - - if a.GenesisForkVersion != b.GenesisForkVersion { - return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion) - } - - return nil -} - func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) { return gc.multiClient.NodeSyncing(ctx, opts) } From 7a4248d4c1da1f231d35560a430dead0485644a5 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 22 Jan 2025 20:52:53 -0300 Subject: [PATCH 05/42] beacon/goclient: set up connection hooks, assert genesis --- beacon/goclient/goclient.go | 214 ++++++++++++++++++++++++------------ 1 file changed, 146 insertions(+), 68 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 0abcc81fb7..e45be623f1 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -114,6 +114,9 @@ type GoClient struct { clients []Client multiClient MultiClient + genesisMu sync.Mutex + genesis *apiv1.Genesis + syncDistanceTolerance phase0.Slot nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) @@ -154,85 +157,41 @@ func New( longTimeout = DefaultLongTimeout } + client := &GoClient{ + log: logger, + ctx: opt.Context, + network: opt.Network, + syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), + operatorDataStore: operatorDataStore, + registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{}, + attestationDataCache: ttlcache.New( + // we only fetch attestation data during the slot of the relevant duty (and never later), + // hence caching it for 2 slots is sufficient + ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()), + ), + commonTimeout: commonTimeout, + longTimeout: longTimeout, + } + beaconAddrList := strings.Split(opt.BeaconNodeAddr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",". if len(beaconAddrList) == 0 { return nil, fmt.Errorf("no beacon node address provided") } - var consensusClients []Client - var consensusClientsAsServices []eth2client.Service for _, beaconAddr := range beaconAddrList { - httpClient, err := setupHTTPClient(opt.Context, logger, beaconAddr, commonTimeout) - if err != nil { + if err := client.addSingleClient(opt.Context, beaconAddr); err != nil { return nil, err } - - consensusClients = append(consensusClients, httpClient) - consensusClientsAsServices = append(consensusClientsAsServices, httpClient) - - nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{}) - if err != nil { - logger.Error(clResponseErrMsg, - zap.String("address", httpClient.Address()), - zap.String("api", "NodeVersion"), - zap.Error(err), - ) - continue - } - if nodeVersionResp == nil { - logger.Error(clNilResponseErrMsg, - zap.String("address", httpClient.Address()), - zap.String("api", "NodeVersion"), - ) - continue - } - - logger.Info("consensus client connected", - fields.Name(httpClient.Name()), - fields.Address(httpClient.Address()), - zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), - zap.String("version", nodeVersionResp.Data), - ) } - // TODO: assert genesis against info in networkconfig - //err := assertSameGenesis(opt.Context, consensusClients...) - //if err != nil { - // return nil, fmt.Errorf("assert same spec: %w", err) - //} - - multiClient, err := eth2clientmulti.New( - opt.Context, - eth2clientmulti.WithClients(consensusClientsAsServices), - eth2clientmulti.WithLogLevel(zerolog.DebugLevel), - eth2clientmulti.WithTimeout(commonTimeout), - ) + err := client.initMultiClient(opt.Context) if err != nil { logger.Error("Consensus multi client initialization failed", zap.String("address", opt.BeaconNodeAddr), zap.Error(err), ) - return nil, fmt.Errorf("create multi client: %w", err) - } - consensusClient := multiClient.(*eth2clientmulti.Service) - - client := &GoClient{ - log: logger, - ctx: opt.Context, - network: opt.Network, - clients: consensusClients, - multiClient: consensusClient, - syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), - operatorDataStore: operatorDataStore, - registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{}, - attestationDataCache: ttlcache.New( - // we only fetch attestation data during the slot of the relevant duty (and never later), - // hence caching it for 2 slots is sufficient - ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()), - ), - commonTimeout: commonTimeout, - longTimeout: longTimeout, + return nil, err } client.nodeSyncingFn = client.nodeSyncing @@ -244,27 +203,146 @@ func New( return client, nil } -func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commonTimeout time.Duration) (*eth2clienthttp.Service, error) { +func (gc *GoClient) initMultiClient(ctx context.Context) error { + var services []eth2client.Service + for _, client := range gc.clients { + services = append(services, client) + } + + multiClient, err := eth2clientmulti.New( + ctx, + eth2clientmulti.WithClients(services), + eth2clientmulti.WithLogLevel(zerolog.DebugLevel), + eth2clientmulti.WithTimeout(gc.commonTimeout), + ) + if err != nil { + return fmt.Errorf("create multi client: %w", err) + } + + gc.multiClient = multiClient.(*eth2clientmulti.Service) + return nil +} + +func (gc *GoClient) addSingleClient(ctx context.Context, addr string) error { httpClient, err := eth2clienthttp.New( ctx, // WithAddress supplies the address of the beacon node, in host:port format. eth2clienthttp.WithAddress(addr), // LogLevel supplies the level of logging to carry out. eth2clienthttp.WithLogLevel(zerolog.DebugLevel), - eth2clienthttp.WithTimeout(commonTimeout), + eth2clienthttp.WithTimeout(gc.commonTimeout), eth2clienthttp.WithReducedMemoryUsage(true), eth2clienthttp.WithAllowDelayedStart(true), + eth2clienthttp.WithHooks(gc.singleClientHooks()), ) if err != nil { - logger.Error("Consensus http client initialization failed", + gc.log.Error("Consensus http client initialization failed", zap.String("address", addr), zap.Error(err), ) - return nil, fmt.Errorf("create http client: %w", err) + return fmt.Errorf("create http client: %w", err) + } + + gc.clients = append(gc.clients, httpClient.(*eth2clienthttp.Service)) + + return nil +} + +func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { + return ð2clienthttp.Hooks{ + OnActive: func(ctx context.Context, s *eth2clienthttp.Service) { + // If err is nil, nodeVersionResp is never nil. + nodeVersionResp, err := s.NodeVersion(ctx, &api.NodeVersionOpts{}) + if err != nil { + gc.log.Error(clResponseErrMsg, + zap.String("address", s.Address()), + zap.String("api", "NodeVersion"), + zap.Error(err), + ) + return + } + + gc.log.Info("consensus client connected", + fields.Name(s.Name()), + fields.Address(s.Address()), + zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))), + zap.String("version", nodeVersionResp.Data), + ) + + genesis, err := s.Genesis(ctx, &api.GenesisOpts{}) + if err != nil { + gc.log.Error(clResponseErrMsg, + zap.String("address", s.Address()), + zap.String("api", "Genesis"), + zap.Error(err), + ) + return + } + + if err := gc.assertSameGenesis(genesis.Data); err != nil { + gc.genesisMu.Lock() + defer gc.genesisMu.Unlock() + gc.log.Fatal("client genesis differs", + zap.String("address", s.Address()), + zap.Any("client_genesis", genesis.Data), + zap.Any("expected_genesis", gc.genesis), + ) + return // Tests may override Fatal's behavior + } + }, + OnInactive: func(ctx context.Context, s *eth2clienthttp.Service) { + gc.log.Warn("consensus client disconnected", + fields.Name(s.Name()), + fields.Address(s.Address()), + ) + }, + OnSynced: func(ctx context.Context, s *eth2clienthttp.Service) { + gc.log.Info("consensus client synced", + fields.Name(s.Name()), + fields.Address(s.Address()), + ) + }, + OnDesynced: func(ctx context.Context, s *eth2clienthttp.Service) { + gc.log.Warn("consensus client desynced", + fields.Name(s.Name()), + fields.Address(s.Address()), + ) + }, + } +} + +// assertSameGenesis should receive a non-empty list +func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { + gc.genesisMu.Lock() + defer gc.genesisMu.Unlock() + + if gc.genesis == nil { + gc.genesis = genesis + return nil + } + + if err := sameGenesis(gc.genesis, genesis); err != nil { + return fmt.Errorf("different genesis: %w", err) } - return httpClient.(*eth2clienthttp.Service), nil + return nil +} + +func sameGenesis(a, b *apiv1.Genesis) error { + if !a.GenesisTime.Equal(b.GenesisTime) { + return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime) + } + + if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot { + return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot) + } + + if a.GenesisForkVersion != b.GenesisForkVersion { + return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion) + } + + return nil } func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) { From 1025641125ec553d3b5a6f55b7fdaf33f082120d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 22 Jan 2025 21:03:06 -0300 Subject: [PATCH 06/42] beacon/goclient: remove outdated comment --- beacon/goclient/goclient.go | 1 - 1 file changed, 1 deletion(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index e45be623f1..c1377b4e7f 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -312,7 +312,6 @@ func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { } } -// assertSameGenesis should receive a non-empty list func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { gc.genesisMu.Lock() defer gc.genesisMu.Unlock() From af5c455ca8d60bbaf0a8ec548bea865516126984 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 22 Jan 2025 22:25:42 -0300 Subject: [PATCH 07/42] eth/executionclient: allow starting with unhealthy client --- eth/executionclient/multi_client.go | 191 ++++++++++++++++------- eth/executionclient/multi_client_test.go | 141 +++-------------- 2 files changed, 162 insertions(+), 170 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 2f29cfc8bf..8fbf0eefbd 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math/big" + "sync" "sync/atomic" "time" @@ -34,22 +35,32 @@ type MultiClient struct { syncDistanceTolerance uint64 contractAddress ethcommon.Address + chainIDMu sync.Mutex chainID *big.Int closed chan struct{} nodeAddrs []string - clients []SingleClientProvider + clientsMu []sync.Mutex // each client has own mutex + clients []SingleClientProvider // nil if not connected + connectedCount atomic.Uint64 currentClientIndex atomic.Int64 } // NewMulti creates a new instance of MultiClient. -func NewMulti(ctx context.Context, nodeAddrs []string, contractAddr ethcommon.Address, opts ...OptionMulti) (*MultiClient, error) { +func NewMulti( + ctx context.Context, + nodeAddrs []string, + contractAddr ethcommon.Address, + opts ...OptionMulti, +) (*MultiClient, error) { if len(nodeAddrs) == 0 { return nil, fmt.Errorf("no node address provided") } multiClient := &MultiClient{ nodeAddrs: nodeAddrs, + clients: make([]SingleClientProvider, len(nodeAddrs)), // initialized with nil values (not connected) + clientsMu: make([]sync.Mutex, len(nodeAddrs)), contractAddress: contractAddr, logger: zap.NewNop(), followDistance: DefaultFollowDistance, @@ -63,66 +74,86 @@ func NewMulti(ctx context.Context, nodeAddrs []string, contractAddr ethcommon.Ad opt(multiClient) } - // The underlying client may call Fatal on unsuccessful reconnection attempt. - // Therefore, we need to override Fatal's behavior to avoid crashing. - logger := multiClient.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop)) - - for _, nodeAddr := range nodeAddrs { - singleClient, err := New( - ctx, - nodeAddr, - contractAddr, - WithLogger(logger), - WithFollowDistance(multiClient.followDistance), - WithConnectionTimeout(multiClient.connectionTimeout), - WithReconnectionInitialInterval(multiClient.reconnectionInitialInterval), - WithReconnectionMaxInterval(multiClient.reconnectionMaxInterval), - WithHealthInvalidationInterval(multiClient.healthInvalidationInterval), - WithSyncDistanceTolerance(multiClient.syncDistanceTolerance), - ) - if err != nil { - return nil, fmt.Errorf("create single client: %w", err) + var multiErr error + for clientIndex := range nodeAddrs { + if err := multiClient.connect(ctx, clientIndex); err != nil { + multiClient.logger.Error("failed to connect to node", + zap.String("address", nodeAddrs[clientIndex]), + zap.Error(err)) + + multiErr = errors.Join(multiErr, err) + continue } - multiClient.clients = append(multiClient.clients, singleClient) + multiClient.connectedCount.Add(1) + } + + if multiClient.connectedCount.Load() == 0 { + return nil, fmt.Errorf("no available clients: %w", multiErr) } - same, err := multiClient.assertSameChainIDs(ctx) + return multiClient, nil +} + +func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { + // The underlying client may call Fatal on unsuccessful reconnection attempt. + // Therefore, we need to override Fatal's behavior to avoid crashing. + logger := mc.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop)) + + singleClient, err := New( + ctx, + mc.nodeAddrs[clientIndex], + mc.contractAddress, + WithLogger(logger), + WithFollowDistance(mc.followDistance), + WithConnectionTimeout(mc.connectionTimeout), + WithReconnectionInitialInterval(mc.reconnectionInitialInterval), + WithReconnectionMaxInterval(mc.reconnectionMaxInterval), + WithHealthInvalidationInterval(mc.healthInvalidationInterval), + WithSyncDistanceTolerance(mc.syncDistanceTolerance), + ) if err != nil { - return nil, fmt.Errorf("assert same chain IDs: %w", err) + return fmt.Errorf("create single client: %w", err) } - if !same { - return nil, fmt.Errorf("execution clients' chain IDs are not same") + + chainID, err := singleClient.ChainID(ctx) + if err != nil { + mc.logger.Error("failed to get chain ID", + zap.String("address", mc.nodeAddrs[clientIndex]), + zap.Error(err)) + return fmt.Errorf("get chain ID: %w", err) } - return multiClient, nil + if err := mc.assertSameChainID(chainID); err != nil { + mc.logger.Fatal("chain ID mismatch", + zap.String("observed_chain_id", mc.chainID.String()), + zap.String("checked_chain_id", chainID.String()), + zap.String("address", mc.nodeAddrs[clientIndex])) + } + + mc.clientsMu[clientIndex].Lock() + mc.clients[clientIndex] = singleClient + mc.clientsMu[clientIndex].Unlock() + return nil } -// assertSameChainIDs checks if all healthy clients have the same chain ID. -// It sets firstChainID to the chain ID of the first healthy client encountered. -func (mc *MultiClient) assertSameChainIDs(ctx context.Context) (bool, error) { - for i, client := range mc.clients { - addr := mc.nodeAddrs[i] +// assertSameChainID checks if client has the same chain ID. +// It sets mc.chainID to the chain ID of the first healthy client encountered. +func (mc *MultiClient) assertSameChainID(chainID *big.Int) error { + mc.chainIDMu.Lock() + defer mc.chainIDMu.Unlock() - chainID, err := client.ChainID(ctx) - if err != nil { - mc.logger.Error("failed to get chain ID", zap.String("address", addr), zap.Error(err)) - return false, fmt.Errorf("get chain ID: %w", err) - } - if mc.chainID == nil { - mc.chainID = chainID - continue - } - if mc.chainID.Cmp(chainID) != 0 { - mc.logger.Error("chain ID mismatch", - zap.String("observed_chain_id", mc.chainID.String()), - zap.String("checked_chain_id", chainID.String()), - zap.String("address", addr)) - return false, nil - } + if mc.chainID == nil { + mc.chainID = chainID + return nil + } + + if mc.chainID.Cmp(chainID) != 0 { + return fmt.Errorf("different chain ID, expected %v, got %v", + mc.chainID.String(), chainID.String()) } - return true, nil + return nil } // FetchHistoricalLogs retrieves historical logs emitted by the contract starting from fromBlock. @@ -203,16 +234,35 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan func (mc *MultiClient) Healthy(ctx context.Context) error { healthyClients := atomic.Int32{} p := pool.New().WithErrors().WithContext(ctx) - for i, client := range mc.clients { + + for i := range mc.clients { + i := i p.Go(func(ctx context.Context) error { - err := client.Healthy(ctx) + mc.clientsMu[i].Lock() + + if mc.clients[i] == nil { + if err := mc.connect(ctx, i); err != nil { + mc.logger.Warn("failed to connect to client", + zap.String("addr", mc.nodeAddrs[i]), + zap.Error(err)) + + mc.clientsMu[i].Unlock() + return err + } + } + + err := mc.clients[i].Healthy(ctx) if err != nil { mc.logger.Warn("client is not healthy", zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) + + mc.clientsMu[i].Unlock() return err } healthyClients.Add(1) + + mc.clientsMu[i].Unlock() return nil }) } @@ -284,12 +334,23 @@ func (mc *MultiClient) ChainID(_ context.Context) (*big.Int, error) { func (mc *MultiClient) Close() error { close(mc.closed) + mc.connectedCount.Store(0) + var multiErr error - for i, client := range mc.clients { - if err := client.Close(); err != nil { + for i := range mc.clients { + mc.clientsMu[i].Lock() + + if mc.clients[i] == nil { + mc.clientsMu[i].Unlock() + continue + } + if err := mc.clients[i].Close(); err != nil { mc.logger.Debug("Failed to close client", zap.String("address", mc.nodeAddrs[i]), zap.Error(err)) multiErr = errors.Join(multiErr, err) } + mc.clients[i] = nil + + mc.clientsMu[i].Unlock() } return multiErr @@ -297,18 +358,33 @@ func (mc *MultiClient) Close() error { func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error)) (any, error) { if len(mc.clients) == 1 { - return f(mc.clients[0]) + return f(mc.clients[0]) // no need for mutex because one client is always non-nil } // Iterate over the clients in round-robin fashion, // starting from the most likely healthy client (currentClientIndex). var startingIndex = int(mc.currentClientIndex.Load()) var allErrs error - for i := 0; i < len(mc.clients); i++ { + for i := range mc.clients { + mc.clientsMu[i].Lock() + clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. client := mc.clients[clientIndex] + if client == nil { + if err := mc.connect(ctx, i); err != nil { + mc.logger.Warn("failed to connect to client", + zap.String("addr", mc.nodeAddrs[i]), + zap.Error(err)) + + allErrs = errors.Join(allErrs, err) + mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + mc.clientsMu[i].Unlock() + continue + } + } + logger := mc.logger.With( zap.String("addr", mc.nodeAddrs[clientIndex]), zap.String("method", methodFromContext(ctx))) @@ -319,8 +395,10 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi logger.Warn("client is not healthy, switching to next client", zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), zap.Error(err)) + allErrs = errors.Join(allErrs, err) mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + mc.clientsMu[i].Unlock() continue } @@ -329,6 +407,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi v, err := f(client) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { mc.logger.Debug("received graceful closure from client", zap.Error(err)) + mc.clientsMu[i].Unlock() return v, err } @@ -339,11 +418,13 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi allErrs = errors.Join(allErrs, err) mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + mc.clientsMu[i].Unlock() continue } // Update currentClientIndex to the successful client. mc.currentClientIndex.Store(int64(clientIndex)) + mc.clientsMu[i].Unlock() return v, nil } diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index 7edf729662..e4e29b230e 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -42,37 +42,6 @@ func TestNewMulti(t *testing.T) { require.Error(t, err) require.Contains(t, err.Error(), "create single client") }) - - t.Run("chain ID mismatch", func(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - mockClient1. - EXPECT(). - ChainID(gomock.Any()). - Return(big.NewInt(1), nil). - AnyTimes() - - mockClient2. - EXPECT(). - ChainID(gomock.Any()). - Return(big.NewInt(2), nil). - AnyTimes() - - mc := &MultiClient{ - nodeAddrs: []string{"mock1", "mock2"}, - clients: []SingleClientProvider{mockClient1, mockClient2}, - logger: zap.NewNop(), - } - - same, err := mc.assertSameChainIDs(context.Background()) - - require.NoError(t, err) - require.False(t, same) - }) } func TestNewMulti_WithOptions(t *testing.T) { @@ -123,90 +92,6 @@ func TestNewMulti_WithOptions(t *testing.T) { require.EqualValues(t, customSyncDistanceTolerance, mc.syncDistanceTolerance) } -func TestMultiClient_assertSameChainIDs(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - mockClient1. - EXPECT(). - ChainID(gomock.Any()). - Return(big.NewInt(5), nil). - Times(1) - - mockClient2. - EXPECT(). - ChainID(gomock.Any()). - Return(big.NewInt(5), nil). - Times(1) - - mc := &MultiClient{ - nodeAddrs: []string{"mock1", "mock2"}, - clients: []SingleClientProvider{mockClient1, mockClient2}, - logger: zap.NewNop(), - } - - same, err := mc.assertSameChainIDs(context.Background()) - require.NoError(t, err) - require.True(t, same, "expected chain IDs to match") - - chainID, err := mc.ChainID(context.Background()) - require.NoError(t, err) - require.NotNil(t, chainID) - require.Equal(t, int64(5), chainID.Int64()) -} - -func TestMultiClient_assertSameChainIDs_Error(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - nodeAddr1 := "mockNode1" - nodeAddr2 := "mockNode2" - - mockClient1. - EXPECT(). - ChainID(gomock.Any()). - Return(big.NewInt(1), nil). - Times(1) - - mockClient1. - EXPECT(). - Healthy(gomock.Any()). - Return(nil). - AnyTimes() - - fetchErr := fmt.Errorf("failed to get chain ID") - mockClient2. - EXPECT(). - ChainID(gomock.Any()). - Return(nil, fetchErr). - Times(1) - - mockClient2. - EXPECT(). - Healthy(gomock.Any()). - Return(nil). - AnyTimes() - - mc := &MultiClient{ - nodeAddrs: []string{nodeAddr1, nodeAddr2}, - clients: []SingleClientProvider{mockClient1, mockClient2}, - logger: zap.NewNop(), - closed: make(chan struct{}), - } - - same, err := mc.assertSameChainIDs(context.Background()) - - require.False(t, same, "Expected chain IDs to not match due to an error") - require.Error(t, err, "Expected an error when fetching chain ID from a client") - require.Contains(t, err.Error(), "get chain ID: failed to get chain ID", "Error message should indicate the chain ID retrieval failure") -} - func TestMultiClient_FetchHistoricalLogs(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -243,6 +128,7 @@ func TestMultiClient_FetchHistoricalLogs(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockaddr"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -301,6 +187,7 @@ func TestMultiClient_FetchHistoricalLogs_AllClientsNothingToSync(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -349,6 +236,7 @@ func TestMultiClient_FetchHistoricalLogs_MixedErrors(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -407,6 +295,7 @@ func TestMultiClient_StreamLogs(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), closed: make(chan struct{}), } @@ -473,6 +362,7 @@ func TestMultiClient_StreamLogs_Success(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), closed: make(chan struct{}), } @@ -559,6 +449,7 @@ func TestMultiClient_StreamLogs_Failover(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockClient2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -638,6 +529,7 @@ func TestMultiClient_StreamLogs_AllClientsFail(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), closed: make(chan struct{}), } @@ -717,6 +609,7 @@ func TestMultiClient_StreamLogs_SameFromBlock(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -795,6 +688,7 @@ func TestMultiClient_StreamLogs_MultipleFailoverAttempts(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2", "mockNode3"}, clients: []SingleClientProvider{mockClient1, mockClient2, mockClient3}, + clientsMu: make([]sync.Mutex, 3), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -831,6 +725,7 @@ func TestMultiClient_StreamLogs_NoHealthyClients(t *testing.T) { logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), nodeAddrs: nodeAddrs, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), closed: make(chan struct{}), contractAddress: contractAddr, } @@ -858,6 +753,7 @@ func TestMultiClient_Healthy(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -889,6 +785,7 @@ func TestMultiClient_Healthy_MultiClient(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -919,6 +816,7 @@ func TestMultiClient_Healthy_AllClientsUnhealthy(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -944,6 +842,7 @@ func TestMultiClient_BlockByNumber(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -968,6 +867,7 @@ func TestMultiClient_BlockByNumber_Error(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -993,6 +893,7 @@ func TestMultiClient_HeaderByNumber(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1017,6 +918,7 @@ func TestMultiClient_HeaderByNumber_Error(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1052,6 +954,7 @@ func TestMultiClient_SubscribeFilterLogs(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1084,6 +987,7 @@ func TestMultiClient_SubscribeFilterLogs_Error(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1114,6 +1018,7 @@ func TestMultiClient_FilterLogs(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1139,6 +1044,7 @@ func TestMultiClient_FilterLogs_Error(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1245,6 +1151,7 @@ func TestMultiClient_Close(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1", "mock2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1284,6 +1191,7 @@ func TestMultiClient_Close_MultiClient(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2", "mockNode3"}, clients: []SingleClientProvider{mockClient1, mockClient2, mockClient3}, + clientsMu: make([]sync.Mutex, 3), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1317,6 +1225,7 @@ func TestMultiClient_Call_Concurrency(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mock1"}, clients: []SingleClientProvider{mockClient}, + clientsMu: make([]sync.Mutex, 1), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1369,6 +1278,7 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop(), closed: make(chan struct{}), } @@ -1422,6 +1332,7 @@ func TestMultiClient_ReconnectionLimit(t *testing.T) { mc := &MultiClient{ nodeAddrs: []string{"mockNode1", "mockNode2"}, clients: []SingleClientProvider{mockClient1, mockClient2}, + clientsMu: make([]sync.Mutex, 2), logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), closed: make(chan struct{}), } From 3b9efcfe98052ae40ce852e5ed9435f847fae80e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 23 Jan 2025 13:35:37 -0300 Subject: [PATCH 08/42] eth/executionclient: simplify mutex usage --- eth/executionclient/multi_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 8fbf0eefbd..dad72a79d4 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -239,6 +239,7 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { i := i p.Go(func(ctx context.Context) error { mc.clientsMu[i].Lock() + defer mc.clientsMu[i].Unlock() if mc.clients[i] == nil { if err := mc.connect(ctx, i); err != nil { @@ -246,7 +247,6 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) - mc.clientsMu[i].Unlock() return err } } @@ -257,12 +257,10 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) - mc.clientsMu[i].Unlock() return err } healthyClients.Add(1) - mc.clientsMu[i].Unlock() return nil }) } From c6592ad91bd9ada56556918c52916bbb8d16cf7d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 23 Jan 2025 14:03:38 -0300 Subject: [PATCH 09/42] eth/executionclient: fix tests for assertSameChainID --- eth/executionclient/multi_client_test.go | 35 ++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index e4e29b230e..a84cf04a5a 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -92,6 +92,41 @@ func TestNewMulti_WithOptions(t *testing.T) { require.EqualValues(t, customSyncDistanceTolerance, mc.syncDistanceTolerance) } +func TestMultiClient_assertSameChainIDs(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mc := &MultiClient{ + logger: zap.NewNop(), + } + + require.NoError(t, mc.assertSameChainID(big.NewInt(5))) + require.NoError(t, mc.assertSameChainID(big.NewInt(5))) + + chainID, err := mc.ChainID(context.Background()) + require.NoError(t, err) + require.NotNil(t, chainID) + require.Equal(t, int64(5), chainID.Int64()) +} + +func TestMultiClient_assertSameChainIDs_Error(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mc := &MultiClient{ + logger: zap.NewNop(), + closed: make(chan struct{}), + } + + require.NoError(t, mc.assertSameChainID(big.NewInt(5))) + require.Error(t, mc.assertSameChainID(big.NewInt(6))) + + chainID, err := mc.ChainID(context.Background()) + require.NoError(t, err) + require.NotNil(t, chainID) + require.Equal(t, int64(5), chainID.Int64()) +} + func TestMultiClient_FetchHistoricalLogs(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From b21c1100f970839d36d837e4c44a8dd62646b7de Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 23 Jan 2025 14:11:19 -0300 Subject: [PATCH 10/42] eth/executionclient: improve comment for assertSameGenesis --- beacon/goclient/goclient.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index c1377b4e7f..955ca8d014 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -312,6 +312,11 @@ func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { } } +// assertSameGenesis checks if genesis is same. +// Clients may have different values returned by Spec call, +// so we decided that it's best to assert that GenesisForkVersion is the same. +// To add more assertions, we check the whole apiv1.Genesis (GenesisTime and GenesisValidatorsRoot) +// as they should be same too. func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { gc.genesisMu.Lock() defer gc.genesisMu.Unlock() From 902c99af7cad3028150f781426c4e01241b0c244 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 23 Jan 2025 14:23:09 -0300 Subject: [PATCH 11/42] handle nil genesis and chain ID responses --- beacon/goclient/goclient.go | 8 +++++++- eth/executionclient/multi_client.go | 10 ++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 955ca8d014..8f719d1727 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -283,10 +283,12 @@ func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { if err := gc.assertSameGenesis(genesis.Data); err != nil { gc.genesisMu.Lock() defer gc.genesisMu.Unlock() - gc.log.Fatal("client genesis differs", + + gc.log.Fatal("client returned unexpected genesis", zap.String("address", s.Address()), zap.Any("client_genesis", genesis.Data), zap.Any("expected_genesis", gc.genesis), + zap.Error(err), ) return // Tests may override Fatal's behavior } @@ -321,6 +323,10 @@ func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { gc.genesisMu.Lock() defer gc.genesisMu.Unlock() + if genesis == nil { + return fmt.Errorf("genesis is nil") + } + if gc.genesis == nil { gc.genesis = genesis return nil diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index dad72a79d4..57c113c8a7 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -125,10 +125,12 @@ func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { } if err := mc.assertSameChainID(chainID); err != nil { - mc.logger.Fatal("chain ID mismatch", + mc.logger.Fatal("client returned unexpected chain ID", zap.String("observed_chain_id", mc.chainID.String()), zap.String("checked_chain_id", chainID.String()), - zap.String("address", mc.nodeAddrs[clientIndex])) + zap.String("address", mc.nodeAddrs[clientIndex]), + zap.Error(err), + ) } mc.clientsMu[clientIndex].Lock() @@ -143,6 +145,10 @@ func (mc *MultiClient) assertSameChainID(chainID *big.Int) error { mc.chainIDMu.Lock() defer mc.chainIDMu.Unlock() + if chainID == nil { + return fmt.Errorf("chain ID is nil") + } + if mc.chainID == nil { mc.chainID = chainID return nil From e1cc2e3cc9c81946f9c4fb808176bced2e4e74ca Mon Sep 17 00:00:00 2001 From: y0sher Date: Sun, 26 Jan 2025 14:43:16 +0200 Subject: [PATCH 12/42] refactor(multi_client): replace connectedCount atomic int with bool. replace connected atomic with regular bool value --- eth/executionclient/multi_client.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 57c113c8a7..483184fac5 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -42,7 +42,6 @@ type MultiClient struct { nodeAddrs []string clientsMu []sync.Mutex // each client has own mutex clients []SingleClientProvider // nil if not connected - connectedCount atomic.Uint64 currentClientIndex atomic.Int64 } @@ -74,6 +73,8 @@ func NewMulti( opt(multiClient) } + var connected bool + var multiErr error for clientIndex := range nodeAddrs { if err := multiClient.connect(ctx, clientIndex); err != nil { @@ -85,10 +86,10 @@ func NewMulti( continue } - multiClient.connectedCount.Add(1) + connected = true } - if multiClient.connectedCount.Load() == 0 { + if !connected { return nil, fmt.Errorf("no available clients: %w", multiErr) } @@ -238,7 +239,7 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan // Healthy returns if execution client is currently healthy: responds to requests and not in the syncing state. func (mc *MultiClient) Healthy(ctx context.Context) error { - healthyClients := atomic.Int32{} + healthyClients := atomic.Bool{} p := pool.New().WithErrors().WithContext(ctx) for i := range mc.clients { @@ -265,13 +266,13 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { return err } - healthyClients.Add(1) + healthyClients.Store(true) return nil }) } err := p.Wait() - if healthyClients.Load() > 0 { + if healthyClients.Load() { return nil } return fmt.Errorf("no healthy clients: %w", err) @@ -338,8 +339,6 @@ func (mc *MultiClient) ChainID(_ context.Context) (*big.Int, error) { func (mc *MultiClient) Close() error { close(mc.closed) - mc.connectedCount.Store(0) - var multiErr error for i := range mc.clients { mc.clientsMu[i].Lock() From 2bdc1aced5b1afc67bd23507c83f055b6367a2b5 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 08:58:13 -0300 Subject: [PATCH 13/42] clarify comment --- eth/executionclient/multi_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 483184fac5..c82d8f8b2a 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -97,8 +97,8 @@ func NewMulti( } func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { - // The underlying client may call Fatal on unsuccessful reconnection attempt. - // Therefore, we need to override Fatal's behavior to avoid crashing. + // ExecutionClient may call Fatal on unsuccessful reconnection attempt. + // Therefore, we need to override its Fatal behavior to avoid crashing. logger := mc.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop)) singleClient, err := New( From 8db421f01fc194fd38313bc47859278a7301a46e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 10:36:39 -0300 Subject: [PATCH 14/42] fix double mutex lock --- eth/executionclient/multi_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index c82d8f8b2a..fb00ca23a5 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -96,6 +96,8 @@ func NewMulti( return multiClient, nil } +// connect connects to a client by clientIndex and updates mc.clients[clientIndex] without locks. +// Caller must lock mc.clientsMu[clientIndex]. func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { // ExecutionClient may call Fatal on unsuccessful reconnection attempt. // Therefore, we need to override its Fatal behavior to avoid crashing. @@ -134,9 +136,7 @@ func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { ) } - mc.clientsMu[clientIndex].Lock() mc.clients[clientIndex] = singleClient - mc.clientsMu[clientIndex].Unlock() return nil } From 0dd1027b5f9de2df4a0888961461aa90c4909359 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 10:50:52 -0300 Subject: [PATCH 15/42] fix potential nil pointer dereference --- eth/executionclient/multi_client.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index fb00ca23a5..8bc4734818 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -366,15 +366,14 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi // Iterate over the clients in round-robin fashion, // starting from the most likely healthy client (currentClientIndex). - var startingIndex = int(mc.currentClientIndex.Load()) + startingIndex := int(mc.currentClientIndex.Load()) var allErrs error for i := range mc.clients { - mc.clientsMu[i].Lock() - clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. - client := mc.clients[clientIndex] + mc.clientsMu[i].Lock() + client := mc.clients[clientIndex] if client == nil { if err := mc.connect(ctx, i); err != nil { mc.logger.Warn("failed to connect to client", @@ -386,7 +385,10 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi mc.clientsMu[i].Unlock() continue } + + client = mc.clients[clientIndex] } + mc.clientsMu[i].Unlock() logger := mc.logger.With( zap.String("addr", mc.nodeAddrs[clientIndex]), @@ -401,7 +403,6 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi allErrs = errors.Join(allErrs, err) mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. - mc.clientsMu[i].Unlock() continue } @@ -410,7 +411,6 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi v, err := f(client) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { mc.logger.Debug("received graceful closure from client", zap.Error(err)) - mc.clientsMu[i].Unlock() return v, err } @@ -421,13 +421,11 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi allErrs = errors.Join(allErrs, err) mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. - mc.clientsMu[i].Unlock() continue } // Update currentClientIndex to the successful client. mc.currentClientIndex.Store(int64(clientIndex)) - mc.clientsMu[i].Unlock() return v, nil } From 8452bf46972b4fd80754de8a26917d0f37bec4b3 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 11:47:00 -0300 Subject: [PATCH 16/42] check only genesis fork version instead of whole genesis --- beacon/goclient/goclient.go | 52 ++++++++++--------------------------- 1 file changed, 13 insertions(+), 39 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 8f719d1727..d70e7c31c1 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -6,6 +6,7 @@ import ( "math" "strings" "sync" + "sync/atomic" "time" eth2client "github.com/attestantio/go-eth2-client" @@ -114,8 +115,7 @@ type GoClient struct { clients []Client multiClient MultiClient - genesisMu sync.Mutex - genesis *apiv1.Genesis + genesisVersion atomic.Pointer[phase0.Version] syncDistanceTolerance phase0.Slot nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) @@ -280,14 +280,11 @@ func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { return } - if err := gc.assertSameGenesis(genesis.Data); err != nil { - gc.genesisMu.Lock() - defer gc.genesisMu.Unlock() - - gc.log.Fatal("client returned unexpected genesis", + if expected, err := gc.assertSameGenesisVersion(genesis.Data.GenesisForkVersion); err != nil { + gc.log.Fatal("client returned unexpected genesis fork version, make sure all clients use the same Ethereum network", zap.String("address", s.Address()), - zap.Any("client_genesis", genesis.Data), - zap.Any("expected_genesis", gc.genesis), + zap.Any("client_genesis", genesis.Data.GenesisForkVersion), + zap.Any("expected_genesis", expected), zap.Error(err), ) return // Tests may override Fatal's behavior @@ -319,40 +316,17 @@ func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks { // so we decided that it's best to assert that GenesisForkVersion is the same. // To add more assertions, we check the whole apiv1.Genesis (GenesisTime and GenesisValidatorsRoot) // as they should be same too. -func (gc *GoClient) assertSameGenesis(genesis *apiv1.Genesis) error { - gc.genesisMu.Lock() - defer gc.genesisMu.Unlock() - - if genesis == nil { - return fmt.Errorf("genesis is nil") - } - - if gc.genesis == nil { - gc.genesis = genesis - return nil - } - - if err := sameGenesis(gc.genesis, genesis); err != nil { - return fmt.Errorf("different genesis: %w", err) - } - - return nil -} - -func sameGenesis(a, b *apiv1.Genesis) error { - if !a.GenesisTime.Equal(b.GenesisTime) { - return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime) +func (gc *GoClient) assertSameGenesisVersion(genesisVersion phase0.Version) (phase0.Version, error) { + if gc.genesisVersion.CompareAndSwap(nil, &genesisVersion) { + return genesisVersion, nil } - if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot { - return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot) + expected := *gc.genesisVersion.Load() + if expected != genesisVersion { + return expected, fmt.Errorf("genesis fork version mismatch, expected %v, got %v", expected, genesisVersion) } - if a.GenesisForkVersion != b.GenesisForkVersion { - return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion) - } - - return nil + return expected, nil } func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) { From d5fed4145858c870b69d1a509a10515ad39f7baf Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 12:25:52 -0300 Subject: [PATCH 17/42] create getClient helper method --- eth/executionclient/multi_client.go | 77 +++++++++++++++-------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 8bc4734818..91c9dec058 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -40,7 +40,7 @@ type MultiClient struct { closed chan struct{} nodeAddrs []string - clientsMu []sync.Mutex // each client has own mutex + clientsMu []sync.Mutex // each client has own mutex, mutex is only locked in getClient and Close (on state transition) clients []SingleClientProvider // nil if not connected currentClientIndex atomic.Int64 } @@ -96,6 +96,21 @@ func NewMulti( return multiClient, nil } +// getClient gets a client at index. +// If it's nil (which means it's not connected), it attempts to connect to it and store connected client instead of nil. +func (mc *MultiClient) getClient(ctx context.Context, clientIndex int) (SingleClientProvider, error) { + mc.clientsMu[clientIndex].Lock() + defer mc.clientsMu[clientIndex].Unlock() + + if mc.clients[clientIndex] == nil { + if err := mc.connect(ctx, clientIndex); err != nil { + return nil, fmt.Errorf("connect: %w", err) + } + } + + return mc.clients[clientIndex], nil +} + // connect connects to a client by clientIndex and updates mc.clients[clientIndex] without locks. // Caller must lock mc.clientsMu[clientIndex]. func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { @@ -245,21 +260,16 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { for i := range mc.clients { i := i p.Go(func(ctx context.Context) error { - mc.clientsMu[i].Lock() - defer mc.clientsMu[i].Unlock() - - if mc.clients[i] == nil { - if err := mc.connect(ctx, i); err != nil { - mc.logger.Warn("failed to connect to client", - zap.String("addr", mc.nodeAddrs[i]), - zap.Error(err)) + client, err := mc.getClient(ctx, i) + if err != nil { + mc.logger.Warn("failed to get client", + zap.String("addr", mc.nodeAddrs[i]), + zap.Error(err)) - return err - } + return err } - err := mc.clients[i].Healthy(ctx) - if err != nil { + if err := client.Healthy(ctx); err != nil { mc.logger.Warn("client is not healthy", zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) @@ -342,18 +352,16 @@ func (mc *MultiClient) Close() error { var multiErr error for i := range mc.clients { mc.clientsMu[i].Lock() - - if mc.clients[i] == nil { - mc.clientsMu[i].Unlock() - continue - } - if err := mc.clients[i].Close(); err != nil { - mc.logger.Debug("Failed to close client", zap.String("address", mc.nodeAddrs[i]), zap.Error(err)) - multiErr = errors.Join(multiErr, err) - } + client := mc.clients[i] mc.clients[i] = nil - mc.clientsMu[i].Unlock() + + if client != nil { + if err := client.Close(); err != nil { + mc.logger.Debug("Failed to close client", zap.String("address", mc.nodeAddrs[i]), zap.Error(err)) + multiErr = errors.Join(multiErr, err) + } + } } return multiErr @@ -372,23 +380,16 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. - mc.clientsMu[i].Lock() - client := mc.clients[clientIndex] - if client == nil { - if err := mc.connect(ctx, i); err != nil { - mc.logger.Warn("failed to connect to client", - zap.String("addr", mc.nodeAddrs[i]), - zap.Error(err)) - - allErrs = errors.Join(allErrs, err) - mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. - mc.clientsMu[i].Unlock() - continue - } + client, err := mc.getClient(ctx, clientIndex) + if err != nil { + mc.logger.Warn("failed to get client", + zap.String("addr", mc.nodeAddrs[i]), + zap.Error(err)) - client = mc.clients[clientIndex] + allErrs = errors.Join(allErrs, err) + mc.currentClientIndex.Store(int64(nextClientIndex)) // Advance. + continue } - mc.clientsMu[i].Unlock() logger := mc.logger.With( zap.String("addr", mc.nodeAddrs[clientIndex]), From 75e275370a0fa0e5b6cc6ab31a39010fc694d27a Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 12:28:28 -0300 Subject: [PATCH 18/42] improve logs --- eth/executionclient/multi_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 91c9dec058..657468982e 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -262,7 +262,7 @@ func (mc *MultiClient) Healthy(ctx context.Context) error { p.Go(func(ctx context.Context) error { client, err := mc.getClient(ctx, i) if err != nil { - mc.logger.Warn("failed to get client", + mc.logger.Warn("client unavailable", zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) @@ -382,7 +382,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi client, err := mc.getClient(ctx, clientIndex) if err != nil { - mc.logger.Warn("failed to get client", + mc.logger.Warn("client unavailable, switching to next client", zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) From 999608ba6333d8c32d4f620c99ab8ca5f386447d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 13:21:55 -0300 Subject: [PATCH 19/42] fix issue with log fields --- eth/executionclient/multi_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 657468982e..e754076dd2 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -411,12 +411,12 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi v, err := f(client) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { - mc.logger.Debug("received graceful closure from client", zap.Error(err)) + logger.Debug("received graceful closure from client", zap.Error(err)) return v, err } if err != nil { - mc.logger.Error("call failed, trying another client", + logger.Error("call failed, trying another client", zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), zap.Error(err)) From 66d5051f23f1f57a1a230dbd49fded5c6c1135d6 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 14:18:24 -0300 Subject: [PATCH 20/42] atomic chain ID --- eth/executionclient/multi_client.go | 29 ++++++++++-------------- eth/executionclient/multi_client_test.go | 27 ++++++++++++++-------- 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index e754076dd2..c8b133a313 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -35,8 +35,7 @@ type MultiClient struct { syncDistanceTolerance uint64 contractAddress ethcommon.Address - chainIDMu sync.Mutex - chainID *big.Int + chainID atomic.Pointer[big.Int] closed chan struct{} nodeAddrs []string @@ -142,9 +141,9 @@ func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { return fmt.Errorf("get chain ID: %w", err) } - if err := mc.assertSameChainID(chainID); err != nil { + if expected, err := mc.assertSameChainID(chainID); err != nil { mc.logger.Fatal("client returned unexpected chain ID", - zap.String("observed_chain_id", mc.chainID.String()), + zap.String("expected_chain_id", expected.String()), zap.String("checked_chain_id", chainID.String()), zap.String("address", mc.nodeAddrs[clientIndex]), zap.Error(err), @@ -157,25 +156,21 @@ func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { // assertSameChainID checks if client has the same chain ID. // It sets mc.chainID to the chain ID of the first healthy client encountered. -func (mc *MultiClient) assertSameChainID(chainID *big.Int) error { - mc.chainIDMu.Lock() - defer mc.chainIDMu.Unlock() - +func (mc *MultiClient) assertSameChainID(chainID *big.Int) (*big.Int, error) { if chainID == nil { - return fmt.Errorf("chain ID is nil") + return nil, fmt.Errorf("chain ID is nil") } - if mc.chainID == nil { - mc.chainID = chainID - return nil + if mc.chainID.CompareAndSwap(nil, chainID) { + return chainID, nil } - if mc.chainID.Cmp(chainID) != 0 { - return fmt.Errorf("different chain ID, expected %v, got %v", - mc.chainID.String(), chainID.String()) + expected := mc.chainID.Load() + if expected.Cmp(chainID) != 0 { + return expected, fmt.Errorf("different chain ID, expected %v, got %v", expected.String(), chainID.String()) } - return nil + return expected, nil } // FetchHistoricalLogs retrieves historical logs emitted by the contract starting from fromBlock. @@ -343,7 +338,7 @@ func (mc *MultiClient) Filterer() (*contract.ContractFilterer, error) { } func (mc *MultiClient) ChainID(_ context.Context) (*big.Int, error) { - return mc.chainID, nil + return mc.chainID.Load(), nil } func (mc *MultiClient) Close() error { diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index a84cf04a5a..8e85a8a482 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -100,8 +100,12 @@ func TestMultiClient_assertSameChainIDs(t *testing.T) { logger: zap.NewNop(), } - require.NoError(t, mc.assertSameChainID(big.NewInt(5))) - require.NoError(t, mc.assertSameChainID(big.NewInt(5))) + expected, err := mc.assertSameChainID(big.NewInt(5)) + require.NoError(t, err) + require.EqualValues(t, 5, expected.Uint64()) + expected, err = mc.assertSameChainID(big.NewInt(5)) + require.NoError(t, err) + require.EqualValues(t, 5, expected.Uint64()) chainID, err := mc.ChainID(context.Background()) require.NoError(t, err) @@ -118,8 +122,12 @@ func TestMultiClient_assertSameChainIDs_Error(t *testing.T) { closed: make(chan struct{}), } - require.NoError(t, mc.assertSameChainID(big.NewInt(5))) - require.Error(t, mc.assertSameChainID(big.NewInt(6))) + expected, err := mc.assertSameChainID(big.NewInt(5)) + require.NoError(t, err) + require.EqualValues(t, 5, expected.Uint64()) + expected, err = mc.assertSameChainID(big.NewInt(6)) + require.Error(t, err) + require.EqualValues(t, 5, expected.Uint64()) chainID, err := mc.ChainID(context.Background()) require.NoError(t, err) @@ -1142,10 +1150,10 @@ func TestMultiClient_Filterer_Integration(t *testing.T) { func TestMultiClient_ChainID(t *testing.T) { mc := &MultiClient{ - chainID: big.NewInt(5), - logger: zap.NewNop(), - closed: make(chan struct{}), + logger: zap.NewNop(), + closed: make(chan struct{}), } + mc.chainID.Store(big.NewInt(5)) cid, err := mc.ChainID(context.Background()) require.NoError(t, err) @@ -1154,9 +1162,8 @@ func TestMultiClient_ChainID(t *testing.T) { func TestMultiClient_ChainID_NotSet(t *testing.T) { mc := &MultiClient{ - chainID: nil, - logger: zap.NewNop(), - closed: make(chan struct{}), + logger: zap.NewNop(), + closed: make(chan struct{}), } cid, err := mc.ChainID(context.Background()) From 61874f7fda06e2ef4afe3504f5ac40ec32cc3e9e Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 14:19:38 -0300 Subject: [PATCH 21/42] fix comment for client mutexes --- eth/executionclient/multi_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index c8b133a313..b87668f9ea 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -39,7 +39,7 @@ type MultiClient struct { closed chan struct{} nodeAddrs []string - clientsMu []sync.Mutex // each client has own mutex, mutex is only locked in getClient and Close (on state transition) + clientsMu []sync.Mutex // clientsMu allow for lazy initialization of each client in `clients` slice in thread-safe manner (atomically) clients []SingleClientProvider // nil if not connected currentClientIndex atomic.Int64 } From 70594e85ff555e94dd01bf32547fcde0c618ccc0 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 14:20:32 -0300 Subject: [PATCH 22/42] no nil assignment in Close --- eth/executionclient/multi_client.go | 1 - 1 file changed, 1 deletion(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index b87668f9ea..8d72cca837 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -348,7 +348,6 @@ func (mc *MultiClient) Close() error { for i := range mc.clients { mc.clientsMu[i].Lock() client := mc.clients[i] - mc.clients[i] = nil mc.clientsMu[i].Unlock() if client != nil { From 3cc71cd600f68a290add09f7956cddb29197c3e1 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 16:04:56 -0300 Subject: [PATCH 23/42] add panic hook --- eth/executionclient/multi_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 8d72cca837..8609819973 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -115,7 +115,7 @@ func (mc *MultiClient) getClient(ctx context.Context, clientIndex int) (SingleCl func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { // ExecutionClient may call Fatal on unsuccessful reconnection attempt. // Therefore, we need to override its Fatal behavior to avoid crashing. - logger := mc.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop)) + logger := mc.logger.WithOptions(zap.WithFatalHook(zapcore.WriteThenNoop), zap.WithPanicHook(zapcore.WriteThenNoop)) singleClient, err := New( ctx, From 7dd5aa0890290d6a6dcd1bb3d619056d7f5b5119 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 16:05:20 -0300 Subject: [PATCH 24/42] attempt to fix panic --- eth/executionclient/execution_client.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index f76196e6eb..94df8a9b97 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -298,7 +298,11 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error { defer ec.healthyChMu.Unlock() if err := ec.healthy(ctx); err != nil { - close(ec.healthyCh) + select { + case <-ec.healthyCh: // already closed + default: + close(ec.healthyCh) + } return fmt.Errorf("unhealthy: %w", err) } From ab9166858b897927fe9f2ac7189695df89967381 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 16:07:40 -0300 Subject: [PATCH 25/42] fix logging --- eth/executionclient/multi_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 8609819973..28a73b925c 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -135,14 +135,14 @@ func (mc *MultiClient) connect(ctx context.Context, clientIndex int) error { chainID, err := singleClient.ChainID(ctx) if err != nil { - mc.logger.Error("failed to get chain ID", + logger.Error("failed to get chain ID", zap.String("address", mc.nodeAddrs[clientIndex]), zap.Error(err)) return fmt.Errorf("get chain ID: %w", err) } if expected, err := mc.assertSameChainID(chainID); err != nil { - mc.logger.Fatal("client returned unexpected chain ID", + logger.Fatal("client returned unexpected chain ID", zap.String("expected_chain_id", expected.String()), zap.String("checked_chain_id", chainID.String()), zap.String("address", mc.nodeAddrs[clientIndex]), From 99405c92bda2d25d5e67d17c2f2e6bd7e5f08155 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 16:25:37 -0300 Subject: [PATCH 26/42] iterate clients forever in StreamLogs --- eth/executionclient/multi_client.go | 26 ++++++++++++++++-------- eth/executionclient/multi_client_test.go | 2 +- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 28a73b925c..6cde980607 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -3,6 +3,7 @@ package executionclient import ( "context" "fmt" + "math" "math/big" "sync" "sync/atomic" @@ -192,7 +193,7 @@ func (mc *MultiClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64 return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f) + _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f, false) if err != nil { return nil, nil, err } @@ -233,7 +234,7 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f) + _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f, true) if err != nil && !errors.Is(err, ErrClosed) && !errors.Is(err, context.Canceled) { // NOTE: There are unit tests that trigger Fatal and override its behavior. // Therefore, the code must call `return` afterward. @@ -288,7 +289,7 @@ func (mc *MultiClient) BlockByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.BlockByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f) + res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f, false) if err != nil { return nil, err } @@ -301,7 +302,7 @@ func (mc *MultiClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.HeaderByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f) + res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f, false) if err != nil { return nil, err } @@ -313,7 +314,7 @@ func (mc *MultiClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filte f := func(client SingleClientProvider) (any, error) { return client.SubscribeFilterLogs(ctx, q, ch) } - res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f) + res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f, false) if err != nil { return nil, err } @@ -325,7 +326,7 @@ func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ( f := func(client SingleClientProvider) (any, error) { return client.FilterLogs(ctx, q) } - res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f) + res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f, false) if err != nil { return nil, err } @@ -361,7 +362,12 @@ func (mc *MultiClient) Close() error { return multiErr } -func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error)) (any, error) { +// call calls f for all clients until it succeeds. +// If forever is false, it tries all clients only once and if no client is available then it returns an error. +// If forever is true, it iterates clients forever. +// It's used in StreamLogs because it's called once per the node lifetime, +// and it's possible that clients go up and down several times, therefore there's no limit. +func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error), forever bool) (any, error) { if len(mc.clients) == 1 { return f(mc.clients[0]) // no need for mutex because one client is always non-nil } @@ -370,7 +376,11 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi // starting from the most likely healthy client (currentClientIndex). startingIndex := int(mc.currentClientIndex.Load()) var allErrs error - for i := range mc.clients { + limit := len(mc.clients) + if forever { + limit = math.MaxInt32 + } + for i := 0; i < limit; i++ { clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index 8e85a8a482..c0aca9852a 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -1330,7 +1330,7 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) { return client.streamLogsToChan(context.TODO(), nil, 200) } - _, err := mc.call(context.Background(), f) + _, err := mc.call(context.Background(), f, false) require.Error(t, err) require.Contains(t, err.Error(), "all clients failed") } From 768665d133ea19d21959e5c275df3b4d8bfbea86 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 17:16:30 -0300 Subject: [PATCH 27/42] set follow distance to 1 --- eth/executionclient/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/defaults.go b/eth/executionclient/defaults.go index 8f42a11e13..42cb1e2ec9 100644 --- a/eth/executionclient/defaults.go +++ b/eth/executionclient/defaults.go @@ -9,7 +9,7 @@ const ( DefaultReconnectionInitialInterval = 1 * time.Second DefaultReconnectionMaxInterval = 64 * time.Second DefaultHealthInvalidationInterval = 10 * time.Second // TODO: decide on this value, for now choosing a bit less than block interval - DefaultFollowDistance = 8 + DefaultFollowDistance = 1 // TODO ALAN: revert DefaultHistoricalLogsBatchSize = 500 defaultLogBuf = 8 * 1024 From 6af3463ee4f284d24e94a1729f2175c127b96428 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 17:31:19 -0300 Subject: [PATCH 28/42] Revert "set follow distance to 1" This reverts commit 768665d133ea19d21959e5c275df3b4d8bfbea86. --- eth/executionclient/defaults.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/defaults.go b/eth/executionclient/defaults.go index 42cb1e2ec9..8f42a11e13 100644 --- a/eth/executionclient/defaults.go +++ b/eth/executionclient/defaults.go @@ -9,7 +9,7 @@ const ( DefaultReconnectionInitialInterval = 1 * time.Second DefaultReconnectionMaxInterval = 64 * time.Second DefaultHealthInvalidationInterval = 10 * time.Second // TODO: decide on this value, for now choosing a bit less than block interval - DefaultFollowDistance = 1 + DefaultFollowDistance = 8 // TODO ALAN: revert DefaultHistoricalLogsBatchSize = 500 defaultLogBuf = 8 * 1024 From e9f5c7bada23e5beda3afff922fa925cbeabea60 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 17:51:14 -0300 Subject: [PATCH 29/42] delete healthy channel --- eth/executionclient/execution_client.go | 31 +------------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index 94df8a9b97..11d46f27b2 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "math/big" - "sync" "sync/atomic" "time" @@ -49,7 +48,6 @@ var _ Provider = &ExecutionClient{} var ( ErrClosed = fmt.Errorf("closed") - ErrUnhealthy = fmt.Errorf("unhealthy") ErrNotConnected = fmt.Errorf("not connected") ErrBadInput = fmt.Errorf("bad input") ErrNothingToSync = errors.New("nothing to sync") @@ -81,8 +79,6 @@ type ExecutionClient struct { client *ethclient.Client closed chan struct{} lastSyncedTime atomic.Int64 - healthyChMu sync.Mutex - healthyCh chan struct{} } // New creates a new instance of ExecutionClient. @@ -98,7 +94,6 @@ func New(ctx context.Context, nodeAddr string, contractAddr ethcommon.Address, o healthInvalidationInterval: DefaultHealthInvalidationInterval, logBatchSize: DefaultHistoricalLogsBatchSize, // TODO Make batch of logs adaptive depending on "websocket: read limit" closed: make(chan struct{}), - healthyCh: make(chan struct{}), } for _, opt := range opts { opt(client) @@ -246,8 +241,6 @@ func (ec *ExecutionClient) StreamLogs(ctx context.Context, fromBlock uint64) <-c return case <-ec.closed: return - case <-ec.healthyCh: - return default: lastBlock, err := ec.streamLogsToChan(ctx, logs, fromBlock) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { @@ -294,26 +287,7 @@ func (ec *ExecutionClient) Healthy(ctx context.Context) error { return nil } - ec.healthyChMu.Lock() - defer ec.healthyChMu.Unlock() - - if err := ec.healthy(ctx); err != nil { - select { - case <-ec.healthyCh: // already closed - default: - close(ec.healthyCh) - } - return fmt.Errorf("unhealthy: %w", err) - } - - // Reset the healthyCh channel if it was closed. - select { - case <-ec.healthyCh: - default: - ec.healthyCh = make(chan struct{}) - } - - return nil + return ec.healthy(ctx) } func (ec *ExecutionClient) healthy(ctx context.Context) error { @@ -430,9 +404,6 @@ func (ec *ExecutionClient) streamLogsToChan(ctx context.Context, logs chan<- Blo case <-ec.closed: return fromBlock, ErrClosed - case <-ec.healthyCh: - return fromBlock, ErrUnhealthy - case err := <-sub.Err(): if err == nil { return fromBlock, ErrClosed From f093561e6f5d5f176c4cb73824997140c9a7ecb5 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Mon, 27 Jan 2025 21:06:48 -0300 Subject: [PATCH 30/42] remove obsolete tests --- eth/executionclient/multi_client_test.go | 170 ----------------------- 1 file changed, 170 deletions(-) diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index c0aca9852a..00219e750c 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -522,91 +522,6 @@ func TestMultiClient_StreamLogs_Failover(t *testing.T) { require.Equal(t, uint64(202), receivedLogs[2].BlockNumber) } -func TestMultiClient_StreamLogs_AllClientsFail(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - // Setup both clients to fail - mockClient1. - EXPECT(). - streamLogsToChan(gomock.Any(), gomock.Any(), uint64(200)). - DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { - out <- BlockLogs{BlockNumber: 200} - return 200, errors.New("network error") // Triggers failover - }). - Times(1) - - mockClient1. - EXPECT(). - Healthy(gomock.Any()). - DoAndReturn(func(ctx context.Context) error { - return nil - }). - AnyTimes() - - mockClient2. - EXPECT(). - streamLogsToChan(gomock.Any(), gomock.Any(), uint64(201)). // Updated fromBlock to 201 - DoAndReturn(func(_ context.Context, out chan<- BlockLogs, fromBlock uint64) (uint64, error) { - out <- BlockLogs{BlockNumber: 201} - return 201, errors.New("network error") // All clients failed - }). - Times(1) - - mockClient2. - EXPECT(). - Healthy(gomock.Any()). - DoAndReturn(func(ctx context.Context) error { - return nil - }). - AnyTimes() - - hook := &fatalHook{} - - mc := &MultiClient{ - nodeAddrs: []string{"mockNode1", "mockNode2"}, - clients: []SingleClientProvider{mockClient1, mockClient2}, - clientsMu: make([]sync.Mutex, 2), - logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), - closed: make(chan struct{}), - } - - logsCh := mc.StreamLogs(ctx, 200) - - var wg sync.WaitGroup - wg.Add(2) // Expecting two logs: 200, 201 - - var receivedLogs []BlockLogs - var mu sync.Mutex - - go func() { - for blk := range logsCh { - mu.Lock() - receivedLogs = append(receivedLogs, blk) - mu.Unlock() - wg.Done() - } - }() - - wg.Wait() - - require.Len(t, receivedLogs, 2, "expected to receive two logs") - require.Equal(t, uint64(200), receivedLogs[0].BlockNumber) - require.Equal(t, uint64(201), receivedLogs[1].BlockNumber) - - _, open := <-logsCh - require.False(t, open, "logs channel should be closed after all logs are received") - - // Make sure Fatal was called due to all clients failing - require.True(t, hook.called, "expected Fatal to be called due to all clients failing") -} - func TestMultiClient_StreamLogs_SameFromBlock(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -746,41 +661,6 @@ func TestMultiClient_StreamLogs_MultipleFailoverAttempts(t *testing.T) { require.False(t, open, "logs channel should be closed after all logs are received") } -func TestMultiClient_StreamLogs_NoHealthyClients(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctx := context.Background() - nodeAddrs := []string{"mockNode1", "mockNode2"} - contractAddr := ethcommon.HexToAddress("0x1234") - fromBlock := uint64(100) - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - healthErr := errors.New("client1 unhealthy") - mockClient1.EXPECT().Healthy(gomock.Any()).Return(healthErr).AnyTimes() - mockClient2.EXPECT().Healthy(gomock.Any()).Return(healthErr).AnyTimes() - - hook := &fatalHook{} - - mc := &MultiClient{ - logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), - nodeAddrs: nodeAddrs, - clients: []SingleClientProvider{mockClient1, mockClient2}, - clientsMu: make([]sync.Mutex, 2), - closed: make(chan struct{}), - contractAddress: contractAddr, - } - - logsCh := mc.StreamLogs(ctx, fromBlock) - - _, open := <-logsCh - require.False(t, open, "logs channel should be closed after fatal log") - - require.True(t, hook.called, "expected Fatal to be called") -} - func TestMultiClient_Healthy(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1335,56 +1215,6 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) { require.Contains(t, err.Error(), "all clients failed") } -func TestMultiClient_ReconnectionLimit(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - mockClient1 := NewMockSingleClientProvider(ctrl) - mockClient2 := NewMockSingleClientProvider(ctrl) - - mockClient1. - EXPECT(). - Healthy(gomock.Any()). - Return(nil). - AnyTimes() - - mockClient1. - EXPECT(). - streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). - Return(uint64(200), fmt.Errorf("streaming error")). - AnyTimes() - - mockClient2. - EXPECT(). - Healthy(gomock.Any()). - Return(nil). - AnyTimes() - - mockClient2. - EXPECT(). - streamLogsToChan(gomock.Any(), gomock.Any(), gomock.Any()). - Return(uint64(200), fmt.Errorf("streaming error")). - Times(1) - - hook := &fatalHook{} - - mc := &MultiClient{ - nodeAddrs: []string{"mockNode1", "mockNode2"}, - clients: []SingleClientProvider{mockClient1, mockClient2}, - clientsMu: make([]sync.Mutex, 2), - logger: zap.NewNop().WithOptions(zap.WithFatalHook(hook)), - closed: make(chan struct{}), - } - - logsCh := mc.StreamLogs(ctx, 200) - - _, open := <-logsCh - require.False(t, open, "logs channel should be closed after reconnection attempts limit") -} - type fatalHook struct { mu sync.Mutex called bool From 7b3a1f5cf664ce198608d0adc27370fcf8d7bc06 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 28 Jan 2025 09:51:25 -0300 Subject: [PATCH 31/42] add successful call log --- eth/executionclient/multi_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 6cde980607..b62fed348c 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -429,6 +429,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi continue } + logger.Debug("call succeeded") // Update currentClientIndex to the successful client. mc.currentClientIndex.Store(int64(clientIndex)) return v, nil From 16dc3979090ac9c2acad5c47223f7f4522ff950a Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 28 Jan 2025 15:51:20 -0300 Subject: [PATCH 32/42] fix potential nil ptr dereference --- eth/executionclient/execution_client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/eth/executionclient/execution_client.go b/eth/executionclient/execution_client.go index 11d46f27b2..0453c25fb3 100644 --- a/eth/executionclient/execution_client.go +++ b/eth/executionclient/execution_client.go @@ -442,14 +442,14 @@ func (ec *ExecutionClient) connect(ctx context.Context) error { defer cancel() start := time.Now() - var err error - ec.client, err = ethclient.DialContext(ctx, ec.nodeAddr) + client, err := ethclient.DialContext(ctx, ec.nodeAddr) if err != nil { ec.logger.Error(elResponseErrMsg, zap.String("operation", "DialContext"), zap.Error(err)) return err } + ec.client = client logger.Info("connected to execution client", zap.Duration("took", time.Since(start))) return nil From 95af8011237843a58cdffec71c5fceed0bb7fde6 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 28 Jan 2025 19:11:05 -0300 Subject: [PATCH 33/42] go-eth2-client: use fork with sync distance tolerance --- beacon/goclient/goclient.go | 1 + go.mod | 4 +++- go.sum | 4 ++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index d70e7c31c1..38fd1d1363 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -234,6 +234,7 @@ func (gc *GoClient) addSingleClient(ctx context.Context, addr string) error { eth2clienthttp.WithReducedMemoryUsage(true), eth2clienthttp.WithAllowDelayedStart(true), eth2clienthttp.WithHooks(gc.singleClientHooks()), + eth2clienthttp.WithSyncDistanceTolerance(gc.syncDistanceTolerance), ) if err != nil { gc.log.Error("Consensus http client initialization failed", diff --git a/go.mod b/go.mod index 3e58ac2202..9af922636a 100644 --- a/go.mod +++ b/go.mod @@ -68,7 +68,7 @@ require ( github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bits-and-blooms/bitset v1.13.0 // indirect - github.com/cockroachdb/errors v1.11.3 // indirect + github.com/cockroachdb/errors v1.11.3 github.com/cockroachdb/fifo v0.0.0-20240606204812-0bbfbd93a7ce // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect github.com/cockroachdb/pebble v1.1.1 // indirect @@ -250,3 +250,5 @@ require ( replace github.com/google/flatbuffers => github.com/google/flatbuffers v1.11.0 replace github.com/dgraph-io/ristretto => github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f + +replace github.com/attestantio/go-eth2-client => github.com/nkryuchkov/go-eth2-client v0.0.0-20250128220448-69c1f6bda599 diff --git a/go.sum b/go.sum index 5ead0985b9..3ab605e916 100644 --- a/go.sum +++ b/go.sum @@ -38,8 +38,6 @@ github.com/aristanetworks/glog v0.0.0-20191112221043-67e8567f59f3/go.mod h1:KASm github.com/aristanetworks/goarista v0.0.0-20200805130819-fd197cf57d96 h1:XJH0YfVFKbq782tlNThzN/Ud5qm/cx6LXOA/P6RkTxc= github.com/aristanetworks/goarista v0.0.0-20200805130819-fd197cf57d96/go.mod h1:QZe5Yh80Hp1b6JxQdpfSEEe8X7hTyTEZSosSrFf/oJE= github.com/aristanetworks/splunk-hec-go v0.3.3/go.mod h1:1VHO9r17b0K7WmOlLb9nTk/2YanvOEnLMUgsFrxBROc= -github.com/attestantio/go-eth2-client v0.21.7 h1:tdTJWiOJUCDmYSDt5C8D8+N5Hxfos0yLp+iVT7tKWMk= -github.com/attestantio/go-eth2-client v0.21.7/go.mod h1:d7ZPNrMX8jLfIgML5u7QZxFo2AukLM+5m08iMaLdqb8= github.com/bazelbuild/rules_go v0.23.2 h1:Wxu7JjqnF78cKZbsBsARLSXx/jlGaSLCnUV3mTlyHvM= github.com/bazelbuild/rules_go v0.23.2/go.mod h1:MC23Dc/wkXEyk3Wpq6lCqz0ZAYOZDw2DR5y3N1q2i7M= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= @@ -532,6 +530,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= +github.com/nkryuchkov/go-eth2-client v0.0.0-20250128220448-69c1f6bda599 h1:2+BPHc7DR3H4lYwSXP7MVUIyd/R2U14RwqedznYAvMc= +github.com/nkryuchkov/go-eth2-client v0.0.0-20250128220448-69c1f6bda599/go.mod h1:d7ZPNrMX8jLfIgML5u7QZxFo2AukLM+5m08iMaLdqb8= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/nxadm/tail v1.4.11 h1:8feyoE3OzPrcshW5/MJ4sGESc5cqmGkGCWlco4l0bqY= From 411f482d53b51e0b3362af0ac66f66c8dd4a8d65 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 28 Jan 2025 20:00:59 -0300 Subject: [PATCH 34/42] add a comment about using github.com/nkryuchkov/go-eth2-client --- go.mod | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go.mod b/go.mod index 9af922636a..f50a559299 100644 --- a/go.mod +++ b/go.mod @@ -251,4 +251,8 @@ replace github.com/google/flatbuffers => github.com/google/flatbuffers v1.11.0 replace github.com/dgraph-io/ristretto => github.com/dgraph-io/ristretto v0.1.1-0.20211108053508-297c39e6640f +// github.com/attestantio/go-eth2-client considers node as synced if is_syncing is false, +// however, it's been observed that it may take up to around 50 slots until is_syncing becomes true, +// which causes downtime. +// Using a fix from https://github.com/nkryuchkov/go-eth2-client/commits/fix-multi-rotation-v0.21.7/ replace github.com/attestantio/go-eth2-client => github.com/nkryuchkov/go-eth2-client v0.0.0-20250128220448-69c1f6bda599 From cbd61f54928044911e17377461b795712d055584 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Tue, 28 Jan 2025 20:17:34 -0300 Subject: [PATCH 35/42] code review comments --- eth/executionclient/multi_client.go | 35 ++++++++++++------------ eth/executionclient/multi_client_test.go | 2 +- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index b62fed348c..4600cfede1 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -3,7 +3,6 @@ package executionclient import ( "context" "fmt" - "math" "math/big" "sync" "sync/atomic" @@ -193,7 +192,7 @@ func (mc *MultiClient) FetchHistoricalLogs(ctx context.Context, fromBlock uint64 return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f, false) + _, err := mc.call(contextWithMethod(ctx, "FetchHistoricalLogs"), f, len(mc.clients)) if err != nil { return nil, nil, err } @@ -234,7 +233,7 @@ func (mc *MultiClient) StreamLogs(ctx context.Context, fromBlock uint64) <-chan return nil, nil } - _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f, true) + _, err := mc.call(contextWithMethod(ctx, "StreamLogs"), f, 0) if err != nil && !errors.Is(err, ErrClosed) && !errors.Is(err, context.Canceled) { // NOTE: There are unit tests that trigger Fatal and override its behavior. // Therefore, the code must call `return` afterward. @@ -289,7 +288,7 @@ func (mc *MultiClient) BlockByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.BlockByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f, false) + res, err := mc.call(contextWithMethod(ctx, "BlockByNumber"), f, len(mc.clients)) if err != nil { return nil, err } @@ -302,7 +301,7 @@ func (mc *MultiClient) HeaderByNumber(ctx context.Context, blockNumber *big.Int) f := func(client SingleClientProvider) (any, error) { return client.HeaderByNumber(ctx, blockNumber) } - res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f, false) + res, err := mc.call(contextWithMethod(ctx, "HeaderByNumber"), f, len(mc.clients)) if err != nil { return nil, err } @@ -314,7 +313,7 @@ func (mc *MultiClient) SubscribeFilterLogs(ctx context.Context, q ethereum.Filte f := func(client SingleClientProvider) (any, error) { return client.SubscribeFilterLogs(ctx, q, ch) } - res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f, false) + res, err := mc.call(contextWithMethod(ctx, "SubscribeFilterLogs"), f, len(mc.clients)) if err != nil { return nil, err } @@ -326,7 +325,7 @@ func (mc *MultiClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ( f := func(client SingleClientProvider) (any, error) { return client.FilterLogs(ctx, q) } - res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f, false) + res, err := mc.call(contextWithMethod(ctx, "FilterLogs"), f, len(mc.clients)) if err != nil { return nil, err } @@ -363,11 +362,16 @@ func (mc *MultiClient) Close() error { } // call calls f for all clients until it succeeds. -// If forever is false, it tries all clients only once and if no client is available then it returns an error. -// If forever is true, it iterates clients forever. -// It's used in StreamLogs because it's called once per the node lifetime, -// and it's possible that clients go up and down several times, therefore there's no limit. -func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error), forever bool) (any, error) { +// +// If there's only one client, call just calls f for it. The maxTries parameter is ignored in this case. +// If forever is not 0, it tries all clients in a round-robin logic until the limit is hit, +// and if no client is available then it returns an error. +// If maxTries is 0, it iterates clients forever. +// +// It must be called with maxTries == 0 from StreamLogs because StreamLogs is called once per the node lifetime, +// and it's possible that clients go up and down several times, therefore there should be no limit. +// It must be called with maxTries != 0 from other methods to return an error if all nodes are down. +func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvider) (any, error), maxTries int) (any, error) { if len(mc.clients) == 1 { return f(mc.clients[0]) // no need for mutex because one client is always non-nil } @@ -376,11 +380,8 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi // starting from the most likely healthy client (currentClientIndex). startingIndex := int(mc.currentClientIndex.Load()) var allErrs error - limit := len(mc.clients) - if forever { - limit = math.MaxInt32 - } - for i := 0; i < limit; i++ { + // Iterate maxTries times if maxTries != 0. Iterate forever if maxTries == 0 + for i := 0; (maxTries == 0) || (i < maxTries); i++ { clientIndex := (startingIndex + i) % len(mc.clients) nextClientIndex := (clientIndex + 1) % len(mc.clients) // For logging. diff --git a/eth/executionclient/multi_client_test.go b/eth/executionclient/multi_client_test.go index 00219e750c..8299d13210 100644 --- a/eth/executionclient/multi_client_test.go +++ b/eth/executionclient/multi_client_test.go @@ -1210,7 +1210,7 @@ func TestMultiClient_Call_AllClientsFail(t *testing.T) { return client.streamLogsToChan(context.TODO(), nil, 200) } - _, err := mc.call(context.Background(), f, false) + _, err := mc.call(context.Background(), f, len(mc.clients)) require.Error(t, err) require.Contains(t, err.Error(), "all clients failed") } From f8188c644ba22cb7f7e1be3662f97555f0e558e9 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 29 Jan 2025 09:35:42 -0300 Subject: [PATCH 36/42] named consensus client logger --- beacon/goclient/goclient.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 38fd1d1363..f336673829 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -158,7 +158,7 @@ func New( } client := &GoClient{ - log: logger, + log: logger.Named("consensus_client"), ctx: opt.Context, network: opt.Network, syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance), From 5b72d6a5546ad3cfe24f78ae03cae353945d69fc Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 29 Jan 2025 11:50:17 -0300 Subject: [PATCH 37/42] remove trace logs --- eth/executionclient/multi_client.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 4600cfede1..660e5d1eae 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -412,8 +412,6 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi continue } - logger.Debug("calling client") - v, err := f(client) if errors.Is(err, ErrClosed) || errors.Is(err, context.Canceled) { logger.Debug("received graceful closure from client", zap.Error(err)) @@ -430,7 +428,6 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi continue } - logger.Debug("call succeeded") // Update currentClientIndex to the successful client. mc.currentClientIndex.Store(int64(clientIndex)) return v, nil From 77ca3cfd1ceb1a655c2cb1b0e17a3c9db64bbe41 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 29 Jan 2025 11:50:55 -0300 Subject: [PATCH 38/42] fix a typo --- eth/executionclient/multi_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 660e5d1eae..c9fe7c837f 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -364,7 +364,7 @@ func (mc *MultiClient) Close() error { // call calls f for all clients until it succeeds. // // If there's only one client, call just calls f for it. The maxTries parameter is ignored in this case. -// If forever is not 0, it tries all clients in a round-robin logic until the limit is hit, +// If maxTries is not 0, it tries all clients in a round-robin logic until the limit is hit, // and if no client is available then it returns an error. // If maxTries is 0, it iterates clients forever. // From 0406ad3a34018f1942c14f790961ccbaed6af05d Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 29 Jan 2025 22:37:30 -0300 Subject: [PATCH 39/42] add a comment with execution client shutdown scenarios --- eth/executionclient/multi_client.go | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index c9fe7c837f..5ab4537068 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -21,6 +21,35 @@ import ( var _ Provider = &MultiClient{} +// MultiClient wraps several execution clients and uses the current available one. +// +// There are several scenarios of node outage: +// +// 1) SSV node uses EL1, EL2, CL1, CL2; CL1 uses only EL1, CL2 uses only EL2. EL1 becomes unavailable. +// +// The execution client MultiClient switches to EL2, consensus multi client (another package) remains on CL1. +// CL1 remains available and responds to requests but its syncing distance increases until EL1 is up. +// The consensus multi client cannot determine that it's unhealthy until the sync distance reaches SyncDistanceTolerance, +// but when it does, it switches to CL2 and then SSV node uses EL2 and CL2. +// This case usually causes duty misses approximately for duration of SyncDistanceTolerance. +// +// 2) SSV node uses EL1, EL2, CL1, CL2; CL1 uses EL1 and other available ELs, CL2 uses any available EL. +// EL1 becomes unavailable. +// +// The execution MultiClient switches to EL2, the consensus multi client remains on CL1, +// which should switch its execution client from EL1 to an available one. +// Possible duty misses up to SyncDistanceTolerance duration +// +// 3) SSV node uses EL1, EL2, CL1, CL2; CL1 uses any available EL, CL2 uses any available EL. +// EL1 becomes unavailable. +// +// The execution MultiClient switches to EL2, the consensus multi client remains on CL1, +// which should remain working. This shouldn't cause significant duty misses. +// +// 4) SSV node uses EL1, EL2, CL1, CL2; CL1 uses only EL1, CL2 uses only EL2. EL1 and CL1 become unavailable. +// +// The execution MultiClient switches to EL2, the consensus multi client switches to CL2, +// This shouldn't cause significant duty misses. type MultiClient struct { // optional logger *zap.Logger From 23d393d8adf8893a2c2fc943a3979632936e5715 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Wed, 29 Jan 2025 22:40:42 -0300 Subject: [PATCH 40/42] improve the comment for the call method --- eth/executionclient/multi_client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 5ab4537068..02bc625c7c 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -392,7 +392,7 @@ func (mc *MultiClient) Close() error { // call calls f for all clients until it succeeds. // -// If there's only one client, call just calls f for it. The maxTries parameter is ignored in this case. +// If there's only one client, call just calls f for it preserving old behavior. The maxTries parameter is ignored in this case. // If maxTries is not 0, it tries all clients in a round-robin logic until the limit is hit, // and if no client is available then it returns an error. // If maxTries is 0, it iterates clients forever. From c8909738eea3cf6052d45a819606af6bb1771751 Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 30 Jan 2025 13:55:00 -0300 Subject: [PATCH 41/42] log client address on submissions --- beacon/goclient/aggregator.go | 13 +++- beacon/goclient/attest.go | 15 +++-- beacon/goclient/committee_subscribe.go | 17 +++-- beacon/goclient/proposer.go | 67 +++++++++++++------ beacon/goclient/sync_committee.go | 17 +++-- .../goclient/sync_committee_contribution.go | 17 +++-- beacon/goclient/voluntary_exit.go | 11 +-- 7 files changed, 106 insertions(+), 51 deletions(-) diff --git a/beacon/goclient/aggregator.go b/beacon/goclient/aggregator.go index 27a8adc958..882628a61a 100644 --- a/beacon/goclient/aggregator.go +++ b/beacon/goclient/aggregator.go @@ -78,10 +78,21 @@ func (gc *GoClient) SubmitAggregateSelectionProof(slot phase0.Slot, committeeInd // SubmitSignedAggregateSelectionProof broadcasts a signed aggregator msg func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitAggregateAttestations"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg}) recordRequestDuration(gc.ctx, "SubmitAggregateAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) - return err + if err != nil { + logger.Error(clResponseErrMsg, zap.Error(err)) + return err + } + + logger.Debug("consensus client submitted signed aggregate attestations") + return nil } // IsAggregator returns true if the signature is from the input validator. The committee diff --git a/beacon/goclient/attest.go b/beacon/goclient/attest.go index 7561e89fd9..a9bf2066cd 100644 --- a/beacon/goclient/attest.go +++ b/beacon/goclient/attest.go @@ -122,16 +122,19 @@ func withCommitteeIndex(data *phase0.AttestationData, committeeIndex phase0.Comm // SubmitAttestations implements Beacon interface func (gc *GoClient) SubmitAttestations(attestations []*phase0.Attestation) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitAttestations"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitAttestations(gc.ctx, attestations) - recordRequestDuration(gc.ctx, "SubmitAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitAttestations", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitAttestations"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) return err } - return err + logger.Debug("consensus client submitted attestations") + return nil } diff --git a/beacon/goclient/committee_subscribe.go b/beacon/goclient/committee_subscribe.go index 3e10d761ea..5a6ce6c1cf 100644 --- a/beacon/goclient/committee_subscribe.go +++ b/beacon/goclient/committee_subscribe.go @@ -11,16 +11,21 @@ import ( // SubmitBeaconCommitteeSubscriptions is implementation for subscribing committee to subnet (p2p topic) func (gc *GoClient) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscription []*eth2apiv1.BeaconCommitteeSubscription) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitBeaconCommitteeSubscriptions"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitBeaconCommitteeSubscriptions(ctx, subscription) - recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitBeaconCommitteeSubscriptions"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) + return err } - return err + + logger.Debug("consensus client submitted beacon committee subscriptions") + return nil } // SubmitSyncCommitteeSubscriptions is implementation for subscribing sync committee to subnet (p2p topic) diff --git a/beacon/goclient/proposer.go b/beacon/goclient/proposer.go index df93c6ff85..a2fc7051aa 100644 --- a/beacon/goclient/proposer.go +++ b/beacon/goclient/proposer.go @@ -194,17 +194,23 @@ func (gc *GoClient) SubmitBlindedBeaconBlock(block *api.VersionedBlindedProposal // (because it must be submitted to the same node that returned that block), // we need to submit it to client(s) directly. if len(gc.clients) == 1 { + clientAddress := gc.clients[0].Address() + logger := gc.log.With( + zap.String("api", "SubmitBlindedProposal"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.clients[0].SubmitBlindedProposal(gc.ctx, opts) - recordRequestDuration(gc.ctx, "SubmitBlindedProposal", gc.clients[0].Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitBlindedProposal", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitBlindedProposal"), + logger.Error(clResponseErrMsg, zap.Error(err), ) return err } + logger.Debug("consensus client submitted blinded beacon block") + return nil } @@ -226,13 +232,17 @@ func (gc *GoClient) SubmitBlindedBeaconBlock(block *api.VersionedBlindedProposal for _, client := range gc.clients { client := client p.Go(func(ctx context.Context) error { + clientAddress := client.Address() + logger := logger.With(zap.String("client_addr", clientAddress)) + if err := client.SubmitBlindedProposal(ctx, opts); err == nil { logger.Debug("consensus client returned an error while submitting blinded proposal. As at least one node must submit successfully, it's expected that some nodes may fail to submit.", - zap.String("client_addr", client.Address()), zap.Error(err)) return err } + logger.Debug("consensus client submitted blinded beacon block") + submissions.Add(1) return nil }) @@ -293,16 +303,21 @@ func (gc *GoClient) SubmitBeaconBlock(block *api.VersionedProposal, sig phase0.B Proposal: signedBlock, } + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitProposal"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitProposal(gc.ctx, opts) - recordRequestDuration(gc.ctx, "SubmitProposal", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitProposal", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitProposal"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) + return err } - return err + + logger.Debug("consensus client submitted beacon block") + return nil } func (gc *GoClient) SubmitValidatorRegistration(registration *api.VersionedSignedValidatorRegistration) error { @@ -317,16 +332,22 @@ func (gc *GoClient) SubmitProposalPreparation(feeRecipients map[phase0.Validator FeeRecipient: recipient, }) } + + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitProposalPreparations"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitProposalPreparations(gc.ctx, preparations) - recordRequestDuration(gc.ctx, "SubmitProposalPreparations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitProposalPreparations", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitProposalPreparations"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) + return err } - return err + + logger.Debug("consensus client submitted proposal preparation") + return nil } func (gc *GoClient) updateBatchRegistrationCache(registration *api.VersionedSignedValidatorRegistration) error { @@ -413,21 +434,23 @@ func (gc *GoClient) submitBatchedRegistrations(slot phase0.Slot, registrations [ bs = len(registrations) } + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitValidatorRegistrations"), + zap.String("client_addr", clientAddress)) + // TODO: Do we need to submit them to all nodes? start := time.Now() err := gc.multiClient.SubmitValidatorRegistrations(gc.ctx, registrations[0:bs]) - recordRequestDuration(gc.ctx, "SubmitValidatorRegistrations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitValidatorRegistrations", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitValidatorRegistrations"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) return err } registrations = registrations[bs:] - gc.log.Info("submitted batched validator registrations", + logger.Info("submitted batched validator registrations", fields.Slot(slot), fields.Count(bs)) } diff --git a/beacon/goclient/sync_committee.go b/beacon/goclient/sync_committee.go index ac36b66766..a110991ce0 100644 --- a/beacon/goclient/sync_committee.go +++ b/beacon/goclient/sync_committee.go @@ -73,14 +73,19 @@ func (gc *GoClient) GetSyncMessageBlockRoot(slot phase0.Slot) (phase0.Root, spec // SubmitSyncMessages submits a signed sync committee msg func (gc *GoClient) SubmitSyncMessages(msgs []*altair.SyncCommitteeMessage) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitSyncCommitteeMessages"), + zap.String("client_addr", clientAddress)) + reqStart := time.Now() err := gc.multiClient.SubmitSyncCommitteeMessages(gc.ctx, msgs) - recordRequestDuration(gc.ctx, "SubmitSyncCommitteeMessages", gc.multiClient.Address(), http.MethodPost, time.Since(reqStart), err) + recordRequestDuration(gc.ctx, "SubmitSyncCommitteeMessages", clientAddress, http.MethodPost, time.Since(reqStart), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitSyncCommitteeMessages"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) + return err } - return err + + logger.Debug("consensus client submitted sync messages") + return nil } diff --git a/beacon/goclient/sync_committee_contribution.go b/beacon/goclient/sync_committee_contribution.go index a665d4ee78..0c6cc1b57e 100644 --- a/beacon/goclient/sync_committee_contribution.go +++ b/beacon/goclient/sync_committee_contribution.go @@ -125,16 +125,21 @@ func (gc *GoClient) GetSyncCommitteeContribution(slot phase0.Slot, selectionProo // SubmitSignedContributionAndProof broadcasts to the network func (gc *GoClient) SubmitSignedContributionAndProof(contribution *altair.SignedContributionAndProof) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitSyncCommitteeContributions"), + zap.String("client_addr", clientAddress)) + start := time.Now() err := gc.multiClient.SubmitSyncCommitteeContributions(gc.ctx, []*altair.SignedContributionAndProof{contribution}) - recordRequestDuration(gc.ctx, "SubmitSyncCommitteeContributions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err) + recordRequestDuration(gc.ctx, "SubmitSyncCommitteeContributions", clientAddress, http.MethodPost, time.Since(start), err) if err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitSyncCommitteeContributions"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) + return err } - return err + + logger.Debug("consensus client submitted signed contribution and proof") + return nil } // waitForOneThirdSlotDuration waits until one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) diff --git a/beacon/goclient/voluntary_exit.go b/beacon/goclient/voluntary_exit.go index fb6d333f3f..01ce53c92d 100644 --- a/beacon/goclient/voluntary_exit.go +++ b/beacon/goclient/voluntary_exit.go @@ -6,13 +6,16 @@ import ( ) func (gc *GoClient) SubmitVoluntaryExit(voluntaryExit *phase0.SignedVoluntaryExit) error { + clientAddress := gc.multiClient.Address() + logger := gc.log.With( + zap.String("api", "SubmitVoluntaryExit"), + zap.String("client_addr", clientAddress)) + if err := gc.multiClient.SubmitVoluntaryExit(gc.ctx, voluntaryExit); err != nil { - gc.log.Error(clResponseErrMsg, - zap.String("api", "SubmitVoluntaryExit"), - zap.Error(err), - ) + logger.Error(clResponseErrMsg, zap.Error(err)) return err } + logger.Debug("consensus client submitted voluntary exit") return nil } From 4751783fa99400bda5ee5583fc7b3528b02407df Mon Sep 17 00:00:00 2001 From: Nikita Kryuchkov Date: Thu, 30 Jan 2025 13:57:09 -0300 Subject: [PATCH 42/42] improve logs --- eth/executionclient/multi_client.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eth/executionclient/multi_client.go b/eth/executionclient/multi_client.go index 02bc625c7c..f6ee4a052a 100644 --- a/eth/executionclient/multi_client.go +++ b/eth/executionclient/multi_client.go @@ -416,7 +416,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi client, err := mc.getClient(ctx, clientIndex) if err != nil { - mc.logger.Warn("client unavailable, switching to next client", + mc.logger.Warn("client unavailable, switching to the next client", zap.String("addr", mc.nodeAddrs[i]), zap.Error(err)) @@ -432,7 +432,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi // Make sure this client is healthy. This shouldn't cause too many requests because the result is cached. // TODO: Make sure the allowed tolerance doesn't cause issues in log streaming. if err := client.Healthy(ctx); err != nil { - logger.Warn("client is not healthy, switching to next client", + logger.Warn("client is not healthy, switching to the next client", zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), zap.Error(err)) @@ -448,7 +448,7 @@ func (mc *MultiClient) call(ctx context.Context, f func(client SingleClientProvi } if err != nil { - logger.Error("call failed, trying another client", + logger.Error("call failed, switching to the next client", zap.String("next_addr", mc.nodeAddrs[nextClientIndex]), zap.Error(err))