Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: feat(sync): headers stage #58

Closed
wants to merge 17 commits into from
Closed

Conversation

rkrasiuk
Copy link
Member

@rkrasiuk rkrasiuk commented Oct 13, 2022

still WIP, currently doesn't compile, because of db types

This is the basic implementation of the headers stage. The current steps are:

  1. Query for the chain tip
  2. Download headers in batches in reverse from the chain tip up to the last stored header
  3. Header validation and consistency checks are performed upon receiving them
  4. Store the headers in the db

Things left to be done:

  • fix db types
  • implement unwind
  • add timeouts on batch downloads
  • move downloader to interfaces crate
  • add docs & logging
  • add tests

@rkrasiuk rkrasiuk added C-enhancement New feature or request A-staged-sync Related to staged sync (pipelines and stages) labels Oct 13, 2022
@rkrasiuk rkrasiuk force-pushed the rkrasiuk/headers-stage branch from 6a03b56 to de1af72 Compare October 13, 2022 10:56
@rkrasiuk rkrasiuk changed the title wip: feat(sync): headers stage scaffolding wip: feat(sync): headers stage Oct 13, 2022
@rkrasiuk rkrasiuk force-pushed the rkrasiuk/headers-stage branch from de1af72 to 71dcd0c Compare October 13, 2022 12:05
@rkrasiuk rkrasiuk force-pushed the rkrasiuk/headers-stage branch from 71dcd0c to 07b9266 Compare October 13, 2022 12:19
Copy link
Member

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest taking a more Stream-centric approach vs Akula's loops, cc @mattsse on my proposed design. @rkrasiuk please investigate if the design I suggest is doable, as it feels super readable & easy to optimize.

Comment on lines 44 to 45
#[error(transparent)]
Internal(Box<dyn std::error::Error + Send + Sync>),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you try doing trait HeaderClient { type Error: std::error::Error + Send + Sync }, and try using <H as HeaderClient>::Error instead of box dyn? This same thing should be across all stages, would rather avoid having internal box dyn errors bc they're hard to test

Comment on lines 113 to 118
cursor_header_number.put(
hash.to_fixed_bytes().to_vec(),
header.number,
Some(WriteFlags::APPEND),
)?;
cursor_header.put((header.number, hash), header, Some(WriteFlags::APPEND))?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to write the header hash/header keys + values in order, or can we do them in any order?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd assume they would have to be ordered because if any intermediate error occurs, we need to be able to walk the db somehow to unwind

Comment on lines 176 to 189
let mut out = Vec::<HeaderLocked>::new();
loop {
match self.download_batch(head, &forkchoice_state, &mut stream, &mut out).await {
Ok(done) => {
if done {
return Ok(out)
}
}
Err(e) if e.is_retryable() && retries > 0 => {
retries -= 1;
}
Err(e) => return Err(e),
}
}
Copy link
Member

@gakonst gakonst Oct 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we take a mapped Stream-like API approach here?

async fn execute(&self, input) -> ... {
       // get head
      let (state_tip, _) = self.next_forkchoice_state(&head.hash()).await;
      let headers = self.download(state_tip).await?;
      // handle the error

     // get the tx cursors and write
}

/// Requests new headers starting from the specified block.
async fn request_headers<T: Into<BlockId>>(&self, start: T) -> impl Stream <...> {
    // Send a P2P request to make the stream start receiving values, if we don't send
    // this out, it's possible we won't receive any headers that we care about.
    // This is going to be a no-op I suspect in the test impls
    let request = HeaderRequest { start: start.into(), limit: self.batch_size, reverse: true };
    let request_id = rand::thread_rng().gen();
    self.client.send_header_request(request_id, request).timeout(self.timeout).await;
    
    // This should return an `impl Stream<Item = (u64, Vec<Header>)>`
    // The headers in the stream are both unvalidated and unordered header
    self.client.stream_headers()
}

async fn download(&self, start: BlockId) -> Result<Vec<Header>> {
    // Get the stream
    let stream = self.request_headers(start).await?;

    // This makes the stream retryable
    let stream = RetriableStream::new(stream);
    
    // This makes the stream timeout (or whatever configurable timeout)
    // if no headers are received back for a particular request
    let stream = stream.timeout(5);
    
    // Filter the stream's output for only non-empty headers & responses
    // that match our request_Id
    let filtered_stream = stream.filter(|(id, headers)| request_id == id && !headers.is_empty());
    
    let headers = {
        // maybe can avoid collecting if we can operate on ordered stream somehow?
        // https://docs.rs/ordered-stream/0.0.1/ordered_stream/
        let mut h = filtered_stream.collect::<Vec<_>>();
        h.sort_unstable_by_key(|h| h.number);
        h
    }
    
    // TODO: Investigate if this can be done in parallel by validating sorted
    // buckets in parallel and checking the boundaries between each bucket
    self.consensus.validate_headers(&headers, start).await?;
    
    Ok(headers)
}

cc @mattsse I feel like this should be doable as a cleaner abstraction. Avoid the loops and whiles as much as possible, stay in the stream abstraction, collect & sort only when we want to run verification.

