diff --git a/Cargo.lock b/Cargo.lock index 0c0134787b296..b6007f7d98807 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-lock" version = "2.5.0" @@ -2503,9 +2509,14 @@ dependencies = [ name = "reth-headers-downloaders" version = "0.1.0" dependencies = [ + "assert_matches", "async-trait", + "once_cell", + "rand", "reth-interfaces", + "reth-primitives", "reth-rpc-types", + "tokio", ] [[package]] diff --git a/crates/net/headers-downloaders/Cargo.toml b/crates/net/headers-downloaders/Cargo.toml index 5ac5afa2e3788..b380bce065078 100644 --- a/crates/net/headers-downloaders/Cargo.toml +++ b/crates/net/headers-downloaders/Cargo.toml @@ -10,7 +10,12 @@ description = "Implementations of various header downloader" [dependencies] async-trait = "0.1.58" reth-interfaces = { path = "../../interfaces" } +reth-primitives = { path = "../../primitives" } reth-rpc-types = { path = "../rpc-types" } [dev-dependencies] +assert_matches = "1.5.0" +once_cell = "1.15.0" +rand = "0.8.5" reth-interfaces = { path = "../../interfaces", features = ["test-helpers"] } +tokio = { version = "1.21.2", features = ["full"] } diff --git a/crates/net/headers-downloaders/src/linear.rs b/crates/net/headers-downloaders/src/linear.rs index e69de29bb2d1d..3b704ad195ae7 100644 --- a/crates/net/headers-downloaders/src/linear.rs +++ b/crates/net/headers-downloaders/src/linear.rs @@ -0,0 +1,357 @@ +use std::time::Duration; + +use async_trait::async_trait; +use reth_interfaces::{ + consensus::Consensus, + p2p::headers::{ + client::{HeadersClient, HeadersStream}, + downloader::{DownloadError, Downloader}, + }, +}; +use reth_primitives::{rpc::BlockId, HeaderLocked}; +use reth_rpc_types::engine::ForkchoiceState; + +/// Download headers in batches +#[derive(Debug)] +pub struct LinearDownloader<'a, C, H> { + /// The consensus client + consensus: &'a C, + /// The headers client + client: &'a H, + /// The batch size per one request + pub batch_size: u64, + /// A single request timeout + pub request_timeout: Duration, + /// The number of retries for downloading + pub request_retries: usize, +} + +#[async_trait] +impl<'a, C: Consensus, H: HeadersClient> Downloader for LinearDownloader<'a, C, H> { + type Consensus = C; + type Client = H; + + fn consensus(&self) -> &Self::Consensus { + self.consensus + } + + fn client(&self) -> &Self::Client { + self.client + } + + /// The request timeout + fn timeout(&self) -> Duration { + self.request_timeout + } + + /// Download headers in batches with retries. + /// Returns the header collection in sorted ascending order + async fn download( + &self, + head: &HeaderLocked, + forkchoice: &ForkchoiceState, + ) -> Result, DownloadError> { + let mut stream = self.client().stream_headers().await; + let mut retries = self.request_retries; + + // Header order will be preserved during inserts + let mut out = vec![]; + loop { + let result = self.download_batch(&mut stream, forkchoice, head, out.get(0)).await; + match result { + Ok(result) => match result { + LinearDownloadResult::Batch(mut headers) => { + // TODO: fix + headers.extend_from_slice(&out); + out = headers; + } + LinearDownloadResult::Finished(mut headers) => { + // TODO: fix + headers.extend_from_slice(&out); + out = headers; + return Ok(out) + } + LinearDownloadResult::Ignore => (), + }, + Err(e) if e.is_retryable() && retries > 1 => { + retries -= 1; + } + Err(e) => return Err(e), + } + } + } +} + +/// The intermediate download result +#[derive(Debug)] +pub enum LinearDownloadResult { + /// Downloaded last batch up to tip + Finished(Vec), + /// Downloaded batch + Batch(Vec), + /// Ignore this batch + Ignore, +} + +impl<'a, C: Consensus, H: HeadersClient> LinearDownloader<'a, C, H> { + async fn download_batch( + &'a self, + stream: &'a mut HeadersStream, + forkchoice: &'a ForkchoiceState, + head: &'a HeaderLocked, + earliest: Option<&HeaderLocked>, + ) -> Result { + // Request headers starting from tip or earliest cached + let start = earliest.map_or(forkchoice.head_block_hash, |h| h.parent_hash); + let mut headers = + self.download_headers(stream, BlockId::Hash(start), self.batch_size).await?; + headers.sort_unstable_by_key(|h| h.number); + + let mut out = Vec::with_capacity(headers.len()); + // Iterate headers in reverse + for parent in headers.into_iter().rev() { + let parent = parent.lock(); + + if head.hash() == parent.hash() { + // We've reached the target + return Ok(LinearDownloadResult::Finished(out)) + } + + match out.first().or(earliest) { + Some(header) if self.validate(header, &parent).is_ok() => { + return Ok(LinearDownloadResult::Ignore) + } + // The buffer is empty and the first header does not match the tip, discard + // TODO: penalize the peer? + None if parent.hash() != forkchoice.head_block_hash => { + return Ok(LinearDownloadResult::Ignore) + } + _ => (), + }; + + out.insert(0, parent); + } + + Ok(LinearDownloadResult::Batch(out)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_interfaces::{ + p2p::headers::client::HeadersRequest, + test_helpers::{TestConsensus, TestHeadersClient}, + }; + use reth_primitives::{rpc::BlockId, HeaderLocked, H256}; + use test_runner::LinearTestRunner; + + use assert_matches::assert_matches; + use once_cell::sync::Lazy; + use tokio::sync::oneshot::error::TryRecvError; + + static CONSENSUS: Lazy = Lazy::new(|| TestConsensus::default()); + static CONSENSUS_FAIL: Lazy = Lazy::new(|| { + let mut consensus = TestConsensus::default(); + consensus.set_fail_validation(true); + consensus + }); + + static CLIENT: Lazy = Lazy::new(|| TestHeadersClient::default()); + + #[tokio::test] + async fn download_timeout() { + let runner = LinearTestRunner::new(); + let retries = runner.retries; + let rx = runner.run(&*CONSENSUS, &*CLIENT, HeaderLocked::default(), H256::zero()); + + let mut requests = vec![]; + CLIENT + .on_header_request(retries, |_id, req| { + requests.push(req); + }) + .await; + assert_eq!(requests.len(), retries); + assert_matches!(rx.await, Ok(Err(DownloadError::NoHeaderResponse { .. }))); + } + + #[tokio::test] + async fn download_timeout_on_invalid_messages() { + let runner = LinearTestRunner::new(); + let retries = runner.retries; + let rx = runner.run(&*CONSENSUS, &*CLIENT, HeaderLocked::default(), H256::zero()); + + let mut num_of_reqs = 0; + let mut last_req_id: Option = None; + + CLIENT + .on_header_request(retries, |id, _req| { + num_of_reqs += 1; + last_req_id = Some(id); + CLIENT.send_header_response(id.saturating_add(id % 2), vec![]); + }) + .await; + + assert_eq!(num_of_reqs, retries); + assert_matches!( + rx.await, + Ok(Err(DownloadError::NoHeaderResponse { request_id })) if request_id == last_req_id.unwrap()); + } + + #[tokio::test] + async fn download_propagates_consensus_validation_error() { + let tip_parent = gen_random_header(1, None); + let tip = gen_random_header(2, Some(tip_parent.hash())); + + let rx = LinearTestRunner::new().run( + &*CONSENSUS_FAIL, + &*CLIENT, + HeaderLocked::default(), + tip.hash(), + ); + + let requests = CLIENT.on_header_request(1, |id, req| (id, req)).await; + let request = requests.last(); + assert_matches!( + request, + Some((_, HeadersRequest { start, .. })) + if matches!(start, BlockId::Hash(hash) if *hash == tip.hash()) + ); + + let request = request.unwrap(); + CLIENT.send_header_response( + request.0, + vec![tip_parent.clone().unlock(), tip.clone().unlock()], + ); + + assert_matches!( + rx.await, + Ok(Err(DownloadError::HeaderValidation { hash, .. })) if hash == tip_parent.hash() + ); + } + + #[tokio::test] + async fn download_starts_with_chain_tip() { + let head = gen_random_header(1, None); + let tip = gen_random_header(2, Some(head.hash())); + + let mut rx = LinearTestRunner::new().run(&*CONSENSUS, &*CLIENT, head.clone(), tip.hash()); + + CLIENT + .on_header_request(1, |id, _req| { + let mut corrupted_tip = tip.clone().unlock(); + corrupted_tip.nonce = rand::random(); + CLIENT.send_header_response(id, vec![corrupted_tip, head.clone().unlock()]) + }) + .await; + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + CLIENT + .on_header_request(1, |id, _req| { + CLIENT.send_header_response(id, vec![tip.clone().unlock(), head.clone().unlock()]) + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(ref val)) if val.len() == 1); + assert_eq!(*result.unwrap().unwrap().first().unwrap(), tip); + } + + #[tokio::test] + async fn download_returns_headers_asc() { + let (start, end) = (100, 200); + let head = gen_random_header(start, None); + let headers = gen_block_range(start + 1..end, head.hash()); + let tip = headers.last().unwrap(); + + let rx = LinearTestRunner::new().run(&*CONSENSUS, &*CLIENT, head.clone(), tip.hash()); + + let mut idx = 0; + let chunk_size = 10; + let chunk_iter = headers.clone().into_iter().rev(); + // `usize::div_ceil` is unstable. ref: https://github.com/rust-lang/rust/issues/88581 + let count = (headers.len() + chunk_size - 1) / chunk_size; + CLIENT + .on_header_request(count + 1, |id, _req| { + let mut chunk = + chunk_iter.clone().skip(chunk_size * idx).take(chunk_size).peekable(); + idx += 1; + if chunk.peek().is_some() { + let headers: Vec<_> = chunk.map(|h| h.unlock()).collect(); + CLIENT.send_header_response(id, headers); + } else { + CLIENT.send_header_response(id, vec![head.clone().unlock()]) + } + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(_))); + let result = result.unwrap().unwrap(); + assert_eq!(result, headers); + } + + mod test_runner { + use super::*; + use reth_interfaces::{consensus::Consensus, p2p::headers::client::HeadersClient}; + use reth_rpc_types::engine::ForkchoiceState; + use tokio::sync::oneshot; + + type DownloadResult = Result, DownloadError>; + + pub(crate) struct LinearTestRunner { + pub(crate) retries: usize, + test_ch: (oneshot::Sender, oneshot::Receiver), + } + + impl LinearTestRunner { + pub(crate) fn new() -> Self { + Self { test_ch: oneshot::channel(), retries: 5 } + } + + pub(crate) fn run( + self, + consensus: &'static C, + client: &'static H, + head: HeaderLocked, + tip: H256, + ) -> oneshot::Receiver { + let (tx, rx) = self.test_ch; + let downloader = LinearDownloader { + consensus, + client, + request_retries: self.retries, + batch_size: 100, + request_timeout: Duration::from_millis(100), + }; + tokio::spawn(async move { + let forkchoice = ForkchoiceState { head_block_hash: tip, ..Default::default() }; + let result = downloader.download(&head, &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + rx + } + } + } + + pub(crate) fn gen_block_range(rng: std::ops::Range, head: H256) -> Vec { + let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); + for idx in rng { + headers.push(gen_random_header( + idx, + Some(headers.last().map(|h: &HeaderLocked| h.hash()).unwrap_or(head)), + )); + } + headers + } + + pub(crate) fn gen_random_header(number: u64, parent: Option) -> HeaderLocked { + let header = reth_primitives::Header { + number, + nonce: rand::random(), + parent_hash: parent.unwrap_or_default(), + ..Default::default() + }; + header.lock() + } +} diff --git a/crates/net/rpc-types/src/eth/engine.rs b/crates/net/rpc-types/src/eth/engine.rs index 2559f40677e68..85850f2326ded 100644 --- a/crates/net/rpc-types/src/eth/engine.rs +++ b/crates/net/rpc-types/src/eth/engine.rs @@ -25,7 +25,7 @@ pub struct ExecutionPayload { } /// This structure encapsulates the fork choice state -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ForkchoiceState { pub head_block_hash: H256,