Skip to content

Commit

Permalink
feat: get various sync mechanisms working with/without sharded data
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Aug 9, 2024
1 parent 156ef53 commit 47eedae
Show file tree
Hide file tree
Showing 21 changed files with 635 additions and 196 deletions.
9 changes: 8 additions & 1 deletion packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
import {toHexString} from "@chainsafe/ssz";
import {capella, ssz, altair, BeaconBlock} from "@lodestar/types";
import {ForkName, ForkLightClient, ForkSeq, INTERVALS_PER_SLOT, MAX_SEED_LOOKAHEAD, SLOTS_PER_EPOCH} from "@lodestar/params";
import {
ForkName,
ForkLightClient,
ForkSeq,
INTERVALS_PER_SLOT,
MAX_SEED_LOOKAHEAD,
SLOTS_PER_EPOCH,
} from "@lodestar/params";
import {
CachedBeaconStateAltair,
computeEpochAtSlot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ export class SeenGossipBlockInput {
}
}

function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
export function getEmptyBlockInputCacheEntry(fork: ForkName): BlockInputCacheType {
// Capture both the promise and its callbacks for blockInput and final availability
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolveBlockInput: ((block: BlockInput) => void) | null = null;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/gossip/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
phase0,
SignedBeaconBlock,
Slot,
electra
electra,
} from "@lodestar/types";
import {BeaconConfig} from "@lodestar/config";
import {Logger} from "@lodestar/utils";
Expand Down
4 changes: 3 additions & 1 deletion packages/beacon-node/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import {
capella,
deneb,
phase0,
electra
electra,
} from "@lodestar/types";
import {PeerIdStr} from "../util/peerId.js";
import {CustodyConfig} from "../util/dataColumns.js";
Expand All @@ -38,6 +38,7 @@ import {PeerAction} from "./peers/index.js";
import {NodeId} from "./subnets/interface.js";

export type WithBytes<T> = {data: T; bytes: Uint8Array};
export type WithOptionalBytes<T> = {data: T; bytes: Uint8Array | null};

