Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validator/client: process Sync Committee roles separately #13995

Merged
merged 13 commits into from
Jul 1, 2024
Merged
108 changes: 72 additions & 36 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,14 +701,22 @@ func (v *validator) subscribeToSubnets(ctx context.Context, duties *ethpb.Duties

// RolesAt slot returns the validator roles at the given slot. Returns nil if the
// validator is known to not have a roles at the slot. Returns UNKNOWN if the
// validator assignments are unknown. Otherwise returns a valid ValidatorRole map.
// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map.
func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) {
ctx, span := trace.StartSpan(ctx, "validator.RolesAt")
defer span.End()

v.dutiesLock.RLock()
defer v.dutiesLock.RUnlock()
rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

var (
rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

// store sync committee duties pubkeys and share indices in slices for
// potential DV processing
syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte)
)

for validator, duty := range v.duties.CurrentEpochDuties {
var roles []iface.ValidatorRole

Expand All @@ -723,6 +731,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
}
}
}

if duty.AttesterSlot == slot {
roles = append(roles, iface.RoleAttester)

Expand Down Expand Up @@ -751,15 +760,9 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
inSyncCommittee = true
}
}

if inSyncCommittee {
aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex)
if err != nil {
aggregator = false
log.WithError(err).Errorf("Could not check if validator %#x is an aggregator", bytesutil.Trunc(duty.PublicKey))
}
if aggregator {
roles = append(roles, iface.RoleSyncCommitteeAggregator)
}
syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey)
}

if len(roles) == 0 {
Expand All @@ -770,6 +773,32 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
copy(pubKey[:], duty.PublicKey)
rolesAt[pubKey] = roles
}

aggregator, err := v.isSyncCommitteeAggregator(
ctx,
slot,
syncCommitteeValidators,
)

if err != nil {
log.WithError(err).Error("Could not check if any validator is a sync committee aggregator")
return rolesAt, nil
}

for valIdx, isAgg := range aggregator {
if isAgg {
valPubkey, ok := syncCommitteeValidators[valIdx]
if !ok {
log.
WithField("pubkey", fmt.Sprintf("%#x", bytesutil.Trunc(valPubkey[:]))).
Warn("Validator is marked as sync committee aggregator but cannot be found in sync committee validator list")
continue
}

rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator)
}
}

return rolesAt, nil
}

Expand Down Expand Up @@ -827,54 +856,61 @@ func (v *validator) isAggregator(
//
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) {
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) {
ctx, span := trace.StartSpan(ctx, "validator.isSyncCommitteeAggregator")
defer span.End()

res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})
if err != nil {
return false, err
}
var (
selections []iface.SyncCommitteeSelection
isAgg = make(map[primitives.ValidatorIndex]bool)
)

for valIdx, pubKey := range validators {
res, err := v.validatorClient.SyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})

var selections []iface.SyncCommitteeSelection
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return false, err
return nil, errors.Wrap(err, "can't fetch sync subcommittee index")
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: validatorIndex,
})
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return nil, errors.Wrap(err, "can't sign selection data")
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: valIdx,
})
}
}

// Override selections with aggregated ones if the node is part of a Distributed Validator.
if v.distributed && len(selections) > 0 {
var err error
selections, err = v.validatorClient.AggregatedSyncSelections(ctx, selections)
if err != nil {
return false, errors.Wrap(err, "failed to get aggregated sync selections")
return nil, errors.Wrap(err, "failed to get aggregated sync selections")
}
}

for _, s := range selections {
isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof)
if err != nil {
return false, err
}
if isAggregator {
return true, nil
return nil, errors.Wrap(err, "can't detect sync committee aggregator")
}

isAgg[s.ValidatorIndex] = isAggregator
}

return false, nil
return isAgg, nil
}

// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when
Expand Down
24 changes: 16 additions & 8 deletions validator/client/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,9 +1263,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)

aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, false, aggregator)
require.Equal(t, false, aggregator[0])

c := params.BeaconConfig().Copy()
c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64
Expand All @@ -1284,9 +1286,11 @@ func TestIsSyncCommitteeAggregator_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{Indices: []primitives.CommitteeIndex{0}}, nil /*err*/)

aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, true, aggregator)
require.Equal(t, true, aggregator[0])
})
}
}
Expand All @@ -1310,9 +1314,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) {
},
).Return(&ethpb.SyncSubcommitteeIndexResponse{}, nil /*err*/)

aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 0)
aggregator, err := v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
0: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, false, aggregator)
require.Equal(t, false, aggregator[0])

c := params.BeaconConfig().Copy()
c.TargetAggregatorsPerSyncSubcommittee = math.MaxUint64
Expand Down Expand Up @@ -1345,9 +1351,11 @@ func TestIsSyncCommitteeAggregator_Distributed_OK(t *testing.T) {
[]iface.SyncCommitteeSelection{selection},
).Return([]iface.SyncCommitteeSelection{selection}, nil)

aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, bytesutil.ToBytes48(pubKey), 123)
aggregator, err = v.isSyncCommitteeAggregator(context.Background(), slot, map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte{
123: bytesutil.ToBytes48(pubKey),
})
require.NoError(t, err)
require.Equal(t, true, aggregator)
require.Equal(t, true, aggregator[123])
})
}
}
Expand Down
Loading