From fcb81f164982cd809ff66d6b7ce47843c1d11904 Mon Sep 17 00:00:00 2001 From: Georgios Konstantopoulos Date: Mon, 24 Oct 2022 04:38:17 -0700 Subject: [PATCH] headers(part 3) feat: implement Linear downloader (#119) * feat: add headers downloaders crate * feat: more scaffolding * interfaces: generalize retryable erros * feat: implement linear downloader * fix linear downloader tests & add builder * extend & reverse * feat: linear downloader generics behind arc and reversed return order (#120) * put client & consensus behind arc and return headers in rev * cleanup Co-authored-by: Roman Krasiuk --- Cargo.lock | 60 +++ Cargo.toml | 1 + crates/interfaces/src/consensus.rs | 3 +- .../interfaces/src/p2p/headers/downloader.rs | 7 +- crates/net/headers-downloaders/Cargo.toml | 22 + crates/net/headers-downloaders/src/lib.rs | 11 + crates/net/headers-downloaders/src/linear.rs | 419 ++++++++++++++++++ crates/net/rpc-types/src/eth/engine.rs | 2 +- 8 files changed, 520 insertions(+), 5 deletions(-) create mode 100644 crates/net/headers-downloaders/Cargo.toml create mode 100644 crates/net/headers-downloaders/src/lib.rs create mode 100644 crates/net/headers-downloaders/src/linear.rs diff --git a/Cargo.lock b/Cargo.lock index a2e3136246b8..494a59c41371 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -70,6 +70,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "async-lock" version = "2.5.0" @@ -730,6 +736,19 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "der" version = "0.6.0" @@ -2499,6 +2518,21 @@ dependencies = [ "thiserror", ] +[[package]] +name = "reth-headers-downloaders" +version = "0.1.0" +dependencies = [ + "assert_matches", + "async-trait", + "once_cell", + "rand", + "reth-interfaces", + "reth-primitives", + "reth-rpc-types", + "serial_test", + "tokio", +] + [[package]] name = "reth-interfaces" version = "0.1.0" @@ -3055,6 +3089,32 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "92761393ee4dc3ff8f4af487bd58f4307c9329bbedea02cac0089ad9c411e153" +dependencies = [ + "dashmap", + "futures", + "lazy_static", + "log", + "parking_lot", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b6f5d1c3087fb119617cff2966fe3808a80e5eb59a8c1601d5994d66f4346a5" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha-1" version = "0.9.8" diff --git a/Cargo.toml b/Cargo.toml index 0bb846476446..03d0071279a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "crates/net/rpc", "crates/net/rpc-api", "crates/net/rpc-types", + "crates/net/headers-downloaders", "crates/primitives", "crates/stages", "crates/transaction-pool", diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index c7f6a11e51ae..10aef492635f 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -7,7 +7,8 @@ use tokio::sync::watch::Receiver; /// Consensus is a protocol that chooses canonical chain. /// We are checking validity of block header here. #[async_trait] -pub trait Consensus { +#[auto_impl::auto_impl(&, Arc)] +pub trait Consensus: Send + Sync { /// Get a receiver for the fork choice state fn fork_choice_state(&self) -> Receiver; diff --git a/crates/interfaces/src/p2p/headers/downloader.rs b/crates/interfaces/src/p2p/headers/downloader.rs index 00e1a7049a46..680867b3cd83 100644 --- a/crates/interfaces/src/p2p/headers/downloader.rs +++ b/crates/interfaces/src/p2p/headers/downloader.rs @@ -50,15 +50,16 @@ pub enum DownloadError { } impl DownloadError { - /// Returns bool indicating whether this error is retryable or fatal + /// Returns bool indicating whether this error is retryable or fatal, in the cases + /// where the peer responds with no headers, or times out. pub fn is_retryable(&self) -> bool { - matches!(self, DownloadError::NoHeaderResponse { .. }) + matches!(self, DownloadError::NoHeaderResponse { .. } | DownloadError::Timeout { .. }) } } /// The header downloading strategy #[async_trait] -pub trait Downloader: Sync + Send + Debug { +pub trait Downloader: Sync + Send { /// The Consensus used to verify block validity when /// downloading type Consensus: Consensus; diff --git a/crates/net/headers-downloaders/Cargo.toml b/crates/net/headers-downloaders/Cargo.toml new file mode 100644 index 000000000000..3b8c51f1f778 --- /dev/null +++ b/crates/net/headers-downloaders/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "reth-headers-downloaders" +version = "0.1.0" +edition = "2021" +license = "MIT OR Apache-2.0" +repository = "https://github.com/foundry-rs/reth" +readme = "README.md" +description = "Implementations of various header downloader" + +[dependencies] +async-trait = "0.1.58" +reth-interfaces = { path = "../../interfaces" } +reth-primitives = { path = "../../primitives" } +reth-rpc-types = { path = "../rpc-types" } + +[dev-dependencies] +assert_matches = "1.5.0" +once_cell = "1.15.0" +rand = "0.8.5" +reth-interfaces = { path = "../../interfaces", features = ["test-helpers"] } +tokio = { version = "1.21.2", features = ["full"] } +serial_test = "0.9.0" diff --git a/crates/net/headers-downloaders/src/lib.rs b/crates/net/headers-downloaders/src/lib.rs new file mode 100644 index 000000000000..5e97cff8e4b4 --- /dev/null +++ b/crates/net/headers-downloaders/src/lib.rs @@ -0,0 +1,11 @@ +#![warn(missing_docs, unreachable_pub)] +#![deny(unused_must_use, rust_2018_idioms)] +#![doc(test( + no_crate_inject, + attr(deny(warnings, rust_2018_idioms), allow(dead_code, unused_variables)) +))] + +//! Implements Header Downloader algorithms + +/// A Linear downloader implementation. +pub mod linear; diff --git a/crates/net/headers-downloaders/src/linear.rs b/crates/net/headers-downloaders/src/linear.rs new file mode 100644 index 000000000000..57dfb875cef8 --- /dev/null +++ b/crates/net/headers-downloaders/src/linear.rs @@ -0,0 +1,419 @@ +use std::{borrow::Borrow, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use reth_interfaces::{ + consensus::Consensus, + p2p::headers::{ + client::{HeadersClient, HeadersStream}, + downloader::{DownloadError, Downloader}, + }, +}; +use reth_primitives::{rpc::BlockId, HeaderLocked}; +use reth_rpc_types::engine::ForkchoiceState; + +/// Download headers in batches +#[derive(Debug)] +pub struct LinearDownloader { + /// The consensus client + consensus: Arc, + /// The headers client + client: Arc, + /// The batch size per one request + pub batch_size: u64, + /// A single request timeout + pub request_timeout: Duration, + /// The number of retries for downloading + pub request_retries: usize, +} + +#[async_trait] +impl Downloader for LinearDownloader { + type Consensus = C; + type Client = H; + + fn consensus(&self) -> &Self::Consensus { + self.consensus.borrow() + } + + fn client(&self) -> &Self::Client { + self.client.borrow() + } + + /// The request timeout + fn timeout(&self) -> Duration { + self.request_timeout + } + + /// Download headers in batches with retries. + /// Returns the header collection in sorted descending + /// order from chain tip to local head + async fn download( + &self, + head: &HeaderLocked, + forkchoice: &ForkchoiceState, + ) -> Result, DownloadError> { + let mut stream = self.client().stream_headers().await; + let mut retries = self.request_retries; + + // Header order will be preserved during inserts + let mut out = vec![]; + loop { + let result = self.download_batch(&mut stream, forkchoice, head, out.last()).await; + match result { + Ok(result) => match result { + LinearDownloadResult::Batch(mut headers) => { + out.append(&mut headers); + } + LinearDownloadResult::Finished(mut headers) => { + out.append(&mut headers); + return Ok(out) + } + LinearDownloadResult::Ignore => (), + }, + Err(e) if e.is_retryable() && retries > 1 => { + retries -= 1; + } + Err(e) => return Err(e), + } + } + } +} + +/// The intermediate download result +#[derive(Debug)] +pub enum LinearDownloadResult { + /// Downloaded last batch up to tip + Finished(Vec), + /// Downloaded batch + Batch(Vec), + /// Ignore this batch + Ignore, +} + +impl LinearDownloader { + async fn download_batch( + &self, + stream: &mut HeadersStream, + forkchoice: &ForkchoiceState, + head: &HeaderLocked, + earliest: Option<&HeaderLocked>, + ) -> Result { + // Request headers starting from tip or earliest cached + let start = earliest.map_or(forkchoice.head_block_hash, |h| h.parent_hash); + let mut headers = + self.download_headers(stream, BlockId::Hash(start), self.batch_size).await?; + headers.sort_unstable_by_key(|h| h.number); + + let mut out = Vec::with_capacity(headers.len()); + // Iterate headers in reverse + for parent in headers.into_iter().rev() { + let parent = parent.lock(); + + if head.hash() == parent.hash() { + // We've reached the target + return Ok(LinearDownloadResult::Finished(out)) + } + + match out.last().or(earliest) { + Some(header) => { + match self.validate(header, &parent) { + // ignore mismatched headers + Err(DownloadError::MismatchedHeaders { .. }) => { + return Ok(LinearDownloadResult::Ignore) + } + // propagate any other error if any + Err(e) => return Err(e), + // proceed to insert if validation is successful + _ => (), + }; + } + // The buffer is empty and the first header does not match the tip, discard + // TODO: penalize the peer? + None if parent.hash() != forkchoice.head_block_hash => { + return Ok(LinearDownloadResult::Ignore) + } + _ => (), + }; + + out.push(parent); + } + + Ok(LinearDownloadResult::Batch(out)) + } +} + +/// The builder for [LinearDownloader] with +/// some default settings +#[derive(Debug)] +pub struct LinearDownloadBuilder { + /// The batch size per one request + batch_size: u64, + /// A single request timeout + request_timeout: Duration, + /// The number of retries for downloading + request_retries: usize, +} + +impl Default for LinearDownloadBuilder { + fn default() -> Self { + Self { batch_size: 100, request_timeout: Duration::from_millis(100), request_retries: 5 } + } +} + +impl LinearDownloadBuilder { + /// Initialize a new builder + pub fn new() -> Self { + Self::default() + } + + /// Set the request batch size + pub fn batch_size(mut self, size: u64) -> Self { + self.batch_size = size; + self + } + + /// Set the request timeout + pub fn timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// Set the number of retries per request + pub fn retries(mut self, retries: usize) -> Self { + self.request_retries = retries; + self + } + + /// Build [LinearDownloader] with provided consensus + /// and header client implementations + pub fn build( + self, + consensus: Arc, + client: Arc, + ) -> LinearDownloader { + LinearDownloader { + consensus, + client, + batch_size: self.batch_size, + request_timeout: self.request_timeout, + request_retries: self.request_retries, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use reth_interfaces::{ + p2p::headers::client::HeadersRequest, + test_helpers::{TestConsensus, TestHeadersClient}, + }; + use reth_primitives::{rpc::BlockId, HeaderLocked, H256}; + + use assert_matches::assert_matches; + use once_cell::sync::Lazy; + use serial_test::serial; + use tokio::sync::oneshot::{self, error::TryRecvError}; + + static CONSENSUS: Lazy> = Lazy::new(|| Arc::new(TestConsensus::default())); + static CONSENSUS_FAIL: Lazy> = Lazy::new(|| { + let mut consensus = TestConsensus::default(); + consensus.set_fail_validation(true); + Arc::new(consensus) + }); + + static CLIENT: Lazy> = + Lazy::new(|| Arc::new(TestHeadersClient::default())); + + #[tokio::test] + #[serial] + async fn download_timeout() { + let retries = 5; + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new() + .retries(retries) + .build(CONSENSUS.clone(), CLIENT.clone()); + let result = + downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut requests = vec![]; + CLIENT + .on_header_request(retries, |_id, req| { + requests.push(req); + }) + .await; + assert_eq!(requests.len(), retries); + assert_matches!(rx.await, Ok(Err(DownloadError::NoHeaderResponse { .. }))); + } + + #[tokio::test] + #[serial] + async fn download_timeout_on_invalid_messages() { + let retries = 5; + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new() + .retries(retries) + .build(CONSENSUS.clone(), CLIENT.clone()); + let result = + downloader.download(&HeaderLocked::default(), &ForkchoiceState::default()).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut num_of_reqs = 0; + let mut last_req_id: Option = None; + + CLIENT + .on_header_request(retries, |id, _req| { + num_of_reqs += 1; + last_req_id = Some(id); + CLIENT.send_header_response(id.saturating_add(id % 2), vec![]); + }) + .await; + + assert_eq!(num_of_reqs, retries); + assert_matches!( + rx.await, + Ok(Err(DownloadError::NoHeaderResponse { request_id })) if request_id == last_req_id.unwrap() + ); + } + + #[tokio::test] + #[serial] + async fn download_propagates_consensus_validation_error() { + let tip_parent = gen_random_header(1, None); + let tip = gen_random_header(2, Some(tip_parent.hash())); + let tip_hash = tip.hash(); + + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = + LinearDownloadBuilder::new().build(CONSENSUS_FAIL.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&HeaderLocked::default(), &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + let requests = CLIENT.on_header_request(1, |id, req| (id, req)).await; + let request = requests.last(); + assert_matches!( + request, + Some((_, HeadersRequest { start, .. })) + if matches!(start, BlockId::Hash(hash) if *hash == tip_hash) + ); + + let request = request.unwrap(); + CLIENT.send_header_response( + request.0, + vec![tip_parent.clone().unlock(), tip.clone().unlock()], + ); + + assert_matches!( + rx.await, + Ok(Err(DownloadError::HeaderValidation { hash, .. })) if hash == tip_parent.hash() + ); + } + + #[tokio::test] + #[serial] + async fn download_starts_with_chain_tip() { + let head = gen_random_header(1, None); + let tip = gen_random_header(2, Some(head.hash())); + + let tip_hash = tip.hash(); + let chain_head = head.clone(); + let (tx, mut rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&chain_head, &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + CLIENT + .on_header_request(1, |id, _req| { + let mut corrupted_tip = tip.clone().unlock(); + corrupted_tip.nonce = rand::random(); + CLIENT.send_header_response(id, vec![corrupted_tip, head.clone().unlock()]) + }) + .await; + assert_matches!(rx.try_recv(), Err(TryRecvError::Empty)); + + CLIENT + .on_header_request(1, |id, _req| { + CLIENT.send_header_response(id, vec![tip.clone().unlock(), head.clone().unlock()]) + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(ref val)) if val.len() == 1); + assert_eq!(*result.unwrap().unwrap().first().unwrap(), tip); + } + + #[tokio::test] + #[serial] + async fn download_returns_headers_desc() { + let (start, end) = (100, 200); + let head = gen_random_header(start, None); + let mut headers = gen_block_range(start + 1..end, head.hash()); + headers.reverse(); + + let tip_hash = headers.first().unwrap().hash(); + let chain_head = head.clone(); + let (tx, rx) = oneshot::channel(); + tokio::spawn(async move { + let downloader = LinearDownloadBuilder::new().build(CONSENSUS.clone(), CLIENT.clone()); + let forkchoice = ForkchoiceState { head_block_hash: tip_hash, ..Default::default() }; + let result = downloader.download(&chain_head, &forkchoice).await; + tx.send(result).expect("failed to forward download response"); + }); + + let mut idx = 0; + let chunk_size = 10; + // `usize::div_ceil` is unstable. ref: https://github.com/rust-lang/rust/issues/88581 + let count = (headers.len() + chunk_size - 1) / chunk_size; + CLIENT + .on_header_request(count + 1, |id, _req| { + let mut chunk = + headers.iter().skip(chunk_size * idx).take(chunk_size).cloned().peekable(); + idx += 1; + if chunk.peek().is_some() { + let headers: Vec<_> = chunk.map(|h| h.unlock()).collect(); + CLIENT.send_header_response(id, headers); + } else { + CLIENT.send_header_response(id, vec![head.clone().unlock()]) + } + }) + .await; + + let result = rx.await; + assert_matches!(result, Ok(Ok(_))); + let result = result.unwrap().unwrap(); + assert_eq!(result.len(), headers.len()); + assert_eq!(result, headers); + } + + pub(crate) fn gen_block_range(rng: std::ops::Range, head: H256) -> Vec { + let mut headers = Vec::with_capacity(rng.end.saturating_sub(rng.start) as usize); + for idx in rng { + headers.push(gen_random_header( + idx, + Some(headers.last().map(|h: &HeaderLocked| h.hash()).unwrap_or(head)), + )); + } + headers + } + + pub(crate) fn gen_random_header(number: u64, parent: Option) -> HeaderLocked { + let header = reth_primitives::Header { + number, + nonce: rand::random(), + parent_hash: parent.unwrap_or_default(), + ..Default::default() + }; + header.lock() + } +} diff --git a/crates/net/rpc-types/src/eth/engine.rs b/crates/net/rpc-types/src/eth/engine.rs index 2559f40677e6..85850f2326de 100644 --- a/crates/net/rpc-types/src/eth/engine.rs +++ b/crates/net/rpc-types/src/eth/engine.rs @@ -25,7 +25,7 @@ pub struct ExecutionPayload { } /// This structure encapsulates the fork choice state -#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ForkchoiceState { pub head_block_hash: H256,