diff --git a/packages/beacon-node/src/api/impl/beacon/pool/index.ts b/packages/beacon-node/src/api/impl/beacon/pool/index.ts index 60f4539adc01..15bb1a3107db 100644 --- a/packages/beacon-node/src/api/impl/beacon/pool/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/pool/index.ts @@ -70,6 +70,9 @@ export function getBeaconPoolApi({ const insertOutcome = chain.attestationPool.add(attestation, attDataRootHex); metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome}); } + + chain.emitter.emit(routes.events.EventType.attestation, attestation); + const sentPeers = await network.publishBeaconAttestation(attestation, subnet); metrics?.onPoolSubmitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers); } catch (e) { @@ -108,6 +111,7 @@ export function getBeaconPoolApi({ async submitPoolVoluntaryExit(voluntaryExit) { await validateGossipVoluntaryExit(chain, voluntaryExit); chain.opPool.insertVoluntaryExit(voluntaryExit); + chain.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit); await network.publishVoluntaryExit(voluntaryExit); }, @@ -121,6 +125,9 @@ export function getBeaconPoolApi({ await validateBlsToExecutionChange(chain, blsToExecutionChange, true); const preCapella = chain.clock.currentEpoch < chain.config.CAPELLA_FORK_EPOCH; chain.opPool.insertBlsToExecutionChange(blsToExecutionChange, preCapella); + + chain.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange); + if (!preCapella) { await network.publishBlsToExecutionChange(blsToExecutionChange); } diff --git a/packages/beacon-node/src/chain/blocks/importBlock.ts b/packages/beacon-node/src/chain/blocks/importBlock.ts index 02789ece50cb..8da566b9d2bb 100644 --- a/packages/beacon-node/src/chain/blocks/importBlock.ts +++ b/packages/beacon-node/src/chain/blocks/importBlock.ts @@ -26,6 +26,10 @@ import {writeBlockInputToDb} from "./writeBlockInputToDb.js"; * Fork-choice allows to import attestations from current (0) or past (1) epoch. */ const FORK_CHOICE_ATT_EPOCH_LIMIT = 1; +/** + * Emit eventstream events for block contents events only for blocks that are recent enough to clock + */ +const EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS = 64; /** * Imports a fully verified block into the chain state. Produces multiple permanent side-effects. @@ -149,11 +153,6 @@ export async function importBlock( blockRootHex, block.message.slot ); - - // don't want to log the processed attestations here as there are so many attestations and it takes too much disc space, - // users may want to keep more log files instead of unnecessary processed attestations log - // see https://github.com/ChainSafe/lodestar/pull/4032 - this.emitter.emit(routes.events.EventType.attestation, attestation); } catch (e) { // a block has a lot of attestations and it may has same error, we don't want to log all of them if (e instanceof ForkChoiceError && e.type.code === ForkChoiceErrorCode.INVALID_ATTESTATION) { @@ -370,14 +369,25 @@ export async function importBlock( } } - // Send block events + // Send block events, only for recent enough blocks - for (const voluntaryExit of block.message.body.voluntaryExits) { - this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit); - } - - for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) { - this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange); + if (this.clock.currentSlot - block.message.slot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) { + // NOTE: Skip looping if there are no listeners from the API + if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) { + for (const voluntaryExit of block.message.body.voluntaryExits) { + this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit); + } + } + if (this.emitter.listenerCount(routes.events.EventType.blsToExecutionChange)) { + for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) { + this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange); + } + } + if (this.emitter.listenerCount(routes.events.EventType.attestation)) { + for (const attestation of block.message.body.attestations) { + this.emitter.emit(routes.events.EventType.attestation, attestation); + } + } } // Register stat metrics about the block after importing it diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 06050cfa0ff5..4399bda0b004 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -3,6 +3,7 @@ import {BeaconConfig} from "@lodestar/config"; import {Logger, prettyBytes} from "@lodestar/utils"; import {Root, Slot, ssz} from "@lodestar/types"; import {ForkName, ForkSeq} from "@lodestar/params"; +import {routes} from "@lodestar/api"; import {Metrics} from "../../metrics/index.js"; import {OpSource} from "../../metrics/validatorMonitor.js"; import {IBeaconChain} from "../../chain/index.js"; @@ -177,6 +178,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH const blockInput = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, serializedData); await validateBeaconBlock(blockInput, topic.fork, peerIdStr, seenTimestampSec); handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); + + // Do not emit block on eventstream API, it will be emitted after successful import }, [GossipType.blob_sidecar]: async (_data, _topic, _peerIdStr, _seenTimestampSec) => { @@ -235,6 +238,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH ); } } + + chain.emitter.emit(routes.events.EventType.attestation, signedAggregateAndProof.message.aggregate); }, [GossipType.beacon_attestation]: async ({serializedData, msgSlot}, {subnet}, _peer, seenTimestampSec) => { @@ -278,6 +283,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH logger.debug("Error adding gossip unaggregated attestation to forkchoice", {subnet}, e as Error); } } + + chain.emitter.emit(routes.events.EventType.attestation, attestation); }, [GossipType.attester_slashing]: async ({serializedData}, topic) => { @@ -318,6 +325,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH } catch (e) { logger.error("Error adding voluntaryExit to pool", {}, e as Error); } + + chain.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit); }, [GossipType.sync_committee_contribution_and_proof]: async ({serializedData}, topic) => { @@ -340,6 +349,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH } catch (e) { logger.error("Error adding to contributionAndProof pool", {}, e as Error); } + + chain.emitter.emit(routes.events.EventType.contributionAndProof, contributionAndProof); }, [GossipType.sync_committee]: async ({serializedData}, topic) => { @@ -386,6 +397,8 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH } catch (e) { logger.error("Error adding blsToExecutionChange to pool", {}, e as Error); } + + chain.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange); }, }; }