diff --git a/validator/client/validator.go b/validator/client/validator.go index 00a94f30f378..447f8e919fa2 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -701,14 +701,22 @@ func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.Duties // RolesAt slot returns the validator roles at the given slot. Returns nil if the // validator is known to not have a roles at the slot. Returns UNKNOWN if the -// validator assignments are unknown. Otherwise returns a valid ValidatorRole map. +// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map. func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) { ctx, span := trace.StartSpan(ctx, "validator.RolesAt") defer span.End() v.dutiesLock.RLock() defer v.dutiesLock.RUnlock() - rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + var ( + rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole) + + // store sync committee duties pubkeys and share indices in slices for + // potential DV processing + syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) + ) + for validator, duty := range v.duties.CurrentEpochDuties { var roles []iface.ValidatorRole @@ -723,6 +731,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie } } } + if duty.AttesterSlot == slot { roles = append(roles, iface.RoleAttester) @@ -751,15 +760,9 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie inSyncCommittee = true } } + if inSyncCommittee { - aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex) - if err != nil { - aggregator = false - log.WithError(err).Errorf("Could not check if validator %#x is an aggregator", bytesutil.Trunc(duty.PublicKey)) - } - if aggregator { - roles = append(roles, iface.RoleSyncCommitteeAggregator) - } + syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey) } if len(roles) == 0 { @@ -770,6 +773,32 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie copy(pubKey[:], duty.PublicKey) rolesAt[pubKey] = roles } + + aggregator, err := v.isSyncCommitteeAggregator( + ctx, + slot, + syncCommitteeValidators, + ) + + if err != nil { + log.WithError(err).Error("Could not check if any validator is a sync committee aggregator") + return rolesAt, nil + } + + for valIdx, isAgg := range aggregator { + if isAgg { + valPubkey, ok := syncCommitteeValidators[valIdx] + if !ok { + log. + WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(valPubkey[:]))). + Warn("Validator is marked as sync committee aggregator but cannot be found in sync committee validator list") + continue + } + + rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator) + } + } + return rolesAt, nil } @@ -827,54 +856,61 @@ func (v *validator) isAggregator( // // modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE) // return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0 -func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) { +func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) { ctx, span := trace.StartSpan(ctx, "validator.isSyncCommitteeAggregator") defer span.End() - res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ - PublicKey: pubKey[:], - Slot: slot, - }) - if err != nil { - return false, err - } + var ( + selections []iface.SyncCommitteeSelection + isAgg = make(map[primitives.ValidatorIndex]bool) + ) + + for valIdx, pubKey := range validators { + res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, ðpb.SyncSubcommitteeIndexRequest{ + PublicKey: pubKey[:], + Slot: slot, + }) - var selections []iface.SyncCommitteeSelection - for _, index := range res.Indices { - subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount - subnet := uint64(index) / subCommitteeSize - sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) if err != nil { - return false, err + return nil, errors.Wrap(err, "can't fetch sync subcommittee index") } - selections = append(selections, iface.SyncCommitteeSelection{ - SelectionProof: sig, - Slot: slot, - SubcommitteeIndex: primitives.CommitteeIndex(subnet), - ValidatorIndex: validatorIndex, - }) + for _, index := range res.Indices { + subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount + subnet := uint64(index) / subCommitteeSize + sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot) + if err != nil { + return nil, errors.Wrap(err, "can't sign selection data") + } + + selections = append(selections, iface.SyncCommitteeSelection{ + SelectionProof: sig, + Slot: slot, + SubcommitteeIndex: primitives.CommitteeIndex(subnet), + ValidatorIndex: valIdx, + }) + } } // Override selections with aggregated ones if the node is part of a Distributed Validator. if v.distributed && len(selections) > 0 { + var err error selections, err = v.validatorClient.AggregatedSyncSelections(ctx, selections) if err != nil { - return false, errors.Wrap(err, "failed to get aggregated sync selections") + return nil, errors.Wrap(err, "failed to get aggregated sync selections") } } for _, s := range selections { isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof) if err != nil { - return false, err - } - if isAggregator { - return true, nil + return nil, errors.Wrap(err, "can't detect sync committee aggregator") } + + isAgg[s.ValidatorIndex] = isAggregator } - return false, nil + return isAgg, nil } // UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 88214ff28494..98b96d02db85 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -1263,9 +1263,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{}, nil /*err*/) - aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, false, aggregator) + require.Equal(t, false, aggregator[0]) c := params.BeaconConfig().Copy() c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64 @@ -1284,9 +1286,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/) - aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, true, aggregator) + require.Equal(t, true, aggregator[0]) }) } } @@ -1310,9 +1314,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) { }, ).Return(ðpb.SyncSubcommitteeIndexResponse{}, nil /*err*/) - aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0) + aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 0: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, false, aggregator) + require.Equal(t, false, aggregator[0]) c := params.BeaconConfig().Copy() c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64 @@ -1345,9 +1351,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) { []iface.SyncCommitteeSelection{selection}, ).Return([]iface.SyncCommitteeSelection{selection}, nil) - aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 123) + aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{ + 123: bytesutil.ToBytes48(pubKey), + }) require.NoError(t, err) - require.Equal(t, true, aggregator) + require.Equal(t, true, aggregator[123]) }) } }