diff --git a/Cargo.lock b/Cargo.lock index a9f68173a8209..a1af391c5cac7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1847,6 +1847,7 @@ version = "0.1.0" dependencies = [ "async-trait", "auto_impl", + "futures", "reth-primitives", "reth-rpc-types", "thiserror", @@ -1914,7 +1915,10 @@ name = "reth-stages" version = "0.1.0" dependencies = [ "async-trait", + "futures", + "rand", "reth-db", + "reth-interfaces", "reth-primitives", "tempfile", "thiserror", diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index e6d88f7da8972..42da856618b4f 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -13,3 +13,4 @@ async-trait = "0.1.57" thiserror = "1.0.37" auto_impl = "1.0" tokio = { version = "1.21.2", features = ["sync"] } +futures = "0.3" diff --git a/crates/interfaces/src/consensus.rs b/crates/interfaces/src/consensus.rs index d67d074af6e06..e6e3e5c47ff9a 100644 --- a/crates/interfaces/src/consensus.rs +++ b/crates/interfaces/src/consensus.rs @@ -7,12 +7,12 @@ use tokio::sync::watch::Receiver; /// Consensus is a protocol that chooses canonical chain. /// We are checking validity of block header here. #[async_trait] -pub trait Consensus { +pub trait Consensus: Sync + Send { /// Get a receiver for the fork choice state - fn fork_choice_state(&self) -> Receiver; + fn forkchoice_state(&self) -> Receiver; /// Validate if header is correct and follows consensus specification - fn validate_header(&self, _header: &Header) -> Result<(), Error> { + fn validate_header(&self, _header: &Header, _parent: &Header) -> Result<(), Error> { Ok(()) } } diff --git a/crates/interfaces/src/lib.rs b/crates/interfaces/src/lib.rs index c7624b35de943..508551a40fb1a 100644 --- a/crates/interfaces/src/lib.rs +++ b/crates/interfaces/src/lib.rs @@ -12,3 +12,6 @@ pub mod executor; /// Consensus traits. pub mod consensus; + +/// Stage sync related traits +pub mod stages; diff --git a/crates/interfaces/src/stages.rs b/crates/interfaces/src/stages.rs new file mode 100644 index 0000000000000..9856dcf8ac00e --- /dev/null +++ b/crates/interfaces/src/stages.rs @@ -0,0 +1,32 @@ +use async_trait::async_trait; +use futures::Stream; +use reth_primitives::{rpc::BlockId, Header, H256, H512}; +use std::{collections::HashSet, pin::Pin}; + +/// The stream of messages +pub type MessageStream = Pin + Send>>; + +/// The header request struct +#[derive(Debug)] +pub struct HeaderRequest { + /// The starting block + pub start: BlockId, + /// The response max size + pub limit: u64, + /// Flag indicating whether the blocks should + /// arrive in reverse + pub reverse: bool, +} + +/// The block headers downloader client +#[async_trait] +pub trait HeadersClient: Send + Sync { + /// Update the current node status + async fn update_status(&mut self, height: u64, hash: H256, td: H256); + + /// Send the header request + async fn send_header_request(&self, id: u64, request: HeaderRequest) -> HashSet; + + /// Stream the header response messages + async fn stream_headers(&self) -> MessageStream<(u64, Vec
)>; +} diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 5649e7b8e9971..9f6b3bf604f76 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -98,6 +98,13 @@ impl Deref for HeaderLocked { } impl HeaderLocked { + /// Construct a new locked header. + /// Applicable when hash is known from + /// the database provided it's not corrupted. + pub fn new(header: Header, hash: H256) -> Self { + Self { header, hash } + } + /// Extract raw header that can be modified. pub fn unlock(self) -> Header { self.header diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 23872f5a6b337..bde35d252c937 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -9,12 +9,17 @@ description = "Staged syncing primitives used in reth." [dependencies] reth-primitives = { path = "../primitives" } +reth-interfaces = { path = "../interfaces" } reth-db = { path = "../db" } -async-trait = "0.1.57" thiserror = "1.0.37" tracing = "0.1.36" tracing-futures = "0.2.5" tokio = { version = "1.21.2", features = ["sync"] } +rand = "0.8" # TODO: + +# async/futures +async-trait = "0.1.57" +futures = "0.3" [dev-dependencies] tokio = { version = "*", features = ["rt", "sync", "macros"] } diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 7e4ce6525d761..3e57d12923672 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -12,6 +12,7 @@ mod error; mod id; mod pipeline; mod stage; +mod stages; mod util; pub use error::*; diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs new file mode 100644 index 0000000000000..40e532dba0a04 --- /dev/null +++ b/crates/stages/src/stages/headers.rs @@ -0,0 +1,270 @@ +use async_trait::async_trait; + +use crate::{ExecInput, ExecOutput, Stage, StageError, StageId, UnwindInput, UnwindOutput}; +use futures::StreamExt; +use rand::Rng; +use reth_db::{ + kv::{tables, tx::Tx}, + mdbx::{self, WriteFlags}, +}; +use reth_interfaces::{ + consensus::Consensus, + stages::{HeaderRequest, HeadersClient, MessageStream}, +}; +use reth_primitives::{rpc::BlockId, BlockNumber, Header, HeaderLocked, H256}; +use std::{sync::Arc, time::Duration}; +use thiserror::Error; +use tracing::*; + +const HEADERS: StageId = StageId("HEADERS"); + +// TODO: docs +// TODO: add tracing +pub struct HeaderStage { + pub consensus: Arc, + pub client: Arc, + pub batch_size: u64, + pub request_retries: usize, + pub request_timeout: usize, +} + +#[derive(Error, Debug)] +pub enum DownloadError { + /// Header validation failed + #[error("Failed to validate header {hash} for block {number}. Details: {details}.")] + HeaderValidation { hash: H256, number: BlockNumber, details: String }, + /// No headers reponse received + #[error("Failed to get headers for request {request_id}.")] + NoHeaderResponse { request_id: u64 }, + /// The stage encountered an internal error. + #[error(transparent)] + Internal(Box), +} + +impl DownloadError { + fn is_retryable(&self) -> bool { + matches!(self, DownloadError::NoHeaderResponse { .. }) + } +} + +#[async_trait] +impl<'db, E> Stage<'db, E> for HeaderStage +where + E: mdbx::EnvironmentKind, +{ + fn id(&self) -> StageId { + HEADERS + } + + /// Execute the stage. + async fn execute<'tx>( + &mut self, + tx: &mut Tx<'tx, mdbx::RW, E>, + input: ExecInput, + ) -> Result { + let last_block_num = + input.previous_stage.as_ref().map(|(_, block)| *block).unwrap_or_default(); + // TODO: check if in case of panic the node head needs to be updated + self.update_head(tx, last_block_num).await?; + + let mut stage_progress = last_block_num; + + // download the headers + // TODO: check if some upper block constraint is necessary + let last_hash: H256 = tx.get::(last_block_num)?.unwrap(); // TODO: + let last_header: Header = tx.get::((last_block_num, last_hash))?.unwrap(); // TODO: + let head = HeaderLocked::new(last_header, last_hash); + + let forkchoice_state = self.next_forkchoice_state(&head.hash()).await; + + let headers = match self.download(&head, forkchoice_state).await { + Ok(res) => res, + Err(e) => match e { + DownloadError::NoHeaderResponse { request_id } => { + warn!("no response for request {request_id}"); + return Ok(ExecOutput { stage_progress, reached_tip: false, done: false }) + } + DownloadError::HeaderValidation { hash, number, details } => { + warn!("validation error for header {hash}: {details}"); + return Err(StageError::Validation { block: number }) + } + DownloadError::Internal(e) => return Err(StageError::Internal(e)), + }, + }; + + let mut cursor_header_number = tx.cursor::()?; + let mut cursor_header = tx.cursor::()?; + let mut cursor_canonical = tx.cursor::()?; + let mut cursor_td = tx.cursor::()?; + let mut td = cursor_td.last()?.map(|((_, _), v)| v).unwrap(); // TODO: + + for header in headers { + if header.number == 0 { + continue + } + + let hash = header.hash(); + td += header.difficulty; + + cursor_header_number.put( + hash.to_fixed_bytes().to_vec(), + header.number, + Some(WriteFlags::APPEND), + )?; + cursor_header.put((header.number, hash), header, Some(WriteFlags::APPEND))?; + cursor_canonical.put(header.number, hash, Some(WriteFlags::APPEND))?; + cursor_td.put((header.number, hash), td, Some(WriteFlags::APPEND))?; + + stage_progress = header.number; + } + + Ok(ExecOutput { stage_progress, reached_tip: true, done: true }) + } + + /// Unwind the stage. + async fn unwind<'tx>( + &mut self, + tx: &mut Tx<'tx, mdbx::RW, E>, + input: UnwindInput, + ) -> Result> { + if let Some(bad_block) = input.bad_block { + todo!() + } + + todo!() + } +} + +impl HeaderStage { + async fn update_head<'tx, E: mdbx::EnvironmentKind>( + &self, + tx: &'tx mut Tx<'tx, mdbx::RW, E>, + height: BlockNumber, + ) -> Result<(), StageError> { + let hash = tx.get::(height)?.unwrap(); + let td: Vec = tx.get::((height, hash))?.unwrap(); + self.client.update_status(height, hash, H256::from_slice(&td)); + Ok(()) + } + + async fn next_forkchoice_state(&self, head: &H256) -> (H256, H256) { + let mut state_rcv = self.consensus.forkchoice_state(); + loop { + state_rcv.changed().await; + let forkchoice = state_rcv.borrow(); + if !forkchoice.head_block_hash.is_zero() && forkchoice.head_block_hash != *head { + return (forkchoice.head_block_hash, forkchoice.finalized_block_hash) + } + } + } + + /// Download headers in batches with retries. + /// Returns the header collection in sorted ascending order + async fn download( + &self, + head: &HeaderLocked, + forkchoice_state: (H256, H256), + ) -> Result, DownloadError> { + let mut stream = self.client.stream_headers().await; + // the header order will be preserved during inserts + let mut retries = self.request_retries; + + let mut out = Vec::::new(); + loop { + match self.download_batch(head, &forkchoice_state, &mut stream, &mut out).await { + Ok(done) => { + if done { + return Ok(out) + } + } + Err(e) if e.is_retryable() && retries > 0 => { + retries -= 1; + } + Err(e) => return Err(e), + } + } + } + + /// Request and process the batch of headers + async fn download_batch( + &self, + head: &HeaderLocked, + (state_tip, state_finalized): &(H256, H256), + stream: &mut MessageStream<(u64, Vec
)>, + out: &mut Vec, + ) -> Result { + let request_id = rand::thread_rng().gen(); + let start = BlockId::Hash(out.first().map_or(state_tip.clone(), |h| h.parent_hash)); + let request = HeaderRequest { start, limit: self.batch_size, reverse: true }; + // TODO: timeout + let _ = self.client.send_header_request(request_id, request).await; + + let mut batch = self.receive_headers(stream, request_id).await?; + + out.reserve_exact(batch.len()); + batch.sort_unstable_by_key(|h| h.number); // TODO: revise: par_sort? + + let mut batch_iter = batch.into_iter().rev(); + while let Some(parent) = batch_iter.next() { + let parent = parent.lock(); + + if head.hash() == parent.hash() { + // we are done + return Ok(true) + } + + if let Some(tail_header) = out.first() { + if !(parent.hash() == tail_header.parent_hash && + parent.number + 1 == tail_header.number) + { + // cannot attach to the current buffer + // discard this batch + return Ok(false) + } + + self.consensus.validate_header(&tail_header, &parent).map_err(|e| { + DownloadError::HeaderValidation { + hash: parent.hash(), + details: e.to_string(), + number: parent.number, + } + })?; + } else if parent.hash() != *state_tip { + // the buffer is empty and the first header + // does not match the one we requested + // discard this batch + return Ok(false) + } + + out.insert(0, parent); + } + + Ok(false) + } + + /// Process header message stream and return the request by id. + /// The messages with empty headers are ignored. + async fn receive_headers( + &self, + stream: &mut MessageStream<(u64, Vec
)>, + request_id: u64, + ) -> Result, DownloadError> { + let timeout = tokio::time::sleep(Duration::from_secs(5)); + tokio::pin!(timeout); + let result = loop { + tokio::select! { + msg = stream.next() => { + match msg { + Some((id, headers)) if request_id == id && !headers.is_empty() => break Some(headers), + _ => (), + } + } + _ = &mut timeout => { + break None; + } + } + }; + + result.ok_or(DownloadError::NoHeaderResponse { request_id }) + } +} diff --git a/crates/stages/src/stages/mod.rs b/crates/stages/src/stages/mod.rs new file mode 100644 index 0000000000000..0b97f4357b4aa --- /dev/null +++ b/crates/stages/src/stages/mod.rs @@ -0,0 +1 @@ +mod headers;