Skip to content

Commit

Permalink
Don't subscribe too early to sync committee
Browse files Browse the repository at this point in the history
  • Loading branch information
dapplion committed May 14, 2021
1 parent 92a114e commit e041e3c
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 41 deletions.
11 changes: 6 additions & 5 deletions packages/validator/src/services/attestationDuties.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ export class AttestationDutiesService {
*
* This function will perform (in the following order):
*
* 1. Poll for current-epoch duties and update the local `this.attesters` map.
* 1. Poll for current-epoch duties and update the local duties map.
* 2. As above, but for the next-epoch.
* 3. Push out any attestation subnet subscriptions to the BN.
* 4. Prune old entries from `this.attesters`.
* 4. Prune old entries from duties.
*/
private async pollBeaconAttesters(currentEpoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
const nextEpoch = currentEpoch + 1;
Expand Down Expand Up @@ -134,8 +134,9 @@ export class AttestationDutiesService {
}
}

/** For the given `indexArr`, download the duties for the given `epoch` and
store them in `this.attesters`. */
/**
* For the given `indexArr`, download the duties for the given `epoch` and store them in duties.
*/
private async pollBeaconAttestersForEpoch(epoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
// Don't fetch duties for epochs before genesis. However, should fetch epoch 0 duties at epoch -1
if (epoch < 0) {
Expand Down Expand Up @@ -199,7 +200,7 @@ export class AttestationDutiesService {
};
}

/** Run once per epoch to prune `this.attesters` map */
/** Run once per epoch to prune duties map */
private pruneOldDuties(currentEpoch: Epoch): void {
for (const attMap of this.dutiesByEpochByIndex.values()) {
for (const epoch of attMap.keys()) {
Expand Down
77 changes: 41 additions & 36 deletions packages/validator/src/services/syncCommitteeDuties.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {computeSyncPeriodAtEpoch, computeSyncPeriodAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconConfig} from "@chainsafe/lodestar-config";
import {altair, BLSSignature, Epoch, Root, Slot, SyncPeriod, ValidatorIndex} from "@chainsafe/lodestar-types";
import {altair, BLSSignature, Epoch, Root, Slot, ValidatorIndex} from "@chainsafe/lodestar-types";
import {ILogger} from "@chainsafe/lodestar-utils";
import {toHexString} from "@chainsafe/ssz";
import {IndicesService} from "./indices";
Expand All @@ -12,9 +12,11 @@ import {ValidatorStore} from "./validatorStore";
/** Only retain `HISTORICAL_DUTIES_PERIODS` duties prior to the current periods. */
const HISTORICAL_DUTIES_PERIODS = 2;
/** Epochs prior to `ALTAIR_FORK_EPOCH` to start fetching duties */
const ALTAIR_FORK_LOOKAHEAD = 1;
const ALTAIR_FORK_LOOKAHEAD_EPOCHS = 1;
/** How many epochs prior from a subscription starting, ask the node to subscribe */
const SUBSCRIPTIONS_LOOKAHEAD_EPOCHS = 2;

/** Neatly joins the server-generated `AttesterData` with the locally-generated `selectionProof`. */
/** Neatly joins SyncDuty with the locally-generated `selectionProof`. */
export type SyncDutyAndProof = {
duty: altair.SyncDuty;
/** This value is only set to not null if the proof indicates that the validator is an aggregator. */
Expand All @@ -40,7 +42,7 @@ export class SyncCommitteeDutiesService {
private readonly validatorStore: ValidatorStore,
private readonly indicesService: IndicesService
) {
// Running this task every epoch is safe since a re-org of two epochs is very unlikely
// Running this task every epoch is safe since a re-org of many epochs is very unlikely
// TODO: If the re-org event is reliable consider re-running then
clock.runEveryEpoch(this.runDutiesTasks);
}
Expand All @@ -62,57 +64,56 @@ export class SyncCommitteeDutiesService {
return duties;
}

private runDutiesTasks = async (epoch: Epoch): Promise<void> => {
private runDutiesTasks = async (currentEpoch: Epoch): Promise<void> => {
// Before altair fork (+ lookahead) no need to check duties
if (epoch < this.config.params.ALTAIR_FORK_EPOCH - ALTAIR_FORK_LOOKAHEAD) {
if (currentEpoch < this.config.params.ALTAIR_FORK_EPOCH - ALTAIR_FORK_LOOKAHEAD_EPOCHS) {
return;
}

const period = computeSyncPeriodAtEpoch(this.config, epoch);

await Promise.all([
// Run pollSyncCommittees immediately for all known local indices
this.pollSyncCommittees(period, this.indicesService.getAllLocalIndices()).catch((e) => {
if (notAborted(e)) this.logger.error("Error on poll SyncDuties", {period}, e);
this.pollSyncCommittees(currentEpoch, this.indicesService.getAllLocalIndices()).catch((e) => {
if (notAborted(e)) this.logger.error("Error on poll SyncDuties", {epoch: currentEpoch}, e);
}),

// At the same time fetch any remaining unknown validator indices, then poll duties for those newIndices only
this.indicesService
.pollValidatorIndices()
.then((newIndices) => this.pollSyncCommittees(period, newIndices))
.then((newIndices) => this.pollSyncCommittees(currentEpoch, newIndices))
.catch((e) => {
if (notAborted(e)) this.logger.error("Error on poll indices and SyncDuties", {period}, e);
if (notAborted(e)) this.logger.error("Error on poll indices and SyncDuties", {epoch: currentEpoch}, e);
}),
]);

// After both, prune
this.pruneOldDuties(period);
this.pruneOldDuties(currentEpoch);
};

/**
* Query the beacon node for SyncDuties for any known validators.
*
* This function will perform (in the following order):
*
* 1. Poll for current-period duties and update the local `this.attesters` map.
* 1. Poll for current-period duties and update the local duties map.
* 2. As above, but for the next-period.
* 3. Push out any Sync subnet subscriptions to the BN.
* 4. Prune old entries from `this.attesters`.
* 4. Prune old entries from duties.
*/
private async pollSyncCommittees(currentPeriod: SyncPeriod, indexArr: ValidatorIndex[]): Promise<void> {
private async pollSyncCommittees(currentEpoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
// No need to bother the BN if we don't have any validators.
if (indexArr.length === 0) {
return;
}

const nextPeriod = currentPeriod + 1;
for (const period of [currentPeriod, nextPeriod]) {
const nextPeriodEpoch = currentEpoch + this.config.params.EPOCHS_PER_SYNC_COMMITTEE_PERIOD;
for (const epoch of [currentEpoch, nextPeriodEpoch]) {
// Download the duties and update the duties for the current and next period.
await this.pollSyncCommitteesForEpoch(period, indexArr).catch((e) => {
if (notAborted(e)) this.logger.error("Failed to download SyncDuties", {period}, e);
await this.pollSyncCommitteesForEpoch(epoch, indexArr).catch((e) => {
if (notAborted(e)) this.logger.error("Failed to download SyncDuties", {epoch}, e);
});
}

const currentPeriod = computeSyncPeriodAtEpoch(this.config, currentEpoch);
const syncCommitteeSubscriptions: altair.SyncCommitteeSubscription[] = [];

// For this and the next period, produce any beacon committee subscriptions.
Expand All @@ -121,18 +122,22 @@ export class SyncCommitteeDutiesService {
// potentially excessive on the BN in normal cases, but it will help with fast re-subscriptions
// if the BN goes offline or we swap to a different one.
const indexSet = new Set(indexArr);
for (const period of [currentPeriod, nextPeriod]) {
for (const period of [currentPeriod, currentPeriod + 1]) {
for (const [validatorIndex, dutiesByPeriod] of this.dutiesByPeriodByIndex.entries()) {
const dutyAtEpoch = dutiesByPeriod.get(period);
if (dutyAtEpoch) {
if (indexSet.has(validatorIndex)) {
syncCommitteeSubscriptions.push({
validatorIndex,
syncCommitteeIndices: dutyAtEpoch.duty.validatorSyncCommitteeIndices,
// TODO: Change after https://github.com/ethereum/eth2.0-APIs/issues/144
untilEpoch: (period + 1) * this.config.params.EPOCHS_PER_SYNC_COMMITTEE_PERIOD,
// No need to send isAggregator here since the beacon node will assume validator always aggregates
});
const fromEpoch = period * this.config.params.EPOCHS_PER_SYNC_COMMITTEE_PERIOD;
const untilEpoch = (period + 1) * this.config.params.EPOCHS_PER_SYNC_COMMITTEE_PERIOD;
// Don't subscribe too early to save node's resources
if (currentEpoch >= fromEpoch - SUBSCRIPTIONS_LOOKAHEAD_EPOCHS) {
syncCommitteeSubscriptions.push({
validatorIndex,
syncCommitteeIndices: dutyAtEpoch.duty.validatorSyncCommitteeIndices,
untilEpoch,
// No need to send isAggregator here since the beacon node will assume validator always aggregates
});
}
}
}
}
Expand All @@ -148,24 +153,23 @@ export class SyncCommitteeDutiesService {
}

/**
* For the given `indexArr`, download the duties for the given `period` and store them in `this.attesters`.
* For the given `indexArr`, download the duties for the given `period` and store them in duties.
*/
private async pollSyncCommitteesForEpoch(period: SyncPeriod, indexArr: ValidatorIndex[]): Promise<void> {
private async pollSyncCommitteesForEpoch(epoch: Epoch, indexArr: ValidatorIndex[]): Promise<void> {
// Don't fetch duties for periods before genesis. However, should fetch period 0 duties at period -1
if (period < 0) {
if (epoch < 0) {
return;
}

// TODO: Query by period after https://github.com/ethereum/eth2.0-APIs/issues/144
const epoch = period * this.config.params.EPOCHS_PER_SYNC_COMMITTEE_PERIOD;
const syncDuties = await this.apiClient.validator.getSyncCommitteeDuties(epoch, indexArr).catch((e) => {
throw extendError(e, "Failed to obtain SyncDuties");
});
const dependentRoot = syncDuties.dependentRoot;
const relevantDuties = syncDuties.data.filter((duty) => this.indicesService.hasValidatorIndex(duty.validatorIndex));
const period = computeSyncPeriodAtEpoch(this.config, epoch);

this.logger.debug("Downloaded SyncDuties", {
period,
epoch,
dependentRoot: toHexString(dependentRoot),
count: relevantDuties.length,
});
Expand Down Expand Up @@ -201,8 +205,9 @@ export class SyncCommitteeDutiesService {
};
}

/** Run at least once per period to prune `this.attesters` map */
private pruneOldDuties(currentPeriod: SyncPeriod): void {
/** Run at least once per period to prune duties map */
private pruneOldDuties(currentEpoch: Epoch): void {
const currentPeriod = computeSyncPeriodAtEpoch(this.config, currentEpoch);
for (const attMap of this.dutiesByPeriodByIndex.values()) {
for (const period of attMap.keys()) {
if (period + HISTORICAL_DUTIES_PERIODS < currentPeriod) {
Expand Down

0 comments on commit e041e3c

Please sign in to comment.