Skip to content

Commit

Permalink
Merge accad68 into 397327a
Browse files Browse the repository at this point in the history
  • Loading branch information
twoeths authored Nov 4, 2021
2 parents 397327a + accad68 commit 4dc1af3
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 12 deletions.
3 changes: 3 additions & 0 deletions packages/lodestar/test/utils/validationData/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import {signCached} from "../cache";
import {ClockStatic} from "../clock";
import {toSingleBit} from "../aggregationBits";
import {toHexString} from "@chainsafe/ssz";
import {config} from "@chainsafe/lodestar-config/default";
import {IBeaconConfig} from "@chainsafe/lodestar-config";

export type AttestationValidDataOpts = {
currentSlot?: Slot;
Expand Down Expand Up @@ -108,6 +110,7 @@ export function getAttestationValidData(

const chain = ({
clock,
config: config as IBeaconConfig,
forkChoice,
regen,
seenAttesters: new SeenAttesters(),
Expand Down
1 change: 1 addition & 0 deletions packages/validator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
"@chainsafe/lodestar-types": "^0.31.0",
"@chainsafe/lodestar-utils": "^0.31.0",
"@chainsafe/ssz": "^0.8.19",
"strict-event-emitter-types": "^2.0.0",
"bigint-buffer": "^1.1.5"
},
"devDependencies": {
Expand Down
28 changes: 25 additions & 3 deletions packages/validator/src/services/attestation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import {AttestationDutiesService, AttDutyAndProof} from "./attestationDuties";
import {groupAttDutiesByCommitteeIndex} from "./utils";
import {IndicesService} from "./indices";
import {toHexString} from "@chainsafe/ssz";
import {ChainHeaderTracker} from "./chainHeaderTracker";
import {ChainHeaderTracker, HeadEventData} from "./chainHeaderTracker";
import {ValidatorEvent, ValidatorEventEmitter} from "./emitter";

/**
* Service that sets up and handles validator attester duties.
Expand All @@ -22,6 +23,7 @@ export class AttestationService {
private readonly api: Api,
private readonly clock: IClock,
private readonly validatorStore: ValidatorStore,
private readonly emitter: ValidatorEventEmitter,
indicesService: IndicesService,
chainHeadTracker: ChainHeaderTracker
) {
Expand All @@ -42,8 +44,10 @@ export class AttestationService {
// Fetch info first so a potential delay is absorved by the sleep() below
const dutiesByCommitteeIndex = groupAttDutiesByCommitteeIndex(this.dutiesService.getDutiesAtSlot(slot));

// Lighthouse recommends to always wait to 1/3 of the slot, even if the block comes early
await sleep(this.clock.msToSlotFraction(slot, 1 / 3), signal);
// A validator should create and broadcast the attestation to the associated attestation subnet when either
// (a) the validator has received a valid block from the expected block proposer for the assigned slot or
// (b) one-third of the slot has transpired (SECONDS_PER_SLOT / 3 seconds after the start of slot) -- whichever comes first.
await Promise.race([sleep(this.clock.msToSlotFraction(slot, 1 / 3), signal), this.waitForBlockSlot(slot)]);

// await for all so if the Beacon node is overloaded it auto-throttles
// TODO: This approach is convervative to reduce the node's load, review
Expand All @@ -57,6 +61,24 @@ export class AttestationService {
);
};

private waitForBlockSlot(slot: Slot): Promise<void> {
let headListener: (head: HeadEventData) => void;

const onDone = (): void => {
this.emitter.off(ValidatorEvent.chainHead, headListener);
};

return new Promise((resolve) => {
headListener = (head: HeadEventData): void => {
if (head.slot >= slot) {
onDone();
resolve();
}
};
this.emitter.on(ValidatorEvent.chainHead, headListener);
});
}

private async publishAttestationsAndAggregates(
slot: Slot,
committeeIndex: CommitteeIndex,
Expand Down
23 changes: 16 additions & 7 deletions packages/validator/src/services/chainHeaderTracker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {ILogger} from "@chainsafe/lodestar-utils";
import {Slot, Root, RootHex} from "@chainsafe/lodestar-types";
import {GENESIS_SLOT} from "@chainsafe/lodestar-params";
import {fromHexString} from "@chainsafe/ssz";
import {ValidatorEvent, ValidatorEventEmitter} from "./emitter";

const {EventType} = routes.events;

Expand All @@ -23,7 +24,11 @@ export class ChainHeaderTracker {
private headBlockRoot: Root | null = null;
private readonly fns: RunEveryFn[] = [];

constructor(private readonly logger: ILogger, private readonly api: Api) {}
constructor(
private readonly logger: ILogger,
private readonly api: Api,
private readonly emitter: ValidatorEventEmitter
) {}

start(signal: AbortSignal): void {
this.api.events.eventstream([EventType.head], signal, this.onHeadUpdate);
Expand All @@ -49,15 +54,19 @@ export class ChainHeaderTracker {
this.headBlockSlot = slot;
this.headBlockRoot = fromHexString(block);

const headEventData = {
slot: this.headBlockSlot,
head: block,
previousDutyDependentRoot: previousDutyDependentRoot,
currentDutyDependentRoot: currentDutyDependentRoot,
};

for (const fn of this.fns) {
fn({
slot: this.headBlockSlot,
head: block,
previousDutyDependentRoot: previousDutyDependentRoot,
currentDutyDependentRoot: currentDutyDependentRoot,
}).catch((e) => this.logger.error("Error calling head event handler", e));
fn(headEventData).catch((e) => this.logger.error("Error calling head event handler", e));
}

this.emitter.emit(ValidatorEvent.chainHead, headEventData);

this.logger.verbose("Found new chain head", {
slot: slot,
head: block,
Expand Down
21 changes: 21 additions & 0 deletions packages/validator/src/services/emitter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import {EventEmitter} from "events";
import StrictEventEmitter from "strict-event-emitter-types";
import {HeadEventData} from "./chainHeaderTracker";

export enum ValidatorEvent {
/**
* This event signals that the node chain has a new head.
*/
chainHead = "chainHead",
}

export interface IValidatorEvents {
[ValidatorEvent.chainHead]: (head: HeadEventData) => void;
}

/**
* Emit important validator events.
*/
export class ValidatorEventEmitter extends (EventEmitter as {
new (): StrictEventEmitter<EventEmitter, IValidatorEvents>;
}) {}
7 changes: 5 additions & 2 deletions packages/validator/src/validator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {assertEqualParams, getLoggerVc, NotEqualParamsError} from "./util";
import {ChainHeaderTracker} from "./services/chainHeaderTracker";
import {MetaDataRepository} from ".";
import {toHexString} from "@chainsafe/ssz";
import {ValidatorEventEmitter} from "./services/emitter";

export type ValidatorOptions = {
slashingProtection: ISlashingProtection;
Expand Down Expand Up @@ -48,6 +49,7 @@ export class Validator {
private readonly api: Api;
private readonly secretKeys: SecretKey[];
private readonly clock: IClock;
private readonly emitter: ValidatorEventEmitter;
private readonly chainHeaderTracker: ChainHeaderTracker;
private readonly logger: ILogger;
private state: State = {status: Status.stopped};
Expand All @@ -68,10 +70,11 @@ export class Validator {
const clock = new Clock(config, logger, {genesisTime: Number(genesis.genesisTime)});
const validatorStore = new ValidatorStore(config, slashingProtection, secretKeys, genesis);
const indicesService = new IndicesService(logger, api, validatorStore);
this.chainHeaderTracker = new ChainHeaderTracker(logger, api);
this.emitter = new ValidatorEventEmitter();
this.chainHeaderTracker = new ChainHeaderTracker(logger, api, this.emitter);
const loggerVc = getLoggerVc(logger, clock);
new BlockProposingService(config, loggerVc, api, clock, validatorStore, graffiti);
new AttestationService(loggerVc, api, clock, validatorStore, indicesService, this.chainHeaderTracker);
new AttestationService(loggerVc, api, clock, validatorStore, this.emitter, indicesService, this.chainHeaderTracker);
new SyncCommitteeService(config, loggerVc, api, clock, validatorStore, this.chainHeaderTracker, indicesService);

this.config = config;
Expand Down
4 changes: 4 additions & 0 deletions packages/validator/test/unit/services/attestation.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {loggerVc, testLogger} from "../../utils/logger";
import {ClockMock} from "../../utils/clock";
import {IndicesService} from "../../../src/services/indices";
import {ChainHeaderTracker} from "../../../src/services/chainHeaderTracker";
import {ValidatorEventEmitter} from "../../../src/services/emitter";

describe("AttestationService", function () {
const sandbox = sinon.createSandbox();
Expand All @@ -23,6 +24,8 @@ describe("AttestationService", function () {
const api = getApiClientStub(sandbox);
const validatorStore = sinon.createStubInstance(ValidatorStore) as ValidatorStore &
sinon.SinonStubbedInstance<ValidatorStore>;
const emitter = sinon.createStubInstance(ValidatorEventEmitter) as ValidatorEventEmitter &
sinon.SinonStubbedInstance<ValidatorEventEmitter>;
const chainHeadTracker = sinon.createStubInstance(ChainHeaderTracker) as ChainHeaderTracker &
sinon.SinonStubbedInstance<ChainHeaderTracker>;
let pubkeys: Uint8Array[]; // Initialize pubkeys in before() so bls is already initialized
Expand All @@ -48,6 +51,7 @@ describe("AttestationService", function () {
api,
clock,
validatorStore,
emitter,
indicesService,
chainHeadTracker
);
Expand Down

0 comments on commit 4dc1af3

Please sign in to comment.