Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix(el/cl): allow multi clients to start if at least one node is up #2000

Merged
merged 44 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
4a7ed06
do not crash if one client fails version check
y0sher Jan 22, 2025
1784907
fix help note on multiple addresses
y0sher Jan 22, 2025
6139748
don't compare genensis values
y0sher Jan 22, 2025
54f982f
remove old assertSameGenesis code
y0sher Jan 22, 2025
7a4248d
beacon/goclient: set up connection hooks, assert genesis
nkryuchkov Jan 22, 2025
1025641
beacon/goclient: remove outdated comment
nkryuchkov Jan 23, 2025
af5c455
eth/executionclient: allow starting with unhealthy client
nkryuchkov Jan 23, 2025
3b9efcf
eth/executionclient: simplify mutex usage
nkryuchkov Jan 23, 2025
c6592ad
eth/executionclient: fix tests for assertSameChainID
nkryuchkov Jan 23, 2025
b21c110
eth/executionclient: improve comment for assertSameGenesis
nkryuchkov Jan 23, 2025
902c99a
handle nil genesis and chain ID responses
nkryuchkov Jan 23, 2025
13edb6a
Merge branch 'stage' into fix/multiclient-oneclient
nkryuchkov Jan 24, 2025
e1cc2e3
refactor(multi_client): replace connectedCount atomic int with bool. …
y0sher Jan 26, 2025
2bdc1ac
clarify comment
nkryuchkov Jan 27, 2025
8db421f
fix double mutex lock
nkryuchkov Jan 27, 2025
0dd1027
fix potential nil pointer dereference
nkryuchkov Jan 27, 2025
8452bf4
check only genesis fork version instead of whole genesis
nkryuchkov Jan 27, 2025
d5fed41
create getClient helper method
nkryuchkov Jan 27, 2025
75e2753
improve logs
nkryuchkov Jan 27, 2025
999608b
fix issue with log fields
nkryuchkov Jan 27, 2025
66d5051
atomic chain ID
nkryuchkov Jan 27, 2025
61874f7
fix comment for client mutexes
nkryuchkov Jan 27, 2025
70594e8
no nil assignment in Close
nkryuchkov Jan 27, 2025
3cc71cd
add panic hook
nkryuchkov Jan 27, 2025
7dd5aa0
attempt to fix panic
nkryuchkov Jan 27, 2025
ab91668
fix logging
nkryuchkov Jan 27, 2025
99405c9
iterate clients forever in StreamLogs
nkryuchkov Jan 27, 2025
768665d
set follow distance to 1
nkryuchkov Jan 27, 2025
6af3463
Revert "set follow distance to 1"
nkryuchkov Jan 27, 2025
e9f5c7b
delete healthy channel
nkryuchkov Jan 27, 2025
f093561
remove obsolete tests
nkryuchkov Jan 28, 2025
7b3a1f5
add successful call log
nkryuchkov Jan 28, 2025
16dc397
fix potential nil ptr dereference
nkryuchkov Jan 28, 2025
95af801
go-eth2-client: use fork with sync distance tolerance
nkryuchkov Jan 28, 2025
411f482
add a comment about using github.com/nkryuchkov/go-eth2-client
nkryuchkov Jan 28, 2025
cbd61f5
code review comments
nkryuchkov Jan 28, 2025
f8188c6
named consensus client logger
nkryuchkov Jan 29, 2025
5b72d6a
remove trace logs
nkryuchkov Jan 29, 2025
77ca3cf
fix a typo
nkryuchkov Jan 29, 2025
0406ad3
add a comment with execution client shutdown scenarios
nkryuchkov Jan 30, 2025
23d393d
improve the comment for the call method
nkryuchkov Jan 30, 2025
85779ec
Merge branch 'stage' into fix/multiclient-oneclient
nkryuchkov Jan 30, 2025
c890973
log client address on submissions
nkryuchkov Jan 30, 2025
4751783
improve logs
nkryuchkov Jan 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion beacon/goclient/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,21 @@

