Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: regen to consume state cache reload api #6456

Merged
merged 2 commits into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion packages/beacon-node/src/chain/regen/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export enum RegenErrorCode {
TOO_MANY_BLOCK_PROCESSED = "REGEN_ERROR_TOO_MANY_BLOCK_PROCESSED",
BLOCK_NOT_IN_DB = "REGEN_ERROR_BLOCK_NOT_IN_DB",
STATE_TRANSITION_ERROR = "REGEN_ERROR_STATE_TRANSITION_ERROR",
INVALID_STATE_ROOT = "REGEN_ERROR_INVALID_STATE_ROOT",
}

export type RegenErrorType =
Expand All @@ -17,7 +18,8 @@ export type RegenErrorType =
| {code: RegenErrorCode.NO_SEED_STATE}
| {code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED; stateRoot: RootHex | Root}
| {code: RegenErrorCode.BLOCK_NOT_IN_DB; blockRoot: RootHex | Root}
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error};
| {code: RegenErrorCode.STATE_TRANSITION_ERROR; error: Error}
| {code: RegenErrorCode.INVALID_STATE_ROOT; slot: Slot; expected: RootHex; actual: RootHex};

export class RegenError extends Error {
type: RegenErrorType;
Expand Down
94 changes: 68 additions & 26 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,29 +10,34 @@ import {
stateTransition,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {sleep} from "@lodestar/utils";
import {Logger, sleep} from "@lodestar/utils";
import {SLOTS_PER_EPOCH} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {Metrics} from "../../metrics/index.js";
import {IBeaconDb} from "../../db/index.js";
import {CheckpointStateCache, StateContextCache} from "../stateCache/index.js";
import {getCheckpointFromState} from "../blocks/utils/checkpoint.js";
import {ChainEvent, ChainEventEmitter} from "../emitter.js";
import {CheckpointStateCache, BlockStateCache} from "../stateCache/types.js";
import {IStateRegeneratorInternal, RegenCaller, StateCloneOpts} from "./interface.js";
import {RegenError, RegenErrorCode} from "./errors.js";

export type RegenModules = {
db: IBeaconDb;
forkChoice: IForkChoice;
stateCache: StateContextCache;
stateCache: BlockStateCache;
checkpointStateCache: CheckpointStateCache;
config: ChainForkConfig;
emitter: ChainEventEmitter;
logger: Logger;
metrics: Metrics | null;
};

/**
* Regenerates states that have already been processed by the fork choice
* Since Feb 2024, we support reloading checkpoint state from disk via allowDiskReload flag. Due to its performance impact
* this flag is only set to true in this case:
* - getPreState: this is for block processing, it's important to reload state in unfinality time
* - updateHeadState: rarely happen, but it's important to make sure we always can regen head state
twoeths marked this conversation as resolved.
Show resolved Hide resolved
*/
export class StateRegenerator implements IStateRegeneratorInternal {
constructor(private readonly modules: RegenModules) {}
Expand All @@ -41,11 +46,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
* Get the state to run with `block`. May be:
* - If parent is in same epoch -> Exact state at `block.parentRoot`
* - If parent is in prev epoch -> State after `block.parentRoot` dialed forward through epoch transition
* - reload state if needed in this flow
*/
async getPreState(
block: allForks.BeaconBlock,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller
): Promise<CachedBeaconStateAllForks> {
const parentBlock = this.modules.forkChoice.getBlock(block.parentRoot);
if (!parentBlock) {
Expand All @@ -57,18 +63,19 @@ export class StateRegenerator implements IStateRegeneratorInternal {

const parentEpoch = computeEpochAtSlot(parentBlock.slot);
const blockEpoch = computeEpochAtSlot(block.slot);
const allowDiskReload = true;

// This may save us at least one epoch transition.
// If the requested state crosses an epoch boundary
// then we may use the checkpoint state before the block
// We may have the checkpoint state with parent root inside the checkpoint state cache
// through gossip validation.
if (parentEpoch < blockEpoch) {
return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, rCaller);
return this.getCheckpointState({root: block.parentRoot, epoch: blockEpoch}, opts, regenCaller, allowDiskReload);
}

// Otherwise, get the state normally.
return this.getState(parentBlock.stateRoot, rCaller);
return this.getState(parentBlock.stateRoot, regenCaller, allowDiskReload);
}

/**
Expand All @@ -77,20 +84,23 @@ export class StateRegenerator implements IStateRegeneratorInternal {
async getCheckpointState(
cp: phase0.Checkpoint,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
const checkpointStartSlot = computeStartSlotAtEpoch(cp.epoch);
return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, rCaller);
return this.getBlockSlotState(toHexString(cp.root), checkpointStartSlot, opts, regenCaller, allowDiskReload);
}

/**
* Get state after block `blockRoot` dialed forward to `slot`
* - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk
*/
async getBlockSlotState(
blockRoot: RootHex,
slot: Slot,
opts: StateCloneOpts,
rCaller: RegenCaller
regenCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
const block = this.modules.forkChoice.getBlockHex(blockRoot);
if (!block) {
Expand All @@ -108,26 +118,35 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}

const latestCheckpointStateCtx = this.modules.checkpointStateCache.getLatest(blockRoot, computeEpochAtSlot(slot));
const {checkpointStateCache} = this.modules;
const epoch = computeEpochAtSlot(slot);
const latestCheckpointStateCtx = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(blockRoot, epoch)
: checkpointStateCache.getLatest(blockRoot, epoch);

// If a checkpoint state exists with the given checkpoint root, it either is in requested epoch
// or needs to have empty slots processed until the requested epoch
if (latestCheckpointStateCtx) {
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, opts);
return processSlotsByCheckpoint(this.modules, latestCheckpointStateCtx, slot, regenCaller, opts);
}

// Otherwise, use the fork choice to get the stateRoot from block at the checkpoint root
// regenerate that state,
// then process empty slots until the requested epoch
const blockStateCtx = await this.getState(block.stateRoot, rCaller);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, opts);
const blockStateCtx = await this.getState(block.stateRoot, regenCaller, allowDiskReload);
return processSlotsByCheckpoint(this.modules, blockStateCtx, slot, regenCaller, opts);
}

/**
* Get state by exact root. If not in cache directly, requires finding the block that references the state from the
* forkchoice and replaying blocks to get to it.
* - allowDiskReload should be used with care, as it will cause the state to be reloaded from disk
*/
async getState(stateRoot: RootHex, _rCaller: RegenCaller): Promise<CachedBeaconStateAllForks> {
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
allowDiskReload = false
): Promise<CachedBeaconStateAllForks> {
// Trivial case, state at stateRoot is already cached
const cachedStateCtx = this.modules.stateCache.get(stateRoot);
if (cachedStateCtx) {
Expand All @@ -143,15 +162,17 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// gets reversed when replayed
const blocksToReplay = [block];
let state: CachedBeaconStateAllForks | null = null;
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.parentRoot)) {
const {checkpointStateCache} = this.modules;
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.stateCache.get(b.stateRoot);
if (state) {
break;
}
state = this.modules.checkpointStateCache.getLatest(
b.blockRoot,
computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1)
);
const epoch = computeEpochAtSlot(blocksToReplay[blocksToReplay.length - 1].slot - 1);
state = allowDiskReload
? await checkpointStateCache.getOrReloadLatest(b.blockRoot, epoch)
: checkpointStateCache.getLatest(b.blockRoot, epoch);
if (state) {
break;
}
Expand All @@ -172,6 +193,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
Expand All @@ -195,11 +218,23 @@ export class StateRegenerator implements IStateRegeneratorInternal {
verifyProposer: false,
verifySignatures: false,
},
null
this.modules.metrics
);

// TODO: Persist states, note that regen could be triggered by old states.
// Should those take a place in the cache?
const stateRoot = toHexString(state.hashTreeRoot());
if (b.stateRoot !== stateRoot) {
throw new RegenError({
slot: b.slot,
code: RegenErrorCode.INVALID_STATE_ROOT,
actual: stateRoot,
expected: b.stateRoot,
});
}

if (allowDiskReload) {
// also with allowDiskReload flag, we "reload" it to the state cache too
this.modules.stateCache.add(state);
}

// this avoids keeping our node busy processing blocks
await sleep(0);
Expand All @@ -210,13 +245,14 @@ export class StateRegenerator implements IStateRegeneratorInternal {
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});

