From 4a5a1dbea84567ea17f06c571e2b1a61e930739b Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 27 Jan 2023 15:02:44 +0200 Subject: [PATCH] fix(download): header downloader initial state (#1064) --- bin/reth/src/node/mod.rs | 9 +- crates/net/downloaders/src/headers/linear.rs | 202 ++++++++++++------- crates/net/downloaders/src/headers/task.rs | 6 +- crates/stages/src/stages/headers.rs | 9 +- 4 files changed, 134 insertions(+), 92 deletions(-) diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index f8153b8cb671..f2ba6dbe6e44 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -161,14 +161,7 @@ impl Command { headers::linear::LinearDownloadBuilder::default() .request_limit(config.stages.headers.downloader_batch_size) .stream_batch_size(config.stages.headers.commit_threshold as usize) - // NOTE: the head and target will be set from inside the stage before the - // downloader is called - .build( - consensus.clone(), - fetch_client.clone(), - Default::default(), - Default::default(), - ), + .build(consensus.clone(), fetch_client.clone()), ); // Spawn bodies downloader diff --git a/crates/net/downloaders/src/headers/linear.rs b/crates/net/downloaders/src/headers/linear.rs index 7e5dc4b171c6..5cb518c38f3b 100644 --- a/crates/net/downloaders/src/headers/linear.rs +++ b/crates/net/downloaders/src/headers/linear.rs @@ -14,7 +14,7 @@ use reth_interfaces::{ priority::Priority, }, }; -use reth_primitives::{Header, HeadersDirection, PeerId, SealedHeader, H256}; +use reth_primitives::{BlockNumber, Header, HeadersDirection, PeerId, SealedHeader, H256}; use std::{ cmp::{Ordering, Reverse}, collections::{binary_heap::PeekMut, BinaryHeap}, @@ -50,9 +50,9 @@ pub struct LinearDownloader { /// Client used to download headers. client: Arc, /// The local head of the chain. - local_head: SealedHeader, + local_head: Option, /// Block we want to close the gap to. - sync_target: SyncTargetBlock, + sync_target: Option, /// The block number to use for requests. next_request_block_number: u64, /// Keeps track of the block we need to validate next. @@ -99,8 +99,28 @@ where /// Returns the block number the local node is at. #[inline] - fn local_block_number(&self) -> u64 { - self.local_head.number + fn local_block_number(&self) -> Option { + self.local_head.as_ref().map(|h| h.number) + } + + /// Returns the existing local head block number + /// + /// # Panics + /// + /// If the local head has not been set. + #[inline] + fn existing_local_block_number(&self) -> BlockNumber { + self.local_head.as_ref().expect("is initialized").number + } + + /// Returns the existing sync target hash. + /// + /// # Panics + /// + /// If the sync target has never been set. + #[inline] + fn existing_sync_target_hash(&self) -> H256 { + self.sync_target.as_ref().expect("is initialized").hash } /// Max requests to handle at the same time @@ -130,15 +150,19 @@ where /// /// Returns `None` if no more requests are required. fn next_request(&mut self) -> Option { - let local_head = self.local_block_number(); - if self.next_request_block_number > local_head { - let request = - calc_next_request(local_head, self.next_request_block_number, self.request_limit); - // need to shift the tracked request block number based on the number of requested - // headers so follow-up requests will use that as start. - self.next_request_block_number -= request.limit; - - return Some(request) + if let Some(local_head) = self.local_block_number() { + if self.next_request_block_number > local_head { + let request = calc_next_request( + local_head, + self.next_request_block_number, + self.request_limit, + ); + // need to shift the tracked request block number based on the number of requested + // headers so follow-up requests will use that as start. + self.next_request_block_number -= request.limit; + + return Some(request) + } } None @@ -171,6 +195,7 @@ where headers: Vec
, peer_id: PeerId, ) -> Result<(), HeadersResponseError> { + let sync_target_hash = self.existing_sync_target_hash(); let mut validated = Vec::with_capacity(headers.len()); for parent in headers { @@ -184,13 +209,13 @@ where trace!(target: "downloaders::headers", ?error ,"Failed to validate header"); return Err(HeadersResponseError { request, peer_id: Some(peer_id), error }) } - } else if parent.hash() != self.sync_target.hash { + } else if parent.hash() != sync_target_hash { return Err(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::InvalidTip { received: parent.hash(), - expected: self.sync_target.hash, + expected: sync_target_hash, }, }) } @@ -218,7 +243,9 @@ where /// are _not_ higher than the new `target_block_number`. fn on_block_number_update(&mut self, target_block_number: u64, next_block: u64) { // Update the trackers - if let Some(old_target) = self.sync_target.number.replace(target_block_number) { + if let Some(old_target) = + self.sync_target.as_mut().and_then(|t| t.number.replace(target_block_number)) + { if target_block_number > old_target { // the new target is higher than the old target we need to update the // request tracker and reset everything @@ -248,6 +275,7 @@ where &mut self, response: HeadersRequestOutcome, ) -> Result<(), HeadersResponseError> { + let sync_target_hash = self.existing_sync_target_hash(); let HeadersRequestOutcome { request, outcome } = response; match outcome { Ok(res) => { @@ -270,18 +298,18 @@ where let target = headers.remove(0).seal(); - if target.hash() != self.sync_target.hash { + if target.hash() != sync_target_hash { return Err(HeadersResponseError { request, peer_id: Some(peer_id), error: DownloadError::InvalidTip { received: target.hash(), - expected: self.sync_target.hash, + expected: sync_target_hash, }, }) } - trace!(target: "downloaders::headers", head=%self.local_head.number, hash=?target.hash(), number=%target.number, "Received sync target"); + trace!(target: "downloaders::headers", head=?self.local_block_number(), hash=?target.hash(), number=%target.number, "Received sync target"); // This is the next block we need to start issuing requests from let parent_block_number = target.number.saturating_sub(1); @@ -361,7 +389,7 @@ where self.try_validate_buffered() .map(Err::<(), HeadersResponseError>) .transpose()?; - } else if highest.number > self.local_head.number { + } else if highest.number > self.existing_local_block_number() { // can't validate yet self.buffered_responses.push(OrderedHeadersResponse { headers, @@ -430,12 +458,8 @@ where } /// Returns the request for the `sync_target` header. - fn get_sync_target_request(&self) -> HeadersRequest { - HeadersRequest { - start: self.sync_target.hash.into(), - limit: 1, - direction: HeadersDirection::Falling, - } + fn get_sync_target_request(&self, start: H256) -> HeadersRequest { + HeadersRequest { start: start.into(), limit: 1, direction: HeadersDirection::Falling } } /// Starts a request future @@ -478,26 +502,28 @@ where H: HeadersClient + 'static, { fn update_local_head(&mut self, head: SealedHeader) { - self.local_head = head; // ensure we're only yielding headers that are in range and follow the current local head. while self .queued_validated_headers .last() - .map(|last| last.number <= self.local_head.number) + .map(|last| last.number <= head.number) .unwrap_or_default() { // headers are sorted high to low self.queued_validated_headers.pop(); } + // update the local head + self.local_head = Some(head); } /// If the given target is different from the current target, we need to update the sync target fn update_sync_target(&mut self, target: SyncTarget) { + let current_tip = self.sync_target.as_ref().map(|t| t.hash); match target { SyncTarget::Tip(tip) => { - if tip != self.sync_target.hash { - trace!(target: "downloaders::headers", current=?self.sync_target.hash, new=?tip, "Update sync target"); - self.sync_target.hash = tip; + if Some(tip) != current_tip { + trace!(target: "downloaders::headers", current=?current_tip, new=?tip, "Update sync target"); + let new_sync_target = SyncTargetBlock::from_hash(tip); // if the new sync target is the next queued request we don't need to re-start // the target update @@ -507,27 +533,35 @@ where .filter(|h| h.hash() == tip) .map(|h| h.number) { - self.sync_target.number = Some(target_number); + self.sync_target = Some(new_sync_target.with_number(target_number)); return } trace!(target: "downloaders::headers", new=?target, "Request new sync target"); + self.sync_target = Some(new_sync_target); self.sync_target_request = - Some(self.request_fut(self.get_sync_target_request(), Priority::High)); + Some(self.request_fut(self.get_sync_target_request(tip), Priority::High)); } } SyncTarget::Gap(existing) => { let target = existing.parent_hash; - if target != self.sync_target.hash { + if Some(target) != current_tip { // there could be a sync target request in progress self.sync_target_request.take(); // If the target has changed, update the request pointers based on the new // targeted block number let parent_block_number = existing.number.saturating_sub(1); - trace!(target: "downloaders::headers", current=?self.sync_target.hash, new=?target, %parent_block_number, "Updated sync target"); + trace!(target: "downloaders::headers", current=?current_tip, new=?target, %parent_block_number, "Updated sync target"); - self.sync_target.hash = target; + // Update the sync target hash + self.sync_target = match self.sync_target.take() { + Some(mut sync_target) => { + sync_target.hash = target; + Some(sync_target) + } + None => Some(SyncTargetBlock::from_hash(target)), + }; self.on_block_number_update(parent_block_number, parent_block_number); } } @@ -548,6 +582,18 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); + // The downloader boundaries (local head and sync target) have to be set in order + // to start downloading data. + if this.local_head.is_none() || this.sync_target.is_none() { + tracing::trace!( + target: "downloaders::headers", + head=?this.local_block_number(), + sync_target=?this.sync_target, + "The downloader sync boundaries have not been set" + ); + return Poll::Pending + } + // If we have a new tip request we need to complete that first before we send batched // requests while let Some(mut req) = this.sync_target_request.take() { @@ -735,6 +781,19 @@ struct SyncTargetBlock { number: Option, } +impl SyncTargetBlock { + /// Create new instance from hash. + fn from_hash(hash: H256) -> Self { + Self { hash, number: None } + } + + /// Set a number on the instance. + fn with_number(mut self, number: u64) -> Self { + self.number = Some(number); + self + } +} + /// The builder for [LinearDownloader] with /// some default settings #[derive(Debug)] @@ -812,13 +871,7 @@ impl LinearDownloadBuilder { /// Build [LinearDownloader] with provided consensus /// and header client implementations - pub fn build( - self, - consensus: Arc, - client: Arc, - local_head: SealedHeader, - sync_target_block_hash: H256, - ) -> LinearDownloader + pub fn build(self, consensus: Arc, client: Arc) -> LinearDownloader where H: HeadersClient + 'static, { @@ -829,11 +882,11 @@ impl LinearDownloadBuilder { max_concurrent_requests, max_buffered_responses, } = self; - let mut downloader = LinearDownloader { + LinearDownloader { consensus, client, - local_head, - sync_target: SyncTargetBlock { hash: sync_target_block_hash, number: None }, + local_head: None, + sync_target: None, // Note: we set these to `0` first, they'll be updated once the sync target response is // handled and only used afterwards next_request_block_number: 0, @@ -849,12 +902,7 @@ impl LinearDownloadBuilder { buffered_responses: Default::default(), queued_validated_headers: Default::default(), metrics: DownloaderMetrics::new(HEADERS_DOWNLOADER_SCOPE), - }; - - downloader.sync_target_request = - Some(downloader.request_fut(downloader.get_sync_target_request(), Priority::High)); - - downloader + } } } @@ -882,6 +930,7 @@ mod tests { use super::*; use crate::headers::test_utils::child_header; + use assert_matches::assert_matches; use reth_interfaces::test_utils::{TestConsensus, TestHeadersClient}; use reth_primitives::SealedHeader; @@ -892,12 +941,11 @@ mod tests { let genesis = SealedHeader::default(); - let mut downloader = LinearDownloadBuilder::default().build( - Arc::new(TestConsensus::default()), - Arc::clone(&client), - genesis, - H256::random(), - ); + let mut downloader = LinearDownloadBuilder::default() + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); + + downloader.update_local_head(genesis); + downloader.update_sync_target(SyncTarget::Tip(H256::random())); downloader.sync_target_request.take(); @@ -909,7 +957,10 @@ mod tests { let target = SyncTarget::Gap(SealedHeader::new(Default::default(), H256::random())); downloader.update_sync_target(target); assert!(downloader.sync_target_request.is_none()); - assert!(downloader.sync_target.number.is_some()); + assert_matches!( + downloader.sync_target, + Some(target) => target.number.is_some() + ); } /// Tests that request calc works @@ -919,12 +970,10 @@ mod tests { let header = SealedHeader::default(); - let mut downloader = LinearDownloadBuilder::default().build( - Arc::new(TestConsensus::default()), - Arc::clone(&client), - header.clone(), - H256::random(), - ); + let mut downloader = LinearDownloadBuilder::default() + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); + downloader.update_local_head(header.clone()); + downloader.update_sync_target(SyncTarget::Tip(H256::random())); downloader.queued_validated_headers.push(header.clone()); let mut next = header.as_ref().clone(); @@ -961,12 +1010,11 @@ mod tests { let batch_size = 99; let start = 1000; - let mut downloader = LinearDownloadBuilder::default().request_limit(batch_size).build( - Arc::new(TestConsensus::default()), - Arc::clone(&client), - genesis, - H256::random(), - ); + let mut downloader = LinearDownloadBuilder::default() + .request_limit(batch_size) + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); + downloader.update_local_head(genesis); + downloader.update_sync_target(SyncTarget::Tip(H256::random())); downloader.next_request_block_number = start; let mut total = 0; @@ -975,7 +1023,7 @@ mod tests { total += req.limit; } assert_eq!(total, start); - assert_eq!(downloader.next_request_block_number, downloader.local_block_number()); + assert_eq!(Some(downloader.next_request_block_number), downloader.local_block_number()); } #[test] @@ -1013,7 +1061,9 @@ mod tests { let mut downloader = LinearDownloadBuilder::default() .stream_batch_size(3) .request_limit(3) - .build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); + downloader.update_local_head(p3.clone()); + downloader.update_sync_target(SyncTarget::Tip(p0.hash())); client .extend(vec![ @@ -1043,7 +1093,9 @@ mod tests { let mut downloader = LinearDownloadBuilder::default() .stream_batch_size(1) .request_limit(1) - .build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); + downloader.update_local_head(p3.clone()); + downloader.update_sync_target(SyncTarget::Tip(p0.hash())); client .extend(vec![ diff --git a/crates/net/downloaders/src/headers/task.rs b/crates/net/downloaders/src/headers/task.rs index 9ee7623411d4..1d9fd0f05896 100644 --- a/crates/net/downloaders/src/headers/task.rs +++ b/crates/net/downloaders/src/headers/task.rs @@ -48,8 +48,6 @@ impl TaskDownloader { /// let downloader = LinearDownloader::::builder().build( /// consensus, /// client, - /// Default::default(), - /// Default::default(), /// ); /// let downloader = TaskDownloader::spawn(downloader); /// # } @@ -172,9 +170,11 @@ mod tests { let downloader = LinearDownloadBuilder::default() .stream_batch_size(1) .request_limit(1) - .build(Arc::new(TestConsensus::default()), Arc::clone(&client), p3.clone(), p0.hash()); + .build(Arc::new(TestConsensus::default()), Arc::clone(&client)); let mut downloader = TaskDownloader::spawn(downloader); + downloader.update_local_head(p3.clone()); + downloader.update_sync_target(SyncTarget::Tip(p0.hash())); client .extend(vec![ diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 40a0d35bf6d8..e534a43a0951 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -443,12 +443,9 @@ mod tests { client: client.clone(), consensus: consensus.clone(), downloader_factory: Box::new(move || { - LinearDownloadBuilder::default().stream_batch_size(500).build( - consensus.clone(), - client.clone(), - Default::default(), - Default::default(), - ) + LinearDownloadBuilder::default() + .stream_batch_size(500) + .build(consensus.clone(), client.clone()) }), network_handle: TestStatusUpdater::default(), tx: TestTransaction::default(),