diff --git a/Cargo.lock b/Cargo.lock index 57d4a7deaf62..a0d94e760920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7071,6 +7071,7 @@ dependencies = [ "reth-errors", "reth-ethereum-consensus", "reth-evm", + "reth-exex-types", "reth-metrics", "reth-network-p2p", "reth-payload-builder", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 57adf06f14ea..38ab50877ac1 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -63,7 +63,9 @@ reth-tracing = { workspace = true, optional = true } [dev-dependencies] # reth reth-db = { workspace = true, features = ["test-utils"] } +reth-exex-types.workspace = true reth-network-p2p = { workspace = true, features = ["test-utils"] } +reth-prune.workspace = true reth-prune-types.workspace = true reth-stages = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 5b50927273ba..2a08d697ab18 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -229,7 +229,7 @@ pub enum PersistenceAction { } /// A handle to the persistence service -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender, @@ -254,13 +254,16 @@ impl PersistenceHandle { /// assumed to be ordered by block number. /// /// This returns the latest hash that has been saved, allowing removal of that block and any - /// previous blocks from in-memory data structures. - pub async fn save_blocks(&self, blocks: Vec) -> B256 { - let (tx, rx) = oneshot::channel(); + /// previous blocks from in-memory data structures. This value is returned in the receiver end + /// of the sender argument. + pub fn save_blocks(&self, blocks: Vec, tx: oneshot::Sender) { + if blocks.is_empty() { + let _ = tx.send(B256::default()); + return; + } self.sender .send(PersistenceAction::SaveBlocks((blocks, tx))) .expect("should be able to send"); - rx.await.expect("todo: err handling") } /// Tells the persistence service to remove blocks above a certain block number. The removed @@ -283,3 +286,86 @@ impl PersistenceHandle { rx.await.expect("todo: err handling") } } + +#[cfg(test)] +mod tests { + use super::*; + use reth_chainspec::MAINNET; + use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; + use reth_exex_types::FinishedExExHeight; + use reth_primitives::{ + Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256, + }; + use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory}; + use reth_prune::Pruner; + use reth_trie::{updates::TrieUpdates, HashedPostState}; + use revm::db::BundleState; + use std::sync::{mpsc::channel, Arc}; + + fn default_persistence_handle() -> PersistenceHandle { + let db = create_test_rw_db(); + let (_static_dir, static_dir_path) = create_test_static_files_dir(); + let provider = ProviderFactory::new( + db, + MAINNET.clone(), + StaticFileProvider::read_write(static_dir_path).unwrap(), + ); + let (finished_exex_height_tx, finished_exex_height_rx) = + tokio::sync::watch::channel(FinishedExExHeight::NoExExs); + + let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx); + + let (static_file_sender, _static_file_receiver) = channel(); + let static_file_handle = StaticFileServiceHandle::new(static_file_sender); + + Persistence::spawn_new(provider, static_file_handle, pruner) + } + + #[tokio::test] + async fn test_save_blocks_empty() { + let persistence_handle = default_persistence_handle(); + + let blocks = vec![]; + let (tx, rx) = oneshot::channel(); + + persistence_handle.save_blocks(blocks, tx); + + let hash = rx.await.unwrap(); + assert_eq!(hash, B256::default()); + } + + #[tokio::test] + async fn test_save_blocks_single_block() { + let persistence_handle = default_persistence_handle(); + + let mut block = Block::default(); + let sender = Address::random(); + let tx = TransactionSigned::default(); + block.body.push(tx); + let block_hash = block.hash_slow(); + let block_number = block.number; + let sealed = block.seal_slow(); + let sealed_with_senders = + SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap(); + + let executed = ExecutedBlock::new( + Arc::new(sealed), + Arc::new(sealed_with_senders.senders), + Arc::new(ExecutionOutcome::new( + BundleState::default(), + Receipts { receipt_vec: vec![] }, + block_number, + vec![Requests::default()], + )), + Arc::new(HashedPostState::default()), + Arc::new(TrieUpdates::default()), + ); + let blocks = vec![executed]; + let (tx, rx) = oneshot::channel(); + + persistence_handle.save_blocks(blocks, tx); + + let actual_hash = rx.await.unwrap(); + assert_eq!(block_hash, actual_hash); + } +} diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8afed31043c0..78e8cdbaeec7 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2,6 +2,7 @@ use crate::{ backfill::BackfillAction, chain::FromOrchestrator, engine::{DownloadRequest, EngineApiEvent, FromEngine}, + persistence::PersistenceHandle, }; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, @@ -37,12 +38,15 @@ use std::{ marker::PhantomData, sync::{mpsc::Receiver, Arc}, }; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::*; mod memory_overlay; pub use memory_overlay::MemoryOverlayStateProvider; +/// Maximum number of blocks to be kept in memory without triggering persistence. +const PERSISTENCE_THRESHOLD: u64 = 256; + /// Represents an executed block stored in-memory. #[derive(Clone, Debug)] pub struct ExecutedBlock { @@ -54,6 +58,16 @@ pub struct ExecutedBlock { } impl ExecutedBlock { + pub(crate) const fn new( + block: Arc, + senders: Arc>, + execution_output: Arc, + hashed_state: Arc, + trie: Arc, + ) -> Self { + Self { block, senders, execution_output, hashed_state, trie } + } + /// Returns a reference to the executed block. pub(crate) fn block(&self) -> &SealedBlock { &self.block @@ -120,6 +134,11 @@ impl TreeState { } } } + + /// Returns the maximum block number stored. + pub(crate) fn max_block_number(&self) -> BlockNumber { + *self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0 + } } /// Tracks the state of the engine api internals. @@ -129,7 +148,7 @@ impl TreeState { pub struct EngineApiTreeState { /// Tracks the state of the blockchain tree. tree_state: TreeState, - /// Tracks the received forkchoice state updates received by the CL. + /// Tracks the forkchoice state updates received by the CL. forkchoice_state_tracker: ForkchoiceStateTracker, /// Buffer of detached blocks. buffer: BlockBuffer, @@ -242,6 +261,8 @@ pub struct EngineApiTreeHandlerImpl { state: EngineApiTreeState, incoming: Receiver>>, outgoing: UnboundedSender, + persistence: PersistenceHandle, + persistence_state: PersistenceState, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, _marker: PhantomData, @@ -262,6 +283,7 @@ where incoming: Receiver>>, outgoing: UnboundedSender, state: EngineApiTreeState, + persistence: PersistenceHandle, ) -> Self { Self { provider, @@ -270,6 +292,8 @@ where payload_validator, incoming, outgoing, + persistence, + persistence_state: PersistenceState::default(), is_pipeline_active: false, state, _marker: PhantomData, @@ -284,6 +308,7 @@ where payload_validator: ExecutionPayloadValidator, incoming: Receiver>>, state: EngineApiTreeState, + persistence: PersistenceHandle, ) -> UnboundedSender { let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel(); let task = Self::new( @@ -294,6 +319,7 @@ where incoming, outgoing.clone(), state, + persistence, ); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); outgoing @@ -342,6 +368,71 @@ where } } } + + if self.should_persist() && !self.persistence_state.in_progress() { + let blocks_to_persist = self.get_blocks_to_persist(); + let (tx, rx) = oneshot::channel(); + self.persistence.save_blocks(blocks_to_persist, tx); + self.persistence_state.start(rx); + } + + if self.persistence_state.in_progress() { + let rx = self + .persistence_state + .rx + .as_mut() + .expect("if a persistence task is in progress Receiver must be Some"); + // Check if persistence has completed + if let Ok(last_persisted_block_hash) = rx.try_recv() { + if let Some(block) = + self.state.tree_state.block_by_hash(last_persisted_block_hash) + { + self.persistence_state.finish(last_persisted_block_hash, block.number); + self.remove_persisted_blocks_from_memory(); + } else { + error!("could not find persisted block with hash {last_persisted_block_hash} in memory"); + } + } + } + } + } + + /// Returns true if the canonical chain length minus the last persisted + /// block is more than the persistence threshold. + fn should_persist(&self) -> bool { + self.state.tree_state.max_block_number() - + self.persistence_state.last_persisted_block_number > + PERSISTENCE_THRESHOLD + } + + fn get_blocks_to_persist(&self) -> Vec { + let start = self.persistence_state.last_persisted_block_number + 1; + let end = start + PERSISTENCE_THRESHOLD; + + self.state + .tree_state + .blocks_by_number + .range(start..end) + .flat_map(|(_, blocks)| blocks.iter().cloned()) + .collect() + } + + fn remove_persisted_blocks_from_memory(&mut self) { + let keys_to_remove: Vec = self + .state + .tree_state + .blocks_by_number + .range(..=self.persistence_state.last_persisted_block_number) + .map(|(&k, _)| k) + .collect(); + + for key in keys_to_remove { + if let Some(blocks) = self.state.tree_state.blocks_by_number.remove(&key) { + // Remove corresponding blocks from blocks_by_hash + for block in blocks { + self.state.tree_state.blocks_by_hash.remove(&block.block().hash()); + } + } } } @@ -755,3 +846,35 @@ where todo!() } } + +/// The state of the persistence task. +#[derive(Default, Debug)] +struct PersistenceState { + /// Hash of the last block persisted. + last_persisted_block_hash: B256, + /// Receiver end of channel where the result of the persistence task will be + /// sent when done. A None value means there's no persistence task in progress. + rx: Option>, + /// The last persisted block number. + last_persisted_block_number: u64, +} + +impl PersistenceState { + /// Determines if there is a persistence task in progress by checking if the + /// receiver is set. + const fn in_progress(&self) -> bool { + self.rx.is_some() + } + + /// Sets state for a started persistence task. + fn start(&mut self, rx: oneshot::Receiver) { + self.rx = Some(rx); + } + + /// Sets state for a finished persistence task. + fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) { + self.rx = None; + self.last_persisted_block_number = last_persisted_block_number; + self.last_persisted_block_hash = last_persisted_block_hash; + } +}