Skip to content

Commit

Permalink
Merge 2531003 into 4313726
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech authored Jun 17, 2023
2 parents 4313726 + 2531003 commit 2242737
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 33 deletions.
20 changes: 9 additions & 11 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -208,20 +208,17 @@ export function getBeaconBlockApi({
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
// Build a blockInput for post deneb, signedBlobs will be be used in followup PRs
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents);
const blobsSidecar = blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map(({message}) => message)
);

blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobsSidecar
blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map((sblob) => sblob.message)
)
);
} else {
signedBlock = signedBlockOrContents;
Expand All @@ -231,7 +228,8 @@ export function getBeaconBlockApi({

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot = computeTimeAtSlot(config, signedBlock.message.slot, chain.genesisTime) * 1000 - Date.now();
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
Expand All @@ -242,7 +240,7 @@ export function getBeaconBlockApi({
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlockMaybeBlobs(blockForImport) as Promise<unknown>,
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
Expand All @@ -254,7 +252,7 @@ export function getBeaconBlockApi({
}
throw e;
}),
// TODO deneb: publish signed blobs as well
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
},
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {JobItemQueue} from "../../util/queue/index.js";

export enum GossipType {
beacon_block = "beacon_block",
blob_sidecar = "blob_sidecar",
beacon_block_and_blobs_sidecar = "beacon_block_and_blobs_sidecar",
beacon_aggregate_and_proof = "beacon_aggregate_and_proof",
beacon_attestation = "beacon_attestation",
Expand Down Expand Up @@ -38,6 +39,7 @@ export interface IGossipTopic {

export type GossipTopicTypeMap = {
[GossipType.beacon_block]: {type: GossipType.beacon_block};
[GossipType.blob_sidecar]: {type: GossipType.blob_sidecar; index: number};
[GossipType.beacon_block_and_blobs_sidecar]: {type: GossipType.beacon_block_and_blobs_sidecar};
[GossipType.beacon_aggregate_and_proof]: {type: GossipType.beacon_aggregate_and_proof};
[GossipType.beacon_attestation]: {type: GossipType.beacon_attestation; subnet: number};
Expand Down Expand Up @@ -68,6 +70,7 @@ export type SSZTypeOfGossipTopic<T extends GossipTopic> = T extends {type: infer

export type GossipTypeMap = {
[GossipType.beacon_block]: allForks.SignedBeaconBlock;
[GossipType.blob_sidecar]: deneb.SignedBlobSidecar;
[GossipType.beacon_block_and_blobs_sidecar]: deneb.SignedBeaconBlockAndBlobsSidecar;
[GossipType.beacon_aggregate_and_proof]: phase0.SignedAggregateAndProof;
[GossipType.beacon_attestation]: phase0.Attestation;
Expand All @@ -83,6 +86,7 @@ export type GossipTypeMap = {

export type GossipFnByType = {
[GossipType.beacon_block]: (signedBlock: allForks.SignedBeaconBlock) => Promise<void> | void;
[GossipType.blob_sidecar]: (signedBlobSidecar: deneb.SignedBlobSidecar) => Promise<void> | void;
[GossipType.beacon_block_and_blobs_sidecar]: (
signedBeaconBlockAndBlobsSidecar: deneb.SignedBeaconBlockAndBlobsSidecar
) => Promise<void> | void;
Expand Down
12 changes: 12 additions & 0 deletions packages/beacon-node/src/network/gossip/topic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ function stringifyGossipTopicType(topic: GossipTopic): string {
case GossipType.beacon_attestation:
case GossipType.sync_committee:
return `${topic.type}_${topic.subnet}`;
case GossipType.blob_sidecar:
return `${topic.type}_${topic.index}`;
}
}

Expand All @@ -82,6 +84,8 @@ export function getGossipSSZType(topic: GossipTopic) {
case GossipType.beacon_block:
// beacon_block is updated in altair to support the updated SignedBeaconBlock type
return ssz[topic.fork].SignedBeaconBlock;
case GossipType.blob_sidecar:
return ssz.deneb.SignedBlobSidecar;
case GossipType.beacon_block_and_blobs_sidecar:
return ssz.deneb.SignedBeaconBlockAndBlobsSidecar;
case GossipType.beacon_aggregate_and_proof:
Expand Down Expand Up @@ -181,6 +185,13 @@ export function parseGossipTopic(forkDigestContext: ForkDigestContext, topicStr:
}
}

if (gossipTypeStr.startsWith(GossipType.blob_sidecar)) {
const indexStr = gossipTypeStr.slice(GossipType.blob_sidecar.length + 1); // +1 for '_' concatenating the topic name and the index
const index = parseInt(indexStr, 10);
if (Number.isNaN(index)) throw Error(`index ${indexStr} is not a number`);
return {type: GossipType.blob_sidecar, index, fork, encoding};
}

throw Error(`Unknown gossip type ${gossipTypeStr}`);
} catch (e) {
(e as Error).message = `Invalid gossip topic ${topicStr}: ${(e as Error).message}`;
Expand Down Expand Up @@ -253,6 +264,7 @@ function parseEncodingStr(encodingStr: string): GossipEncoding {
// TODO: Review which yes, and which not
export const gossipTopicIgnoreDuplicatePublishError: Record<GossipType, boolean> = {
[GossipType.beacon_block]: true,
[GossipType.blob_sidecar]: true,
[GossipType.beacon_block_and_blobs_sidecar]: true,
[GossipType.beacon_aggregate_and_proof]: true,
[GossipType.beacon_attestation]: true,
Expand Down
4 changes: 1 addition & 3 deletions packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {Connection} from "@libp2p/interface-connection";
import {Registrar} from "@libp2p/interface-registrar";
import {ConnectionManager} from "@libp2p/interface-connection-manager";
import {Slot, SlotRootHex, allForks, altair, capella, deneb, phase0} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {PeerIdStr} from "../util/peerId.js";
import {INetworkEventBus} from "./events.js";
import {INetworkCorePublic} from "./core/types.js";
Expand Down Expand Up @@ -44,9 +43,8 @@ export interface INetwork extends INetworkCorePublic {
sendBlobSidecarsByRoot(peerId: PeerIdStr, request: deneb.BlobSidecarsByRootRequest): Promise<deneb.BlobSidecar[]>;

// Gossip
publishBeaconBlockMaybeBlobs(blockInput: BlockInput): Promise<number>;
publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<number>;
publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<number>;
publishBlobSidecar(signedBlobSidecar: deneb.SignedBlobSidecar): Promise<number>;
publishBeaconAggregateAndProof(aggregateAndProof: phase0.SignedAggregateAndProof): Promise<number>;
publishBeaconAttestation(attestation: phase0.Attestation, subnet: number): Promise<number>;
publishVoluntaryExit(voluntaryExit: phase0.SignedVoluntaryExit): Promise<number>;
Expand Down
25 changes: 6 additions & 19 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import {IBeaconChain} from "../chain/index.js";
import {IBeaconDb} from "../db/interface.js";
import {PeerIdStr, peerIdToString} from "../util/peerId.js";
import {IClock} from "../util/clock.js";
import {BlockInput, BlockInputType} from "../chain/blocks/types.js";
import {NetworkOptions} from "./options.js";
import {INetwork} from "./interface.js";
import {ReqRespMethod} from "./reqresp/index.js";
Expand Down Expand Up @@ -276,31 +275,19 @@ export class Network implements INetwork {

// Gossip

async publishBeaconBlockMaybeBlobs(blockInput: BlockInput): Promise<number> {
switch (blockInput.type) {
case BlockInputType.preDeneb:
return this.publishBeaconBlock(blockInput.block);

case BlockInputType.postDeneb:
return this.publishSignedBeaconBlockAndBlobsSidecar({
beaconBlock: blockInput.block as deneb.SignedBeaconBlock,
blobsSidecar: blockInput.blobs,
});
}
}

async publishBeaconBlock(signedBlock: allForks.SignedBeaconBlock): Promise<number> {
const fork = this.config.getForkName(signedBlock.message.slot);
return this.publishGossip<GossipType.beacon_block>({type: GossipType.beacon_block, fork}, signedBlock, {
ignoreDuplicatePublishError: true,
});
}

async publishSignedBeaconBlockAndBlobsSidecar(item: deneb.SignedBeaconBlockAndBlobsSidecar): Promise<number> {
const fork = this.config.getForkName(item.beaconBlock.message.slot);
return this.publishGossip<GossipType.beacon_block_and_blobs_sidecar>(
{type: GossipType.beacon_block_and_blobs_sidecar, fork},
item,
async publishBlobSidecar(signedBlobSidecar: deneb.SignedBlobSidecar): Promise<number> {
const fork = this.config.getForkName(signedBlobSidecar.message.slot);
const index = signedBlobSidecar.message.index;
return this.publishGossip<GossipType.blob_sidecar>(
{type: GossipType.blob_sidecar, fork, index},
signedBlobSidecar,
{ignoreDuplicatePublishError: true}
);
}
Expand Down
4 changes: 4 additions & 0 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ export function getGossipHandlers(modules: ValidatorFnsModules, options: GossipH
handleValidBeaconBlock({...blockInput, serializedData}, peerIdStr, seenTimestampSec);
},

[GossipType.blob_sidecar]: async (_data, _topic, _peerIdStr, _seenTimestampSec) => {
// TODO DENEB: impl to be added on migration of blockinput
},

[GossipType.beacon_block_and_blobs_sidecar]: async ({serializedData}, topic, peerIdStr, seenTimestampSec) => {
const blockAndBlocks = sszDeserialize(topic, serializedData);
const {beaconBlock, blobsSidecar} = blockAndBlocks;
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/src/network/processor/gossipQueues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ const gossipQueueOpts: {
} = {
// validation gossip block asap
[GossipType.beacon_block]: {maxLength: 1024, type: QueueType.FIFO, dropOpts: {type: DropType.count, count: 1}},
// gossip length for blob is beacon block length * max blobs per block = 4096
[GossipType.blob_sidecar]: {
maxLength: 4096,
type: QueueType.FIFO,
dropOpts: {type: DropType.count, count: 1},
},
// TODO DENEB: What's a good queue max given that now blocks are much bigger?
[GossipType.beacon_block_and_blobs_sidecar]: {
maxLength: 32,
Expand Down
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type WorkOpts = {
*/
const executeGossipWorkOrderObj: Record<GossipType, WorkOpts> = {
[GossipType.beacon_block]: {bypassQueue: true},
[GossipType.blob_sidecar]: {bypassQueue: true},
[GossipType.beacon_block_and_blobs_sidecar]: {bypassQueue: true},
[GossipType.beacon_aggregate_and_proof]: {},
[GossipType.voluntary_exit]: {},
Expand Down
6 changes: 6 additions & 0 deletions packages/beacon-node/test/unit/network/gossip/topic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ describe("network / gossip / topic", function () {
topicStr: "/eth2/18ae4ccb/beacon_block/ssz_snappy",
},
],
[GossipType.blob_sidecar]: [
{
topic: {type: GossipType.blob_sidecar, index: 1, fork: ForkName.deneb, encoding},
topicStr: "/eth2/46acb19a/blob_sidecar_1/ssz_snappy",
},
],
[GossipType.beacon_block_and_blobs_sidecar]: [
{
topic: {type: GossipType.beacon_block_and_blobs_sidecar, fork: ForkName.deneb, encoding},
Expand Down

0 comments on commit 2242737

Please sign in to comment.