From f0f12ecff3118a88ec686cd5081660a690ac2b33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 14 May 2024 11:02:27 +0200 Subject: [PATCH 1/7] validator/client: process Sync Committee roles separately In a DV context, to be compatible with the proposed selection endpoint, the VC must push all partial selections to it instead of just one. Process sync committee role separately within the RolesAt method, so that partial selections can be pushed to the DV client appropriately, if configured. --- testing/util/state_test.go | 97 ------------------------------ validator/client/validator.go | 107 ++++++++++++++++++++++------------ 2 files changed, 69 insertions(+), 135 deletions(-) delete mode 100644 testing/util/state_test.go diff --git a/testing/util/state_test.go b/testing/util/state_test.go deleted file mode 100644 index 57362f7072c6..000000000000 --- a/testing/util/state_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package util - -import ( - "context" - "testing" - - ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v5/testing/assert" - "github.com/prysmaticlabs/prysm/v5/testing/require" -) - -func TestNewBeaconState(t *testing.T) { - st, err := NewBeaconState() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconState{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconStateAltair(t *testing.T) { - st, err := NewBeaconStateAltair() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconStateAltair{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconStateBellatrix(t *testing.T) { - st, err := NewBeaconStateBellatrix() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconStateBellatrix{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconStateCapella(t *testing.T) { - st, err := NewBeaconStateCapella() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconStateCapella{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconStateDeneb(t *testing.T) { - st, err := NewBeaconStateDeneb() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconStateDeneb{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconStateElectra(t *testing.T) { - st, err := NewBeaconStateElectra() - require.NoError(t, err) - b, err := st.MarshalSSZ() - require.NoError(t, err) - got := ðpb.BeaconStateElectra{} - require.NoError(t, got.UnmarshalSSZ(b)) - assert.DeepEqual(t, st.ToProtoUnsafe(), got) -} - -func TestNewBeaconState_HashTreeRoot(t *testing.T) { - st, err := NewBeaconState() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) - st, err = NewBeaconStateAltair() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) - st, err = NewBeaconStateBellatrix() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) - st, err = NewBeaconStateCapella() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) - st, err = NewBeaconStateDeneb() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) - st, err = NewBeaconStateElectra() - require.NoError(t, err) - _, err = st.HashTreeRoot(context.Background()) - require.NoError(t, err) -} diff --git a/validator/client/validator.go b/validator/client/validator.go index 83a0c431c155..c6032bec1cdc 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -682,11 +682,19 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes // RolesAt slot returns the validator roles at the given slot. Returns nil if the // validator is known to not have a roles at the slot. Returns UNKNOWN if the -// validator assignments are unknown. Otherwise returns a valid ValidatorRole map. +// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map. func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) { v.dutiesLock.RLock() defer v.dutiesLock.RUnlock() - rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + var ( + rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + // store sync committee duties pubkeys and share indices in slices for + // potential DV processing + syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) + ) + for validator, duty := range v.duties.CurrentEpochDuties { var roles []iface.ValidatorRole @@ -701,6 +709,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie } } } + if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) @@ -726,19 +735,11 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if duty.IsSyncCommittee { roles = append(roles, iface.RoleSyncCommittee) inSyncCommittee = true - } - } - if inSyncCommittee { - aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) - if err != nil { - return nil, errors.Wrap(err, "could not check if a validator is a sync committee aggregator") - } - if aggregator { - roles = append(roles, iface.RoleSyncCommitteeAggregator) + syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey) } } - if len(roles) == 0 { + if len(roles) == 0 && !inSyncCommittee { roles = append(roles, iface.RoleUnknown) } @@ -746,6 +747,28 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie copy(pubKey[:], duty.PublicKey) rolesAt[pubKey] = roles } + + aggregator, err := v.isSyncCommitteeAggregator( + ctx, + slot, + syncCommitteeValidators, + ) + + if err != nil { + return nil, errors.Wrap(err, "could not check if validators are a sync committee aggregator") + } + + for valIdx, isAgg := range aggregator { + if isAgg { + valPubkey, ok := syncCommitteeValidators[valIdx] + if !ok { + return nil, errors.New("validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + } + + rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator) + } + } + return rolesAt, nil } @@ -794,51 +817,59 @@ func (v *validator) isAggregator(ctx context.Context, committee []primitives.Val // // modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE) // return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0 -func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) { - res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ - PublicKey: pubKey[:], - Slot: slot, - }) - if err != nil { - return false, err - } +func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) { + var ( + selections []iface.SyncCommitteeSelection + isAgg = make(map[primitives.ValidatorIndex]bool) + ) + + for valIdx, pubKey := range validators { + res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ + PublicKey: pubKey[:], + Slot: slot, + }) - var selections []iface.SyncCommitteeSelection - for _, index := range res.Indices { - subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount - subnet := uint64(index) / subCommitteeSize - sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) if err != nil { - return false, err + return nil, errors.Wrap(err, "can't fetch sync subcommittee index") + } + + for _, index := range res.Indices { + subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount + subnet := uint64(index) / subCommitteeSize + sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) + if err != nil { + return nil, errors.Wrap(err, "can't sign selection data") + } + + selections = append(selections, iface.SyncCommitteeSelection{ + SelectionProof: sig, + Slot: slot, + SubcommitteeIndex: primitives.CommitteeIndex(subnet), + ValidatorIndex: valIdx, + }) } - selections = append(selections, iface.SyncCommitteeSelection{ - SelectionProof: sig, - Slot: slot, - SubcommitteeIndex: primitives.CommitteeIndex(subnet), - ValidatorIndex: validatorIndex, - }) } // Override selections with aggregated ones if the node is part of a Distributed Validator. if v.distributed && len(selections) > 0 { + var err error selections, err = v.validatorClient.GetAggregatedSyncSelections(ctx, selections) if err != nil { - return false, errors.Wrap(err, "failed to get aggregated sync selections") + return nil, errors.Wrap(err, "failed to get aggregated sync selections") } } for _, s := range selections { isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof) if err != nil { - return false, err - } - if isAggregator { - return true, nil + return nil, errors.Wrap(err, "can't detect sync committee aggregator") } + + isAgg[s.ValidatorIndex] = isAggregator } - return false, nil + return isAgg, nil } // UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when From cfa960130073c20ba50f840b49ab099bd03f6e8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 14 May 2024 12:10:20 +0200 Subject: [PATCH 2/7] testing/util: re-add erroneously deleted state_test.go --- testing/util/state_test.go | 97 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 testing/util/state_test.go diff --git a/testing/util/state_test.go b/testing/util/state_test.go new file mode 100644 index 000000000000..57362f7072c6 --- /dev/null +++ b/testing/util/state_test.go @@ -0,0 +1,97 @@ +package util + +import ( + "context" + "testing" + + ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" + "github.com/prysmaticlabs/prysm/v5/testing/assert" + "github.com/prysmaticlabs/prysm/v5/testing/require" +) + +func TestNewBeaconState(t *testing.T) { + st, err := NewBeaconState() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconState{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconStateAltair(t *testing.T) { + st, err := NewBeaconStateAltair() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconStateAltair{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconStateBellatrix(t *testing.T) { + st, err := NewBeaconStateBellatrix() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconStateBellatrix{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconStateCapella(t *testing.T) { + st, err := NewBeaconStateCapella() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconStateCapella{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconStateDeneb(t *testing.T) { + st, err := NewBeaconStateDeneb() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconStateDeneb{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconStateElectra(t *testing.T) { + st, err := NewBeaconStateElectra() + require.NoError(t, err) + b, err := st.MarshalSSZ() + require.NoError(t, err) + got := ðpb.BeaconStateElectra{} + require.NoError(t, got.UnmarshalSSZ(b)) + assert.DeepEqual(t, st.ToProtoUnsafe(), got) +} + +func TestNewBeaconState_HashTreeRoot(t *testing.T) { + st, err := NewBeaconState() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) + st, err = NewBeaconStateAltair() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) + st, err = NewBeaconStateBellatrix() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) + st, err = NewBeaconStateCapella() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) + st, err = NewBeaconStateDeneb() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) + st, err = NewBeaconStateElectra() + require.NoError(t, err) + _, err = st.HashTreeRoot(context.Background()) + require.NoError(t, err) +} From e88c68a3c07586bdb1a107f6a9a83f53a634b094 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 14 May 2024 12:20:56 +0200 Subject: [PATCH 3/7] validator/client: fix tests --- validator/client/validator.go | 1 - validator/client/validator_test.go | 24 ++++++++++++++++-------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index c6032bec1cdc..64897b2820c0 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -848,7 +848,6 @@ func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitiv ValidatorIndex: valIdx, }) } - } // Override selections with aggregated ones if the node is part of a Distributed Validator. diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 3bdb48003d7d..9f7414e162de 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -1258,9 +1258,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{}, nil /*err*/) - aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, false, aggregator) + require.Equal(t, false, aggregator[0]) c := params.BeaconConfig().Copy() c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64 @@ -1279,9 +1281,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/) - aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, true, aggregator) + require.Equal(t, true, aggregator[0]) }) } } @@ -1305,9 +1309,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{}, nil /*err*/) - aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, false, aggregator) + require.Equal(t, false, aggregator[0]) c := params.BeaconConfig().Copy() c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64 @@ -1340,9 +1346,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) { []iface.SyncCommitteeSelection{selection}, ).Return([]iface.SyncCommitteeSelection{selection}, nil) - aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 123) + aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 123: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, true, aggregator) + require.Equal(t, true, aggregator[123]) }) } } From f7cded9a9e6c40911c19146c1dc7e70da3cd06b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Tue, 14 May 2024 12:52:09 +0200 Subject: [PATCH 4/7] validator/client: always process sync committee validator Process sync committee duty at slot boundary as well. --- validator/client/validator.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index 64897b2820c0..487ac191327a 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -735,11 +735,14 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if duty.IsSyncCommittee { roles = append(roles, iface.RoleSyncCommittee) inSyncCommittee = true - syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey) } } - if len(roles) == 0 && !inSyncCommittee { + if inSyncCommittee { + syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey) + } + + if len(roles) == 0 { roles = append(roles, iface.RoleUnknown) } From a4308fd2b8cf0f544b8914071d7f05495493ba82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Wed, 29 May 2024 14:28:25 +0200 Subject: [PATCH 5/7] don't fail if validator is marked as sync committee but it is not in the list ignore the duty and continue --- validator/client/validator.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index 6957fc18ac1b..e39f1b61b017 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -764,7 +764,10 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if isAgg { valPubkey, ok := syncCommitteeValidators[valIdx] if !ok { - return nil, errors.New("validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + log. + WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(valPubkey[:]))). + Warn("validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + continue } rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator) From c74ca105bc56b904963e044b07a61070a29eb716 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Wed, 29 May 2024 16:39:16 +0200 Subject: [PATCH 6/7] address code review comments --- validator/client/validator.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index d54eb513c737..0a7fb33fa115 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -760,7 +760,8 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie ) if err != nil { - return nil, errors.Wrap(err, "could not check if validators are a sync committee aggregator") + log.WithError(err).Error("Could not check if any validator is a sync committee aggregator") + return rolesAt, nil } for valIdx, isAgg := range aggregator { @@ -769,7 +770,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie if !ok { log. WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(valPubkey[:]))). - Warn("validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + Warn("Validator is marked as sync committee aggregator but cannot be found in sync committee validator list") continue } From 8c7b4706886f78ae1ad9ab26b7f056ab44af4583 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gianguido=20Sor=C3=A0?= Date: Mon, 1 Jul 2024 09:56:17 +0200 Subject: [PATCH 7/7] fix build --- validator/client/validator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/validator/client/validator.go b/validator/client/validator.go index 7a7f82ef1d2c..447f8e919fa2 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -866,7 +866,7 @@ func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitiv ) for valIdx, pubKey := range validators { - res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ + res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ PublicKey: pubKey[:], Slot: slot, })