Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tree hook for persisting blocks #9365

Merged
merged 20 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ reth-tracing = { workspace = true, optional = true }
[dev-dependencies]
# reth
reth-db = { workspace = true, features = ["test-utils"] }
reth-exex-types.workspace = true
reth-network-p2p = { workspace = true, features = ["test-utils"] }
reth-prune.workspace = true
reth-prune-types.workspace = true
reth-stages = { workspace = true, features = ["test-utils"] }
reth-tracing.workspace = true
Expand Down
96 changes: 91 additions & 5 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ pub enum PersistenceAction {
}

/// A handle to the persistence service
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PersistenceHandle {
/// The channel used to communicate with the persistence service
sender: Sender<PersistenceAction>,
Expand All @@ -254,13 +254,16 @@ impl PersistenceHandle {
/// assumed to be ordered by block number.
///
/// This returns the latest hash that has been saved, allowing removal of that block and any
/// previous blocks from in-memory data structures.
pub async fn save_blocks(&self, blocks: Vec<ExecutedBlock>) -> B256 {
let (tx, rx) = oneshot::channel();
/// previous blocks from in-memory data structures. This value is returned in the receiver end
/// of the sender argument.
pub fn save_blocks(&self, blocks: Vec<ExecutedBlock>, tx: oneshot::Sender<B256>) {
if blocks.is_empty() {
let _ = tx.send(B256::default());
return;
}
self.sender
.send(PersistenceAction::SaveBlocks((blocks, tx)))
.expect("should be able to send");
rx.await.expect("todo: err handling")
}

/// Tells the persistence service to remove blocks above a certain block number. The removed
Expand All @@ -283,3 +286,86 @@ impl PersistenceHandle {
rx.await.expect("todo: err handling")
}
}

#[cfg(test)]
mod tests {
use super::*;
use reth_chainspec::MAINNET;
use reth_db::test_utils::{create_test_rw_db, create_test_static_files_dir};
use reth_exex_types::FinishedExExHeight;
use reth_primitives::{
Address, Block, Receipts, Requests, SealedBlockWithSenders, TransactionSigned, B256,
};
use reth_provider::{providers::StaticFileProvider, ExecutionOutcome, ProviderFactory};
use reth_prune::Pruner;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use revm::db::BundleState;
use std::sync::{mpsc::channel, Arc};

fn default_persistence_handle() -> PersistenceHandle {
let db = create_test_rw_db();
let (_static_dir, static_dir_path) = create_test_static_files_dir();
let provider = ProviderFactory::new(
db,
MAINNET.clone(),
StaticFileProvider::read_write(static_dir_path).unwrap(),
);
let (finished_exex_height_tx, finished_exex_height_rx) =
tokio::sync::watch::channel(FinishedExExHeight::NoExExs);

let pruner = Pruner::new(provider.clone(), vec![], 5, 0, 5, None, finished_exex_height_rx);

let (static_file_sender, _static_file_receiver) = channel();
let static_file_handle = StaticFileServiceHandle::new(static_file_sender);

Persistence::spawn_new(provider, static_file_handle, pruner)
}

#[tokio::test]
async fn test_save_blocks_empty() {
let persistence_handle = default_persistence_handle();

let blocks = vec![];
let (tx, rx) = oneshot::channel();

persistence_handle.save_blocks(blocks, tx);

let hash = rx.await.unwrap();
assert_eq!(hash, B256::default());
}

#[tokio::test]
async fn test_save_blocks_single_block() {
let persistence_handle = default_persistence_handle();

let mut block = Block::default();
let sender = Address::random();
let tx = TransactionSigned::default();
block.body.push(tx);
let block_hash = block.hash_slow();
let block_number = block.number;
let sealed = block.seal_slow();
let sealed_with_senders =
SealedBlockWithSenders::new(sealed.clone(), vec![sender]).unwrap();

let executed = ExecutedBlock::new(
Arc::new(sealed),
Arc::new(sealed_with_senders.senders),
Arc::new(ExecutionOutcome::new(
BundleState::default(),
Receipts { receipt_vec: vec![] },
block_number,
vec![Requests::default()],
)),
Arc::new(HashedPostState::default()),
Arc::new(TrieUpdates::default()),
);
let blocks = vec![executed];
let (tx, rx) = oneshot::channel();

persistence_handle.save_blocks(blocks, tx);

let actual_hash = rx.await.unwrap();
assert_eq!(block_hash, actual_hash);
}
}
127 changes: 125 additions & 2 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::{
backfill::BackfillAction,
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, FromEngine},
persistence::PersistenceHandle,
};
use reth_beacon_consensus::{
BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache, OnForkChoiceUpdated,
Expand Down Expand Up @@ -37,12 +38,15 @@ use std::{
marker::PhantomData,
sync::{mpsc::Receiver, Arc},
};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
fgimenez marked this conversation as resolved.
Show resolved Hide resolved
use tracing::*;

mod memory_overlay;
pub use memory_overlay::MemoryOverlayStateProvider;

/// Maximum number of blocks to be kept in memory without triggering persistence.
const PERSISTENCE_THRESHOLD: u64 = 256;

/// Represents an executed block stored in-memory.
#[derive(Clone, Debug)]
pub struct ExecutedBlock {
Expand All @@ -54,6 +58,16 @@ pub struct ExecutedBlock {
}

impl ExecutedBlock {
pub(crate) const fn new(
block: Arc<SealedBlock>,
senders: Arc<Vec<Address>>,
execution_output: Arc<ExecutionOutcome>,
hashed_state: Arc<HashedPostState>,
trie: Arc<TrieUpdates>,
) -> Self {
Self { block, senders, execution_output, hashed_state, trie }
}

/// Returns a reference to the executed block.
pub(crate) fn block(&self) -> &SealedBlock {
&self.block
Expand Down Expand Up @@ -120,6 +134,11 @@ impl TreeState {
}
}
}

/// Returns the maximum block number stored.
pub(crate) fn max_block_number(&self) -> BlockNumber {
*self.blocks_by_number.last_key_value().unwrap_or((&BlockNumber::default(), &vec![])).0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar approach as in get_blocks_to_persist, assuming blocks_by_number is the source of truth for blocks to write to db, let me know if that's ok

}
}

/// Tracks the state of the engine api internals.
Expand All @@ -129,7 +148,7 @@ impl TreeState {
pub struct EngineApiTreeState {
/// Tracks the state of the blockchain tree.
tree_state: TreeState,
/// Tracks the received forkchoice state updates received by the CL.
/// Tracks the forkchoice state updates received by the CL.
forkchoice_state_tracker: ForkchoiceStateTracker,
/// Buffer of detached blocks.
buffer: BlockBuffer,
Expand Down Expand Up @@ -242,6 +261,8 @@ pub struct EngineApiTreeHandlerImpl<P, E, T: EngineTypes> {
state: EngineApiTreeState,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
persistence: PersistenceHandle,
persistence_state: PersistenceState,
/// (tmp) The flag indicating whether the pipeline is active.
is_pipeline_active: bool,
_marker: PhantomData<T>,
Expand All @@ -262,6 +283,7 @@ where
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
persistence: PersistenceHandle,
) -> Self {
Self {
provider,
Expand All @@ -270,6 +292,8 @@ where
payload_validator,
incoming,
outgoing,
persistence,
persistence_state: PersistenceState::default(),
is_pipeline_active: false,
state,
_marker: PhantomData,
Expand All @@ -284,6 +308,7 @@ where
payload_validator: ExecutionPayloadValidator,
incoming: Receiver<FromEngine<BeaconEngineMessage<T>>>,
state: EngineApiTreeState,
persistence: PersistenceHandle,
) -> UnboundedSender<EngineApiEvent> {
let (outgoing, rx) = tokio::sync::mpsc::unbounded_channel();
let task = Self::new(
Expand All @@ -294,6 +319,7 @@ where
incoming,
outgoing.clone(),
state,
persistence,
);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
outgoing
Expand Down Expand Up @@ -342,6 +368,71 @@ where
}
}
}

if self.should_persist() && !self.persistence_state.in_progress() {
let blocks_to_persist = self.get_blocks_to_persist();
let (tx, rx) = oneshot::channel();
self.persistence.save_blocks(blocks_to_persist, tx);
self.persistence_state.start(rx);
}

if self.persistence_state.in_progress() {
let rx = self
.persistence_state
.rx
.as_mut()
.expect("if a persistence task is in progress Receiver must be Some");
// Check if persistence has completed
if let Ok(last_persisted_block_hash) = rx.try_recv() {
if let Some(block) =
self.state.tree_state.block_by_hash(last_persisted_block_hash)
{
self.persistence_state.finish(last_persisted_block_hash, block.number);
self.remove_persisted_blocks_from_memory();
} else {
error!("could not find persisted block with hash {last_persisted_block_hash} in memory");
}
}
}
}
}

/// Returns true if the canonical chain length minus the last persisted
/// block is more than the persistence threshold.
fn should_persist(&self) -> bool {
fgimenez marked this conversation as resolved.
Show resolved Hide resolved
self.state.tree_state.max_block_number() -
self.persistence_state.last_persisted_block_number >
PERSISTENCE_THRESHOLD
}

fn get_blocks_to_persist(&self) -> Vec<ExecutedBlock> {
let start = self.persistence_state.last_persisted_block_number + 1;
let end = start + PERSISTENCE_THRESHOLD;

self.state
.tree_state
.blocks_by_number
.range(start..end)
.flat_map(|(_, blocks)| blocks.iter().cloned())
.collect()
}

fn remove_persisted_blocks_from_memory(&mut self) {
let keys_to_remove: Vec<BlockNumber> = self
.state
.tree_state
.blocks_by_number
.range(..=self.persistence_state.last_persisted_block_number)
.map(|(&k, _)| k)
.collect();

for key in keys_to_remove {
if let Some(blocks) = self.state.tree_state.blocks_by_number.remove(&key) {
// Remove corresponding blocks from blocks_by_hash
for block in blocks {
self.state.tree_state.blocks_by_hash.remove(&block.block().hash());
}
}
}
}

Expand Down Expand Up @@ -755,3 +846,35 @@ where
todo!()
}
}

/// The state of the persistence task.
#[derive(Default, Debug)]
struct PersistenceState {
/// Hash of the last block persisted.
last_persisted_block_hash: B256,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
rx: Option<oneshot::Receiver<B256>>,
/// The last persisted block number.
last_persisted_block_number: u64,
}

impl PersistenceState {
/// Determines if there is a persistence task in progress by checking if the
/// receiver is set.
const fn in_progress(&self) -> bool {
self.rx.is_some()
}

/// Sets state for a started persistence task.
fn start(&mut self, rx: oneshot::Receiver<B256>) {
self.rx = Some(rx);
}

/// Sets state for a finished persistence task.
fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) {
self.rx = None;
self.last_persisted_block_number = last_persisted_block_number;
self.last_persisted_block_hash = last_persisted_block_hash;
}
}
Loading