From f43984eee667699400fc1eeb4f6365fe80c1644d Mon Sep 17 00:00:00 2001 From: harkamal Date: Mon, 6 Nov 2023 00:58:36 +0530 Subject: [PATCH] feat: optimistically verify blocks even before all blobs available cleanup pr and add metrics to track simplify improvements and type fixes increase bucket precision time fixes improve metrics collection improve metrics collection some comments improv fix the missing writing blobs for blobspromise rebase fixes rebase fixes remove artifact apply feedback add more meta info to error separate out the blockinput cache and attach to chain rename the cache apply more feedback add unittest for seengossipblockinput add comments --- .../src/api/impl/beacon/blocks/index.ts | 21 +-- .../beacon-node/src/chain/blocks/index.ts | 15 +- .../beacon-node/src/chain/blocks/types.ts | 155 ++++------------ .../src/chain/blocks/verifyBlock.ts | 52 +++++- .../blocks/verifyBlocksDataAvailability.ts | 126 +++++++++++++ .../blocks/verifyBlocksExecutionPayloads.ts | 7 +- .../chain/blocks/verifyBlocksSanityChecks.ts | 46 +---- .../chain/blocks/verifyBlocksSignatures.ts | 17 +- .../blocks/verifyBlocksStateTransitionOnly.ts | 9 +- .../src/chain/blocks/writeBlockInputToDb.ts | 11 +- packages/beacon-node/src/chain/chain.ts | 2 + .../src/chain/errors/blockError.ts | 5 +- packages/beacon-node/src/chain/interface.ts | 2 + .../beacon-node/src/chain/seenCache/index.ts | 1 + .../chain/seenCache/seenGossipBlockInput.ts | 170 ++++++++++++++++++ .../src/metrics/metrics/lodestar.ts | 27 ++- .../src/network/processor/gossipHandlers.ts | 43 +++-- .../seenCache/seenGossipBlockInput.test.ts | 158 ++++++++++++++++ 18 files changed, 635 insertions(+), 232 deletions(-) create mode 100644 packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts create mode 100644 packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts create mode 100644 packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts diff --git a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts index c54e040ceb0..ef24724a34b 100644 --- a/packages/beacon-node/src/api/impl/beacon/blocks/index.ts +++ b/packages/beacon-node/src/api/impl/beacon/blocks/index.ts @@ -4,7 +4,6 @@ import { computeTimeAtSlot, parseSignedBlindedBlockOrContents, reconstructFullBlockOrContents, - DataAvailableStatus, } from "@lodestar/state-transition"; import {SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params"; import {sleep, toHex} from "@lodestar/utils"; @@ -121,19 +120,13 @@ export function getBeaconBlockApi({ } try { - await verifyBlocksInEpoch.call( - chain as BeaconChain, - parentBlock, - [blockForImport], - [DataAvailableStatus.available], - { - ...opts, - verifyOnly: true, - skipVerifyBlockSignatures: true, - skipVerifyExecutionPayload: true, - seenTimestampSec, - } - ); + await verifyBlocksInEpoch.call(chain as BeaconChain, parentBlock, [blockForImport], { + ...opts, + verifyOnly: true, + skipVerifyBlockSignatures: true, + skipVerifyExecutionPayload: true, + seenTimestampSec, + }); } catch (error) { chain.logger.error("Consensus checks failed while publishing the block", valLogMeta, error as Error); chain.persistInvalidSszValue( diff --git a/packages/beacon-node/src/chain/blocks/index.ts b/packages/beacon-node/src/chain/blocks/index.ts index 569fd077102..8f4c7fa5f0f 100644 --- a/packages/beacon-node/src/chain/blocks/index.ts +++ b/packages/beacon-node/src/chain/blocks/index.ts @@ -58,11 +58,7 @@ export async function processBlocks( } try { - const {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock} = verifyBlocksSanityChecks( - this, - blocks, - opts - ); + const {relevantBlocks, parentSlots, parentBlock} = verifyBlocksSanityChecks(this, blocks, opts); // No relevant blocks, skip verifyBlocksInEpoch() if (relevantBlocks.length === 0 || parentBlock === null) { @@ -72,13 +68,8 @@ export async function processBlocks( // Fully verify a block to be imported immediately after. Does not produce any side-effects besides adding intermediate // states in the state cache through regen. - const {postStates, proposerBalanceDeltas, segmentExecStatus} = await verifyBlocksInEpoch.call( - this, - parentBlock, - relevantBlocks, - dataAvailabilityStatuses, - opts - ); + const {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus} = + await verifyBlocksInEpoch.call(this, parentBlock, relevantBlocks, opts); // If segmentExecStatus has lvhForkchoice then, the entire segment should be invalid // and we need to further propagate diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index 5f1ac883357..aff5a64c992 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -1,14 +1,13 @@ -import {toHexString} from "@chainsafe/ssz"; import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition"; import {MaybeValidExecutionStatus} from "@lodestar/fork-choice"; -import {allForks, deneb, Slot, RootHex} from "@lodestar/types"; +import {allForks, deneb, Slot} from "@lodestar/types"; import {ForkSeq, MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS} from "@lodestar/params"; import {ChainForkConfig} from "@lodestar/config"; -import {pruneSetToMax} from "@lodestar/utils"; export enum BlockInputType { preDeneb = "preDeneb", postDeneb = "postDeneb", + blobsPromise = "blobsPromise", } /** Enum to represent where blocks come from */ @@ -19,9 +18,18 @@ export enum BlockSource { byRoot = "req_resp_by_root", } +export enum GossipedInputType { + block = "block", + blob = "blob", +} + +export type BlobsCache = Map; +export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}; + export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & ( | {type: BlockInputType.preDeneb} - | {type: BlockInputType.postDeneb; blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]} + | ({type: BlockInputType.postDeneb} & BlockInputBlobs) + | {type: BlockInputType.blobsPromise; blobsCache: BlobsCache; availabilityPromise: Promise} ); export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean { @@ -32,125 +40,7 @@ export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clo ); } -export enum GossipedInputType { - block = "block", - blob = "blob", -} -type GossipedBlockInput = - | {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null} - | {type: GossipedInputType.blob; signedBlob: deneb.SignedBlobSidecar; blobBytes: Uint8Array | null}; -type BlockInputCacheType = { - block?: allForks.SignedBeaconBlock; - blockBytes?: Uint8Array | null; - blobs: Map; - blobsBytes: Map; -}; - -const MAX_GOSSIPINPUT_CACHE = 5; -// ssz.deneb.BlobSidecars.elementType.fixedSize; -const BLOBSIDECAR_FIXED_SIZE = 131256; - export const getBlockInput = { - blockInputCache: new Map(), - - getGossipBlockInput( - config: ChainForkConfig, - gossipedInput: GossipedBlockInput - ): - | {blockInput: BlockInput; blockInputMeta: {pending: null; haveBlobs: number; expectedBlobs: number}} - | {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} - | {blockInput: null; blockInputMeta: {pending: GossipedInputType.blob; haveBlobs: number; expectedBlobs: number}} { - let blockHex; - let blockCache; - - if (gossipedInput.type === GossipedInputType.block) { - const {signedBlock, blockBytes} = gossipedInput; - - blockHex = toHexString( - config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message) - ); - blockCache = this.blockInputCache.get(blockHex) ?? { - blobs: new Map(), - blobsBytes: new Map(), - }; - - blockCache.block = signedBlock; - blockCache.blockBytes = blockBytes; - } else { - const {signedBlob, blobBytes} = gossipedInput; - blockHex = toHexString(signedBlob.message.blockRoot); - blockCache = this.blockInputCache.get(blockHex); - - // If a new entry is going to be inserted, prune out old ones - if (blockCache === undefined) { - pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE); - blockCache = {blobs: new Map(), blobsBytes: new Map()}; - } - - // TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions - blockCache.blobs.set(signedBlob.message.index, signedBlob.message); - // easily splice out the unsigned message as blob is a fixed length type - blockCache.blobsBytes.set(signedBlob.message.index, blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null); - } - - this.blockInputCache.set(blockHex, blockCache); - const {block: signedBlock, blockBytes} = blockCache; - - if (signedBlock !== undefined) { - // block is available, check if all blobs have shown up - const {slot, body} = signedBlock.message; - const {blobKzgCommitments} = body as deneb.BeaconBlockBody; - const blockInfo = `blockHex=${blockHex}, slot=${slot}`; - - if (blobKzgCommitments.length < blockCache.blobs.size) { - throw Error( - `Received more blobs=${blockCache.blobs.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}` - ); - } - if (blobKzgCommitments.length === blockCache.blobs.size) { - const blobSidecars = []; - const blobsBytes = []; - - for (let index = 0; index < blobKzgCommitments.length; index++) { - const blobSidecar = blockCache.blobs.get(index); - if (blobSidecar === undefined) { - throw Error(`Missing blobSidecar at index=${index} for ${blockInfo}`); - } - blobSidecars.push(blobSidecar); - blobsBytes.push(blockCache.blobsBytes.get(index) ?? null); - } - - return { - // TODO freetheblobs: collate and add serialized data for the postDeneb blockinput - blockInput: getBlockInput.postDeneb( - config, - signedBlock, - BlockSource.gossip, - blobSidecars, - blockBytes ?? null, - blobsBytes - ), - blockInputMeta: {pending: null, haveBlobs: blockCache.blobs.size, expectedBlobs: blobKzgCommitments.length}, - }; - } else { - return { - blockInput: null, - blockInputMeta: { - pending: GossipedInputType.blob, - haveBlobs: blockCache.blobs.size, - expectedBlobs: blobKzgCommitments.length, - }, - }; - } - } else { - // will need to wait for the block to showup - return { - blockInput: null, - blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blockCache.blobs.size, expectedBlobs: null}, - }; - } - }, - preDeneb( config: ChainForkConfig, block: allForks.SignedBeaconBlock, @@ -188,6 +78,27 @@ export const getBlockInput = { blobsBytes, }; }, + + blobsPromise( + config: ChainForkConfig, + block: allForks.SignedBeaconBlock, + source: BlockSource, + blobsCache: BlobsCache, + blockBytes: Uint8Array | null, + availabilityPromise: Promise + ): BlockInput { + if (config.getForkSeq(block.message.slot) < ForkSeq.deneb) { + throw Error(`Pre Deneb block slot ${block.message.slot}`); + } + return { + type: BlockInputType.blobsPromise, + block, + source, + blobsCache, + blockBytes, + availabilityPromise, + }; + }, }; export enum AttestationImportOpt { diff --git a/packages/beacon-node/src/chain/blocks/verifyBlock.ts b/packages/beacon-node/src/chain/blocks/verifyBlock.ts index 72db1d801b4..94a42a39a6a 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlock.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlock.ts @@ -5,7 +5,7 @@ import { isStateValidatorsNodesPopulated, DataAvailableStatus, } from "@lodestar/state-transition"; -import {bellatrix} from "@lodestar/types"; +import {bellatrix, deneb} from "@lodestar/types"; import {ForkName} from "@lodestar/params"; import {ProtoBlock, ExecutionStatus} from "@lodestar/fork-choice"; import {ChainForkConfig} from "@lodestar/config"; @@ -14,13 +14,14 @@ import {BlockError, BlockErrorCode} from "../errors/index.js"; import {BlockProcessOpts} from "../options.js"; import {RegenCaller} from "../regen/index.js"; import type {BeaconChain} from "../chain.js"; -import {BlockInput, ImportBlockOpts} from "./types.js"; +import {BlockInput, ImportBlockOpts, BlockInputType} from "./types.js"; import {POS_PANDA_MERGE_TRANSITION_BANNER} from "./utils/pandaMergeTransitionBanner.js"; import {CAPELLA_OWL_BANNER} from "./utils/ownBanner.js"; import {DENEB_BLOWFISH_BANNER} from "./utils/blowfishBanner.js"; import {verifyBlocksStateTransitionOnly} from "./verifyBlocksStateTransitionOnly.js"; import {verifyBlocksSignatures} from "./verifyBlocksSignatures.js"; import {verifyBlocksExecutionPayload, SegmentExecStatus} from "./verifyBlocksExecutionPayloads.js"; +import {verifyBlocksDataAvailability} from "./verifyBlocksDataAvailability.js"; import {writeBlockInputToDb} from "./writeBlockInputToDb.js"; /** @@ -38,12 +39,12 @@ export async function verifyBlocksInEpoch( this: BeaconChain, parentBlock: ProtoBlock, blocksInput: BlockInput[], - dataAvailabilityStatuses: DataAvailableStatus[], opts: BlockProcessOpts & ImportBlockOpts ): Promise<{ postStates: CachedBeaconStateAllForks[]; proposerBalanceDeltas: number[]; segmentExecStatus: SegmentExecStatus; + dataAvailabilityStatuses: DataAvailableStatus[]; }> { const blocks = blocksInput.map(({block}) => block); if (blocks.length === 0) { @@ -88,7 +89,12 @@ export async function verifyBlocksInEpoch( try { // batch all I/O operations to reduce overhead - const [segmentExecStatus, {postStates, proposerBalanceDeltas}] = await Promise.all([ + const [ + segmentExecStatus, + {dataAvailabilityStatuses, availableTime}, + {postStates, proposerBalanceDeltas, verifyStateTime}, + {verifySignaturesTime}, + ] = await Promise.all([ // Execution payloads opts.skipVerifyExecutionPayload !== true ? verifyBlocksExecutionPayload(this, parentBlock, blocks, preState0, abortController.signal, opts) @@ -98,12 +104,16 @@ export async function verifyBlocksInEpoch( mergeBlockFound: null, } as SegmentExecStatus), + // data availability for the blobs + verifyBlocksDataAvailability(this, blocksInput, opts), + // Run state transition only // TODO: Ensure it yields to allow flushing to workers and engine API verifyBlocksStateTransitionOnly( preState0, blocksInput, - dataAvailabilityStatuses, + // hack availability for state transition eval as availability is separately determined + blocks.map(() => DataAvailableStatus.available), this.logger, this.metrics, abortController.signal, @@ -113,7 +123,7 @@ export async function verifyBlocksInEpoch( // All signatures at once opts.skipVerifyBlockSignatures !== true ? verifyBlocksSignatures(this.bls, this.logger, this.metrics, preState0, blocks, opts) - : Promise.resolve(), + : Promise.resolve({verifySignaturesTime: Date.now()}), // ideally we want to only persist blocks after verifying them however the reality is there are // rarely invalid blocks we'll batch all I/O operation here to reduce the overhead if there's @@ -151,7 +161,35 @@ export async function verifyBlocksInEpoch( } } - return {postStates, proposerBalanceDeltas, segmentExecStatus}; + if (segmentExecStatus.execAborted === null) { + const {executionStatuses, executionTime} = segmentExecStatus; + if ( + blocksInput.length === 1 && + // gossip blocks have seenTimestampSec + opts.seenTimestampSec !== undefined && + blocksInput[0].type !== BlockInputType.preDeneb && + executionStatuses[0] === ExecutionStatus.Valid + ) { + // Find the max time when the block was actually verified + const fullyVerifiedTime = Math.max(executionTime, verifyStateTime, verifySignaturesTime); + const recvTofullyVerifedTime = fullyVerifiedTime / 1000 - opts.seenTimestampSec; + this.metrics?.gossipBlock.receivedToFullyVerifiedTime.observe(recvTofullyVerifedTime); + + const verifiedToBlobsAvailabiltyTime = Math.max(availableTime - fullyVerifiedTime, 0) / 1000; + const numBlobs = (blocksInput[0].block as deneb.SignedBeaconBlock).message.body.blobKzgCommitments.length; + + this.metrics?.gossipBlock.verifiedToBlobsAvailabiltyTime.observe({numBlobs}, verifiedToBlobsAvailabiltyTime); + this.logger.verbose("Verified blockInput fully with blobs availability", { + slot: blocksInput[0].block.message.slot, + recvTofullyVerifedTime, + verifiedToBlobsAvailabiltyTime, + type: blocksInput[0].type, + numBlobs, + }); + } + } + + return {postStates, dataAvailabilityStatuses, proposerBalanceDeltas, segmentExecStatus}; } finally { abortController.abort(); } diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts new file mode 100644 index 00000000000..9c45469d56d --- /dev/null +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksDataAvailability.ts @@ -0,0 +1,126 @@ +import {computeTimeAtSlot, DataAvailableStatus} from "@lodestar/state-transition"; +import {ChainForkConfig} from "@lodestar/config"; +import {deneb, UintNum64} from "@lodestar/types"; +import {Logger} from "@lodestar/utils"; +import {BlockError, BlockErrorCode} from "../errors/index.js"; +import {validateBlobSidecars} from "../validation/blobSidecar.js"; +import {Metrics} from "../../metrics/metrics.js"; +import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation} from "./types.js"; + +// proposer boost is not available post 3 sec so try pulling using unknown block hash +// post 3 sec after throwing the availability error +const BLOB_AVAILABILITY_TIMEOUT = 3_000; + +/** + * Verifies some early cheap sanity checks on the block before running the full state transition. + * + * - Parent is known to the fork-choice + * - Check skipped slots limit + * - check_block_relevancy() + * - Block not in the future + * - Not genesis block + * - Block's slot is < Infinity + * - Not finalized slot + * - Not already known + */ +export async function verifyBlocksDataAvailability( + chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger; metrics: Metrics | null}, + blocks: BlockInput[], + opts: ImportBlockOpts +): Promise<{dataAvailabilityStatuses: DataAvailableStatus[]; availableTime: number}> { + if (blocks.length === 0) { + throw Error("Empty partiallyVerifiedBlocks"); + } + + const dataAvailabilityStatuses: DataAvailableStatus[] = []; + const seenTime = opts.seenTimestampSec !== undefined ? opts.seenTimestampSec * 1000 : Date.now(); + + for (const blockInput of blocks) { + // Validate status of only not yet finalized blocks, we don't need yet to propogate the status + // as it is not used upstream anywhere + const dataAvailabilityStatus = await maybeValidateBlobs(chain, blockInput, opts); + dataAvailabilityStatuses.push(dataAvailabilityStatus); + } + + const availableTime = blocks[blocks.length - 1].type === BlockInputType.blobsPromise ? Date.now() : seenTime; + if (blocks.length === 1 && opts.seenTimestampSec !== undefined && blocks[0].type !== BlockInputType.preDeneb) { + const recvToAvailableTime = availableTime / 1000 - opts.seenTimestampSec; + const numBlobs = (blocks[0].block as deneb.SignedBeaconBlock).message.body.blobKzgCommitments.length; + + chain.metrics?.gossipBlock.receivedToBlobsAvailabilityTime.observe({numBlobs}, recvToAvailableTime); + chain.logger.verbose("Verified blobs availability", { + slot: blocks[0].block.message.slot, + recvToAvailableTime, + type: blocks[0].type, + }); + } + + return {dataAvailabilityStatuses, availableTime}; +} + +async function maybeValidateBlobs( + chain: {config: ChainForkConfig; genesisTime: UintNum64}, + blockInput: BlockInput, + opts: ImportBlockOpts +): Promise { + switch (blockInput.type) { + case BlockInputType.preDeneb: + return DataAvailableStatus.preDeneb; + + case BlockInputType.postDeneb: + if (opts.validBlobSidecars === BlobSidecarValidation.Full) { + return DataAvailableStatus.available; + } + + // eslint-disable-next-line no-fallthrough + case BlockInputType.blobsPromise: { + // run full validation + const {block} = blockInput; + const blockSlot = block.message.slot; + + const blobsData = + blockInput.type === BlockInputType.postDeneb + ? blockInput + : await raceWithCutoff(chain, blockInput, blockInput.availabilityPromise); + const {blobs} = blobsData; + + const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body; + const beaconBlockRoot = chain.config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message); + + // if the blob siddecars have been individually verified then we can skip kzg proof check + // but other checks to match blobs with block data still need to be performed + const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual; + validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck}); + + return DataAvailableStatus.available; + } + } +} + +/** + * Wait for blobs to become available with a cutoff time. If fails then throw DATA_UNAVAILABLE error + * which may try unknownblock/blobs fill (by root). + */ +async function raceWithCutoff( + chain: {config: ChainForkConfig; genesisTime: UintNum64}, + blockInput: BlockInput, + availabilityPromise: Promise +): Promise { + const {block} = blockInput; + const blockSlot = block.message.slot; + + const cutoffTime = Math.max( + computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + BLOB_AVAILABILITY_TIMEOUT - Date.now(), + 0 + ); + const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTime)); + + try { + await Promise.race([availabilityPromise, cutoffTimeout]); + } catch (e) { + // throw unavailable so that the unknownblock/blobs can be triggered to pull the block + throw new BlockError(block, {code: BlockErrorCode.DATA_UNAVAILABLE}); + } + // we can only be here if availabilityPromise has resolved else an error will be thrown + return availabilityPromise; +} diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksExecutionPayloads.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksExecutionPayloads.ts index 7f4edd14c61..01d04e9a876 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksExecutionPayloads.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksExecutionPayloads.ts @@ -45,6 +45,7 @@ export type SegmentExecStatus = | { execAborted: null; executionStatuses: MaybeValidExecutionStatus[]; + executionTime: number; mergeBlockFound: bellatrix.BeaconBlock | null; } | {execAborted: ExecAbortType; invalidSegmentLVH?: LVHInvalidResponse; mergeBlockFound: null}; @@ -243,8 +244,9 @@ export async function verifyBlocksExecutionPayload( } } - if (blocks.length === 1 && opts.seenTimestampSec !== undefined) { - const recvToVerifiedExecPayload = Date.now() / 1000 - opts.seenTimestampSec; + const executionTime = Date.now(); + if (blocks.length === 1 && opts.seenTimestampSec !== undefined && executionStatuses[0] === ExecutionStatus.Valid) { + const recvToVerifiedExecPayload = executionTime / 1000 - opts.seenTimestampSec; chain.metrics?.gossipBlock.receivedToExecutionPayloadVerification.observe(recvToVerifiedExecPayload); chain.logger.verbose("Verified execution payload", { slot: blocks[0].message.slot, @@ -255,6 +257,7 @@ export async function verifyBlocksExecutionPayload( return { execAborted: null, executionStatuses, + executionTime, mergeBlockFound, }; } diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts index 9fb7d04f1ed..e62355a4889 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksSanityChecks.ts @@ -1,12 +1,11 @@ -import {computeStartSlotAtEpoch, DataAvailableStatus} from "@lodestar/state-transition"; +import {computeStartSlotAtEpoch} from "@lodestar/state-transition"; import {ChainForkConfig} from "@lodestar/config"; import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice"; -import {Slot, deneb} from "@lodestar/types"; +import {Slot} from "@lodestar/types"; import {toHexString} from "@lodestar/utils"; import {IClock} from "../../util/clock.js"; import {BlockError, BlockErrorCode} from "../errors/index.js"; -import {validateBlobSidecars} from "../validation/blobSidecar.js"; -import {BlockInput, BlockInputType, ImportBlockOpts, BlobSidecarValidation} from "./types.js"; +import {BlockInput, ImportBlockOpts} from "./types.js"; /** * Verifies some early cheap sanity checks on the block before running the full state transition. @@ -26,7 +25,6 @@ export function verifyBlocksSanityChecks( opts: ImportBlockOpts ): { relevantBlocks: BlockInput[]; - dataAvailabilityStatuses: DataAvailableStatus[]; parentSlots: Slot[]; parentBlock: ProtoBlock | null; } { @@ -35,7 +33,6 @@ export function verifyBlocksSanityChecks( } const relevantBlocks: BlockInput[] = []; - const dataAvailabilityStatuses: DataAvailableStatus[] = []; const parentSlots: Slot[] = []; let parentBlock: ProtoBlock | null = null; @@ -64,10 +61,6 @@ export function verifyBlocksSanityChecks( } } - // Validate status of only not yet finalized blocks, we don't need yet to propogate the status - // as it is not used upstream anywhere - const dataAvailabilityStatus = maybeValidateBlobs(chain.config, blockInput, opts); - let parentBlockSlot: Slot; if (relevantBlocks.length > 0) { @@ -105,7 +98,6 @@ export function verifyBlocksSanityChecks( // Block is relevant relevantBlocks.push(blockInput); - dataAvailabilityStatuses.push(dataAvailabilityStatus); parentSlots.push(parentBlockSlot); } @@ -115,35 +107,5 @@ export function verifyBlocksSanityChecks( throw Error(`Internal error, parentBlock should not be null for relevantBlocks=${relevantBlocks.length}`); } - return {relevantBlocks, dataAvailabilityStatuses, parentSlots, parentBlock}; -} - -function maybeValidateBlobs( - config: ChainForkConfig, - blockInput: BlockInput, - opts: ImportBlockOpts -): DataAvailableStatus { - switch (blockInput.type) { - case BlockInputType.postDeneb: { - if (opts.validBlobSidecars === BlobSidecarValidation.Full) { - return DataAvailableStatus.available; - } - - // run full validation - const {block, blobs} = blockInput; - const blockSlot = block.message.slot; - const {blobKzgCommitments} = (block as deneb.SignedBeaconBlock).message.body; - const beaconBlockRoot = config.getForkTypes(blockSlot).BeaconBlock.hashTreeRoot(block.message); - - // if the blob siddecars have been individually verified then we can skip kzg proof check - // but other checks to match blobs with block data still need to be performed - const skipProofsCheck = opts.validBlobSidecars === BlobSidecarValidation.Individual; - validateBlobSidecars(blockSlot, beaconBlockRoot, blobKzgCommitments, blobs, {skipProofsCheck}); - - return DataAvailableStatus.available; - } - - case BlockInputType.preDeneb: - return DataAvailableStatus.preDeneb; - } + return {relevantBlocks, parentSlots, parentBlock}; } diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksSignatures.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksSignatures.ts index fbbef969b69..14ad46a35c1 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksSignatures.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksSignatures.ts @@ -20,7 +20,7 @@ export async function verifyBlocksSignatures( preState0: CachedBeaconStateAllForks, blocks: allForks.SignedBeaconBlock[], opts: ImportBlockOpts -): Promise { +): Promise<{verifySignaturesTime: number}> { const isValidPromises: Promise[] = []; // Verifies signatures after running state transition, so all SyncCommittee signed roots are known at this point. @@ -46,17 +46,20 @@ export async function verifyBlocksSignatures( } } - if (blocks.length === 1 && opts.seenTimestampSec !== undefined) { - const recvToSigVer = Date.now() / 1000 - opts.seenTimestampSec; - metrics?.gossipBlock.receivedToSignaturesVerification.observe(recvToSigVer); - logger.verbose("Verified block signatures", {slot: blocks[0].message.slot, recvToSigVer}); - } - // `rejectFirstInvalidResolveAllValid()` returns on isValid result with its index const res = await rejectFirstInvalidResolveAllValid(isValidPromises); if (!res.allValid) { throw new BlockError(blocks[res.index], {code: BlockErrorCode.INVALID_SIGNATURE, state: preState0}); } + + const verifySignaturesTime = Date.now(); + if (blocks.length === 1 && opts.seenTimestampSec !== undefined) { + const recvToSigVer = verifySignaturesTime / 1000 - opts.seenTimestampSec; + metrics?.gossipBlock.receivedToSignaturesVerification.observe(recvToSigVer); + logger.verbose("Verified block signatures", {slot: blocks[0].message.slot, recvToSigVer}); + } + + return {verifySignaturesTime}; } type AllValidRes = {allValid: true} | {allValid: false; index: number}; diff --git a/packages/beacon-node/src/chain/blocks/verifyBlocksStateTransitionOnly.ts b/packages/beacon-node/src/chain/blocks/verifyBlocksStateTransitionOnly.ts index 709ad0c02b2..786d426d70e 100644 --- a/packages/beacon-node/src/chain/blocks/verifyBlocksStateTransitionOnly.ts +++ b/packages/beacon-node/src/chain/blocks/verifyBlocksStateTransitionOnly.ts @@ -27,7 +27,7 @@ export async function verifyBlocksStateTransitionOnly( metrics: Metrics | null, signal: AbortSignal, opts: BlockProcessOpts & ImportBlockOpts -): Promise<{postStates: CachedBeaconStateAllForks[]; proposerBalanceDeltas: number[]}> { +): Promise<{postStates: CachedBeaconStateAllForks[]; proposerBalanceDeltas: number[]; verifyStateTime: number}> { const postStates: CachedBeaconStateAllForks[] = []; const proposerBalanceDeltas: number[] = []; @@ -90,12 +90,13 @@ export async function verifyBlocksStateTransitionOnly( } } + const verifyStateTime = Date.now(); if (blocks.length === 1 && opts.seenTimestampSec !== undefined) { const slot = blocks[0].block.message.slot; - const recvToTransition = Date.now() / 1000 - opts.seenTimestampSec; + const recvToTransition = verifyStateTime / 1000 - opts.seenTimestampSec; metrics?.gossipBlock.receivedToStateTransition.observe(recvToTransition); - logger.verbose("Transitioned gossip block", {slot, recvToTransition}); + logger.verbose("Verified block state transition", {slot, recvToTransition}); } - return {postStates, proposerBalanceDeltas}; + return {postStates, proposerBalanceDeltas, verifyStateTime}; } diff --git a/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts b/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts index 0603ed7e7f7..0b94d32b84e 100644 --- a/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts +++ b/packages/beacon-node/src/chain/blocks/writeBlockInputToDb.ts @@ -13,7 +13,7 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockI const fnPromises: Promise[] = []; for (const blockInput of blocksInput) { - const {block, blockBytes, type} = blockInput; + const {block, blockBytes} = blockInput; const blockRoot = this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message); const blockRootHex = toHex(blockRoot); if (blockBytes) { @@ -29,8 +29,13 @@ export async function writeBlockInputToDb(this: BeaconChain, blocksInput: BlockI root: blockRootHex, }); - if (type === BlockInputType.postDeneb) { - const {blobs: blobSidecars} = blockInput; + if (blockInput.type === BlockInputType.postDeneb || blockInput.type === BlockInputType.blobsPromise) { + const blobSidecars = + blockInput.type == BlockInputType.postDeneb + ? blockInput.blobs + : // At this point of import blobs are available and can be safely awaited + (await blockInput.availabilityPromise).blobs; + // NOTE: Old blobs are pruned on archive fnPromises.push(this.db.blobSidecars.add({blockRoot, slot: block.message.slot, blobSidecars})); this.logger.debug("Persisted blobSidecars to hot DB", { diff --git a/packages/beacon-node/src/chain/chain.ts b/packages/beacon-node/src/chain/chain.ts index 45cda3d94bc..e093b424082 100644 --- a/packages/beacon-node/src/chain/chain.ts +++ b/packages/beacon-node/src/chain/chain.ts @@ -79,6 +79,7 @@ import {BlockInput} from "./blocks/types.js"; import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js"; import {ShufflingCache} from "./shufflingCache.js"; import {StateContextCache} from "./stateCache/stateContextCache.js"; +import {SeenGossipBlockInput} from "./seenCache/index.js"; import {CheckpointStateCache} from "./stateCache/stateContextCheckpointsCache.js"; /** @@ -125,6 +126,7 @@ export class BeaconChain implements IBeaconChain { readonly seenSyncCommitteeMessages = new SeenSyncCommitteeMessages(); readonly seenContributionAndProof: SeenContributionAndProof; readonly seenAttestationDatas: SeenAttestationDatas; + readonly seenGossipBlockInput = new SeenGossipBlockInput(); // Seen cache for liveness checks readonly seenBlockAttesters = new SeenBlockAttesters(); diff --git a/packages/beacon-node/src/chain/errors/blockError.ts b/packages/beacon-node/src/chain/errors/blockError.ts index ee06927a4fc..6ab15275934 100644 --- a/packages/beacon-node/src/chain/errors/blockError.ts +++ b/packages/beacon-node/src/chain/errors/blockError.ts @@ -63,6 +63,8 @@ export enum BlockErrorCode { /** The attestation head block is too far behind the attestation slot, causing many skip slots. This is deemed a DoS risk */ TOO_MANY_SKIPPED_SLOTS = "TOO_MANY_SKIPPED_SLOTS", + /** The blobs are unavailable */ + DATA_UNAVAILABLE = "BLOCK_ERROR_DATA_UNAVAILABLE", } type ExecutionErrorStatus = Exclude< @@ -103,7 +105,8 @@ export type BlockErrorType = | {code: BlockErrorCode.TOO_MUCH_GAS_USED; gasUsed: number; gasLimit: number} | {code: BlockErrorCode.SAME_PARENT_HASH; blockHash: RootHex} | {code: BlockErrorCode.TRANSACTIONS_TOO_BIG; size: number; max: number} - | {code: BlockErrorCode.EXECUTION_ENGINE_ERROR; execStatus: ExecutionErrorStatus; errorMessage: string}; + | {code: BlockErrorCode.EXECUTION_ENGINE_ERROR; execStatus: ExecutionErrorStatus; errorMessage: string} + | {code: BlockErrorCode.DATA_UNAVAILABLE}; export class BlockGossipError extends GossipActionError {} diff --git a/packages/beacon-node/src/chain/interface.ts b/packages/beacon-node/src/chain/interface.ts index 62355e334f6..a209ec36276 100644 --- a/packages/beacon-node/src/chain/interface.ts +++ b/packages/beacon-node/src/chain/interface.ts @@ -49,6 +49,7 @@ import {CheckpointBalancesCache} from "./balancesCache.js"; import {IChainOptions} from "./options.js"; import {AssembledBlockType, BlockAttributes, BlockType} from "./produceBlock/produceBlockBody.js"; import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js"; +import {SeenGossipBlockInput} from "./seenCache/index.js"; import {ShufflingCache} from "./shufflingCache.js"; export {BlockType, type AssembledBlockType}; @@ -102,6 +103,7 @@ export interface IBeaconChain { readonly seenSyncCommitteeMessages: SeenSyncCommitteeMessages; readonly seenContributionAndProof: SeenContributionAndProof; readonly seenAttestationDatas: SeenAttestationDatas; + readonly seenGossipBlockInput: SeenGossipBlockInput; // Seen cache for liveness checks readonly seenBlockAttesters: SeenBlockAttesters; diff --git a/packages/beacon-node/src/chain/seenCache/index.ts b/packages/beacon-node/src/chain/seenCache/index.ts index f354a37f93e..250e6581c31 100644 --- a/packages/beacon-node/src/chain/seenCache/index.ts +++ b/packages/beacon-node/src/chain/seenCache/index.ts @@ -2,3 +2,4 @@ export {SeenAggregators, SeenAttesters} from "./seenAttesters.js"; export {SeenBlockProposers} from "./seenBlockProposers.js"; export {SeenSyncCommitteeMessages} from "./seenCommittee.js"; export {SeenContributionAndProof} from "./seenCommitteeContribution.js"; +export {SeenGossipBlockInput} from "./seenGossipBlockInput.js"; diff --git a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts new file mode 100644 index 00000000000..1257feb844b --- /dev/null +++ b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts @@ -0,0 +1,170 @@ +import {toHexString} from "@chainsafe/ssz"; +import {deneb, RootHex, ssz, allForks} from "@lodestar/types"; +import {ChainForkConfig} from "@lodestar/config"; +import {pruneSetToMax} from "@lodestar/utils"; + +import { + BlockInput, + getBlockInput, + BlockSource, + BlockInputBlobs, + BlobsCache, + GossipedInputType, +} from "../blocks/types.js"; + +export type GossipedBlockInput = + | {type: GossipedInputType.block; signedBlock: allForks.SignedBeaconBlock; blockBytes: Uint8Array | null} + | {type: GossipedInputType.blob; signedBlob: deneb.SignedBlobSidecar; blobBytes: Uint8Array | null}; + +export type BlockInputCacheType = { + block?: allForks.SignedBeaconBlock; + blockBytes?: Uint8Array | null; + blobsCache: BlobsCache; + // promise and its callback cached for delayed resolution + availabilityPromise: Promise; + resolveAvailability: (blobs: BlockInputBlobs) => void; +}; + +const MAX_GOSSIPINPUT_CACHE = 5; +// ssz.deneb.BlobSidecars.elementType.fixedSize, 131256 is size for mainnet preset; +const BLOBSIDECAR_FIXED_SIZE = ssz.deneb.BlobSidecars.elementType.fixedSize ?? 131256; + +/** + * SeenGossipBlockInput tracks and caches the live blobs and blocks on the network to solve data availability + * for the blockInput. If no block has been seen yet for some already seen blobs, it responds will null, but + * on the first block or the consequent blobs it responds with blobs promise till all blobs become available. + * + * One can start processing block on blobs promise blockInput response and can await on the promise before + * fully importing the block. The blobs promise is gets resolved as soon as all blobs corresponding to that + * block are seen by SeenGossipBlockInput + */ +export class SeenGossipBlockInput { + private blockInputCache = new Map(); + + prune(): void { + pruneSetToMax(this.blockInputCache, MAX_GOSSIPINPUT_CACHE); + } + + getGossipBlockInput( + config: ChainForkConfig, + gossipedInput: GossipedBlockInput + ): + | { + blockInput: BlockInput; + blockInputMeta: {pending: GossipedInputType.blob | null; haveBlobs: number; expectedBlobs: number}; + } + | {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} { + let blockHex; + let blockCache; + + if (gossipedInput.type === GossipedInputType.block) { + const {signedBlock, blockBytes} = gossipedInput; + + blockHex = toHexString( + config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message) + ); + blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(); + + blockCache.block = signedBlock; + blockCache.blockBytes = blockBytes; + } else { + const {signedBlob, blobBytes} = gossipedInput; + blockHex = toHexString(signedBlob.message.blockRoot); + blockCache = this.blockInputCache.get(blockHex) ?? getEmptyBlockInputCacheEntry(); + + // TODO: freetheblobs check if its the same blob or a duplicate and throw/take actions + blockCache.blobsCache.set(signedBlob.message.index, { + blobSidecar: signedBlob.message, + // easily splice out the unsigned message as blob is a fixed length type + blobBytes: blobBytes?.slice(0, BLOBSIDECAR_FIXED_SIZE) ?? null, + }); + } + + if (!this.blockInputCache.has(blockHex)) { + this.blockInputCache.set(blockHex, blockCache); + } + const {block: signedBlock, blockBytes, blobsCache, availabilityPromise, resolveAvailability} = blockCache; + + if (signedBlock !== undefined) { + // block is available, check if all blobs have shown up + const {slot, body} = signedBlock.message; + const {blobKzgCommitments} = body as deneb.BeaconBlockBody; + const blockInfo = `blockHex=${blockHex}, slot=${slot}`; + + if (blobKzgCommitments.length < blobsCache.size) { + throw Error( + `Received more blobs=${blobsCache.size} than commitments=${blobKzgCommitments.length} for ${blockInfo}` + ); + } + + if (blobKzgCommitments.length === blobsCache.size) { + const allBlobs = getBlockInputBlobs(blobsCache); + resolveAvailability(allBlobs); + const {blobs, blobsBytes} = allBlobs; + return { + blockInput: getBlockInput.postDeneb( + config, + signedBlock, + BlockSource.gossip, + blobs, + blockBytes ?? null, + blobsBytes + ), + blockInputMeta: {pending: null, haveBlobs: blobs.length, expectedBlobs: blobKzgCommitments.length}, + }; + } else { + return { + blockInput: getBlockInput.blobsPromise( + config, + signedBlock, + BlockSource.gossip, + blobsCache, + blockBytes ?? null, + availabilityPromise + ), + blockInputMeta: { + pending: GossipedInputType.blob, + haveBlobs: blobsCache.size, + expectedBlobs: blobKzgCommitments.length, + }, + }; + } + } else { + // will need to wait for the block to showup + return { + blockInput: null, + blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null}, + }; + } + } +} + +function getEmptyBlockInputCacheEntry(): BlockInputCacheType { + // Capture both the promise and its callbacks. + // It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; + const availabilityPromise = new Promise((resolveCB) => { + resolveAvailability = resolveCB; + }); + if (resolveAvailability === null) { + throw Error("Promise Constructor was not executed immediately"); + } + const blobsCache = new Map(); + return {availabilityPromise, resolveAvailability, blobsCache}; +} + +function getBlockInputBlobs(blobsCache: BlobsCache): BlockInputBlobs { + const blobs = []; + const blobsBytes = []; + + for (let index = 0; index < blobsCache.size; index++) { + const blobCache = blobsCache.get(index); + if (blobCache === undefined) { + throw Error(`Missing blobSidecar at index=${index}`); + } + const {blobSidecar, blobBytes} = blobCache; + blobs.push(blobSidecar); + blobsBytes.push(blobBytes); + } + return {blobs, blobsBytes}; +} diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 8a22fe8f0a9..7d878b8e391 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -649,27 +649,44 @@ export function createLodestarMetrics( receivedToGossipValidate: register.histogram({ name: "lodestar_gossip_block_received_to_gossip_validate", help: "Time elapsed between block received and block validated", - buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4], + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], }), receivedToStateTransition: register.histogram({ name: "lodestar_gossip_block_received_to_state_transition", help: "Time elapsed between block received and block state transition", - buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4], + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], }), receivedToSignaturesVerification: register.histogram({ name: "lodestar_gossip_block_received_to_signatures_verification", help: "Time elapsed between block received and block signatures verification", - buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4], + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], }), receivedToExecutionPayloadVerification: register.histogram({ name: "lodestar_gossip_block_received_to_execution_payload_verification", help: "Time elapsed between block received and execution payload verification", - buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4], + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], + }), + receivedToBlobsAvailabilityTime: register.histogram<"numBlobs">({ + name: "lodestar_gossip_block_received_to_blobs_availability_time", + help: "Time elapsed between block received and blobs became available", + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], + labelNames: ["numBlobs"], + }), + receivedToFullyVerifiedTime: register.histogram({ + name: "lodestar_gossip_block_received_to_fully_verified_time", + help: "Time elapsed between block received and fully verified state, signatures and payload", + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], + }), + verifiedToBlobsAvailabiltyTime: register.histogram<"numBlobs">({ + name: "lodestar_gossip_block_verified_to_blobs_availability_time", + help: "Time elapsed between block verified and blobs became available", + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], + labelNames: ["numBlobs"], }), receivedToBlockImport: register.histogram({ name: "lodestar_gossip_block_received_to_block_import", help: "Time elapsed between block received and block import", - buckets: [0.05, 0.1, 0.2, 0.5, 1, 1.5, 2, 4], + buckets: [0.05, 0.1, 0.3, 0.5, 0.7, 1, 1.3, 1.6, 2, 2.5, 3, 3.5, 4], }), processBlockErrors: register.gauge<"error">({ name: "lodestar_gossip_block_process_block_errors", diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index 2e9ab3bb5a1..627533b54e1 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -116,7 +116,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler fork: ForkName, peerIdStr: string, seenTimestampSec: number - ): Promise { + ): Promise { const slot = signedBlock.message.slot; const forkTypes = config.getForkTypes(slot); const blockHex = prettyBytes(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message)); @@ -126,13 +126,21 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler let blockInput; let blockInputMeta; if (config.getForkSeq(signedBlock.message.slot) >= ForkSeq.deneb) { - const blockInputRes = getBlockInput.getGossipBlockInput(config, { + const blockInputRes = chain.seenGossipBlockInput.getGossipBlockInput(config, { type: GossipedInputType.block, signedBlock, blockBytes, }); + blockInput = blockInputRes.blockInput; blockInputMeta = blockInputRes.blockInputMeta; + + // blockInput can't be returned null, improve by enforcing via return types + if (blockInput === null) { + throw Error( + `Invalid null blockInput returned by getGossipBlockInput for type=${GossipedInputType.block} blockHex=${blockHex} slot=${slot}` + ); + } } else { blockInput = getBlockInput.preDeneb(config, signedBlock, BlockSource.gossip, blockBytes); blockInputMeta = {}; @@ -181,7 +189,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler const delaySec = chain.clock.secFromSlot(slot, seenTimestampSec); const recvToVal = Date.now() / 1000 - seenTimestampSec; - const {blockInput, blockInputMeta} = getBlockInput.getGossipBlockInput(config, { + const {blockInput, blockInputMeta} = chain.seenGossipBlockInput.getGossipBlockInput(config, { type: GossipedInputType.blob, signedBlob, blobBytes, @@ -250,10 +258,20 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler // Returns the delay between the start of `block.slot` and `current time` const delaySec = chain.clock.secFromSlot(signedBlock.message.slot); metrics?.gossipBlock.elapsedTimeTillProcessed.observe(delaySec); + chain.seenGossipBlockInput.prune(); }) .catch((e) => { if (e instanceof BlockError) { switch (e.type.code) { + case BlockErrorCode.DATA_UNAVAILABLE: { + // TODO: create a newevent unknownBlobs and only pull blobs + const slot = signedBlock.message.slot; + const forkTypes = config.getForkTypes(slot); + const rootHex = toHexString(forkTypes.BeaconBlock.hashTreeRoot(signedBlock.message)); + + events.emit(NetworkEvent.unknownBlock, {rootHex, peer: peerIdStr}); + break; + } // ALREADY_KNOWN should not happen with ignoreIfKnown=true above // PARENT_UNKNOWN should not happen, we handled this in validateBeaconBlock() function above case BlockErrorCode.ALREADY_KNOWN: @@ -268,6 +286,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler } metrics?.gossipBlock.processBlockErrors.inc({error: e instanceof BlockError ? e.type.code : "NOT_BLOCK_ERROR"}); logger.error("Error receiving block", {slot: signedBlock.message.slot, peer: peerIdStr}, e as Error); + chain.seenGossipBlockInput.prune(); }); } @@ -288,15 +307,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler peerIdStr, seenTimestampSec ); - if (blockInput !== null) { - handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); - } else { - // TODO DENEB: - // - // If block + blobs not fully received in the slot within some deadline, we should trigger block/blob - // pull using req/resp by root pre-emptively even though it will be trigged on seeing any block/blob - // gossip on next slot via missing parent checks - } + handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); }, [GossipType.blob_sidecar]: async ({ @@ -312,7 +323,13 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler } const blockInput = await validateBeaconBlob(signedBlob, serializedData, topic.index, peerIdStr, seenTimestampSec); if (blockInput !== null) { - handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); + // TODO DENEB: + // + // With blobsPromise the block import would have been attempted with the receipt of the block gossip + // and should have resolved the availability promise, however we could track if the block processing + // was halted and requeue it + // + // handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); } else { // TODO DENEB: // diff --git a/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts new file mode 100644 index 00000000000..9f1632102f7 --- /dev/null +++ b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts @@ -0,0 +1,158 @@ +import {describe, it, expect} from "vitest"; +import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lodestar/config"; +import {ssz} from "@lodestar/types"; + +import {SeenGossipBlockInput} from "../../../../src/chain/seenCache/seenGossipBlockInput.js"; +import {BlockInputType, GossipedInputType} from "../../../../src/chain/blocks/types.js"; + +/* eslint-disable @typescript-eslint/naming-convention */ +describe("SeenGossipBlockInput", () => { + const chainConfig = createChainForkConfig({ + ...defaultChainConfig, + ALTAIR_FORK_EPOCH: 0, + BELLATRIX_FORK_EPOCH: 0, + DENEB_FORK_EPOCH: 0, + }); + const genesisValidatorsRoot = Buffer.alloc(32, 0xaa); + const config = createBeaconConfig(chainConfig, genesisValidatorsRoot); + const seenGossipBlockInput = new SeenGossipBlockInput(); + + // array of numBlobs, events where events are array of + // [block|blob11|blob2, pd | bp | null | error string reflecting the expected result] + const testCases: [string, number, [string, string | null][]][] = [ + ["no blobs", 0, [["block", "pd"]]], + [ + "1 blob, block first", + 1, + [ + ["block", "bp"], + ["blob0", "pd"], + ], + ], + [ + "1 blob, blob first", + 1, + [ + ["blob0", null], + ["block", "pd"], + ], + ], + [ + "6 blobs, block first", + 6, + [ + ["block", "bp"], + ["blob1", "bp"], + ["blob0", "bp"], + ["blob5", "bp"], + ["blob4", "bp"], + ["blob2", "bp"], + ["blob3", "pd"], + ], + ], + [ + "4 blobs, block in mid", + 4, + [ + ["blob1", null], + ["blob3", null], + ["block", "bp"], + ["blob0", "bp"], + ["blob2", "pd"], + ], + ], + [ + "3 blobs, block in end", + 3, + [ + ["blob1", null], + ["blob0", null], + ["blob2", null], + ["block", "pd"], + ], + ], + ]; + // lets start from a random slot to build cases + let slot = 7456; + for (const testCase of testCases) { + const [testName, numBlobs, events] = testCase; + + it(`${testName}`, () => { + const signedBlock = ssz.deneb.SignedBeaconBlock.defaultValue(); + // assign slot and increment for the next block so as to keep these block testcases distinguished + // in the cache + signedBlock.message.slot = slot++; + signedBlock.message.body.blobKzgCommitments = Array.from({length: numBlobs}, () => + ssz.deneb.KZGCommitment.defaultValue() + ); + + const blockRoot = ssz.deneb.BeaconBlock.hashTreeRoot(signedBlock.message); + const signedBlobSidecars = Array.from({length: numBlobs}, (_val, index) => { + const message = {...ssz.deneb.BlobSidecar.defaultValue(), index, blockRoot, slot}; + return {message, signature: ssz.BLSSignature.defaultValue()}; + }); + + for (const testEvent of events) { + const [inputEvent, expectedRes] = testEvent; + const eventType = inputEvent.includes("block") ? GossipedInputType.block : GossipedInputType.blob; + const expectedResponseType = parseResponseType(expectedRes); + + try { + if (eventType === GossipedInputType.block) { + const blockInputRes = seenGossipBlockInput.getGossipBlockInput(config, { + type: GossipedInputType.block, + signedBlock, + blockBytes: null, + }); + + if (expectedResponseType instanceof Error) { + expect.fail(`expected to fail with error: ${expectedResponseType.message}`); + } else if (expectedResponseType === null) { + expect(blockInputRes).equal.toBeNull; + } else { + expect(blockInputRes.blockInput?.type).to.be.equal(expectedResponseType); + } + } else { + const index = parseInt(inputEvent.split("blob")[1] ?? "0"); + const signedBlob = signedBlobSidecars[index]; + expect(signedBlob).not.equal(undefined); + const blockInputRes = seenGossipBlockInput.getGossipBlockInput(config, { + type: GossipedInputType.blob, + signedBlob, + blobBytes: null, + }); + + if (expectedResponseType instanceof Error) { + expect.fail(`expected to fail with error: ${expectedResponseType.message}`); + } else if (expectedResponseType === null) { + expect(blockInputRes).toBeNull; + } else { + expect(blockInputRes.blockInput?.type).to.equal(expectedResponseType); + } + } + } catch (e) { + if (!(e as Error).message.includes("expected to fail with error")) { + if (!(expectedResponseType instanceof Error)) { + expect.fail( + `expected not to fail with respose=${expectedResponseType} but errored: ${(e as Error).message}` + ); + } + } + } + } + }); + } +}); + +function parseResponseType(expectedRes: string | Error | null): BlockInputType | null | Error { + switch (expectedRes) { + case null: + return null; + case "pd": + return BlockInputType.postDeneb; + case "bp": + return BlockInputType.blobsPromise; + default: + return Error(expectedRes); + } +}