Skip to content

Commit

Permalink
Squashed commit of the following:
Browse files Browse the repository at this point in the history
commit 40b0630
Author: dapplion <35266934+dapplion@users.noreply.github.com>
Date:   Wed Jan 4 20:50:58 2023 +0800

    Update tests

commit a6df872
Author: dapplion <35266934+dapplion@users.noreply.github.com>
Date:   Wed Jan 4 12:55:07 2023 +0800

    Make enum const

commit c13f17c
Author: dapplion <35266934+dapplion@users.noreply.github.com>
Date:   Wed Jan 4 12:40:17 2023 +0800

    Remove middleware ChainEvents

commit 41d11aa
Author: dapplion <35266934+dapplion@users.noreply.github.com>
Date:   Wed Jan 4 11:28:10 2023 +0800

    Remove intermediary event ChainEvent.forkChoiceReorg
  • Loading branch information
dapplion committed Jan 5, 2023
1 parent 278c2ee commit 1bbe1ec
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 279 deletions.
16 changes: 15 additions & 1 deletion packages/api/src/beacon/routes/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {RouteDef, TypeJson} from "../../utils/index.js";

// See /packages/api/src/routes/index.ts for reasoning and instructions to add new routes

export enum EventType {
export const enum EventType {
/**
* The node has finished processing, resulting in a new head. previous_duty_dependent_root is
* `get_block_root_at_slot(state, compute_start_slot_at_epoch(epoch - 1) - 1)` and
Expand Down Expand Up @@ -35,6 +35,20 @@ export enum EventType {
lightClientUpdate = "light_client_update",
}

export const eventTypes: {[K in EventType]: K} = {
[EventType.head]: EventType.head,
[EventType.block]: EventType.block,
[EventType.attestation]: EventType.attestation,
[EventType.voluntaryExit]: EventType.voluntaryExit,
[EventType.blsToExecutionChange]: EventType.blsToExecutionChange,
[EventType.finalizedCheckpoint]: EventType.finalizedCheckpoint,
[EventType.chainReorg]: EventType.chainReorg,
[EventType.contributionAndProof]: EventType.contributionAndProof,
[EventType.lightClientOptimisticUpdate]: EventType.lightClientOptimisticUpdate,
[EventType.lightClientFinalityUpdate]: EventType.lightClientFinalityUpdate,
[EventType.lightClientUpdate]: EventType.lightClientUpdate,
};

export type EventData = {
[EventType.head]: {
slot: Slot;
Expand Down
4 changes: 2 additions & 2 deletions packages/api/test/unit/beacon/oapiSpec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ describe("eventstream event data", () => {
});

const eventSerdes = routes.events.getEventSerdes();
const knownTopics = Object.values(routes.events.EventType) as string[];
const knownTopics = new Set<string>(Object.values(routes.events.eventTypes));

for (const [topic, {value}] of Object.entries(eventstreamExamples ?? {})) {
it(topic, () => {
if (!knownTopics.includes(topic)) {
if (!knownTopics.has(topic)) {
throw Error(`topic ${topic} not implemented`);
}

Expand Down
75 changes: 9 additions & 66 deletions packages/beacon-node/src/api/impl/events/index.ts
Original file line number Diff line number Diff line change
@@ -1,86 +1,29 @@
import {capella} from "@lodestar/types";
import {routes} from "@lodestar/api";
import {toHexString} from "@chainsafe/ssz";
import {ApiModules} from "../types.js";
import {ChainEvent, IChainEvents} from "../../../chain/index.js";
import {ApiError} from "../errors.js";

/**
* Mapping of internal `ChainEvents` to API spec events
*/
const chainEventMap = {
[routes.events.EventType.head]: ChainEvent.head as const,
[routes.events.EventType.block]: ChainEvent.block as const,
[routes.events.EventType.attestation]: ChainEvent.attestation as const,
[routes.events.EventType.voluntaryExit]: ChainEvent.block as const,
[routes.events.EventType.finalizedCheckpoint]: ChainEvent.finalized as const,
[routes.events.EventType.chainReorg]: ChainEvent.forkChoiceReorg as const,
[routes.events.EventType.contributionAndProof]: ChainEvent.contributionAndProof as const,
[routes.events.EventType.lightClientOptimisticUpdate]: ChainEvent.lightClientOptimisticUpdate as const,
[routes.events.EventType.lightClientFinalityUpdate]: ChainEvent.lightClientFinalityUpdate as const,
[routes.events.EventType.lightClientUpdate]: ChainEvent.lightClientUpdate as const,
[routes.events.EventType.blsToExecutionChange]: ChainEvent.block as const,
};

export function getEventsApi({chain, config}: Pick<ApiModules, "chain" | "config">): routes.events.Api {
/**
* Mapping to convert internal `ChainEvents` payload to the API spec events data
*/
const eventDataTransformers: {
[K in routes.events.EventType]: (
...args: Parameters<IChainEvents[typeof chainEventMap[K]]>
) => routes.events.EventData[K][];
} = {
[routes.events.EventType.head]: (data) => [data],
[routes.events.EventType.block]: (block, _, executionOptimistic) => [
{
block: toHexString(config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic,
},
],
[routes.events.EventType.attestation]: (attestation) => [attestation],
[routes.events.EventType.voluntaryExit]: (block) => Array.from(block.message.body.voluntaryExits),
[routes.events.EventType.finalizedCheckpoint]: (checkpoint, state) => [
{
block: toHexString(checkpoint.root),
epoch: checkpoint.epoch,
state: toHexString(state.hashTreeRoot()),
executionOptimistic: false,
},
],
[routes.events.EventType.chainReorg]: (data) => [data],
[routes.events.EventType.contributionAndProof]: (contributionAndProof) => [contributionAndProof],
[routes.events.EventType.lightClientOptimisticUpdate]: (headerUpdate) => [headerUpdate],
[routes.events.EventType.lightClientFinalityUpdate]: (headerUpdate) => [headerUpdate],
[routes.events.EventType.lightClientUpdate]: (headerUpdate) => [headerUpdate],
[routes.events.EventType.blsToExecutionChange]: (block) =>
Array.from((block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []),
};
export function getEventsApi({chain}: Pick<ApiModules, "chain" | "config">): routes.events.Api {
const validTopics = new Set(Object.values(routes.events.eventTypes));

return {
eventstream(topics, signal, onEvent) {
const onAbortFns: (() => void)[] = [];

for (const topic of topics) {
const eventDataTransformer = eventDataTransformers[topic];
const chainEvent = chainEventMap[topic];
if (eventDataTransformer === undefined || !chainEvent) {
if (!validTopics.has(topic)) {
throw new ApiError(400, `Unknown topic ${topic}`);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const handler = (...args: any[]): void => {
const handler = (data: any): void => {
// TODO: What happens if this handler throws? Does it break the other chain.emitter listeners?
const messages = eventDataTransformer(...args);
for (const message of messages) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any
onEvent({type: topic, message: message as any});
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any
onEvent({type: topic, message: data});
};

chain.emitter.on(chainEvent, handler);
onAbortFns.push(() => chain.emitter.off(chainEvent, handler));
chain.emitter.on(topic, handler);
onAbortFns.push(() => chain.emitter.off(topic, handler));
}

signal.addEventListener(
Expand Down
59 changes: 49 additions & 10 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {altair, ssz} from "@lodestar/types";
import {altair, capella, ssz} from "@lodestar/types";
import {MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {toHexString} from "@chainsafe/ssz";
import {
Expand All @@ -7,7 +7,8 @@ import {
computeStartSlotAtEpoch,
RootCache,
} from "@lodestar/state-transition";
import {ForkChoiceError, ForkChoiceErrorCode, EpochDifference, AncestorStatus} from "@lodestar/fork-choice";
import {routes} from "@lodestar/api";
import {ForkChoiceError, ForkChoiceErrorCode, EpochDifference} from "@lodestar/fork-choice";
import {ZERO_HASH_HEX} from "../../constants/index.js";
import {toCheckpointHex} from "../stateCache/index.js";
import {isOptimisticBlock} from "../../util/forkChoice.js";
Expand Down Expand Up @@ -130,7 +131,7 @@ export async function importBlock(
// don't want to log the processed attestations here as there are so many attestations and it takes too much disc space,
// users may want to keep more log files instead of unnecessary processed attestations log
// see https://github.com/ChainSafe/lodestar/pull/4032
pendingEvents.push(ChainEvent.attestation, attestation);
pendingEvents.push(routes.events.EventType.attestation, attestation);
} catch (e) {
// a block has a lot of attestations and it may has same error, we don't want to log all of them
if (e instanceof ForkChoiceError && e.type.code === ForkChoiceErrorCode.INVALID_ATTESTATION) {
Expand Down Expand Up @@ -198,7 +199,6 @@ export async function importBlock(
const justifiedEpoch = justifiedCheckpoint.epoch;
const preJustifiedEpoch = parentBlockSummary.justifiedEpoch;
if (justifiedEpoch > preJustifiedEpoch) {
this.emitter.emit(ChainEvent.justified, justifiedCheckpoint, checkpointState);
this.logger.verbose("Checkpoint justified", toCheckpointHex(justifiedCheckpoint));
this.metrics?.previousJustifiedEpoch.set(checkpointState.previousJustifiedCheckpoint.epoch);
this.metrics?.currentJustifiedEpoch.set(justifiedCheckpoint.epoch);
Expand All @@ -207,7 +207,12 @@ export async function importBlock(
const finalizedEpoch = finalizedCheckpoint.epoch;
const preFinalizedEpoch = parentBlockSummary.finalizedEpoch;
if (finalizedEpoch > preFinalizedEpoch) {
this.emitter.emit(ChainEvent.finalized, finalizedCheckpoint, checkpointState);
this.emitter.emit(routes.events.EventType.finalizedCheckpoint, {
block: toHexString(finalizedCheckpoint.root),
epoch: finalizedCheckpoint.epoch,
state: toHexString(checkpointState.hashTreeRoot()),
executionOptimistic: false,
});
this.logger.verbose("Checkpoint finalized", toCheckpointHex(finalizedCheckpoint));
this.metrics?.finalizedEpoch.set(finalizedCheckpoint.epoch);
}
Expand All @@ -223,7 +228,7 @@ export async function importBlock(
// new head
const executionOptimistic = isOptimisticBlock(newHead);

pendingEvents.push(ChainEvent.head, {
pendingEvents.push(routes.events.EventType.head, {
block: newHead.blockRoot,
epochTransition: computeStartSlotAtEpoch(computeEpochAtSlot(newHead.slot)) === newHead.slot,
slot: newHead.slot,
Expand All @@ -233,6 +238,24 @@ export async function importBlock(
executionOptimistic,
});

const delaySec = this.clock.secFromSlot(newHead.slot);
this.logger.verbose("New chain head", {
slot: newHead.slot,
root: newHead.blockRoot,
delaySec,
});

if (this.metrics) {
this.metrics.headSlot.set(newHead.slot);
// Only track "recent" blocks. Otherwise sync can distort this metrics heavily.
// We want to track recent blocks coming from gossip, unknown block sync, and API.
if (delaySec < 64 * this.config.SECONDS_PER_SLOT) {
this.metrics.elapsedTimeTillBecomeHead.observe(delaySec);
}
}

this.onNewHead(newHead);

this.metrics?.forkChoice.changedHead.inc();

const ancestorResult = this.forkChoice.getCommonAncestorDepth(oldHead, newHead);
Expand All @@ -248,8 +271,8 @@ export async function importBlock(
newSlot: newHead.slot,
});

pendingEvents.push(ChainEvent.forkChoiceReorg, {
depth: ancestorResult.depth,
pendingEvents.push(routes.events.EventType.chainReorg, {
depth: distance,
epoch: computeEpochAtSlot(newHead.slot),
slot: newHead.slot,
newHeadBlock: newHead.blockRoot,
Expand Down Expand Up @@ -361,10 +384,26 @@ export async function importBlock(

// - Send event after everything is done

// Send block events

for (const voluntaryExit of block.message.body.voluntaryExits) {
this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
}

for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) {
this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
}

// TODO: Make forkChoice.onBlock return block summary
const blockSummary = this.forkChoice.getBlock(blockRoot);
const executionOptimistic = blockSummary != null && isOptimisticBlock(blockSummary);

this.emitter.emit(routes.events.EventType.block, {
block: toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)),
slot: block.message.slot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

// Emit all events at once after fully completing importBlock()
this.emitter.emit(ChainEvent.block, block, postState, executionOptimistic);
pendingEvents.emit();

// Register stat metrics about the block after importing it
Expand Down
20 changes: 2 additions & 18 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import {ensureDir, writeIfNotExist} from "../util/file.js";
import {CheckpointStateCache, StateContextCache} from "./stateCache/index.js";
import {BlockProcessor, ImportBlockOpts} from "./blocks/index.js";
import {IBeaconClock, LocalClock} from "./clock/index.js";
import {ChainEventEmitter, ChainEvent, HeadEventData} from "./emitter.js";
import {ChainEventEmitter, ChainEvent} from "./emitter.js";
import {IBeaconChain, ProposerPreparationData} from "./interface.js";
import {IChainOptions} from "./options.js";
import {IStateRegenerator, QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
Expand Down Expand Up @@ -285,7 +285,6 @@ export class BeaconChain implements IBeaconChain {
emitter.addListener(ChainEvent.clockEpoch, this.onClockEpoch.bind(this));
emitter.addListener(ChainEvent.forkChoiceFinalized, this.onForkChoiceFinalized.bind(this));
emitter.addListener(ChainEvent.forkChoiceJustified, this.onForkChoiceJustified.bind(this));
emitter.addListener(ChainEvent.head, this.onNewHead.bind(this));
}

async close(): Promise<void> {
Expand Down Expand Up @@ -676,24 +675,9 @@ export class BeaconChain implements IBeaconChain {
}
}

private onNewHead(head: HeadEventData): void {
const delaySec = this.clock.secFromSlot(head.slot);
this.logger.verbose("New chain head", {
headSlot: head.slot,
headRoot: head.block,
delaySec,
});
protected onNewHead(head: ProtoBlock): void {
this.syncContributionAndProofPool.prune(head.slot);
this.seenContributionAndProof.prune(head.slot);

if (this.metrics) {
this.metrics.headSlot.set(head.slot);
// Only track "recent" blocks. Otherwise sync can distort this metrics heavily.
// We want to track recent blocks coming from gossip, unknown block sync, and API.
if (delaySec < 64 * this.config.SECONDS_PER_SLOT) {
this.metrics.elapsedTimeTillBecomeHead.observe(delaySec);
}
}
}

private onForkChoiceJustified(this: BeaconChain, cp: CheckpointWithHex): void {
Expand Down
Loading

0 comments on commit 1bbe1ec

Please sign in to comment.