Skip to content

Commit

Permalink
feat: improve download observability (paradigmxyz#10039)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez authored and martinezjorge committed Aug 7, 2024
1 parent 6bbf50a commit 503e1c0
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
2 changes: 1 addition & 1 deletion crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub enum HandlerEvent<T> {
BackfillAction(BackfillAction),
/// Other event emitted by the handler
Event(T),
// Fatal error
/// Fatal error
FatalError,
}

Expand Down
47 changes: 45 additions & 2 deletions crates/engine/tree/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_network_p2p::{
use reth_primitives::{SealedBlock, SealedBlockWithSenders, B256};
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet},
collections::{binary_heap::PeekMut, BinaryHeap, HashSet, VecDeque},
sync::Arc,
task::{Context, Poll},
};
Expand Down Expand Up @@ -40,6 +40,8 @@ pub enum DownloadAction {
pub enum DownloadOutcome {
/// Downloaded blocks.
Blocks(Vec<SealedBlockWithSenders>),
/// New download started.
NewDownloadStarted { remaining_blocks: u64, target: B256 },
}

/// Basic [`BlockDownloader`].
Expand All @@ -58,6 +60,8 @@ where
set_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlockWithSenders>>,
/// Engine download metrics.
metrics: BlockDownloaderMetrics,
/// Pending events to be emitted.
pending_events: VecDeque<DownloadOutcome>,
}

impl<Client> BasicBlockDownloader<Client>
Expand All @@ -72,6 +76,7 @@ where
inflight_block_range_requests: Vec::new(),
set_buffered_blocks: BinaryHeap::new(),
metrics: BlockDownloaderMetrics::default(),
pending_events: Default::default(),
}
}

Expand Down Expand Up @@ -111,6 +116,10 @@ where
);

let request = self.full_block_client.get_full_block_range(hash, count);
self.push_pending_event(DownloadOutcome::NewDownloadStarted {
remaining_blocks: request.count(),
target: request.start_hash(),
});
self.inflight_block_range_requests.push(request);
}
}
Expand All @@ -123,6 +132,11 @@ where
if self.is_inflight_request(hash) {
return false
}
self.push_pending_event(DownloadOutcome::NewDownloadStarted {
remaining_blocks: 1,
target: hash,
});

trace!(
target: "consensus::engine::sync",
?hash,
Expand All @@ -147,6 +161,16 @@ where
self.metrics.active_block_downloads.set(self.inflight_full_block_requests.len() as f64);
// TODO: full block range metrics
}

/// Adds a pending event to the FIFO queue.
fn push_pending_event(&mut self, pending_event: DownloadOutcome) {
self.pending_events.push_back(pending_event);
}

/// Removes a pending event from the FIFO queue.
fn pop_pending_event(&mut self) -> Option<DownloadOutcome> {
self.pending_events.pop_front()
}
}

impl<Client> BlockDownloader for BasicBlockDownloader<Client>
Expand All @@ -163,6 +187,10 @@ where

/// Advances the download process.
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome> {
if let Some(pending_event) = self.pop_pending_event() {
return Poll::Ready(pending_event);
}

// advance all full block requests
for idx in (0..self.inflight_full_block_requests.len()).rev() {
let mut request = self.inflight_full_block_requests.swap_remove(idx);
Expand Down Expand Up @@ -332,6 +360,13 @@ mod tests {
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;

assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, .. } => {
assert_eq!(remaining_blocks, TOTAL_BLOCKS as u64);
});

let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;

assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);
Expand Down Expand Up @@ -359,9 +394,17 @@ mod tests {
assert_eq!(block_downloader.inflight_full_block_requests.len(), TOTAL_BLOCKS);

// poll downloader
for i in 0..TOTAL_BLOCKS {
let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;

assert_matches!(next_ready, DownloadOutcome::NewDownloadStarted { remaining_blocks, target } => {
assert_eq!(remaining_blocks, 1);
});
}

let sync_future = poll_fn(|cx| block_downloader.poll(cx));
let next_ready = sync_future.await;

assert_matches!(next_ready, DownloadOutcome::Blocks(blocks) => {
// ensure all blocks were obtained
assert_eq!(blocks.len(), TOTAL_BLOCKS);
Expand Down

0 comments on commit 503e1c0

Please sign in to comment.