// SubmitSignedAggregateSelectionProof broadcasts a signed aggregator msg
func (gc *GoClient) SubmitSignedAggregateSelectionProof(msg *phase0.SignedAggregateAndProof) error {
clientAddress := gc.multiClient.Address()
logger := gc.log.With(
zap.String("api", "SubmitAggregateAttestations"),
zap.String("client_addr", clientAddress))

Check warning on line 85 in beacon/goclient/aggregator.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/aggregator.go#L81-L85

Added lines #L81 - L85 were not covered by tests
start := time.Now()
err := gc.multiClient.SubmitAggregateAttestations(gc.ctx, []*phase0.SignedAggregateAndProof{msg})
recordRequestDuration(gc.ctx, "SubmitAggregateAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err)
return err
if err != nil {
logger.Error(clResponseErrMsg, zap.Error(err))
return err
}

Check warning on line 92 in beacon/goclient/aggregator.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/aggregator.go#L89-L92

Added lines #L89 - L92 were not covered by tests

logger.Debug("consensus client submitted signed aggregate attestations")
return nil

Check warning on line 95 in beacon/goclient/aggregator.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/aggregator.go#L94-L95

Added lines #L94 - L95 were not covered by tests
}

// IsAggregator returns true if the signature is from the input validator. The committee
Expand Down
15 changes: 9 additions & 6 deletions beacon/goclient/attest.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,16 +122,19 @@

// SubmitAttestations implements Beacon interface
func (gc *GoClient) SubmitAttestations(attestations []*phase0.Attestation) error {
clientAddress := gc.multiClient.Address()
logger := gc.log.With(
zap.String("api", "SubmitAttestations"),
zap.String("client_addr", clientAddress))

Check warning on line 129 in beacon/goclient/attest.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/attest.go#L125-L129

Added lines #L125 - L129 were not covered by tests
start := time.Now()
err := gc.multiClient.SubmitAttestations(gc.ctx, attestations)
recordRequestDuration(gc.ctx, "SubmitAttestations", gc.multiClient.Address(), http.MethodPost, time.Since(start), err)
recordRequestDuration(gc.ctx, "SubmitAttestations", clientAddress, http.MethodPost, time.Since(start), err)

Check warning on line 132 in beacon/goclient/attest.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/attest.go#L132

Added line #L132 was not covered by tests
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("api", "SubmitAttestations"),
zap.Error(err),
)
logger.Error(clResponseErrMsg, zap.Error(err))

Check warning on line 134 in beacon/goclient/attest.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/attest.go#L134

Added line #L134 was not covered by tests
return err
}

return err
logger.Debug("consensus client submitted attestations")
return nil

Check warning on line 139 in beacon/goclient/attest.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/attest.go#L138-L139

Added lines #L138 - L139 were not covered by tests
}
17 changes: 11 additions & 6 deletions beacon/goclient/committee_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@

// SubmitBeaconCommitteeSubscriptions is implementation for subscribing committee to subnet (p2p topic)
func (gc *GoClient) SubmitBeaconCommitteeSubscriptions(ctx context.Context, subscription []*eth2apiv1.BeaconCommitteeSubscription) error {
clientAddress := gc.multiClient.Address()
logger := gc.log.With(
zap.String("api", "SubmitBeaconCommitteeSubscriptions"),
zap.String("client_addr", clientAddress))

Check warning on line 18 in beacon/goclient/committee_subscribe.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/committee_subscribe.go#L14-L18

Added lines #L14 - L18 were not covered by tests
start := time.Now()
err := gc.multiClient.SubmitBeaconCommitteeSubscriptions(ctx, subscription)
recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", gc.multiClient.Address(), http.MethodPost, time.Since(start), err)
recordRequestDuration(gc.ctx, "SubmitBeaconCommitteeSubscriptions", clientAddress, http.MethodPost, time.Since(start), err)

Check warning on line 21 in beacon/goclient/committee_subscribe.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/committee_subscribe.go#L21

Added line #L21 was not covered by tests
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("api", "SubmitBeaconCommitteeSubscriptions"),
zap.Error(err),
)
logger.Error(clResponseErrMsg, zap.Error(err))
return err

Check warning on line 24 in beacon/goclient/committee_subscribe.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/committee_subscribe.go#L23-L24

Added lines #L23 - L24 were not covered by tests
}
return err

logger.Debug("consensus client submitted beacon committee subscriptions")
return nil

Check warning on line 28 in beacon/goclient/committee_subscribe.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/committee_subscribe.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}

// SubmitSyncCommitteeSubscriptions is implementation for subscribing sync committee to subnet (p2p topic)
Expand Down
215 changes: 120 additions & 95 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"math"
"strings"
"sync"
"sync/atomic"
"time"

eth2client "github.com/attestantio/go-eth2-client"
Expand Down Expand Up @@ -114,6 +115,8 @@
clients []Client
multiClient MultiClient

genesisVersion atomic.Pointer[phase0.Version]

syncDistanceTolerance phase0.Slot
nodeSyncingFn func(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error)

Expand Down Expand Up @@ -154,82 +157,41 @@
longTimeout = DefaultLongTimeout
}

client := &GoClient{
log: logger.Named("consensus_client"),
ctx: opt.Context,
network: opt.Network,
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance),
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
attestationDataCache: ttlcache.New(
// we only fetch attestation data during the slot of the relevant duty (and never later),
// hence caching it for 2 slots is sufficient
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()),
),
commonTimeout: commonTimeout,
longTimeout: longTimeout,
}

beaconAddrList := strings.Split(opt.BeaconNodeAddr, ";") // TODO: Decide what symbol to use as a separator. Bootnodes are currently separated by ";". Deployment bot currently uses ",".
if len(beaconAddrList) == 0 {
return nil, fmt.Errorf("no beacon node address provided")
}

var consensusClients []Client
var consensusClientsAsServices []eth2client.Service
for _, beaconAddr := range beaconAddrList {
httpClient, err := setupHTTPClient(opt.Context, logger, beaconAddr, commonTimeout)
if err != nil {
if err := client.addSingleClient(opt.Context, beaconAddr); err != nil {
return nil, err
}

nodeVersionResp, err := httpClient.NodeVersion(opt.Context, &api.NodeVersionOpts{})
if err != nil {
logger.Error(clResponseErrMsg,
zap.String("api", "NodeVersion"),
zap.Error(err),
)
return nil, fmt.Errorf("failed to get node version: %w", err)
}
if nodeVersionResp == nil {
logger.Error(clNilResponseErrMsg,
zap.String("api", "NodeVersion"),
)
return nil, fmt.Errorf("node version response is nil")
}

logger.Info("consensus client connected",
fields.Name(httpClient.Name()),
fields.Address(httpClient.Address()),
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))),
zap.String("version", nodeVersionResp.Data),
)

consensusClients = append(consensusClients, httpClient)
consensusClientsAsServices = append(consensusClientsAsServices, httpClient)
}

err := assertSameGenesis(opt.Context, consensusClients...)
if err != nil {
return nil, fmt.Errorf("assert same spec: %w", err)
}

multiClient, err := eth2clientmulti.New(
opt.Context,
eth2clientmulti.WithClients(consensusClientsAsServices),
eth2clientmulti.WithLogLevel(zerolog.DebugLevel),
eth2clientmulti.WithTimeout(commonTimeout),
)
err := client.initMultiClient(opt.Context)
if err != nil {
logger.Error("Consensus multi client initialization failed",
zap.String("address", opt.BeaconNodeAddr),
zap.Error(err),
)

return nil, fmt.Errorf("create multi client: %w", err)
}
consensusClient := multiClient.(*eth2clientmulti.Service)

client := &GoClient{
log: logger,
ctx: opt.Context,
network: opt.Network,
clients: consensusClients,
multiClient: consensusClient,
syncDistanceTolerance: phase0.Slot(opt.SyncDistanceTolerance),
operatorDataStore: operatorDataStore,
registrationCache: map[phase0.BLSPubKey]*api.VersionedSignedValidatorRegistration{},
attestationDataCache: ttlcache.New(
// we only fetch attestation data during the slot of the relevant duty (and never later),
// hence caching it for 2 slots is sufficient
ttlcache.WithTTL[phase0.Slot, *phase0.AttestationData](2 * opt.Network.SlotDurationSec()),
),
commonTimeout: commonTimeout,
longTimeout: longTimeout,
return nil, err
}