return state;
}

private findFirstStateBlock(stateRoot: RootHex): ProtoBlock {
for (const block of this.modules.forkChoice.forwarditerateAncestorBlocks()) {
if (block !== undefined) {
if (block.stateRoot === stateRoot) {
twoeths marked this conversation as resolved.
Show resolved Hide resolved
return block;
}
}
Expand All @@ -237,9 +273,10 @@ async function processSlotsByCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, opts);
let postState = await processSlotsToNearestCheckpoint(modules, preState, slot, regenCaller, opts);
if (postState.slot < slot) {
postState = processSlots(postState, slot, opts, modules.metrics);
}
Expand All @@ -257,6 +294,7 @@ async function processSlotsToNearestCheckpoint(
modules: {checkpointStateCache: CheckpointStateCache; metrics: Metrics | null; emitter: ChainEventEmitter},
preState: CachedBeaconStateAllForks,
slot: Slot,
regenCaller: RegenCaller,
opts: StateCloneOpts
): Promise<CachedBeaconStateAllForks> {
const preSlot = preState.slot;
Expand All @@ -272,12 +310,16 @@ async function processSlotsToNearestCheckpoint(
) {
// processSlots calls .clone() before mutating
postState = processSlots(postState, nextEpochSlot, opts, metrics);
modules.metrics?.epochTransitionByCaller.inc({caller: regenCaller});

// Cache state to preserve epoch transition work
// this is usually added when we prepare for next slot or validate gossip block
// then when we process the 1st block of epoch, we don't have to do state transition again
// This adds Previous Root Checkpoint State to the checkpoint state cache
// This may becomes the "official" checkpoint state if the 1st block of epoch is skipped
const checkpointState = postState;
const cp = getCheckpointFromState(checkpointState);
checkpointStateCache.add(cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState);
emitter.emit(ChainEvent.checkpoint, cp, checkpointState.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add the costly clone() here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact I will add more clone() when returning an item from cache to avoid this mistake, see #6178 (comment)


// this avoids keeping our node busy processing blocks
await sleep(0);
Expand Down
5 changes: 5 additions & 0 deletions packages/beacon-node/src/metrics/metrics/lodestar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ export function createLodestarMetrics(

// Beacon state transition metrics

epochTransitionByCaller: register.gauge<{caller: RegenCaller}>({
name: "lodestar_epoch_transition_by_caller_total",
help: "Total count of epoch transition by caller",
labelNames: ["caller"],
}),
epochTransitionTime: register.histogram({
name: "lodestar_stfn_epoch_transition_seconds",
help: "Time to process a single epoch transition in seconds",
Expand Down
Loading