Skip to content

Commit

Permalink
validator/client: process Sync Committee roles separately
Browse files Browse the repository at this point in the history
In a DV context, to be compatible with the proposed selection endpoint, the VC must push all partial selections to it instead of just one.

Process sync committee role separately within the RolesAt method, so that partial selections can be pushed to the DV client appropriately, if configured.
  • Loading branch information
gsora committed May 14, 2024
1 parent de177f7 commit f0f12ec
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 135 deletions.
97 changes: 0 additions & 97 deletions testing/util/state_test.go

This file was deleted.

107 changes: 69 additions & 38 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,19 @@ func (v *validator) subscribeToSubnets(ctx context.Context, res *ethpb.DutiesRes

// RolesAt slot returns the validator roles at the given slot. Returns nil if the
// validator is known to not have a roles at the slot. Returns UNKNOWN if the
// validator assignments are unknown. Otherwise returns a valid ValidatorRole map.
// validator assignments are unknown. Otherwise, returns a valid ValidatorRole map.
func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole, error) {
v.dutiesLock.RLock()
defer v.dutiesLock.RUnlock()
rolesAt := make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

var (
rolesAt = make(map[[fieldparams.BLSPubkeyLength]byte][]iface.ValidatorRole)

// store sync committee duties pubkeys and share indices in slices for
// potential DV processing
syncCommitteeValidators = make(map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte)
)

for validator, duty := range v.duties.CurrentEpochDuties {
var roles []iface.ValidatorRole

Expand All @@ -701,6 +709,7 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
}
}
}

if duty.AttesterSlot == slot {
roles = append(roles, iface.RoleAttester)

Expand All @@ -726,26 +735,40 @@ func (v *validator) RolesAt(ctx context.Context, slot primitives.Slot) (map[[fie
if duty.IsSyncCommittee {
roles = append(roles, iface.RoleSyncCommittee)
inSyncCommittee = true
}
}
if inSyncCommittee {
aggregator, err := v.isSyncCommitteeAggregator(ctx, slot, bytesutil.ToBytes48(duty.PublicKey), duty.ValidatorIndex)
if err != nil {
return nil, errors.Wrap(err, "could not check if a validator is a sync committee aggregator")
}
if aggregator {
roles = append(roles, iface.RoleSyncCommitteeAggregator)
syncCommitteeValidators[duty.ValidatorIndex] = bytesutil.ToBytes48(duty.PublicKey)
}
}

if len(roles) == 0 {
if len(roles) == 0 && !inSyncCommittee {
roles = append(roles, iface.RoleUnknown)
}

var pubKey [fieldparams.BLSPubkeyLength]byte
copy(pubKey[:], duty.PublicKey)
rolesAt[pubKey] = roles
}

aggregator, err := v.isSyncCommitteeAggregator(
ctx,
slot,
syncCommitteeValidators,
)

if err != nil {
return nil, errors.Wrap(err, "could not check if validators are a sync committee aggregator")
}

for valIdx, isAgg := range aggregator {
if isAgg {
valPubkey, ok := syncCommitteeValidators[valIdx]
if !ok {
return nil, errors.New("validator is marked as sync committee aggregator but cannot be found in sync committee validator list")
}

rolesAt[bytesutil.ToBytes48(valPubkey[:])] = append(rolesAt[bytesutil.ToBytes48(valPubkey[:])], iface.RoleSyncCommitteeAggregator)
}
}

return rolesAt, nil
}

Expand Down Expand Up @@ -794,51 +817,59 @@ func (v *validator) isAggregator(ctx context.Context, committee []primitives.Val
//
// modulo = max(1, SYNC_COMMITTEE_SIZE // SYNC_COMMITTEE_SUBNET_COUNT // TARGET_AGGREGATORS_PER_SYNC_SUBCOMMITTEE)
// return bytes_to_uint64(hash(signature)[0:8]) % modulo == 0
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, pubKey [fieldparams.BLSPubkeyLength]byte, validatorIndex primitives.ValidatorIndex) (bool, error) {
res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})
if err != nil {
return false, err
}
func (v *validator) isSyncCommitteeAggregator(ctx context.Context, slot primitives.Slot, validators map[primitives.ValidatorIndex][fieldparams.BLSPubkeyLength]byte) (map[primitives.ValidatorIndex]bool, error) {
var (
selections []iface.SyncCommitteeSelection
isAgg = make(map[primitives.ValidatorIndex]bool)
)

for valIdx, pubKey := range validators {
res, err := v.validatorClient.GetSyncSubcommitteeIndex(ctx, &ethpb.SyncSubcommitteeIndexRequest{
PublicKey: pubKey[:],
Slot: slot,
})

var selections []iface.SyncCommitteeSelection
for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return false, err
return nil, errors.Wrap(err, "can't fetch sync subcommittee index")
}

for _, index := range res.Indices {
subCommitteeSize := params.BeaconConfig().SyncCommitteeSize / params.BeaconConfig().SyncCommitteeSubnetCount
subnet := uint64(index) / subCommitteeSize
sig, err := v.signSyncSelectionData(ctx, pubKey, subnet, slot)
if err != nil {
return nil, errors.Wrap(err, "can't sign selection data")
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: valIdx,
})
}

selections = append(selections, iface.SyncCommitteeSelection{
SelectionProof: sig,
Slot: slot,
SubcommitteeIndex: primitives.CommitteeIndex(subnet),
ValidatorIndex: validatorIndex,
})
}

// Override selections with aggregated ones if the node is part of a Distributed Validator.
if v.distributed && len(selections) > 0 {
var err error
selections, err = v.validatorClient.GetAggregatedSyncSelections(ctx, selections)
if err != nil {
return false, errors.Wrap(err, "failed to get aggregated sync selections")
return nil, errors.Wrap(err, "failed to get aggregated sync selections")
}
}

for _, s := range selections {
isAggregator, err := altair.IsSyncCommitteeAggregator(s.SelectionProof)
if err != nil {
return false, err
}
if isAggregator {
return true, nil
return nil, errors.Wrap(err, "can't detect sync committee aggregator")
}

isAgg[s.ValidatorIndex] = isAggregator
}

return false, nil
return isAgg, nil
}

// UpdateDomainDataCaches by making calls for all of the possible domain data. These can change when
Expand Down

0 comments on commit f0f12ec

Please sign in to comment.