diff --git a/Cargo.lock b/Cargo.lock index 80c874264c23..494a59c41371 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-lock" version = "2.5.0" @@ -730,6 +736,19 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.6.0" @@ -2499,6 +2518,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "reth-headers-downloaders" +version = "0.1.0" +dependencies = [ + "assert_matches", + "async-trait", + "once_cell", + "rand", + "reth-interfaces", + "reth-primitives", + "reth-rpc-types", + "serial_test", + "tokio", +] + [[package]] name = "reth-interfaces" version = "0.1.0" @@ -2507,15 +2541,18 @@ dependencies = [ "auto_impl", "bytes", "eyre", + "futures", "heapless", "parity-scale-codec", "postcard", + "rand", "reth-primitives", "reth-rpc-types", "serde", "test-fuzz", "thiserror", "tokio", + "tokio-stream", ] [[package]] @@ -3052,6 +3089,32 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha-1" version = "0.9.8" @@ -3455,6 +3518,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 0bb846476446..03d0071279a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", + "crates/net/headers-downloaders", "crates/primitives", "crates/stages", "crates/transaction-pool", diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index 31c9b868836c..5360495fb835 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -21,10 +21,15 @@ serde = { version = "1.0.*", default-features = false } postcard = { version = "1.0.2", features = ["alloc"] } heapless = "0.7.16" parity-scale-codec = { version = "3.2.1", features = ["bytes"] } +futures = "0.3.25" +tokio-stream = "0.1.11" +rand = "0.8.5" [dev-dependencies] test-fuzz = "3.0.4" tokio = { version = "1.21.2", features = ["full"] } +tokio-stream = { version = "0.1.11", features = ["sync"] } [features] bench = [] +test-utils = ["tokio-stream/sync"] diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index d67d074af6e0..10aef492635f 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -7,14 +7,13 @@ use tokio::sync::watch::Receiver; /// Consensus is a protocol that chooses canonical chain. /// We are checking validity of block header here. #[async_trait] -pub trait Consensus { +#[auto_impl::auto_impl(&, Arc)] +pub trait Consensus: Send + Sync { /// Get a receiver for the fork choice state fn fork_choice_state(&self) -> Receiver; /// Validate if header is correct and follows consensus specification - fn validate_header(&self, _header: &Header) -> Result<(), Error> { - Ok(()) - } + fn validate_header(&self, header: &Header, parent: &Header) -> Result<(), Error>; } /// Consensus errors (TODO) diff --git a/crates/interfaces/src/lib.rs b/crates/interfaces/src/lib.rs index 91d9521bc095..d21fbc074fb5 100644 --- a/crates/interfaces/src/lib.rs +++ b/crates/interfaces/src/lib.rs @@ -18,7 +18,14 @@ pub mod db; /// Traits that provide chain access. pub mod provider; +/// P2P traits. +pub mod p2p; + /// Possible errors when interacting with the chain. mod error; pub use error::{Error, Result}; + +#[cfg(any(test, feature = "test-utils"))] +/// Common test helpers for mocking out Consensus, Downloaders and Header Clients. +pub mod test_utils; diff --git a/crates/interfaces/src/p2p/headers/client.rs b/crates/interfaces/src/p2p/headers/client.rs new file mode 100644 index 000000000000..341600603588 --- /dev/null +++ b/crates/interfaces/src/p2p/headers/client.rs @@ -0,0 +1,57 @@ +use crate::p2p::MessageStream; + +use reth_primitives::{rpc::BlockId, Header, H256, H512}; + +use async_trait::async_trait; +use std::{collections::HashSet, fmt::Debug}; + +/// Each peer returns a list of headers and the request id corresponding +/// to these headers. This allows clients to make multiple requests in parallel +/// and multiplex the responses accordingly. +pub type HeadersStream = MessageStream; + +/// The item contained in each [`MessageStream`] when used to fetch [`Header`]s via +/// [`HeadersClient`]. +#[derive(Clone, Debug)] +pub struct HeadersResponse { + /// The request id associated with this response. + pub id: u64, + /// The headers the peer replied with. + pub headers: Vec
, +} + +impl From<(u64, Vec
)> for HeadersResponse { + fn from((id, headers): (u64, Vec
)) -> Self { + HeadersResponse { id, headers } + } +} + +/// The header request struct to be sent to connected peers, which +/// will proceed to ask them to stream the requested headers to us. +#[derive(Clone, Debug)] +pub struct HeadersRequest { + /// The starting block + pub start: BlockId, + /// The response max size + pub limit: u64, + /// Flag indicating whether the blocks should + /// arrive in reverse + pub reverse: bool, +} + +/// The block headers downloader client +#[async_trait] +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait HeadersClient: Send + Sync + Debug { + /// Update the node's Status message. + /// + /// The updated Status message will be used during any new eth/65 handshakes. + async fn update_status(&self, height: u64, hash: H256, td: H256); + + /// Sends the header request to the p2p network. + // TODO: What does this return? + async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet; + + /// Stream the header response messages + async fn stream_headers(&self) -> HeadersStream; +} diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs new file mode 100644 index 000000000000..680867b3cd83 --- /dev/null +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -0,0 +1,131 @@ +use super::client::{HeadersClient, HeadersRequest, HeadersStream}; +use crate::consensus::Consensus; + +use async_trait::async_trait; +use reth_primitives::{ + rpc::{BlockId, BlockNumber}, + Header, HeaderLocked, H256, +}; +use reth_rpc_types::engine::ForkchoiceState; +use std::{fmt::Debug, time::Duration}; +use thiserror::Error; +use tokio_stream::StreamExt; + +/// The downloader error type +#[derive(Error, Debug, Clone)] +pub enum DownloadError { + /// Header validation failed + #[error("Failed to validate header {hash}. Details: {details}.")] + HeaderValidation { + /// Hash of header failing validation + hash: H256, + /// The details of validation failure + details: String, + }, + /// No headers reponse received + #[error("Failed to get headers for request {request_id}.")] + NoHeaderResponse { + /// The last request ID + request_id: u64, + }, + /// Timed out while waiting for request id response. + #[error("Timed out while getting headers for request {request_id}.")] + Timeout { + /// The request id that timed out + request_id: u64, + }, + /// Error when checking that the current [`Header`] has the parent's hash as the parent_hash + /// field, and that they have sequential block numbers. + #[error("Headers did not match, current number: {header_number} / current hash: {header_hash}, parent number: {parent_number} / parent_hash: {parent_hash}")] + MismatchedHeaders { + /// The header number being evaluated + header_number: BlockNumber, + /// The header hash being evaluated + header_hash: H256, + /// The parent number being evaluated + parent_number: BlockNumber, + /// The parent hash being evaluated + parent_hash: H256, + }, +} + +impl DownloadError { + /// Returns bool indicating whether this error is retryable or fatal, in the cases + /// where the peer responds with no headers, or times out. + pub fn is_retryable(&self) -> bool { + matches!(self, DownloadError::NoHeaderResponse { .. } | DownloadError::Timeout { .. }) + } +} + +/// The header downloading strategy +#[async_trait] +pub trait Downloader: Sync + Send { + /// The Consensus used to verify block validity when + /// downloading + type Consensus: Consensus; + + /// The Client used to download the headers + type Client: HeadersClient; + + /// The request timeout duration + fn timeout(&self) -> Duration; + + /// The consensus engine + fn consensus(&self) -> &Self::Consensus; + + /// The headers client + fn client(&self) -> &Self::Client; + + /// Download the headers + async fn download( + &self, + head: &HeaderLocked, + forkchoice: &ForkchoiceState, + ) -> Result, DownloadError>; + + /// Perform a header request and returns the headers. + // TODO: Isn't this effectively blocking per request per downloader? + // Might be fine, given we can spawn multiple downloaders? + // TODO: Rethink this function, I don't really like the `stream: &mut HeadersStream` + // in the signature. Why can we not call `self.client.stream_headers()`? Gives lifetime error. + async fn download_headers( + &self, + stream: &mut HeadersStream, + start: BlockId, + limit: u64, + ) -> Result, DownloadError> { + let request_id = rand::random(); + let request = HeadersRequest { start, limit, reverse: true }; + let _ = self.client().send_header_request(request_id, request).await; + + // Filter stream by request id and non empty headers content + let stream = stream + .filter(|resp| request_id == resp.id && !resp.headers.is_empty()) + .timeout(self.timeout()); + + // Pop the first item. + match Box::pin(stream).try_next().await { + Ok(Some(item)) => Ok(item.headers), + _ => return Err(DownloadError::NoHeaderResponse { request_id }), + } + } + + /// Validate whether the header is valid in relation to it's parent + /// + /// Returns Ok(false) if the + fn validate(&self, header: &HeaderLocked, parent: &HeaderLocked) -> Result<(), DownloadError> { + if !(parent.hash() == header.parent_hash && parent.number + 1 == header.number) { + return Err(DownloadError::MismatchedHeaders { + header_number: header.number.into(), + parent_number: parent.number.into(), + header_hash: header.hash(), + parent_hash: parent.hash(), + }) + } + + self.consensus().validate_header(header, parent).map_err(|e| { + DownloadError::HeaderValidation { hash: parent.hash(), details: e.to_string() } + })?; + Ok(()) + } +} diff --git a/crates/interfaces/src/p2p/headers/mod.rs b/crates/interfaces/src/p2p/headers/mod.rs new file mode 100644 index 000000000000..915b28ff08b5 --- /dev/null +++ b/crates/interfaces/src/p2p/headers/mod.rs @@ -0,0 +1,11 @@ +/// Trait definition for [`HeadersClient`] +/// +/// [`HeadersClient`]: client::HeadersClient +pub mod client; + +/// A downloader that receives and verifies block headers, is generic +/// over the Consensus and the HeadersClient being used. +/// +/// [`Consensus`]: crate::consensus::Consensus +/// [`HeadersClient`]: client::HeadersClient +pub mod downloader; diff --git a/crates/interfaces/src/p2p/mod.rs b/crates/interfaces/src/p2p/mod.rs new file mode 100644 index 000000000000..a62037e8bd05 --- /dev/null +++ b/crates/interfaces/src/p2p/mod.rs @@ -0,0 +1,13 @@ +/// Traits for implementing P2P Header Clients. Also includes implementations +/// of a Linear and a Parallel downloader generic over the [`Consensus`] and +/// [`HeadersClient`]. +/// +/// [`Consensus`]: crate::consensus::Consensus +/// [`HeadersClient`]: crate::p2p::headers::HeadersClient +pub mod headers; + +use futures::Stream; +use std::pin::Pin; + +/// The stream of responses from the connected peers, generic over the response type. +pub type MessageStream = Pin + Send>>; diff --git a/crates/interfaces/src/test_utils.rs b/crates/interfaces/src/test_utils.rs new file mode 100644 index 000000000000..2703f878b7c1 --- /dev/null +++ b/crates/interfaces/src/test_utils.rs @@ -0,0 +1,165 @@ +use crate::{ + consensus::{self, Consensus}, + p2p::headers::{ + client::{HeadersClient, HeadersRequest, HeadersResponse, HeadersStream}, + downloader::{DownloadError, Downloader}, + }, +}; +use std::{collections::HashSet, sync::Arc, time::Duration}; + +use reth_primitives::{Header, HeaderLocked, H256, H512}; +use reth_rpc_types::engine::ForkchoiceState; + +use tokio::sync::{broadcast, mpsc, watch}; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; + +#[derive(Debug)] +/// A test downloader which just returns the values that have been pushed to it. +pub struct TestDownloader { + result: Result, DownloadError>, +} + +impl TestDownloader { + /// Instantiates the downloader with the mock responses + pub fn new(result: Result, DownloadError>) -> Self { + Self { result } + } +} + +#[async_trait::async_trait] +impl Downloader for TestDownloader { + type Consensus = TestConsensus; + type Client = TestHeadersClient; + + fn timeout(&self) -> Duration { + Duration::from_millis(1000) + } + + fn consensus(&self) -> &Self::Consensus { + unimplemented!() + } + + fn client(&self) -> &Self::Client { + unimplemented!() + } + + async fn download( + &self, + _: &HeaderLocked, + _: &ForkchoiceState, + ) -> Result, DownloadError> { + self.result.clone() + } +} + +#[derive(Debug)] +/// A test client for fetching headers +pub struct TestHeadersClient { + req_tx: mpsc::Sender<(u64, HeadersRequest)>, + req_rx: Arc>>, + res_tx: broadcast::Sender, + res_rx: broadcast::Receiver, +} + +impl Default for TestHeadersClient { + /// Construct a new test header downloader. + fn default() -> Self { + let (req_tx, req_rx) = mpsc::channel(1); + let (res_tx, res_rx) = broadcast::channel(1); + Self { req_tx, req_rx: Arc::new(tokio::sync::Mutex::new(req_rx)), res_tx, res_rx } + } +} + +impl TestHeadersClient { + /// Helper for interacting with the environment on each request, allowing the client + /// to also reply to messages. + pub async fn on_header_request(&self, mut count: usize, mut f: F) -> Vec + where + F: FnMut(u64, HeadersRequest) -> T, + { + let mut rx = self.req_rx.lock().await; + let mut results = vec![]; + while let Some((id, req)) = rx.recv().await { + results.push(f(id, req)); + count -= 1; + if count == 0 { + break + } + } + results + } + + /// Helper for pushing responses to the client + pub fn send_header_response(&self, id: u64, headers: Vec
) { + self.res_tx.send((id, headers).into()).expect("failed to send header response"); + } +} + +#[async_trait::async_trait] +impl HeadersClient for TestHeadersClient { + // noop + async fn update_status(&self, _height: u64, _hash: H256, _td: H256) {} + + async fn send_header_request(&self, id: u64, request: HeadersRequest) -> HashSet { + self.req_tx.send((id, request)).await.expect("failed to send request"); + HashSet::default() + } + + async fn stream_headers(&self) -> HeadersStream { + Box::pin(BroadcastStream::new(self.res_rx.resubscribe()).filter_map(|e| e.ok())) + } +} + +/// Consensus client impl for testing +#[derive(Debug)] +pub struct TestConsensus { + /// Watcher over the forkchoice state + channel: (watch::Sender, watch::Receiver), + /// Flag whether the header validation should purposefully fail + fail_validation: bool, +} + +impl Default for TestConsensus { + fn default() -> Self { + Self { + channel: watch::channel(ForkchoiceState { + head_block_hash: H256::zero(), + finalized_block_hash: H256::zero(), + safe_block_hash: H256::zero(), + }), + fail_validation: false, + } + } +} + +impl TestConsensus { + /// Update the forkchoice state + pub fn update_tip(&mut self, tip: H256) { + let state = ForkchoiceState { + head_block_hash: tip, + finalized_block_hash: H256::zero(), + safe_block_hash: H256::zero(), + }; + self.channel.0.send(state).expect("updating forkchoice state failed"); + } + + /// Update the validation flag + pub fn set_fail_validation(&mut self, val: bool) { + self.fail_validation = val; + } +} + +#[async_trait::async_trait] +impl Consensus for TestConsensus { + fn fork_choice_state(&self) -> watch::Receiver { + self.channel.1.clone() + } + + fn validate_header(&self, _header: &Header, _parent: &Header) -> Result<(), consensus::Error> { + if self.fail_validation { + Err(consensus::Error::ConsensusError) + } else { + Ok(()) + } + } +} diff --git a/crates/net/headers-downloaders/Cargo.toml b/crates/net/headers-downloaders/Cargo.toml new file mode 100644 index 000000000000..b781442e7b07 --- /dev/null +++ b/crates/net/headers-downloaders/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reth-headers-downloaders" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = "Implementations of various header downloader" + +[dependencies] +async-trait = "0.1.58" +reth-interfaces = { path = "../../interfaces" } +reth-primitives = { path = "../../primitives" } +reth-rpc-types = { path = "../rpc-types" } + +[dev-dependencies] +assert_matches = "1.5.0" +once_cell = "1.15.0" +rand = "0.8.5" +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } +tokio = { version = "1.21.2", features = ["full"] } +serial_test = "0.9.0" diff --git a/crates/net/headers-downloaders/src/lib.rs b/crates/net/headers-downloaders/src/lib.rs new file mode 100644 index 000000000000..5e97cff8e4b4 --- /dev/null +++ b/crates/net/headers-downloaders/src/lib.rs @@ -0,0 +1,11 @@ +#![warn(missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Implements Header Downloader algorithms + +/// A Linear downloader implementation. +pub mod linear; diff --git a/crates/net/headers-downloaders/src/linear.rs b/crates/net/headers-downloaders/src/linear.rs new file mode 100644 index 000000000000..589ad0d35682 --- /dev/null +++ b/crates/net/headers-downloaders/src/linear.rs @@ -0,0 +1,419 @@ +use std::{borrow::Borrow, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use reth_interfaces::{ + consensus::Consensus, + p2p::headers::{ + client::{HeadersClient, HeadersStream}, + downloader::{DownloadError, Downloader}, + }, +}; +use reth_primitives::{rpc::BlockId, HeaderLocked}; +use reth_rpc_types::engine::ForkchoiceState; + +/// Download headers in batches +#[derive(Debug)] +pub struct LinearDownloader { + /// The consensus client + consensus: Arc, + /// The headers client + client: Arc, + /// The batch size per one request + pub batch_size: u64, + /// A single request timeout + pub request_timeout: Duration, + /// The number of retries for downloading + pub request_retries: usize, +} + +#[async_trait] +impl Downloader for LinearDownloader { + type Consensus = C; + type Client = H; + + fn consensus(&self) -> &Self::Consensus { + self.consensus.borrow() + } + + fn client(&self) -> &Self::Client { + self.client.borrow() + } + + /// The request timeout + fn timeout(&self) -> Duration { + self.request_timeout + } + + /// Download headers in batches with retries. + /// Returns the header collection in sorted descending + /// order from chain tip to local head + async fn download( + &self, + head: &HeaderLocked, + forkchoice: &ForkchoiceState, + ) -> Result, DownloadError> { + let mut stream = self.client().stream_headers().await; + let mut retries = self.request_retries; + + // Header order will be preserved during inserts + let mut out = vec![]; + loop { + let result = self.download_batch(&mut stream, forkchoice, head, out.last()).await; + match result { + Ok(result) => match result { + LinearDownloadResult::Batch(mut headers) => { + out.append(&mut headers); + } + LinearDownloadResult::Finished(mut headers) => { + out.append(&mut headers); + return Ok(out) + } + LinearDownloadResult::Ignore => (), + }, + Err(e) if e.is_retryable() && retries > 1 => { + retries -= 1; + } + Err(e) => return Err(e), + } + } + } +} + +/// The intermediate download result +#[derive(Debug)] +pub enum LinearDownloadResult { + /// Downloaded last batch up to tip + Finished(Vec), + /// Downloaded batch + Batch(Vec), + /// Ignore this batch + Ignore, +} + +impl LinearDownloader { + async fn download_batch( + &self, + stream: &mut HeadersStream, + forkchoice: &ForkchoiceState, + head: &HeaderLocked, + earliest: Option<&HeaderLocked>, + ) -> Result { + // Request headers starting from tip or earliest cached + let start = earliest.map_or(forkchoice.head_block_hash, |h| h.parent_hash); + let mut headers = + self.download_headers(stream, BlockId::Hash(start), self.batch_size).await?; + headers.sort_unstable_by_key(|h| h.number); + + let mut out = Vec::with_capacity(headers.len()); + // Iterate headers in reverse + for parent in headers.into_iter().rev() { + let parent = parent.lock(); + + if head.hash() == parent.hash() { + // We've reached the target + return Ok(LinearDownloadResult::Finished(out)) + } + + match out.last().or(earliest) { + Some(header) => { + match self.validate(header, &parent) { + // ignore mismatched headers + Err(DownloadError::MismatchedHeaders { .. }) => { + return Ok(LinearDownloadResult::Ignore) + } + // propagate any other error if any + Err(e) => return Err(e), + // proceed to insert if validation is successful + _ => (), + }; + } + // The buffer is empty and the first header does not match the tip, discard + // TODO: penalize the peer? + None if parent.hash() != forkchoice.head_block_hash => { + return Ok(LinearDownloadResult::Ignore) + } + _ => (), + }; + + out.push(parent); + } + + Ok(LinearDownloadResult::Batch(out)) + } +} + +/// The builder for [LinearDownloader] with +/// some default settings +#[derive(Debug)] +pub struct LinearDownloadBuilder { + /// The batch size per one request + batch_size: u64, + /// A single request timeout + request_timeout: Duration, + /// The number of retries for downloading + request_retries: usize, +} + +impl Default for LinearDownloadBuilder { + fn default() -> Self { + Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 } + } +} + +impl LinearDownloadBuilder { + /// Initialize a new builder + pub fn new() -> Self { + Self::default() + } + + /// Set the request batch size + pub fn batch_size(mut self, size: u64) -> Self { + self.batch_size = size; + self + } + + /// Set the request timeout + pub fn timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// Set the number of retries per request + pub fn retries(mut self, retries: usize) -> Self { + self.request_retries = retries; + self + } + + /// Build [LinearDownloader] with provided consensus + /// and header client implementations + pub fn build( + self, + consensus: Arc, + client: Arc, + ) -> LinearDownloader { + LinearDownloader { + consensus, + client, + batch_size: self.batch_size, + request_timeout: self.request_timeout, + request_retries: self.request_retries, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_interfaces::{ + p2p::headers::client::HeadersRequest, + test_utils::{TestConsensus, TestHeadersClient}, + }; + use reth_primitives::{rpc::BlockId, HeaderLocked, H256}; + + use assert_matches::assert_matches; + use once_cell::sync::Lazy; + use serial_test::serial; + use tokio::sync::oneshot::{self, error::TryRecvError}; + + static CONSENSUS: Lazy> = Lazy::new(|| Arc::new(TestConsensus::default())); + static CONSENSUS_FAIL: Lazy> = Lazy::new(|| { + let mut consensus = TestConsensus::default(); + consensus.set_fail_validation(true); + Arc::new(consensus) + }); + + static CLIENT: Lazy> = + Lazy::new(|| Arc::new(TestHeadersClient::default())); + + #[tokio::test] + #[serial] + async fn download_timeout() { + let retries = 5; + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new() + .retries(retries) + .build(CONSENSUS.clone(), CLIENT.clone()); + let result = + downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut requests = vec![]; + CLIENT + .on_header_request(retries, |_id, req| { + requests.push(req); + }) + .await; + assert_eq!(requests.len(), retries); + assert_matches!(rx.await, Ok(Err(DownloadError::NoHeaderResponse { .. }))); + } + + #[tokio::test] + #[serial] + async fn download_timeout_on_invalid_messages() { + let retries = 5; + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new() + .retries(retries) + .build(CONSENSUS.clone(), CLIENT.clone()); + let result = + downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut num_of_reqs = 0; + let mut last_req_id: Option = None; + + CLIENT + .on_header_request(retries, |id, _req| { + num_of_reqs += 1; + last_req_id = Some(id); + CLIENT.send_header_response(id.saturating_add(id % 2), vec![]); + }) + .await; + + assert_eq!(num_of_reqs, retries); + assert_matches!( + rx.await, + Ok(Err(DownloadError::NoHeaderResponse { request_id })) if request_id == last_req_id.unwrap() + ); + } + + #[tokio::test] + #[serial] + async fn download_propagates_consensus_validation_error() { + let tip_parent = gen_random_header(1, None); + let tip = gen_random_header(2, Some(tip_parent.hash())); + let tip_hash = tip.hash(); + + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = + LinearDownloadBuilder::new().build(CONSENSUS_FAIL.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&HeaderLocked::default(), &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + let requests = CLIENT.on_header_request(1, |id, req| (id, req)).await; + let request = requests.last(); + assert_matches!( + request, + Some((_, HeadersRequest { start, .. })) + if matches!(start, BlockId::Hash(hash) if *hash == tip_hash) + ); + + let request = request.unwrap(); + CLIENT.send_header_response( + request.0, + vec![tip_parent.clone().unlock(), tip.clone().unlock()], + ); + + assert_matches!( + rx.await, + Ok(Err(DownloadError::HeaderValidation { hash, .. })) if hash == tip_parent.hash() + ); + } + + #[tokio::test] + #[serial] + async fn download_starts_with_chain_tip() { + let head = gen_random_header(1, None); + let tip = gen_random_header(2, Some(head.hash())); + + let tip_hash = tip.hash(); + let chain_head = head.clone(); + let (tx, mut rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&chain_head, &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + CLIENT + .on_header_request(1, |id, _req| { + let mut corrupted_tip = tip.clone().unlock(); + corrupted_tip.nonce = rand::random(); + CLIENT.send_header_response(id, vec![corrupted_tip, head.clone().unlock()]) + }) + .await; + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + CLIENT + .on_header_request(1, |id, _req| { + CLIENT.send_header_response(id, vec![tip.clone().unlock(), head.clone().unlock()]) + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(ref val)) if val.len() == 1); + assert_eq!(*result.unwrap().unwrap().first().unwrap(), tip); + } + + #[tokio::test] + #[serial] + async fn download_returns_headers_desc() { + let (start, end) = (100, 200); + let head = gen_random_header(start, None); + let mut headers = gen_block_range(start + 1..end, head.hash()); + headers.reverse(); + + let tip_hash = headers.first().unwrap().hash(); + let chain_head = head.clone(); + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&chain_head, &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut idx = 0; + let chunk_size = 10; + // `usize::div_ceil` is unstable. ref: https://github.com/rust-lang/rust/issues/88581 + let count = (headers.len() + chunk_size - 1) / chunk_size; + CLIENT + .on_header_request(count + 1, |id, _req| { + let mut chunk = + headers.iter().skip(chunk_size * idx).take(chunk_size).cloned().peekable(); + idx += 1; + if chunk.peek().is_some() { + let headers: Vec<_> = chunk.map(|h| h.unlock()).collect(); + CLIENT.send_header_response(id, headers); + } else { + CLIENT.send_header_response(id, vec![head.clone().unlock()]) + } + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(_))); + let result = result.unwrap().unwrap(); + assert_eq!(result.len(), headers.len()); + assert_eq!(result, headers); + } + + pub(crate) fn gen_block_range(rng: std::ops::Range, head: H256) -> Vec { + let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); + for idx in rng { + headers.push(gen_random_header( + idx, + Some(headers.last().map(|h: &HeaderLocked| h.hash()).unwrap_or(head)), + )); + } + headers + } + + pub(crate) fn gen_random_header(number: u64, parent: Option) -> HeaderLocked { + let header = reth_primitives::Header { + number, + nonce: rand::random(), + parent_hash: parent.unwrap_or_default(), + ..Default::default() + }; + header.lock() + } +} diff --git a/crates/net/rpc-types/src/eth/engine.rs b/crates/net/rpc-types/src/eth/engine.rs index 2559f40677e6..85850f2326de 100644 --- a/crates/net/rpc-types/src/eth/engine.rs +++ b/crates/net/rpc-types/src/eth/engine.rs @@ -25,7 +25,7 @@ pub struct ExecutionPayload { } /// This structure encapsulates the fork choice state -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ForkchoiceState { pub head_block_hash: H256, diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index cf96cc5d52eb..8ed4a185d78f 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -198,6 +198,13 @@ impl Deref for HeaderLocked { } impl HeaderLocked { + /// Construct a new locked header. + /// Applicable when hash is known from + /// the database provided it's not corrupted. + pub fn new(header: Header, hash: H256) -> Self { + Self { header, hash } + } + /// Extract raw header that can be modified. pub fn unlock(self) -> Header { self.header