Comment on lines 155 to 157
let mut state_rcv = self.consensus.forkchoice_state();
loop {
state_rcv.changed().await;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible that this deadlocks things somehow if we wait?

Should we instead have a self.consensus.tip() method which immediately returns H256 instead of having a receiver? Main tradeoff would be that maybe we receive an old tip, and we need to run an extra run of the loop?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense. should we simply early exit on tip that was already processed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think so? @rakita

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused a little bit here, is forkchoice in essence forkid? In forkid EIP there is (passed_block_hash, next_fork_number) https://eips.ethereum.org/EIPS/eip-2364.

What I am thinking about the flow of calls that can potentially deadlock is something in sense of:
HeaderStage -> calls fork_choice -> Consensus
Consensus -> pushed new_block -> HeaderStage.
We need to know where mutexes are

Copy link
Member

@gakonst gakonst Oct 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rakita it gives you the current tip and the last finalized block. ForkId is used in eth p2p which we already have in master

@gakonst gakonst mentioned this pull request Oct 14, 2022
23 tasks
Copy link
Collaborator

@mattsse mattsse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps this is a stupid question but can headers be downloaded concurrently?

@@ -11,6 +11,7 @@ description = "Commonly used types in reth."
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false }
bytes = "1.2"
serde = "1.0"
ethereum-types = { version = "0.13.1", default-features = false }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's missing from primitives that makes this necessary?

Comment on lines +49 to +50
// For uint to hash conversion
pub use ethereum_types::BigEndianHash;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, will add this to primitives

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks ❤️‍🔥

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkrasiuk this is done^

@rkrasiuk
Copy link
Member Author

@mattsse currently, we download them in batches by hash, when we receive the batch - we change the cursor to the earliest header hash within that batch. I've been thinking about this yesterday as well, we could optimistically request the headers by block number, unless there is smth preventing us from doing that cc @gakonst

@mattsse
Copy link
Collaborator

mattsse commented Oct 17, 2022

If I understand that correctly that it really depends on how we request them,
if we can request headers from concurrent connections, I think we should be able to request block ranges concurrently.

If the range is not full, then we need to send follow-up request though, right?

Worth thinking about and outlining how and where requests are sent to.

Copy link
Member

@gakonst gakonst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice progress!

Comment on lines 75 to 76
// Unwrap the latest stream message which will be either
// the msg with headers or timeout error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we sure we want only the next stream message? could it be that there's >1 messages and we need to process them in a loop? or do we just leave that for the next time the stage will be executed in the loop? @onbjerg

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we only send 1 request for a range of headers and we perform a check above on line 71 to match the response with the request we send I think it's ok if this is the direction we are going, I don't think we're going to end up in a place where we have more than 1 message waiting for us here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, rethinking this: We might want to request a large range of blocks (let's say 100K just as an example), but in order to not use a lot of memory at a time it might make sense for the downloader to send smaller batches (e.g. 1K blocks or something like that). I guess we should accomodate that? In that case, we would get more than 1 message per request.

Comment on lines 86 to 89
// Iterate the headers in reverse
out.reserve_exact(headers.len());
let mut headers_rev = headers.into_iter().rev();
while let Some(parent) = headers_rev.next() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some docs explaining this? i also don't really like the loop appending to out, can we instead make this return the Headers and do out.extend_from_slice(&self.download_batch(..) or something

Comment on lines 23 to 28
/// Strategy for downloading the headers
pub downloader: Arc<dyn Downloader>,
/// Consensus client implementation
pub consensus: Arc<dyn Consensus>,
/// Downloader client implementation
pub client: Arc<dyn HeadersClient>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these can be generics instead of dyns?


let mut out = Vec::<HeaderLocked>::new();
loop {
match self.download_batch(head, tip, &mut stream, &mut out).await {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally this should be let headers = stream.download(head, tip) or something similar (pass the consensus by ref to Stream::download

Comment on lines +49 to +50
// For uint to hash conversion
pub use ethereum_types::BigEndianHash;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rkrasiuk this is done^

crates/stages/Cargo.toml Show resolved Hide resolved
crates/stages/src/stages/headers/downloader.rs Outdated Show resolved Hide resolved
crates/stages/src/stages/headers/stage.rs Show resolved Hide resolved
crates/stages/src/stages/headers/linear.rs Outdated Show resolved Hide resolved
@@ -0,0 +1,466 @@
use super::downloader::{DownloadError, Downloader};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mattsse PTAL this feels a bit convoluted but I could be wrong

gakonst and others added 4 commits October 19, 2022 18:37
* feat(interfaces): auto impl for ref/arc/box

* feat(downloader): make consensus part of the downloader and a generic

* impl generic for linear dl

* impl generic for parallel dl

* test(headers): make it work with generics

* chore: rm dead code

Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
@rkrasiuk
Copy link
Member Author

closing in favor of #126

@rkrasiuk rkrasiuk closed this Oct 24, 2022
@gakonst gakonst deleted the rkrasiuk/headers-stage branch October 24, 2022 13:59
clabby added a commit to clabby/reth that referenced this pull request Aug 13, 2023
♻️ remove optimism config, replace with boolean flag
yutianwu pushed a commit to yutianwu/reth that referenced this pull request Jul 24, 2024
…net (paradigmxyz#58)

* chore: fix system account issue and hertz storage patch issue on testnet

* fix CI issues

* fix review comments

* fix CI issues
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-staged-sync Related to staged sync (pipelines and stages) C-enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants