Skip to content

Commit

Permalink
backfill sync from an anchor checkpoint state (#3384)
Browse files Browse the repository at this point in the history
* Rebased mpetrunic's PR #2637 with fixes on current master

* fixing the remove peer error

* refactoring to solve sync stuck issues on not anchored kind of errors

* read from db, validate wsCheckpoint

* backfill sequences in db to skip redoing previous backfill work

* syncrange improvs

* feedback cleanup, modular refac of sync function and metrics update

* cleanup

* Graphana Dashboard

* renaming sequences to ranges

* rebase cleanup

* shortneing comment

* using initialize from's return as the anchorState

* Fix metrics

* Add Aborted enum value in lodestar_backfill_sync_status

* Only use JSDoc comment notation for JSDocs

* Simplify nullish values to be only null

* WIP

* refactoring the backfill sync, with parent-child linkage verfication of last previous unverified finalized or wscheckpoint block

* cleanup and simplification of checkpoint/prev finalized checks

* initializing backfillwritten to avoid previous overwriting with a ahead value

* prev finalized or wscheckpoint lookup fix

* missing initializtion

* better assignment of prev fin or ws checkpoint

* don't verify sig on genesis block

* making the extractPreviousFinOrWsCheckpoint lighter

* simplfication of extractPreviousFinOrWsCheckpoint

* improving messaging

* metric for prev fin or ws block slot validation

* dashboard entry for prev fin or ws checkpoint slot for validation

* dashed line for prev fin or ws slot for better clairty

* comments cleanup and always backfill

Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com>
  • Loading branch information
g11tech and dapplion authored Jan 6, 2022
1 parent 0e94cb1 commit 0e47cd3
Show file tree
Hide file tree
Showing 32 changed files with 2,571 additions and 19 deletions.
396 changes: 396 additions & 0 deletions docker/grafana/provisioning/dashboards/lodestar.json

Large diffs are not rendered by default.

10 changes: 9 additions & 1 deletion packages/cli/src/cmds/beacon/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ export async function beaconHandler(args: IBeaconArgs & IGlobalArgs): Promise<vo

// BeaconNode setup
try {
const anchorState = await initBeaconState(options, args, config, db, logger, abortController.signal);
const {anchorState, wsCheckpoint} = await initBeaconState(
options,
args,
config,
db,
logger,
abortController.signal
);
const beaconConfig = createIBeaconConfig(config, anchorState.genesisValidatorsRoot);
const node = await BeaconNode.init({
opts: options,
Expand All @@ -81,6 +88,7 @@ export async function beaconHandler(args: IBeaconArgs & IGlobalArgs): Promise<vo
logger,
libp2p: await createNodeJsLibp2p(peerId, options.network, {peerStoreDir: beaconPaths.peerStoreDir}),
anchorState,
wsCheckpoint,
metricsRegistries,
});

Expand Down
20 changes: 13 additions & 7 deletions packages/cli/src/cmds/beacon/initBeaconState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ async function initAndVerifyWeakSubjectivityState(
store: TreeBacked<allForks.BeaconState>,
wsState: TreeBacked<allForks.BeaconState>,
wsCheckpoint: Checkpoint
): Promise<TreeBacked<allForks.BeaconState>> {
): Promise<{anchorState: TreeBacked<allForks.BeaconState>; wsCheckpoint: Checkpoint}> {
// Check if the store's state and wsState are compatible
if (
store.genesisTime !== wsState.genesisTime ||
Expand All @@ -64,7 +64,10 @@ async function initAndVerifyWeakSubjectivityState(
throw new Error("Fetched weak subjectivity checkpoint not within weak subjectivity period.");
}

return await initStateFromAnchorState(config, db, logger, anchorState);
anchorState = await initStateFromAnchorState(config, db, logger, anchorState);

// Return the latest anchorState but still return original wsCheckpoint to validate in backfill
return {anchorState, wsCheckpoint};
}

/**
Expand All @@ -83,7 +86,7 @@ export async function initBeaconState(
db: IBeaconDb,
logger: ILogger,
signal: AbortSignal
): Promise<TreeBacked<allForks.BeaconState>> {
): Promise<{anchorState: TreeBacked<allForks.BeaconState>; wsCheckpoint?: Checkpoint}> {
// fetch the latest state stored in the db
// this will be used in all cases, if it exists, either used during verification of a weak subjectivity state, or used directly as the anchor state
const lastDbState = await db.stateArchive.lastValue();
Expand Down Expand Up @@ -132,16 +135,19 @@ export async function initBeaconState(
} else if (lastDbState) {
// start the chain from the latest stored state in the db
const config = createIBeaconConfig(chainForkConfig, lastDbState.genesisValidatorsRoot);
return await initStateFromAnchorState(config, db, logger, lastDbState);
const anchorState = await initStateFromAnchorState(config, db, logger, lastDbState);
return {anchorState};
} else {
const genesisStateFile = args.genesisStateFile || getGenesisFileUrl(args.network || defaultNetwork);
if (genesisStateFile && !args.forceGenesis) {
const stateBytes = await downloadOrLoadFile(genesisStateFile);
const anchorState = getStateTypeFromBytes(chainForkConfig, stateBytes).createTreeBackedFromBytes(stateBytes);
let anchorState = getStateTypeFromBytes(chainForkConfig, stateBytes).createTreeBackedFromBytes(stateBytes);
const config = createIBeaconConfig(chainForkConfig, anchorState.genesisValidatorsRoot);
return await initStateFromAnchorState(config, db, logger, anchorState);
anchorState = await initStateFromAnchorState(config, db, logger, anchorState);
return {anchorState};
} else {
return await initStateFromEth1({config: chainForkConfig, db, logger, opts: options.eth1, signal});
const anchorState = await initStateFromEth1({config: chainForkConfig, db, logger, opts: options.eth1, signal});
return {anchorState};
}
}
}
2 changes: 2 additions & 0 deletions packages/db/src/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ export enum Bucket {
lightClient_bestPartialLightClientUpdate = 54, // SyncPeriod -> PartialLightClientUpdate

validator_metaData = 41,

backfilled_ranges = 42, // Backfilled From to To, inclusive of both From, To
}

export enum Key {
Expand Down
15 changes: 14 additions & 1 deletion packages/lodestar/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/

import {ILogger} from "@chainsafe/lodestar-utils";
import {Slot} from "@chainsafe/lodestar-types";
import {computeEpochAtSlot} from "@chainsafe/lodestar-beacon-state-transition";
import {IBeaconDb} from "../../db";
import {CheckpointStateCache} from "../stateCache";
Expand Down Expand Up @@ -42,10 +43,22 @@ export class StatesArchiver {
* epoch - 1024*2 epoch - 1024 epoch - 32 epoch
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
async maybeArchiveState(finalized: CheckpointWithHex, anchorSlot: Slot): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);

// Mark the sequence in backfill db from finalized slot till anchor slot as filled
const finalizedState = this.checkpointStateCache.get(finalized);
if (!finalizedState) {
throw Error("No state in cache for finalized checkpoint state epoch #" + finalized.epoch);
}
await this.db.backfilledRanges.put(finalizedState.slot, anchorSlot);

// Clear previously marked sequence till anchorSlot, without touching backfill sync
// process sequence which are at <=anchorSlot i.e. clear >anchorSlot and < currentSlot
const filteredSeqs = await this.db.backfilledRanges.keys({gt: anchorSlot, lt: finalizedState.slot});
await this.db.backfilledRanges.batchDelete(filteredSeqs);

if (finalized.epoch - lastStoredEpoch > PERSIST_TEMP_STATE_EVERY_EPOCHS) {
await this.archiveState(finalized);

Expand Down
2 changes: 1 addition & 1 deletion packages/lodestar/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ export class Archiver {
await archiveBlocks(this.db, this.chain.forkChoice, this.chain.lightClientServer, this.logger, finalized);

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiver.maybeArchiveState(finalized);
await this.statesArchiver.maybeArchiveState(finalized, this.chain.anchorSlot);

await Promise.all([
this.chain.checkpointStateCache.pruneFinalized(finalizedEpoch),
Expand Down
2 changes: 2 additions & 0 deletions packages/lodestar/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class BeaconChain implements IBeaconChain {
readonly executionEngine: IExecutionEngine;
// Expose config for convenience in modularized functions
readonly config: IBeaconConfig;
readonly anchorSlot: Slot;

bls: IBlsVerifier;
forkChoice: IForkChoice;
Expand Down Expand Up @@ -109,6 +110,7 @@ export class BeaconChain implements IBeaconChain {
this.logger = logger;
this.metrics = metrics;
this.genesisTime = anchorState.genesisTime;
this.anchorSlot = anchorState.slot;
this.genesisValidatorsRoot = anchorState.genesisValidatorsRoot.valueOf() as Uint8Array;
this.eth1 = eth1;
this.executionEngine = executionEngine;
Expand Down
3 changes: 3 additions & 0 deletions packages/lodestar/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export interface IBeaconChain {
// Expose config for convenience in modularized functions
readonly config: IBeaconConfig;

/** The initial slot that the chain is started with */
readonly anchorSlot: Slot;

bls: IBlsVerifier;
forkChoice: IForkChoice;
clock: IBeaconClock;
Expand Down
1 change: 0 additions & 1 deletion packages/lodestar/src/constants/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export type RpcResponseStatusError = Exclude<RespStatus, RespStatus.SUCCESS>;
export const GOSSIP_MAX_SIZE = 2 ** 20;
/** The maximum allowed size of uncompressed req/resp chunked responses. */
export const MAX_CHUNK_SIZE = 2 ** 20;

/** The maximum time to wait for first byte of request response (time-to-first-byte). */
export const TTFB_TIMEOUT = 5 * 1000; // 5 sec
/** The maximum time for complete response transfer. */
Expand Down
5 changes: 5 additions & 0 deletions packages/lodestar/src/db/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
CheckpointHeaderRepository,
SyncCommitteeRepository,
SyncCommitteeWitnessRepository,
BackfilledRanges,
} from "./repositories";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single";

Expand All @@ -44,6 +45,8 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
syncCommittee: SyncCommitteeRepository;
syncCommitteeWitness: SyncCommitteeWitnessRepository;

backfilledRanges: BackfilledRanges;

constructor(opts: IDatabaseApiOptions) {
super(opts);
this.metrics = opts.metrics;
Expand All @@ -65,6 +68,8 @@ export class BeaconDb extends DatabaseService implements IBeaconDb {
this.checkpointHeader = new CheckpointHeaderRepository(this.config, this.db, this.metrics);
this.syncCommittee = new SyncCommitteeRepository(this.config, this.db, this.metrics);
this.syncCommitteeWitness = new SyncCommitteeWitnessRepository(this.config, this.db, this.metrics);

this.backfilledRanges = new BackfilledRanges(this.config, this.db, this.metrics);
}

async stop(): Promise<void> {
Expand Down
3 changes: 3 additions & 0 deletions packages/lodestar/src/db/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
CheckpointHeaderRepository,
SyncCommitteeRepository,
SyncCommitteeWitnessRepository,
BackfilledRanges,
} from "./repositories";
import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single";

Expand Down Expand Up @@ -58,6 +59,8 @@ export interface IBeaconDb {
syncCommittee: SyncCommitteeRepository;
syncCommitteeWitness: SyncCommitteeWitnessRepository;

backfilledRanges: BackfilledRanges;

/**
* Start the connection to the db instance and open the db store.
*/
Expand Down
28 changes: 28 additions & 0 deletions packages/lodestar/src/db/repositories/backfilledRanges.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import {IChainForkConfig} from "@chainsafe/lodestar-config";
import {Slot, ssz} from "@chainsafe/lodestar-types";
import {IDatabaseController, Bucket, IDbMetrics, Repository} from "@chainsafe/lodestar-db";
import {bytesToInt} from "@chainsafe/lodestar-utils";

/**
* Slot to slot ranges that ensure that block range is fully backfilled
*
* If node starts backfilling at slots 1000, and backfills to 800, there will be an entry
* 1000 -> 800
*
* When the node is backfilling if it starts at 1200 and backfills to 1000, it will find this sequence and,
* jump directly to 800 and delete the key 1000.
*/
export class BackfilledRanges extends Repository<Slot, Slot> {
constructor(config: IChainForkConfig, db: IDatabaseController<Uint8Array, Uint8Array>, metrics?: IDbMetrics) {
super(config, db, Bucket.backfilled_ranges, ssz.Slot, metrics);
}

decodeKey(data: Buffer): number {
return bytesToInt((super.decodeKey(data) as unknown) as Uint8Array, "be");
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
getId(value: Slot): number {
throw new Error("Cannot get the db key from slot");
}
}
1 change: 1 addition & 0 deletions packages/lodestar/src/db/repositories/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export {BestPartialLightClientUpdateRepository} from "./lightclientBestPartialUp
export {CheckpointHeaderRepository} from "./lightclientCheckpointHeader";
export {SyncCommitteeRepository} from "./lightclientSyncCommittee";
export {SyncCommitteeWitnessRepository} from "./lightclientSyncCommitteeWitness";
export {BackfilledRanges} from "./backfilledRanges";
24 changes: 24 additions & 0 deletions packages/lodestar/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,30 @@ export function createLodestarMetrics(
}),
},

backfillSync: {
backfilledTillSlot: register.gauge({
name: "lodestar_backfill_till_slot",
help: "Current lowest backfilled slot",
}),
prevFinOrWsSlot: register.gauge({
name: "lodestar_backfill_prev_fin_or_ws_slot",
help: "Slot of previous finalized or wsCheckpoint block to be validated",
}),
totalBlocks: register.gauge<"method">({
name: "lodestar_backfill_sync_blocks_total",
help: "Total amount of backfilled blocks",
labelNames: ["method"],
}),
errors: register.gauge({
name: "lodestar_backfill_sync_errors_total",
help: "Total number of errors while backfilling",
}),
status: register.gauge({
name: "lodestar_backfill_sync_status",
help: "Current backfill syncing status: [Aborted, Pending, Syncing, Completed]",
}),
},

// Validator monitoring

validatorMonitor: {
Expand Down
3 changes: 2 additions & 1 deletion packages/lodestar/src/network/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import PeerId from "peer-id";
import {INetworkEventBus} from "./events";
import {Eth2Gossipsub} from "./gossip";
import {MetadataController} from "./metadata";
import {IPeerRpcScoreStore, IPeerMetadataStore} from "./peers";
import {IPeerRpcScoreStore, IPeerMetadataStore, PeerAction} from "./peers";
import {IReqResp} from "./reqresp";
import {IAttnetsService, ISubnetsService, CommitteeSubscription} from "./subnets";

Expand Down Expand Up @@ -39,6 +39,7 @@ export interface INetwork {
/** Subscribe, search peers, join long-lived syncnets */
prepareSyncCommitteeSubnets(subscriptions: CommitteeSubscription[]): void;
reStatusPeers(peers: PeerId[]): void;
reportPeer(peer: PeerId, action: PeerAction, actionName?: string): void;

// Gossip handler
subscribeGossipCoreTopics(): void;
Expand Down
6 changes: 5 additions & 1 deletion packages/lodestar/src/network/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {MetadataController} from "./metadata";
import {getActiveForks, getCurrentAndNextFork, FORK_EPOCH_LOOKAHEAD} from "./forks";
import {IPeerMetadataStore, Libp2pPeerMetadataStore} from "./peers/metastore";
import {PeerManager} from "./peers/peerManager";
import {IPeerRpcScoreStore, PeerRpcScoreStore} from "./peers";
import {IPeerRpcScoreStore, PeerAction, PeerRpcScoreStore} from "./peers";
import {INetworkEventBus, NetworkEventBus} from "./events";
import {AttnetsService, SyncnetsService, CommitteeSubscription} from "./subnets";

Expand Down Expand Up @@ -205,6 +205,10 @@ export class Network implements INetwork {
this.peerManager.reStatusPeers(peers);
}

reportPeer(peer: PeerId, action: PeerAction, actionName?: string): void {
this.peerRpcScores.applyAction(peer, action, actionName);
}

/**
* Subscribe to all gossip events. Safe to call multiple times
*/
Expand Down
Loading

0 comments on commit 0e47cd3

Please sign in to comment.