Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: gracefully handle missing persisted_trie_updates #13942

Merged
merged 7 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 66 additions & 24 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
/// Updates the pending block with the given block.
///
/// Note: This assumes that the parent block of the pending block is canonical.
pub fn set_pending_block(&self, pending: ExecutedBlock<N>) {
pub fn set_pending_block(&self, pending: ExecutedBlockWithTrieUpdates<N>) {
// fetch the state of the pending block's parent block
let parent = self.state_by_hash(pending.recovered_block().parent_hash());
let pending = BlockState::with_parent(pending, parent);
Expand All @@ -254,9 +254,10 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
///
/// This removes all reorged blocks and appends the new blocks to the tracked chain and connects
/// them to their parent blocks.
fn update_blocks<I>(&self, new_blocks: I, reorged: I)
fn update_blocks<I, R>(&self, new_blocks: I, reorged: R)
where
I: IntoIterator<Item = ExecutedBlock<N>>,
I: IntoIterator<Item = ExecutedBlockWithTrieUpdates<N>>,
R: IntoIterator<Item = ExecutedBlock<N>>,
{
{
// acquire locks, starting with the numbers lock
Expand Down Expand Up @@ -601,20 +602,23 @@ impl<N: NodePrimitives> CanonicalInMemoryState<N> {
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct BlockState<N: NodePrimitives = EthPrimitives> {
/// The executed block that determines the state after this block has been executed.
block: ExecutedBlock<N>,
block: ExecutedBlockWithTrieUpdates<N>,
/// The block's parent block if it exists.
parent: Option<Arc<BlockState<N>>>,
}

#[allow(dead_code)]
impl<N: NodePrimitives> BlockState<N> {
/// [`BlockState`] constructor.
pub const fn new(block: ExecutedBlock<N>) -> Self {
pub const fn new(block: ExecutedBlockWithTrieUpdates<N>) -> Self {
Self { block, parent: None }
}

/// [`BlockState`] constructor with parent.
pub const fn with_parent(block: ExecutedBlock<N>, parent: Option<Arc<Self>>) -> Self {
pub const fn with_parent(
block: ExecutedBlockWithTrieUpdates<N>,
parent: Option<Arc<Self>>,
) -> Self {
Self { block, parent }
}

Expand All @@ -628,12 +632,12 @@ impl<N: NodePrimitives> BlockState<N> {
}

/// Returns the executed block that determines the state.
pub fn block(&self) -> ExecutedBlock<N> {
pub fn block(&self) -> ExecutedBlockWithTrieUpdates<N> {
self.block.clone()
}

/// Returns a reference to the executed block that determines the state.
pub const fn block_ref(&self) -> &ExecutedBlock<N> {
pub const fn block_ref(&self) -> &ExecutedBlockWithTrieUpdates<N> {
&self.block
}

Expand Down Expand Up @@ -787,29 +791,27 @@ impl<N: NodePrimitives> BlockState<N> {
}

/// Represents an executed block stored in-memory.
#[derive(Clone, Debug, PartialEq, Eq, Default)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ExecutedBlock<N: NodePrimitives = EthPrimitives> {
/// Recovered Block
pub recovered_block: Arc<RecoveredBlock<N::Block>>,
/// Block's execution outcome.
pub execution_output: Arc<ExecutionOutcome<N::Receipt>>,
/// Block's hashed state.
pub hashed_state: Arc<HashedPostState>,
/// Trie updates that result of applying the block.
pub trie: Arc<TrieUpdates>,
}

impl<N: NodePrimitives> ExecutedBlock<N> {
/// [`ExecutedBlock`] constructor.
pub const fn new(
recovered_block: Arc<RecoveredBlock<N::Block>>,
execution_output: Arc<ExecutionOutcome<N::Receipt>>,
hashed_state: Arc<HashedPostState>,
trie: Arc<TrieUpdates>,
) -> Self {
Self { recovered_block, execution_output, hashed_state, trie }
impl<N: NodePrimitives> Default for ExecutedBlock<N> {
fn default() -> Self {
Self {
recovered_block: Default::default(),
execution_output: Default::default(),
hashed_state: Default::default(),
}
}
}

impl<N: NodePrimitives> ExecutedBlock<N> {
/// Returns a reference to an inner [`SealedBlock`]
#[inline]
pub fn sealed_block(&self) -> &SealedBlock<N::Block> {
Expand All @@ -833,6 +835,42 @@ impl<N: NodePrimitives> ExecutedBlock<N> {
pub fn hashed_state(&self) -> &HashedPostState {
&self.hashed_state
}
}

/// An [`ExecutedBlock`] with its [`TrieUpdates`].
///
/// We store it as separate type because [`TrieUpdates`] are only available for blocks stored in
/// memory and can't be obtained for canonical persisted blocks.
#[derive(
Clone,
Debug,
PartialEq,
Eq,
Default,
derive_more::Deref,
derive_more::DerefMut,
derive_more::Into,
)]
pub struct ExecutedBlockWithTrieUpdates<N: NodePrimitives = EthPrimitives> {
/// Inner [`ExecutedBlock`].
#[deref]
#[deref_mut]
#[into]
pub block: ExecutedBlock<N>,
/// Trie updates that result of applying the block.
pub trie: Arc<TrieUpdates>,
}

impl<N: NodePrimitives> ExecutedBlockWithTrieUpdates<N> {
/// [`ExecutedBlock`] constructor.
pub const fn new(
recovered_block: Arc<RecoveredBlock<N::Block>>,
execution_output: Arc<ExecutionOutcome<N::Receipt>>,
hashed_state: Arc<HashedPostState>,
trie: Arc<TrieUpdates>,
) -> Self {
Self { block: ExecutedBlock { recovered_block, execution_output, hashed_state }, trie }
}

/// Returns a reference to the trie updates for the block
#[inline]
Expand All @@ -847,14 +885,18 @@ pub enum NewCanonicalChain<N: NodePrimitives = EthPrimitives> {
/// A simple append to the current canonical head
Commit {
/// all blocks that lead back to the canonical head
new: Vec<ExecutedBlock<N>>,
new: Vec<ExecutedBlockWithTrieUpdates<N>>,
},
/// A reorged chain consists of two chains that trace back to a shared ancestor block at which
/// point they diverge.
Reorg {
/// All blocks of the _new_ chain
new: Vec<ExecutedBlock<N>>,
new: Vec<ExecutedBlockWithTrieUpdates<N>>,
/// All blocks of the _old_ chain
///
/// These are not [`ExecutedBlockWithTrieUpdates`] because we don't always have the trie
/// updates for the old canonical chain. For example, in case of node being restarted right
/// before the reorg [`TrieUpdates`] can't be fetched from database.
old: Vec<ExecutedBlock<N>>,
},
}
Expand Down Expand Up @@ -1257,7 +1299,7 @@ mod tests {
block1.recovered_block().hash()
);

let chain = NewCanonicalChain::Reorg { new: vec![block2.clone()], old: vec![block1] };
let chain = NewCanonicalChain::Reorg { new: vec![block2.clone()], old: vec![block1.block] };
state.update_chain(chain);
assert_eq!(
state.head_state().unwrap().block_ref().recovered_block().hash(),
Expand Down Expand Up @@ -1540,7 +1582,7 @@ mod tests {
// Test reorg notification
let chain_reorg = NewCanonicalChain::Reorg {
new: vec![block1a.clone(), block2a.clone()],
old: vec![block1.clone(), block2.clone()],
old: vec![block1.block.clone(), block2.block.clone()],
};

assert_eq!(
Expand Down
9 changes: 6 additions & 3 deletions crates/chain-state/src/memory_overlay.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::ExecutedBlock;
use super::ExecutedBlockWithTrieUpdates;
use alloy_consensus::BlockHeader;
use alloy_primitives::{
keccak256, map::B256HashMap, Address, BlockNumber, Bytes, StorageKey, StorageValue, B256,
Expand All @@ -23,7 +23,7 @@ pub struct MemoryOverlayStateProviderRef<'a, N: NodePrimitives = reth_primitives
/// Historical state provider for state lookups that are not found in in-memory blocks.
pub(crate) historical: Box<dyn StateProvider + 'a>,
/// The collection of executed parent blocks. Expected order is newest to oldest.
pub(crate) in_memory: Vec<ExecutedBlock<N>>,
pub(crate) in_memory: Vec<ExecutedBlockWithTrieUpdates<N>>,
/// Lazy-loaded in-memory trie data.
pub(crate) trie_state: OnceLock<MemoryOverlayTrieState>,
}
Expand All @@ -40,7 +40,10 @@ impl<'a, N: NodePrimitives> MemoryOverlayStateProviderRef<'a, N> {
/// - `in_memory` - the collection of executed ancestor blocks in reverse.
/// - `historical` - a historical state provider for the latest ancestor block stored in the
/// database.
pub fn new(historical: Box<dyn StateProvider + 'a>, in_memory: Vec<ExecutedBlock<N>>) -> Self {
pub fn new(
historical: Box<dyn StateProvider + 'a>,
in_memory: Vec<ExecutedBlockWithTrieUpdates<N>>,
) -> Self {
Self { historical, in_memory, trie_state: OnceLock::new() }
}

Expand Down
18 changes: 9 additions & 9 deletions crates/chain-state/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use core::marker::PhantomData;

use crate::{
in_memory::ExecutedBlock, CanonStateNotification, CanonStateNotifications,
in_memory::ExecutedBlockWithTrieUpdates, CanonStateNotification, CanonStateNotifications,
CanonStateSubscriptions,
};
use alloy_consensus::{
Expand Down Expand Up @@ -204,17 +204,17 @@ impl<N: NodePrimitives> TestBlockBuilder<N> {
fork
}

/// Gets an [`ExecutedBlock`] with [`BlockNumber`], [`Receipts`] and parent hash.
/// Gets an [`ExecutedBlockWithTrieUpdates`] with [`BlockNumber`], [`Receipts`] and parent hash.
fn get_executed_block(
&mut self,
block_number: BlockNumber,
receipts: Receipts,
parent_hash: B256,
) -> ExecutedBlock {
) -> ExecutedBlockWithTrieUpdates {
let block_with_senders = self.generate_random_block(block_number, parent_hash);

let (block, senders) = block_with_senders.split_sealed();
ExecutedBlock::new(
ExecutedBlockWithTrieUpdates::new(
Arc::new(RecoveredBlock::new_sealed(block, senders)),
Arc::new(ExecutionOutcome::new(
BundleState::default(),
Expand All @@ -227,30 +227,30 @@ impl<N: NodePrimitives> TestBlockBuilder<N> {
)
}

/// Generates an [`ExecutedBlock`] that includes the given [`Receipts`].
/// Generates an [`ExecutedBlockWithTrieUpdates`] that includes the given [`Receipts`].
pub fn get_executed_block_with_receipts(
&mut self,
receipts: Receipts,
parent_hash: B256,
) -> ExecutedBlock {
) -> ExecutedBlockWithTrieUpdates {
let number = rand::thread_rng().gen::<u64>();
self.get_executed_block(number, receipts, parent_hash)
}

/// Generates an [`ExecutedBlock`] with the given [`BlockNumber`].
/// Generates an [`ExecutedBlockWithTrieUpdates`] with the given [`BlockNumber`].
pub fn get_executed_block_with_number(
&mut self,
block_number: BlockNumber,
parent_hash: B256,
) -> ExecutedBlock {
) -> ExecutedBlockWithTrieUpdates {
self.get_executed_block(block_number, Receipts { receipt_vec: vec![vec![]] }, parent_hash)
}

/// Generates a range of executed blocks with ascending block numbers.
pub fn get_executed_blocks(
&mut self,
range: Range<u64>,
) -> impl Iterator<Item = ExecutedBlock> + '_ {
) -> impl Iterator<Item = ExecutedBlockWithTrieUpdates> + '_ {
let mut parent_hash = B256::default();
range.map(move |number| {
let current_parent_hash = parent_hash;
Expand Down
4 changes: 2 additions & 2 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use alloy_primitives::B256;
use futures::{Stream, StreamExt};
use reth_chain_state::ExecutedBlock;
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_engine_primitives::{BeaconConsensusEngineEvent, BeaconEngineMessage, EngineTypes};
use reth_primitives::{NodePrimitives, RecoveredBlock};
use reth_primitives_traits::Block;
Expand Down Expand Up @@ -245,7 +245,7 @@ pub enum EngineApiRequest<T: EngineTypes, N: NodePrimitives> {
/// A request received from the consensus engine.
Beacon(BeaconEngineMessage<T>),
/// Request to insert an already executed block, e.g. via payload building.
InsertExecutedBlock(ExecutedBlock<N>),
InsertExecutedBlock(ExecutedBlockWithTrieUpdates<N>),
}

impl<T: EngineTypes, N: NodePrimitives> Display for EngineApiRequest<T, N> {
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::metrics::PersistenceMetrics;
use alloy_consensus::BlockHeader;
use alloy_eips::BlockNumHash;
use reth_chain_state::ExecutedBlock;
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_errors::ProviderError;
use reth_primitives::{EthPrimitives, NodePrimitives};
use reth_provider::{
Expand Down Expand Up @@ -140,7 +140,7 @@ where

fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock<N::Primitives>>,
blocks: Vec<ExecutedBlockWithTrieUpdates<N::Primitives>>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", first=?blocks.first().map(|b| b.recovered_block.num_hash()), last=?blocks.last().map(|b| b.recovered_block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
Expand Down Expand Up @@ -181,7 +181,7 @@ pub enum PersistenceAction<N: NodePrimitives = EthPrimitives> {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock<N>>, oneshot::Sender<Option<BlockNumHash>>),
SaveBlocks(Vec<ExecutedBlockWithTrieUpdates<N>>, oneshot::Sender<Option<BlockNumHash>>),

/// Removes block data above the given block number from the database.
///
Expand Down Expand Up @@ -258,7 +258,7 @@ impl<T: NodePrimitives> PersistenceHandle<T> {
/// If there are no blocks to persist, then `None` is sent in the sender.
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock<T>>,
blocks: Vec<ExecutedBlockWithTrieUpdates<T>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction<T>>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
Expand Down
Loading
Loading