client.nodeSyncingFn = client.nodeSyncing
Expand All @@ -241,68 +203,131 @@
return client, nil
}

func setupHTTPClient(ctx context.Context, logger *zap.Logger, addr string, commonTimeout time.Duration) (*eth2clienthttp.Service, error) {
func (gc *GoClient) initMultiClient(ctx context.Context) error {
var services []eth2client.Service
for _, client := range gc.clients {
services = append(services, client)
}

multiClient, err := eth2clientmulti.New(
ctx,
eth2clientmulti.WithClients(services),
eth2clientmulti.WithLogLevel(zerolog.DebugLevel),
eth2clientmulti.WithTimeout(gc.commonTimeout),
)
if err != nil {
return fmt.Errorf("create multi client: %w", err)
}

gc.multiClient = multiClient.(*eth2clientmulti.Service)
return nil
}

func (gc *GoClient) addSingleClient(ctx context.Context, addr string) error {
httpClient, err := eth2clienthttp.New(
ctx,
// WithAddress supplies the address of the beacon node, in host:port format.
eth2clienthttp.WithAddress(addr),
// LogLevel supplies the level of logging to carry out.
eth2clienthttp.WithLogLevel(zerolog.DebugLevel),
eth2clienthttp.WithTimeout(commonTimeout),
eth2clienthttp.WithTimeout(gc.commonTimeout),
eth2clienthttp.WithReducedMemoryUsage(true),
eth2clienthttp.WithAllowDelayedStart(true),
eth2clienthttp.WithHooks(gc.singleClientHooks()),
eth2clienthttp.WithSyncDistanceTolerance(gc.syncDistanceTolerance),
)
if err != nil {
logger.Error("Consensus http client initialization failed",
gc.log.Error("Consensus http client initialization failed",

Check warning on line 240 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L240

Added line #L240 was not covered by tests
zap.String("address", addr),
zap.Error(err),
)

return nil, fmt.Errorf("create http client: %w", err)
return fmt.Errorf("create http client: %w", err)

Check warning on line 245 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L245

Added line #L245 was not covered by tests
Comment on lines -256 to +245
Copy link
Contributor

@iurii-ssv iurii-ssv Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really specific to this PR but we often (and here as well) do 2 "duplicate" things

  • log error
  • return that same error (that's always gonna be logged eventually by the caller resulting in roughly duplicate log-line)

maybe would be simpler to just return error (with formatted with fmt.Errorf to provide the necessary context) in places like this, bringing it up so we can get on the same page (whether we want to keep an eye on things like this or not)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main difference and issue is that when logging with zap and adding fields its easy to search them by the label value. we'll need to squeeze everything to the fmt.Errorf, it'll still be searchable, but not by label.

Copy link
Contributor

@nkryuchkov nkryuchkov Jan 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we could think more about our logging approach, but I think this package currently uses logging in a way similar to other packages. If we decide to improve logging (e.g. use custom error types with fields), we need to do it project-wide

}

return httpClient.(*eth2clienthttp.Service), nil
}

// assertSameGenesis should receive a non-empty list
func assertSameGenesis(ctx context.Context, services ...Client) error {
firstGenesis, err := services[0].Genesis(ctx, &api.GenesisOpts{})
if err != nil {
return fmt.Errorf("get first genesis: %w", err)
}

for _, service := range services[1:] {
srvGenesis, err := service.Genesis(ctx, &api.GenesisOpts{})
if err != nil {
return fmt.Errorf("get service genesis: %w", err)
}

if err := sameGenesis(firstGenesis.Data, srvGenesis.Data); err != nil {
return fmt.Errorf("different genesis: %w", err)
}
}
gc.clients = append(gc.clients, httpClient.(*eth2clienthttp.Service))

return nil
}

func sameGenesis(a, b *apiv1.Genesis) error {
if a == nil || b == nil { // Input parameters should never be nil, so the check may fail if both are nil
return fmt.Errorf("genesis is nil")
}
func (gc *GoClient) singleClientHooks() *eth2clienthttp.Hooks {
return &eth2clienthttp.Hooks{
OnActive: func(ctx context.Context, s *eth2clienthttp.Service) {
// If err is nil, nodeVersionResp is never nil.
iurii-ssv marked this conversation as resolved.
Show resolved Hide resolved
nodeVersionResp, err := s.NodeVersion(ctx, &api.NodeVersionOpts{})
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("address", s.Address()),
zap.String("api", "NodeVersion"),
zap.Error(err),
)
return
}

Check warning on line 265 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L259-L265

Added lines #L259 - L265 were not covered by tests

gc.log.Info("consensus client connected",
fields.Name(s.Name()),
fields.Address(s.Address()),
zap.String("client", string(ParseNodeClient(nodeVersionResp.Data))),
zap.String("version", nodeVersionResp.Data),
)

if !a.GenesisTime.Equal(b.GenesisTime) {
return fmt.Errorf("genesis time mismatch, got %v and %v", a.GenesisTime, b.GenesisTime)
genesis, err := s.Genesis(ctx, &api.GenesisOpts{})
if err != nil {
gc.log.Error(clResponseErrMsg,
zap.String("address", s.Address()),
zap.String("api", "Genesis"),
zap.Error(err),
)
return
}

Check warning on line 282 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L276-L282

Added lines #L276 - L282 were not covered by tests

if expected, err := gc.assertSameGenesisVersion(genesis.Data.GenesisForkVersion); err != nil {
gc.log.Fatal("client returned unexpected genesis fork version, make sure all clients use the same Ethereum network",
zap.String("address", s.Address()),
zap.Any("client_genesis", genesis.Data.GenesisForkVersion),
zap.Any("expected_genesis", expected),
zap.Error(err),
)
return // Tests may override Fatal's behavior
}

Check warning on line 292 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L285-L292

Added lines #L285 - L292 were not covered by tests
},
OnInactive: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Warn("consensus client disconnected",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},

Check warning on line 299 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L294-L299

Added lines #L294 - L299 were not covered by tests
OnSynced: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Info("consensus client synced",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},
OnDesynced: func(ctx context.Context, s *eth2clienthttp.Service) {
gc.log.Warn("consensus client desynced",
fields.Name(s.Name()),
fields.Address(s.Address()),
)
},

Check warning on line 311 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L306-L311

Added lines #L306 - L311 were not covered by tests
}
}

if a.GenesisValidatorsRoot != b.GenesisValidatorsRoot {
return fmt.Errorf("genesis validators root mismatch, got %v and %v", a.GenesisValidatorsRoot, b.GenesisValidatorsRoot)
// assertSameGenesis checks if genesis is same.
// Clients may have different values returned by Spec call,
// so we decided that it's best to assert that GenesisForkVersion is the same.
// To add more assertions, we check the whole apiv1.Genesis (GenesisTime and GenesisValidatorsRoot)
// as they should be same too.
func (gc *GoClient) assertSameGenesisVersion(genesisVersion phase0.Version) (phase0.Version, error) {
if gc.genesisVersion.CompareAndSwap(nil, &genesisVersion) {
return genesisVersion, nil
}

if a.GenesisForkVersion != b.GenesisForkVersion {
return fmt.Errorf("genesis fork version mismatch, got %v and %v", a.GenesisForkVersion, b.GenesisForkVersion)
expected := *gc.genesisVersion.Load()
if expected != genesisVersion {
return expected, fmt.Errorf("genesis fork version mismatch, expected %v, got %v", expected, genesisVersion)

Check warning on line 327 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L325-L327

Added lines #L325 - L327 were not covered by tests
}

return nil
return expected, nil

Check warning on line 330 in beacon/goclient/goclient.go

View check run for this annotation

Codecov / codecov/patch

beacon/goclient/goclient.go#L330

Added line #L330 was not covered by tests
}

func (gc *GoClient) nodeSyncing(ctx context.Context, opts *api.NodeSyncingOpts) (*api.Response[*apiv1.SyncState], error) {
Expand Down
2 changes: 1 addition & 1 deletion beacon/goclient/goclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestTimeouts(t *testing.T) {
return nil
})
_, err := mockClient(ctx, undialableServer.URL, commonTimeout, longTimeout)
require.ErrorContains(t, err, "context deadline exceeded")
require.ErrorContains(t, err, "client is not active")
}

// Too slow to respond to the Validators request.
Expand Down
Loading