/**
* The architecture of the network looks like so:
Expand All @@ -56,6 +57,7 @@ export interface INetwork extends INetworkCorePublic {
events: INetworkEventBus;

getConnectedPeers(): PeerIdStr[];
getConnectedPeerCustody(peerId: PeerIdStr): number[];
getConnectedPeerCount(): number;
isSubscribedToGossipCoreTopics(): boolean;
reportPeer(peer: PeerIdStr, action: PeerAction, actionName: string): void;
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/network/metadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export enum ENRKey {
eth2 = "eth2",
attnets = "attnets",
syncnets = "syncnets",
custody_subnet_count = "custody_subnet_count",
csc = "csc",
}
export enum SubnetType {
attnets = "attnets",
Expand Down
8 changes: 8 additions & 0 deletions packages/beacon-node/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,14 @@ export class Network implements INetwork {
getConnectedPeers(): PeerIdStr[] {
return Array.from(this.connectedPeers.keys());
}
getConnectedPeerCustody(peerId: PeerIdStr): number[] {
const columns = this.connectedPeers.get(peerId);
if (columns === undefined) {
throw Error("peerId not in connectedPeers");
}

return columns;
}
getConnectedPeerCount(): number {
return this.connectedPeers.size;
}
Expand Down
29 changes: 24 additions & 5 deletions packages/beacon-node/src/network/peers/discover.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class PeerDiscovery {
private config: BeaconConfig;
private cachedENRs = new Map<PeerIdStr, CachedENR>();
private peerIdToNodeId = new Map<PeerIdStr, NodeId>();
private peerIdToMyEnr = new Map<PeerIdStr, ENR>();
private peerIdToCustodySubnetCount = new Map<PeerIdStr, number>();
private randomNodeQuery: QueryStatus = {code: QueryStatusCode.NotActive};
private peersToConnect = 0;
Expand Down Expand Up @@ -327,9 +328,12 @@ export class PeerDiscovery {

const attnets = zeroAttnets;
const syncnets = zeroSyncnets;
const custodySubnetCount = 0;
const custodySubnetCount = this.peerIdToCustodySubnetCount.get(id.toString());
if (custodySubnetCount === undefined) {
this.logger.warn("onDiscoveredPeer with unknown custodySubnetCount assuming 4", {peerId: id.toString()});
}

const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount);
const status = this.handleDiscoveredPeer(id, multiaddrs[0], attnets, syncnets, custodySubnetCount ?? 4);
this.logger.debug("Discovered peer via libp2p", {peer: prettyPrintPeerId(id), status});
this.metrics?.discovery.discoveredStatus.inc({status});
};
Expand All @@ -346,6 +350,7 @@ export class PeerDiscovery {

const nodeId = fromHexString(enr.nodeId);
this.peerIdToNodeId.set(peerId.toString(), nodeId);
this.peerIdToMyEnr.set(peerId.toString(), enr);
pruneSetToMax(this.peerIdToNodeId, MAX_CACHED_NODEIDS);

// tcp multiaddr is known to be be present, checked inside the worker
Expand All @@ -358,15 +363,18 @@ export class PeerDiscovery {
// Are this fields mandatory?
const attnetsBytes = enr.kvs.get(ENRKey.attnets); // 64 bits
const syncnetsBytes = enr.kvs.get(ENRKey.syncnets); // 4 bits
const custodySubnetCountBytes = enr.kvs.get(ENRKey.custody_subnet_count); // 64 bits
const custodySubnetCountBytes = enr.kvs.get(ENRKey.csc); // 64 bits
if (custodySubnetCountBytes === undefined) {
this.logger.warn("peer discovered with no csc assuming 4", exportENRToJSON(enr));
}

// Use faster version than ssz's implementation that leverages pre-cached.
// Some nodes don't serialize the bitfields properly, encoding the syncnets as attnets,
// which cause the ssz implementation to throw on validation. deserializeEnrSubnets() will
// never throw and treat too long or too short bitfields as zero-ed
const attnets = attnetsBytes ? deserializeEnrSubnets(attnetsBytes, ATTESTATION_SUBNET_COUNT) : zeroAttnets;
const syncnets = syncnetsBytes ? deserializeEnrSubnets(syncnetsBytes, SYNC_COMMITTEE_SUBNET_COUNT) : zeroSyncnets;
const custodySubnetCount = custodySubnetCountBytes ? ssz.UintNum64.deserialize(custodySubnetCountBytes) : 1;
const custodySubnetCount = custodySubnetCountBytes ? ssz.Uint8.deserialize(custodySubnetCountBytes) : 4;
this.peerIdToCustodySubnetCount.set(peerId.toString(), custodySubnetCount);

const status = this.handleDiscoveredPeer(peerId, multiaddrTCP, attnets, syncnets, custodySubnetCount);
Expand Down Expand Up @@ -441,7 +449,7 @@ export class PeerDiscovery {
const peerCustodySubnets = getCustodyColumnSubnets(nodeId, peerCustodySubnetCount);
const hasAllColumns = this.custodySubnets.reduce((acc, elem) => acc && peerCustodySubnets.includes(elem), true);

this.logger.debug("peerCustodySubnets", {
this.logger.warn("peerCustodySubnets", {
peerId: peer.peerId.toString(),
peerNodeId: toHexString(nodeId),
hasAllColumns,
Expand Down Expand Up @@ -569,3 +577,14 @@ function formatLibp2pDialError(e: Error): void {
e.stack = undefined;
}
}

function exportENRToJSON(enr?: ENR): Record<string, string | undefined> | undefined {
if (enr === undefined) {
return undefined;
}
return {
ip4: enr.kvs.get("ip")?.toString(),
csc: enr.kvs.get("csc")?.toString(),
nodeId: enr.nodeId,
};
}
31 changes: 27 additions & 4 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {Eth2Gossipsub} from "../gossip/gossipsub.js";
import {StatusCache} from "../statusCache.js";
import {NetworkCoreMetrics} from "../core/metrics.js";
import {LodestarDiscv5Opts} from "../discv5/types.js";
import {getCustodyColumns} from "../../util/dataColumns.js";
import {getCustodyColumnSubnets, getCustodyColumns} from "../../util/dataColumns.js";
import {PeerDiscovery, SubnetDiscvQueryMs} from "./discover.js";
import {PeersData, PeerData} from "./peersData.js";
import {getKnownClientFromAgentVersion, ClientKind} from "./client.js";
Expand Down Expand Up @@ -378,12 +378,35 @@ export class PeerManager {
}
if (getConnection(this.libp2p, peer.toString())) {
const nodeId = peerData?.nodeId ?? this.discovery?.["peerIdToNodeId"].get(peer.toString());
if (nodeId === undefined) {
this.logger.error("onStatus with nodeId=undefined", {peerId: peer.toString()});
return;
}
const custodySubnetCount =
peerData?.custodySubnetCount ?? this.discovery?.["peerIdToCustodySubnetCount"].get(peer.toString());
this.logger.warn("onStatus", {nodeId: nodeId ? toHexString(nodeId) : undefined, peerId: peer.toString()});

if (nodeId !== undefined && custodySubnetCount !== undefined) {
const dataColumns = getCustodyColumns(nodeId, custodySubnetCount);
const peerCustodySubnetCount = custodySubnetCount ?? 4;
const peerCustodySubnets = getCustodyColumnSubnets(nodeId, peerCustodySubnetCount);
const myCustodySubnets = this.discovery?.["custodySubnets"] ?? [];
const hasAllColumns = myCustodySubnets.reduce((acc, elem) => acc && peerCustodySubnets.includes(elem), true);

this.logger.warn(`onStatus ${custodySubnetCount == undefined ? "undefined custody count assuming 4" : ""}`, {
nodeId: nodeId ? toHexString(nodeId) : undefined,
peerId: peer.toString(),
custodySubnetCount,
hasAllColumns,
peerCustodySubnets: peerCustodySubnets.join(","),
myCustodySubnets: myCustodySubnets.join(","),
});

if (!hasAllColumns) {
const enr = this.discovery?.["peerIdToMyEnr"].get(peer.toString());
console.log("lowcustody count enr", enr);
return;
}

{
const dataColumns = getCustodyColumns(nodeId, peerCustodySubnetCount);
this.networkEventBus.emit(NetworkEvent.peerConnected, {peer: peer.toString(), status, dataColumns});
}
}
Expand Down
Loading

0 comments on commit 47eedae

Please sign in to comment.