Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: execute downloaded blocks in batches #10155

Merged
merged 3 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion crates/engine/tree/src/tree/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ const DEFAULT_PERSISTENCE_THRESHOLD: u64 = 256;
const DEFAULT_BLOCK_BUFFER_LIMIT: u32 = 256;
const DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH: u32 = 256;

const DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE: usize = 4;

/// The configuration of the engine tree.
#[derive(Debug)]
pub struct TreeConfig {
Expand All @@ -14,6 +16,8 @@ pub struct TreeConfig {
block_buffer_limit: u32,
/// Number of invalid headers to keep in cache.
max_invalid_header_cache_length: u32,
/// Maximum number of blocks to execute sequentially in a batch.
max_execute_block_batch_size: usize,
}

impl Default for TreeConfig {
Expand All @@ -22,6 +26,7 @@ impl Default for TreeConfig {
persistence_threshold: DEFAULT_PERSISTENCE_THRESHOLD,
block_buffer_limit: DEFAULT_BLOCK_BUFFER_LIMIT,
max_invalid_header_cache_length: DEFAULT_MAX_INVALID_HEADER_CACHE_LENGTH,
max_execute_block_batch_size: DEFAULT_MAX_EXECUTE_BLOCK_BATCH_SIZE,
}
}
}
Expand All @@ -32,8 +37,14 @@ impl TreeConfig {
persistence_threshold: u64,
block_buffer_limit: u32,
max_invalid_header_cache_length: u32,
max_execute_block_batch_size: usize,
) -> Self {
Self { persistence_threshold, block_buffer_limit, max_invalid_header_cache_length }
Self {
persistence_threshold,
block_buffer_limit,
max_invalid_header_cache_length,
max_execute_block_batch_size,
}
}

/// Return the persistence threshold.
Expand All @@ -51,6 +62,11 @@ impl TreeConfig {
self.max_invalid_header_cache_length
}

/// Return the maximum execute block batch size.
pub const fn max_execute_block_batch_size(&self) -> usize {
self.max_execute_block_batch_size
}

/// Setter for persistence threshold.
pub const fn with_persistence_threshold(mut self, persistence_threshold: u64) -> Self {
self.persistence_threshold = persistence_threshold;
Expand All @@ -71,4 +87,13 @@ impl TreeConfig {
self.max_invalid_header_cache_length = max_invalid_header_cache_length;
self
}

/// Setter for maximum execute block batch size.
pub const fn with_max_execute_block_batch_size(
mut self,
max_execute_block_batch_size: usize,
) -> Self {
self.max_execute_block_batch_size = max_execute_block_batch_size;
self
}
}
59 changes: 55 additions & 4 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,19 +518,34 @@ where
}

/// Invoked when previously requested blocks were downloaded.
fn on_downloaded(&mut self, blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
///
/// If the block count exceeds the configured batch size we're allowed to execute at once, this
/// will execute the first batch and send the remaining blocks back through the channel so that
/// don't block request processing for a long time.
fn on_downloaded(&mut self, mut blocks: Vec<SealedBlockWithSenders>) -> Option<TreeEvent> {
if blocks.is_empty() {
// nothing to execute
return None
}

trace!(target: "engine", block_count = %blocks.len(), "received downloaded blocks");
// TODO(mattsse): on process a certain number of blocks sequentially
for block in blocks {
let batch = self.config.max_execute_block_batch_size().min(blocks.len());
for block in blocks.drain(..batch) {
if let Some(event) = self.on_downloaded_block(block) {
let needs_backfill = event.is_backfill_action();
self.on_tree_event(event);
if needs_backfill {
// can exit early if backfill is needed
break
return None
}
}
}

// if we still have blocks to execute, send them as a followup request
if !blocks.is_empty() {
let _ = self.incoming_tx.send(FromEngine::DownloadedBlocks(blocks));
}

None
}

Expand Down Expand Up @@ -2234,6 +2249,41 @@ mod tests {
}
}

#[test]
fn test_tree_persist_block_batch() {
let tree_config = TreeConfig::default();
let chain_spec = MAINNET.clone();
let mut test_block_builder =
TestBlockBuilder::default().with_chain_spec((*chain_spec).clone());

// we need more than tree_config.persistence_threshold() +1 blocks to
// trigger the persistence task.
let blocks: Vec<_> = test_block_builder
.get_executed_blocks(1..tree_config.persistence_threshold() + 2)
.collect();
let mut test_harness = TestHarness::new(chain_spec).with_blocks(blocks);

let mut blocks = vec![];
for idx in 0..tree_config.max_execute_block_batch_size() * 2 {
blocks.push(test_block_builder.generate_random_block(idx as u64, B256::random()));
}

test_harness.to_tree_tx.send(FromEngine::DownloadedBlocks(blocks)).unwrap();

// process the message
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
test_harness.tree.on_engine_message(msg);

// we now should receive the other batch
let msg = test_harness.tree.try_recv_engine_message().unwrap().unwrap();
match msg {
FromEngine::DownloadedBlocks(blocks) => {
assert_eq!(blocks.len(), tree_config.max_execute_block_batch_size());
}
_ => panic!("unexpected message: {:#?}", msg),
}
}

#[tokio::test]
async fn test_tree_persist_blocks() {
let tree_config = TreeConfig::default();
Expand Down Expand Up @@ -2721,6 +2771,7 @@ mod tests {

let chain_spec = MAINNET.clone();
let mut test_harness = TestHarness::new(chain_spec.clone());
test_harness.tree.config = test_harness.tree.config.with_max_execute_block_batch_size(100);

// create base chain and setup test harness with it
let base_chain: Vec<_> = test_harness.block_builder.get_executed_blocks(0..1).collect();
Expand Down
Loading