From 9428cc91b26604f63f97893dedb6870b16a32ea4 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Sat, 6 Jul 2024 14:51:45 +0200 Subject: [PATCH 01/20] feat: tree hook for persisting blocks --- crates/engine/tree/src/tree/mod.rs | 37 ++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 8afed31043c0..84ed8f9157ce 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2,6 +2,7 @@ use crate::{ backfill::BackfillAction, chain::FromOrchestrator, engine::{DownloadRequest, EngineApiEvent, FromEngine}, + persistence::{PersistenceAction, PersistenceHandle}, }; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, @@ -242,6 +243,7 @@ pub struct EngineApiTreeHandlerImpl { state: EngineApiTreeState, incoming: Receiver>>, outgoing: UnboundedSender, + persistence: PersistenceHandle, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, _marker: PhantomData, @@ -262,6 +264,7 @@ where incoming: Receiver>>, outgoing: UnboundedSender, state: EngineApiTreeState, + persistence: PersistenceHandle, ) -> Self { Self { provider, @@ -270,6 +273,7 @@ where payload_validator, incoming, outgoing, + persistence, is_pipeline_active: false, state, _marker: PhantomData, @@ -284,6 +288,7 @@ where payload_validator: ExecutionPayloadValidator, incoming: Receiver>>, state: EngineApiTreeState, + persistence: PersistenceHandle, ) -> UnboundedSender { let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel(); let task = Self::new( @@ -294,6 +299,7 @@ where incoming, outgoing.clone(), state, + persistence, ); std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap(); outgoing @@ -342,9 +348,40 @@ where } } } + + if self.should_persist() { + self.persist_state(); + } } } + /// Returns true if the canonical chain length minus the last persisted + /// block is more than the persistence threshold. + const fn should_persist(&self) -> bool { + false + } + + fn persist_state(&self) { + let blocks_to_persist = self.get_blocks_to_persist(); + let (sender, receiver) = tokio::sync::oneshot::channel(); + let action = PersistenceAction::SaveBlocks((blocks_to_persist, sender)); + self.persistence.send_action(action).unwrap(); + + // Handle the response from the persistence task + tokio::spawn(async move { + match receiver.await { + Ok(_response) => {} + Err(e) => { + error!("Failed to persist data: {e}"); + } + } + }); + } + + const fn get_blocks_to_persist(&self) -> Vec { + Vec::new() + } + /// Return block from database or in-memory state by hash. fn block_by_hash(&self, hash: B256) -> ProviderResult> { // check database first From 90cc255b8ab94280a41aa6222dbcab6e0b8cfcb5 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Mon, 8 Jul 2024 18:30:04 +0200 Subject: [PATCH 02/20] store last persisted block --- crates/engine/tree/src/persistence.rs | 23 ++++++++++++++++++----- crates/engine/tree/src/tree/mod.rs | 23 ++++++----------------- 2 files changed, 24 insertions(+), 22 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 5b50927273ba..32124eafb8c3 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -233,12 +233,14 @@ pub enum PersistenceAction { pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender, + /// The last persisted block number. + last_persisted_block_number: u64, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. pub const fn new(sender: Sender) -> Self { - Self { sender } + Self { sender, last_persisted_block_number: 0 } } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible @@ -255,12 +257,19 @@ impl PersistenceHandle { /// /// This returns the latest hash that has been saved, allowing removal of that block and any /// previous blocks from in-memory data structures. - pub async fn save_blocks(&self, blocks: Vec) -> B256 { + pub async fn save_blocks(&mut self, blocks: Vec) -> B256 { + if blocks.is_empty() { + return B256::default(); + } + let last_block_number = + blocks.last().expect("at this point blocks is not empty").block().number; let (tx, rx) = oneshot::channel(); self.sender .send(PersistenceAction::SaveBlocks((blocks, tx))) .expect("should be able to send"); - rx.await.expect("todo: err handling") + let hash = rx.await.expect("todo: err handling"); + self.last_persisted_block_number = last_block_number; + hash } /// Tells the persistence service to remove blocks above a certain block number. The removed @@ -270,7 +279,9 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) .expect("should be able to send"); - rx.await.expect("todo: err handling") + let hash = rx.await.expect("todo: err handling"); + self.last_persisted_block_number = block_num; + hash } /// Tells the persistence service to remove block data before the given hash, according to the @@ -280,6 +291,8 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::PruneBefore((block_num, tx))) .expect("should be able to send"); - rx.await.expect("todo: err handling") + let hash = rx.await.expect("todo: err handling"); + self.last_persisted_block_number = 0; + hash } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 84ed8f9157ce..5ddb1668793f 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2,7 +2,7 @@ use crate::{ backfill::BackfillAction, chain::FromOrchestrator, engine::{DownloadRequest, EngineApiEvent, FromEngine}, - persistence::{PersistenceAction, PersistenceHandle}, + persistence::PersistenceHandle, }; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, @@ -305,7 +305,7 @@ where outgoing } - fn run(mut self) { + async fn run(mut self) { loop { while let Ok(msg) = self.incoming.recv() { match msg { @@ -350,7 +350,7 @@ where } if self.should_persist() { - self.persist_state(); + self.persist_state().await; } } } @@ -361,21 +361,10 @@ where false } - fn persist_state(&self) { + async fn persist_state(&mut self) { let blocks_to_persist = self.get_blocks_to_persist(); - let (sender, receiver) = tokio::sync::oneshot::channel(); - let action = PersistenceAction::SaveBlocks((blocks_to_persist, sender)); - self.persistence.send_action(action).unwrap(); - - // Handle the response from the persistence task - tokio::spawn(async move { - match receiver.await { - Ok(_response) => {} - Err(e) => { - error!("Failed to persist data: {e}"); - } - } - }); + + self.persistence.save_blocks(blocks_to_persist).await; } const fn get_blocks_to_persist(&self) -> Vec { From aa358a4c0fc2e39e1c2c9f637a8d023b9f110d37 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 9 Jul 2024 10:58:32 +0200 Subject: [PATCH 03/20] impl should persist and max block number --- crates/engine/tree/src/persistence.rs | 10 ++++++++++ crates/engine/tree/src/tree/mod.rs | 13 +++++++++++-- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 32124eafb8c3..5d9d281b93dc 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -295,4 +295,14 @@ impl PersistenceHandle { self.last_persisted_block_number = 0; hash } + + /// Last persisted block number getter. + pub const fn last_persisted_block_number(&self) -> u64 { + self.last_persisted_block_number + } +} + +#[cfg(test)] +mod tests { + use super::*; } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 5ddb1668793f..830aefd1faf4 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -44,6 +44,9 @@ use tracing::*; mod memory_overlay; pub use memory_overlay::MemoryOverlayStateProvider; +/// Maximum number of blocks to be kept in memory without triggering persistence. +const PERSISTENCE_THRESHOLD: u64 = 512; + /// Represents an executed block stored in-memory. #[derive(Clone, Debug)] pub struct ExecutedBlock { @@ -121,6 +124,11 @@ impl TreeState { } } } + + /// Returns the maximum block number stored. + pub(crate) fn max_block_number(&self) -> BlockNumber { + *self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0 + } } /// Tracks the state of the engine api internals. @@ -357,8 +365,9 @@ where /// Returns true if the canonical chain length minus the last persisted /// block is more than the persistence threshold. - const fn should_persist(&self) -> bool { - false + fn should_persist(&self) -> bool { + self.state.tree_state.max_block_number() - self.persistence.last_persisted_block_number() > + PERSISTENCE_THRESHOLD } async fn persist_state(&mut self) { From fb288eadffdab0dc8e8e8535d4d6f0e6e6cf8c1f Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 9 Jul 2024 12:41:19 +0200 Subject: [PATCH 04/20] add persistance test --- Cargo.lock | 1 + crates/engine/tree/Cargo.toml | 2 ++ crates/engine/tree/src/persistence.rs | 35 ++++++++++++++++++++++++--- 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 57d4a7deaf62..a0d94e760920 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7071,6 +7071,7 @@ dependencies = [ "reth-errors", "reth-ethereum-consensus", "reth-evm", + "reth-exex-types", "reth-metrics", "reth-network-p2p", "reth-payload-builder", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 57adf06f14ea..38ab50877ac1 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -63,7 +63,9 @@ reth-tracing = { workspace = true, optional = true } [dev-dependencies] # reth reth-db = { workspace = true, features = ["test-utils"] } +reth-exex-types.workspace = true reth-network-p2p = { workspace = true, features = ["test-utils"] } +reth-prune.workspace = true reth-prune-types.workspace = true reth-stages = { workspace = true, features = ["test-utils"] } reth-tracing.workspace = true diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 5d9d281b93dc..d018116ffead 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -279,9 +279,9 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) .expect("should be able to send"); - let hash = rx.await.expect("todo: err handling"); + let removed_blocks = rx.await.expect("todo: err handling"); self.last_persisted_block_number = block_num; - hash + removed_blocks } /// Tells the persistence service to remove block data before the given hash, according to the @@ -304,5 +304,34 @@ impl PersistenceHandle { #[cfg(test)] mod tests { - use super::*; + use crate::persistence::Persistence; + use reth_chainspec::MAINNET; + use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; + use reth_exex_types::FinishedExExHeight; + use reth_primitives::B256; + use reth_provider::{providers::StaticFileProvider, ProviderFactory}; + use reth_prune::Pruner; + + #[tokio::test] + async fn test_save_blocks_empty() { + let db = create_test_rw_db(); + let (_static_dir, static_dir_path) = create_test_static_files_dir(); + let provider = ProviderFactory::new( + db, + MAINNET.clone(), + StaticFileProvider::read_write(static_dir_path).unwrap(), + ); + let (finished_exex_height_tx, finished_exex_height_rx) = + tokio::sync::watch::channel(FinishedExExHeight::NoExExs); + + let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx); + + let mut persistence_handle = Persistence::spawn_new(provider, pruner); + + let blocks = vec![]; + + let hash = persistence_handle.save_blocks(blocks).await; + + assert_eq!(hash, B256::default()); + } } From 10820ba8011871e5902c22b24ca4b1f5a0f1109c Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 9 Jul 2024 17:08:34 +0200 Subject: [PATCH 05/20] add persistance test, save_blocks non-empty --- crates/engine/tree/src/persistence.rs | 54 ++++++++++++++++++++++++--- crates/engine/tree/src/tree/mod.rs | 10 +++++ 2 files changed, 58 insertions(+), 6 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index d018116ffead..62ab8e2cba5f 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -304,16 +304,20 @@ impl PersistenceHandle { #[cfg(test)] mod tests { - use crate::persistence::Persistence; + use super::*; use reth_chainspec::MAINNET; use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir}; use reth_exex_types::FinishedExExHeight; - use reth_primitives::B256; - use reth_provider::{providers::StaticFileProvider, ProviderFactory}; + use reth_primitives::{ + Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256, + }; + use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory}; use reth_prune::Pruner; + use reth_trie::{updates::TrieUpdates, HashedPostState}; + use revm::db::BundleState; + use std::sync::Arc; - #[tokio::test] - async fn test_save_blocks_empty() { + fn default_persistence_handle() -> PersistenceHandle { let db = create_test_rw_db(); let (_static_dir, static_dir_path) = create_test_static_files_dir(); let provider = ProviderFactory::new( @@ -326,7 +330,12 @@ mod tests { let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx); - let mut persistence_handle = Persistence::spawn_new(provider, pruner); + Persistence::spawn_new(provider, pruner) + } + + #[tokio::test] + async fn test_save_blocks_empty() { + let mut persistence_handle = default_persistence_handle(); let blocks = vec![]; @@ -334,4 +343,37 @@ mod tests { assert_eq!(hash, B256::default()); } + + #[tokio::test] + async fn test_save_blocks_single_block() { + let mut persistence_handle = default_persistence_handle(); + + let mut block = Block::default(); + let sender = Address::random(); + let tx = TransactionSigned::default(); + block.body.push(tx); + let block_hash = block.hash_slow(); + let block_number = block.number; + let sealed = block.seal_slow(); + let sealed_with_senders = + SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap(); + + let executed = ExecutedBlock::new( + Arc::new(sealed), + Arc::new(sealed_with_senders.senders), + Arc::new(ExecutionOutcome::new( + BundleState::default(), + Receipts { receipt_vec: vec![] }, + block_number, + vec![Requests::default()], + )), + Arc::new(HashedPostState::default()), + Arc::new(TrieUpdates::default()), + ); + let blocks = vec![executed]; + + let actual_hash = persistence_handle.save_blocks(blocks).await; + + assert_eq!(block_hash, actual_hash); + } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 830aefd1faf4..49b9ae4d24dd 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -58,6 +58,16 @@ pub struct ExecutedBlock { } impl ExecutedBlock { + pub(crate) const fn new( + block: Arc, + senders: Arc>, + execution_output: Arc, + hashed_state: Arc, + trie: Arc, + ) -> Self { + Self { block, senders, execution_output, hashed_state, trie } + } + /// Returns a reference to the executed block. pub(crate) fn block(&self) -> &SealedBlock { &self.block From fd0c2581d017601e347c9656de2525ed8f551973 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Tue, 9 Jul 2024 17:18:14 +0200 Subject: [PATCH 06/20] get_blocks_to_persist impl --- crates/engine/tree/src/tree/mod.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 49b9ae4d24dd..08e3d842547e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -386,8 +386,16 @@ where self.persistence.save_blocks(blocks_to_persist).await; } - const fn get_blocks_to_persist(&self) -> Vec { - Vec::new() + fn get_blocks_to_persist(&self) -> Vec { + let start = self.persistence.last_persisted_block_number() + 1; + let end = start + PERSISTENCE_THRESHOLD; + + self.state + .tree_state + .blocks_by_number + .range(start..end) + .flat_map(|(_, blocks)| blocks.iter().cloned()) + .collect() } /// Return block from database or in-memory state by hash. From b59fcd88bc0441acda3dd3ad679e3eb4d06c8a20 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 10:38:27 +0200 Subject: [PATCH 07/20] last_persisted_block_number to tree --- crates/engine/tree/src/persistence.rs | 34 ++++++++++----------------- crates/engine/tree/src/tree/mod.rs | 13 +++++++--- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 62ab8e2cba5f..d558dab4835f 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -233,14 +233,12 @@ pub enum PersistenceAction { pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender, - /// The last persisted block number. - last_persisted_block_number: u64, } impl PersistenceHandle { /// Create a new [`PersistenceHandle`] from a [`Sender`]. pub const fn new(sender: Sender) -> Self { - Self { sender, last_persisted_block_number: 0 } + Self { sender } } /// Sends a specific [`PersistenceAction`] in the contained channel. The caller is responsible @@ -257,7 +255,7 @@ impl PersistenceHandle { /// /// This returns the latest hash that has been saved, allowing removal of that block and any /// previous blocks from in-memory data structures. - pub async fn save_blocks(&mut self, blocks: Vec) -> B256 { + pub async fn save_blocks(&self, blocks: Vec) -> B256 { if blocks.is_empty() { return B256::default(); } @@ -267,9 +265,7 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::SaveBlocks((blocks, tx))) .expect("should be able to send"); - let hash = rx.await.expect("todo: err handling"); - self.last_persisted_block_number = last_block_number; - hash + rx.await.expect("todo: err handling") } /// Tells the persistence service to remove blocks above a certain block number. The removed @@ -279,9 +275,7 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::RemoveBlocksAbove((block_num, tx))) .expect("should be able to send"); - let removed_blocks = rx.await.expect("todo: err handling"); - self.last_persisted_block_number = block_num; - removed_blocks + rx.await.expect("todo: err handling") } /// Tells the persistence service to remove block data before the given hash, according to the @@ -291,14 +285,7 @@ impl PersistenceHandle { self.sender .send(PersistenceAction::PruneBefore((block_num, tx))) .expect("should be able to send"); - let hash = rx.await.expect("todo: err handling"); - self.last_persisted_block_number = 0; - hash - } - - /// Last persisted block number getter. - pub const fn last_persisted_block_number(&self) -> u64 { - self.last_persisted_block_number + rx.await.expect("todo: err handling") } } @@ -315,7 +302,7 @@ mod tests { use reth_prune::Pruner; use reth_trie::{updates::TrieUpdates, HashedPostState}; use revm::db::BundleState; - use std::sync::Arc; + use std::sync::{mpsc::channel, Arc}; fn default_persistence_handle() -> PersistenceHandle { let db = create_test_rw_db(); @@ -330,12 +317,15 @@ mod tests { let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx); - Persistence::spawn_new(provider, pruner) + let (static_file_sender, _static_file_receiver) = channel(); + let static_file_handle = StaticFileServiceHandle::new(static_file_sender); + + Persistence::spawn_new(provider, static_file_handle, pruner) } #[tokio::test] async fn test_save_blocks_empty() { - let mut persistence_handle = default_persistence_handle(); + let persistence_handle = default_persistence_handle(); let blocks = vec![]; @@ -346,7 +336,7 @@ mod tests { #[tokio::test] async fn test_save_blocks_single_block() { - let mut persistence_handle = default_persistence_handle(); + let persistence_handle = default_persistence_handle(); let mut block = Block::default(); let sender = Address::random(); diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 08e3d842547e..efba60ead0b4 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -264,6 +264,7 @@ pub struct EngineApiTreeHandlerImpl { persistence: PersistenceHandle, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, + last_persisted_block_number: u64, _marker: PhantomData, } @@ -294,6 +295,7 @@ where persistence, is_pipeline_active: false, state, + last_persisted_block_number: 0, _marker: PhantomData, } } @@ -376,18 +378,23 @@ where /// Returns true if the canonical chain length minus the last persisted /// block is more than the persistence threshold. fn should_persist(&self) -> bool { - self.state.tree_state.max_block_number() - self.persistence.last_persisted_block_number() > + self.state.tree_state.max_block_number() - self.last_persisted_block_number > PERSISTENCE_THRESHOLD } async fn persist_state(&mut self) { let blocks_to_persist = self.get_blocks_to_persist(); - self.persistence.save_blocks(blocks_to_persist).await; + let last_persisted_hash = self.persistence.save_blocks(blocks_to_persist).await; + if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { + self.last_persisted_block_number = block.number; + } else { + error!("could not find persisted block with hash {last_persisted_hash} in memory"); + } } fn get_blocks_to_persist(&self) -> Vec { - let start = self.persistence.last_persisted_block_number() + 1; + let start = self.last_persisted_block_number + 1; let end = start + PERSISTENCE_THRESHOLD; self.state From 0a89ab877433f5c6c5660c1aa827b1d01aeb9538 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 10:39:13 +0200 Subject: [PATCH 08/20] PersistenceHandle not clone --- crates/engine/tree/src/persistence.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index d558dab4835f..b4b09fd2fb9c 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -229,7 +229,7 @@ pub enum PersistenceAction { } /// A handle to the persistence service -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct PersistenceHandle { /// The channel used to communicate with the persistence service sender: Sender, From c081a047fafe2d289fa53c96d3020f1dd0ef5904 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 10:58:45 +0200 Subject: [PATCH 09/20] add logic to remove persisted blocks from memory --- crates/engine/tree/src/tree/mod.rs | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index efba60ead0b4..224d4d96ce19 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -148,7 +148,7 @@ impl TreeState { pub struct EngineApiTreeState { /// Tracks the state of the blockchain tree. tree_state: TreeState, - /// Tracks the received forkchoice state updates received by the CL. + /// Tracks the forkchoice state updates received by the CL. forkchoice_state_tracker: ForkchoiceStateTracker, /// Buffer of detached blocks. buffer: BlockBuffer, @@ -387,7 +387,10 @@ where let last_persisted_hash = self.persistence.save_blocks(blocks_to_persist).await; if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { - self.last_persisted_block_number = block.number; + let last_persisted_block_number = block.number; + self.last_persisted_block_number = last_persisted_block_number; + + self.remove_persisted_blocks_from_memory(last_persisted_block_number); } else { error!("could not find persisted block with hash {last_persisted_hash} in memory"); } @@ -405,6 +408,25 @@ where .collect() } + fn remove_persisted_blocks_from_memory(&mut self, last_persisted_block_number: BlockNumber) { + let keys_to_remove: Vec = self + .state + .tree_state + .blocks_by_number + .range(..=last_persisted_block_number) + .map(|(&k, _)| k) + .collect(); + + for key in keys_to_remove { + if let Some(blocks) = self.state.tree_state.blocks_by_number.remove(&key) { + // Remove corresponding blocks from blocks_by_hash + for block in blocks { + self.state.tree_state.blocks_by_hash.remove(&block.block().hash()); + } + } + } + } + /// Return block from database or in-memory state by hash. fn block_by_hash(&self, hash: B256) -> ProviderResult> { // check database first From 875de2f2d52e8f154a7f362ff15b3054cc7e5656 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 12:07:00 +0200 Subject: [PATCH 10/20] receive save_block output on a channel with try_recv --- crates/engine/tree/src/persistence.rs | 28 ++++++----- crates/engine/tree/src/tree/mod.rs | 69 ++++++++++++++++++++------- 2 files changed, 67 insertions(+), 30 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index b4b09fd2fb9c..a60e91f8519d 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -254,18 +254,16 @@ impl PersistenceHandle { /// assumed to be ordered by block number. /// /// This returns the latest hash that has been saved, allowing removal of that block and any - /// previous blocks from in-memory data structures. - pub async fn save_blocks(&self, blocks: Vec) -> B256 { + /// previous blocks from in-memory data structures. This value is returned in the receiver end + /// of the sender argument. + pub fn save_blocks(&self, blocks: Vec, tx: oneshot::Sender) { if blocks.is_empty() { - return B256::default(); + let _ = tx.send(B256::default()); + return; } - let last_block_number = - blocks.last().expect("at this point blocks is not empty").block().number; - let (tx, rx) = oneshot::channel(); self.sender .send(PersistenceAction::SaveBlocks((blocks, tx))) .expect("should be able to send"); - rx.await.expect("todo: err handling") } /// Tells the persistence service to remove blocks above a certain block number. The removed @@ -323,19 +321,21 @@ mod tests { Persistence::spawn_new(provider, static_file_handle, pruner) } - #[tokio::test] - async fn test_save_blocks_empty() { + #[test] + fn test_save_blocks_empty() { let persistence_handle = default_persistence_handle(); let blocks = vec![]; + let (tx, rx) = oneshot::channel(); - let hash = persistence_handle.save_blocks(blocks).await; + persistence_handle.save_blocks(blocks, tx); + let hash = rx.blocking_recv().unwrap(); assert_eq!(hash, B256::default()); } - #[tokio::test] - async fn test_save_blocks_single_block() { + #[test] + fn test_save_blocks_single_block() { let persistence_handle = default_persistence_handle(); let mut block = Block::default(); @@ -361,9 +361,11 @@ mod tests { Arc::new(TrieUpdates::default()), ); let blocks = vec![executed]; + let (tx, rx) = oneshot::channel(); - let actual_hash = persistence_handle.save_blocks(blocks).await; + persistence_handle.save_blocks(blocks, tx); + let actual_hash = rx.blocking_recv().unwrap(); assert_eq!(block_hash, actual_hash); } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 224d4d96ce19..cf232cb5aaf3 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -4,6 +4,7 @@ use crate::{ engine::{DownloadRequest, EngineApiEvent, FromEngine}, persistence::PersistenceHandle, }; +use parking_lot::Mutex; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, }; @@ -38,7 +39,7 @@ use std::{ marker::PhantomData, sync::{mpsc::Receiver, Arc}, }; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::{mpsc::UnboundedSender, oneshot}; use tracing::*; mod memory_overlay; @@ -252,6 +253,17 @@ pub enum TreeAction { MakeCanonical(B256), } +/// The state of the persistence task. +struct PersistenceState { + /// True if there is a persistence operation in progress. + in_progress: bool, + /// Hash of the last block persisted. + last_persisted_hash: Option, + /// Receiver end of channel where the result of the persistence task will be + /// sent when done. + rx: Option>, +} + #[derive(Debug)] pub struct EngineApiTreeHandlerImpl { provider: P, @@ -264,6 +276,7 @@ pub struct EngineApiTreeHandlerImpl { persistence: PersistenceHandle, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, + /// The last persisted block number. last_persisted_block_number: u64, _marker: PhantomData, } @@ -325,7 +338,13 @@ where outgoing } - async fn run(mut self) { + fn run(mut self) { + let persistence_state = Arc::new(Mutex::new(PersistenceState { + in_progress: false, + last_persisted_hash: None, + rx: None, + })); + loop { while let Ok(msg) = self.incoming.recv() { match msg { @@ -370,7 +389,27 @@ where } if self.should_persist() { - self.persist_state().await; + let mut state = persistence_state.lock(); + if !state.in_progress { + let blocks_to_persist = self.get_blocks_to_persist(); + let (tx, rx) = oneshot::channel(); + self.persistence.save_blocks(blocks_to_persist, tx); + state.in_progress = true; + state.rx = Some(rx); + } + } + + // Check if persistence has completed + let mut state = persistence_state.lock(); + if let Some(rx) = state.rx.as_mut() { + if let Ok(last_persisted_hash) = rx.try_recv() { + state.last_persisted_hash = Some(last_persisted_hash); + state.in_progress = false; + state.rx = None; + drop(state); + + self.handle_persistence_completion(last_persisted_hash); + } } } } @@ -382,20 +421,6 @@ where PERSISTENCE_THRESHOLD } - async fn persist_state(&mut self) { - let blocks_to_persist = self.get_blocks_to_persist(); - - let last_persisted_hash = self.persistence.save_blocks(blocks_to_persist).await; - if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { - let last_persisted_block_number = block.number; - self.last_persisted_block_number = last_persisted_block_number; - - self.remove_persisted_blocks_from_memory(last_persisted_block_number); - } else { - error!("could not find persisted block with hash {last_persisted_hash} in memory"); - } - } - fn get_blocks_to_persist(&self) -> Vec { let start = self.last_persisted_block_number + 1; let end = start + PERSISTENCE_THRESHOLD; @@ -408,6 +433,16 @@ where .collect() } + fn handle_persistence_completion(&mut self, last_persisted_hash: B256) { + if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { + let last_persisted_block_number = block.number; + self.last_persisted_block_number = last_persisted_block_number; + self.remove_persisted_blocks_from_memory(last_persisted_block_number); + } else { + error!("could not find persisted block with hash {last_persisted_hash} in memory"); + } + } + fn remove_persisted_blocks_from_memory(&mut self, last_persisted_block_number: BlockNumber) { let keys_to_remove: Vec = self .state From c1fb943ae2e87f07af5b4ba445019976a90f99a5 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 13:09:30 +0200 Subject: [PATCH 11/20] tests back to to async --- crates/engine/tree/src/persistence.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index a60e91f8519d..2a08d697ab18 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -321,8 +321,8 @@ mod tests { Persistence::spawn_new(provider, static_file_handle, pruner) } - #[test] - fn test_save_blocks_empty() { + #[tokio::test] + async fn test_save_blocks_empty() { let persistence_handle = default_persistence_handle(); let blocks = vec![]; @@ -330,12 +330,12 @@ mod tests { persistence_handle.save_blocks(blocks, tx); - let hash = rx.blocking_recv().unwrap(); + let hash = rx.await.unwrap(); assert_eq!(hash, B256::default()); } - #[test] - fn test_save_blocks_single_block() { + #[tokio::test] + async fn test_save_blocks_single_block() { let persistence_handle = default_persistence_handle(); let mut block = Block::default(); @@ -365,7 +365,7 @@ mod tests { persistence_handle.save_blocks(blocks, tx); - let actual_hash = rx.blocking_recv().unwrap(); + let actual_hash = rx.await.unwrap(); assert_eq!(block_hash, actual_hash); } } From c5cca7001144a0c6602ec358c24365a5221a2c10 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 13:09:52 +0200 Subject: [PATCH 12/20] persistence_state field --- crates/engine/tree/src/tree/mod.rs | 94 +++++++++++++----------------- 1 file changed, 42 insertions(+), 52 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index cf232cb5aaf3..f2a3778f6b85 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -4,7 +4,6 @@ use crate::{ engine::{DownloadRequest, EngineApiEvent, FromEngine}, persistence::PersistenceHandle, }; -use parking_lot::Mutex; use reth_beacon_consensus::{ BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated, }; @@ -253,17 +252,6 @@ pub enum TreeAction { MakeCanonical(B256), } -/// The state of the persistence task. -struct PersistenceState { - /// True if there is a persistence operation in progress. - in_progress: bool, - /// Hash of the last block persisted. - last_persisted_hash: Option, - /// Receiver end of channel where the result of the persistence task will be - /// sent when done. - rx: Option>, -} - #[derive(Debug)] pub struct EngineApiTreeHandlerImpl { provider: P, @@ -274,10 +262,9 @@ pub struct EngineApiTreeHandlerImpl { incoming: Receiver>>, outgoing: UnboundedSender, persistence: PersistenceHandle, + persistence_state: PersistenceState, /// (tmp) The flag indicating whether the pipeline is active. is_pipeline_active: bool, - /// The last persisted block number. - last_persisted_block_number: u64, _marker: PhantomData, } @@ -306,9 +293,9 @@ where incoming, outgoing, persistence, + persistence_state: PersistenceState::default(), is_pipeline_active: false, state, - last_persisted_block_number: 0, _marker: PhantomData, } } @@ -339,12 +326,6 @@ where } fn run(mut self) { - let persistence_state = Arc::new(Mutex::new(PersistenceState { - in_progress: false, - last_persisted_hash: None, - rx: None, - })); - loop { while let Ok(msg) = self.incoming.recv() { match msg { @@ -388,27 +369,29 @@ where } } - if self.should_persist() { - let mut state = persistence_state.lock(); - if !state.in_progress { - let blocks_to_persist = self.get_blocks_to_persist(); - let (tx, rx) = oneshot::channel(); - self.persistence.save_blocks(blocks_to_persist, tx); - state.in_progress = true; - state.rx = Some(rx); - } + if self.should_persist(self.persistence_state.last_persisted_block_number) && + !self.persistence_state.in_progress + { + let blocks_to_persist = + self.get_blocks_to_persist(self.persistence_state.last_persisted_block_number); + let (tx, rx) = oneshot::channel(); + self.persistence.save_blocks(blocks_to_persist, tx); + self.persistence_state.in_progress = true; + self.persistence_state.rx = Some(rx); } // Check if persistence has completed - let mut state = persistence_state.lock(); - if let Some(rx) = state.rx.as_mut() { + if let Some(rx) = self.persistence_state.rx.as_mut() { if let Ok(last_persisted_hash) = rx.try_recv() { - state.last_persisted_hash = Some(last_persisted_hash); - state.in_progress = false; - state.rx = None; - drop(state); - - self.handle_persistence_completion(last_persisted_hash); + if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { + let last_persisted_block_number = block.number; + self.persistence_state.last_persisted_hash = Some(last_persisted_hash); + self.persistence_state.last_persisted_block_number = block.number; + self.persistence_state.in_progress = false; + self.persistence_state.rx = None; + } else { + error!("could not find persisted block with hash {last_persisted_hash} in memory"); + } } } } @@ -416,13 +399,16 @@ where /// Returns true if the canonical chain length minus the last persisted /// block is more than the persistence threshold. - fn should_persist(&self) -> bool { - self.state.tree_state.max_block_number() - self.last_persisted_block_number > + fn should_persist(&self, last_persisted_block_number: BlockNumber) -> bool { + self.state.tree_state.max_block_number() - last_persisted_block_number > PERSISTENCE_THRESHOLD } - fn get_blocks_to_persist(&self) -> Vec { - let start = self.last_persisted_block_number + 1; + fn get_blocks_to_persist( + &self, + last_persisted_block_number: BlockNumber, + ) -> Vec { + let start = last_persisted_block_number + 1; let end = start + PERSISTENCE_THRESHOLD; self.state @@ -433,16 +419,6 @@ where .collect() } - fn handle_persistence_completion(&mut self, last_persisted_hash: B256) { - if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { - let last_persisted_block_number = block.number; - self.last_persisted_block_number = last_persisted_block_number; - self.remove_persisted_blocks_from_memory(last_persisted_block_number); - } else { - error!("could not find persisted block with hash {last_persisted_hash} in memory"); - } - } - fn remove_persisted_blocks_from_memory(&mut self, last_persisted_block_number: BlockNumber) { let keys_to_remove: Vec = self .state @@ -872,3 +848,17 @@ where todo!() } } + +/// The state of the persistence task. +#[derive(Default, Debug)] +struct PersistenceState { + /// True if there is a persistence operation in progress. + in_progress: bool, + /// Hash of the last block persisted. + last_persisted_hash: Option, + /// Receiver end of channel where the result of the persistence task will be + /// sent when done. + rx: Option>, + /// The last persisted block number. + last_persisted_block_number: u64, +} From 6ef08408d95787c0ca3f084d5b1684aeb079f741 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 13:40:54 +0200 Subject: [PATCH 13/20] use last_persisted_block_number from persistence_state and call self.remove_persisted_blocks_from_memory --- crates/engine/tree/src/tree/mod.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index f2a3778f6b85..f37a3799ba98 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -369,11 +369,8 @@ where } } - if self.should_persist(self.persistence_state.last_persisted_block_number) && - !self.persistence_state.in_progress - { - let blocks_to_persist = - self.get_blocks_to_persist(self.persistence_state.last_persisted_block_number); + if self.should_persist() && !self.persistence_state.in_progress { + let blocks_to_persist = self.get_blocks_to_persist(); let (tx, rx) = oneshot::channel(); self.persistence.save_blocks(blocks_to_persist, tx); self.persistence_state.in_progress = true; @@ -389,6 +386,7 @@ where self.persistence_state.last_persisted_block_number = block.number; self.persistence_state.in_progress = false; self.persistence_state.rx = None; + self.remove_persisted_blocks_from_memory(); } else { error!("could not find persisted block with hash {last_persisted_hash} in memory"); } @@ -399,16 +397,14 @@ where /// Returns true if the canonical chain length minus the last persisted /// block is more than the persistence threshold. - fn should_persist(&self, last_persisted_block_number: BlockNumber) -> bool { - self.state.tree_state.max_block_number() - last_persisted_block_number > + fn should_persist(&self) -> bool { + self.state.tree_state.max_block_number() - + self.persistence_state.last_persisted_block_number > PERSISTENCE_THRESHOLD } - fn get_blocks_to_persist( - &self, - last_persisted_block_number: BlockNumber, - ) -> Vec { - let start = last_persisted_block_number + 1; + fn get_blocks_to_persist(&self) -> Vec { + let start = self.persistence_state.last_persisted_block_number + 1; let end = start + PERSISTENCE_THRESHOLD; self.state @@ -419,12 +415,12 @@ where .collect() } - fn remove_persisted_blocks_from_memory(&mut self, last_persisted_block_number: BlockNumber) { + fn remove_persisted_blocks_from_memory(&mut self) { let keys_to_remove: Vec = self .state .tree_state .blocks_by_number - .range(..=last_persisted_block_number) + .range(..=self.persistence_state.last_persisted_block_number) .map(|(&k, _)| k) .collect(); From e77dcc9b06407c2840b9ea49690c950db692fa1c Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 15:20:09 +0200 Subject: [PATCH 14/20] removed in_progress bool of PersistenceState --- crates/engine/tree/src/tree/mod.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index f37a3799ba98..22b1a2d635ee 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -369,7 +369,7 @@ where } } - if self.should_persist() && !self.persistence_state.in_progress { + if self.should_persist() && !self.persistence_state.in_progress() { let blocks_to_persist = self.get_blocks_to_persist(); let (tx, rx) = oneshot::channel(); self.persistence.save_blocks(blocks_to_persist, tx); @@ -848,13 +848,18 @@ where /// The state of the persistence task. #[derive(Default, Debug)] struct PersistenceState { - /// True if there is a persistence operation in progress. - in_progress: bool, - /// Hash of the last block persisted. + /// Hash of the last block persisted. A None value means no persistance task + /// has run yet. last_persisted_hash: Option, /// Receiver end of channel where the result of the persistence task will be - /// sent when done. + /// sent when done. A None value means there's no persistance task in progress. rx: Option>, /// The last persisted block number. last_persisted_block_number: u64, } + +impl PersistenceState { + fn in_progress(&self) -> bool { + self.rx.is_some() + } +} From 0abf473f1c787a0de20d19fd882b80bcb26d3424 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 15:21:39 +0200 Subject: [PATCH 15/20] set PERSISTENCE_THRESHOLD to 256 --- crates/engine/tree/src/tree/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 22b1a2d635ee..18ee159c9905 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -45,7 +45,7 @@ mod memory_overlay; pub use memory_overlay::MemoryOverlayStateProvider; /// Maximum number of blocks to be kept in memory without triggering persistence. -const PERSISTENCE_THRESHOLD: u64 = 512; +const PERSISTENCE_THRESHOLD: u64 = 256; /// Represents an executed block stored in-memory. #[derive(Clone, Debug)] From f444df2fd3f37a328b148d1785bb83c47df00c6d Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 15:36:03 +0200 Subject: [PATCH 16/20] added methods to PersistenceState --- crates/engine/tree/src/tree/mod.rs | 44 ++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 18ee159c9905..a0c4d7970bd2 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -373,22 +373,19 @@ where let blocks_to_persist = self.get_blocks_to_persist(); let (tx, rx) = oneshot::channel(); self.persistence.save_blocks(blocks_to_persist, tx); - self.persistence_state.in_progress = true; - self.persistence_state.rx = Some(rx); + self.persistence_state.start(rx); } - // Check if persistence has completed - if let Some(rx) = self.persistence_state.rx.as_mut() { - if let Ok(last_persisted_hash) = rx.try_recv() { - if let Some(block) = self.state.tree_state.block_by_hash(last_persisted_hash) { - let last_persisted_block_number = block.number; - self.persistence_state.last_persisted_hash = Some(last_persisted_hash); - self.persistence_state.last_persisted_block_number = block.number; - self.persistence_state.in_progress = false; - self.persistence_state.rx = None; + if let Some(rx) = self.persistence_state.receiver() { + // Check if persistence has completed + if let Ok(last_persisted_block_hash) = rx.try_recv() { + if let Some(block) = + self.state.tree_state.block_by_hash(last_persisted_block_hash) + { + self.persistence_state.finish(last_persisted_block_hash, block.number); self.remove_persisted_blocks_from_memory(); } else { - error!("could not find persisted block with hash {last_persisted_hash} in memory"); + error!("could not find persisted block with hash {last_persisted_block_hash} in memory"); } } } @@ -850,7 +847,7 @@ where struct PersistenceState { /// Hash of the last block persisted. A None value means no persistance task /// has run yet. - last_persisted_hash: Option, + last_persisted_block_hash: Option, /// Receiver end of channel where the result of the persistence task will be /// sent when done. A None value means there's no persistance task in progress. rx: Option>, @@ -859,7 +856,26 @@ struct PersistenceState { } impl PersistenceState { - fn in_progress(&self) -> bool { + /// Determines if there is a persistance task in progress by checking if the + /// receiver is set. + const fn in_progress(&self) -> bool { self.rx.is_some() } + + /// Sets state for a started persistance task. + fn start(&mut self, rx: oneshot::Receiver) { + self.rx = Some(rx); + } + + /// Sets state for a finished persistance task. + fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) { + self.rx = None; + self.last_persisted_block_number = last_persisted_block_number; + self.last_persisted_block_hash = Some(last_persisted_block_hash); + } + + /// Returns an usable receiver if a persistence task is in progress. + fn receiver(&mut self) -> Option<&mut oneshot::Receiver> { + self.rx.as_mut() + } } From 41d311d066a77874555b0bae1fae3ea37ee87d8a Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 15:40:12 +0200 Subject: [PATCH 17/20] lint --- crates/engine/tree/src/tree/mod.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index a0c4d7970bd2..961a92253603 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -845,29 +845,29 @@ where /// The state of the persistence task. #[derive(Default, Debug)] struct PersistenceState { - /// Hash of the last block persisted. A None value means no persistance task + /// Hash of the last block persisted. A None value means no persistence task /// has run yet. last_persisted_block_hash: Option, /// Receiver end of channel where the result of the persistence task will be - /// sent when done. A None value means there's no persistance task in progress. + /// sent when done. A None value means there's no persistence task in progress. rx: Option>, /// The last persisted block number. last_persisted_block_number: u64, } impl PersistenceState { - /// Determines if there is a persistance task in progress by checking if the + /// Determines if there is a persistence task in progress by checking if the /// receiver is set. const fn in_progress(&self) -> bool { self.rx.is_some() } - /// Sets state for a started persistance task. + /// Sets state for a started persistence task. fn start(&mut self, rx: oneshot::Receiver) { self.rx = Some(rx); } - /// Sets state for a finished persistance task. + /// Sets state for a finished persistence task. fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) { self.rx = None; self.last_persisted_block_number = last_persisted_block_number; From a90c4c3a3b49f231f834982945acb0dab4fb2db8 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 16:43:22 +0200 Subject: [PATCH 18/20] use persistence_state.in_progress before extracting receiver --- crates/engine/tree/src/tree/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 961a92253603..70e908fdcebc 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -376,7 +376,11 @@ where self.persistence_state.start(rx); } - if let Some(rx) = self.persistence_state.receiver() { + if self.persistence_state.in_progress() { + let rx = self + .persistence_state + .receiver() + .expect("if a persistence task is in progress Receiver must be Some"); // Check if persistence has completed if let Ok(last_persisted_block_hash) = rx.try_recv() { if let Some(block) = From 0ac800d538ee5d9aa9abf93f250f7f3faaee4304 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 16:52:08 +0200 Subject: [PATCH 19/20] remove receiver method from PersistenceState --- crates/engine/tree/src/tree/mod.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 70e908fdcebc..2d43edd92bdf 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -379,7 +379,8 @@ where if self.persistence_state.in_progress() { let rx = self .persistence_state - .receiver() + .rx + .as_mut() .expect("if a persistence task is in progress Receiver must be Some"); // Check if persistence has completed if let Ok(last_persisted_block_hash) = rx.try_recv() { @@ -877,9 +878,4 @@ impl PersistenceState { self.last_persisted_block_number = last_persisted_block_number; self.last_persisted_block_hash = Some(last_persisted_block_hash); } - - /// Returns an usable receiver if a persistence task is in progress. - fn receiver(&mut self) -> Option<&mut oneshot::Receiver> { - self.rx.as_mut() - } } From 77a8ffaf866966dcde10e3c4925286c6b47fe4d3 Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Thu, 11 Jul 2024 19:33:44 +0200 Subject: [PATCH 20/20] remove Option in last_persisted_block_hash field of PersistenceState --- crates/engine/tree/src/tree/mod.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 2d43edd92bdf..78e8cdbaeec7 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -850,9 +850,8 @@ where /// The state of the persistence task. #[derive(Default, Debug)] struct PersistenceState { - /// Hash of the last block persisted. A None value means no persistence task - /// has run yet. - last_persisted_block_hash: Option, + /// Hash of the last block persisted. + last_persisted_block_hash: B256, /// Receiver end of channel where the result of the persistence task will be /// sent when done. A None value means there's no persistence task in progress. rx: Option>, @@ -876,6 +875,6 @@ impl PersistenceState { fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) { self.rx = None; self.last_persisted_block_number = last_persisted_block_number; - self.last_persisted_block_hash = Some(last_persisted_block_hash); + self.last_persisted_block_hash = last_persisted_block_hash; } }