-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
wip: feat(sync): headers stage #58
Conversation
6a03b56
to
de1af72
Compare
de1af72
to
71dcd0c
Compare
71dcd0c
to
07b9266
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crates/stages/src/stages/headers.rs
Outdated
#[error(transparent)] | ||
Internal(Box<dyn std::error::Error + Send + Sync>), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you try doing trait HeaderClient { type Error: std::error::Error + Send + Sync }
, and try using <H as HeaderClient>::Error
instead of box dyn? This same thing should be across all stages, would rather avoid having internal box dyn errors bc they're hard to test
crates/stages/src/stages/headers.rs
Outdated
cursor_header_number.put( | ||
hash.to_fixed_bytes().to_vec(), | ||
header.number, | ||
Some(WriteFlags::APPEND), | ||
)?; | ||
cursor_header.put((header.number, hash), header, Some(WriteFlags::APPEND))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to write the header hash/header keys + values in order, or can we do them in any order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'd assume they would have to be ordered because if any intermediate error occurs, we need to be able to walk the db somehow to unwind
crates/stages/src/stages/headers.rs
Outdated
let mut out = Vec::<HeaderLocked>::new(); | ||
loop { | ||
match self.download_batch(head, &forkchoice_state, &mut stream, &mut out).await { | ||
Ok(done) => { | ||
if done { | ||
return Ok(out) | ||
} | ||
} | ||
Err(e) if e.is_retryable() && retries > 0 => { | ||
retries -= 1; | ||
} | ||
Err(e) => return Err(e), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we take a mapped Stream-like API approach here?
async fn execute(&self, input) -> ... {
// get head
let (state_tip, _) = self.next_forkchoice_state(&head.hash()).await;
let headers = self.download(state_tip).await?;
// handle the error
// get the tx cursors and write
}
/// Requests new headers starting from the specified block.
async fn request_headers<T: Into<BlockId>>(&self, start: T) -> impl Stream <...> {
// Send a P2P request to make the stream start receiving values, if we don't send
// this out, it's possible we won't receive any headers that we care about.
// This is going to be a no-op I suspect in the test impls
let request = HeaderRequest { start: start.into(), limit: self.batch_size, reverse: true };
let request_id = rand::thread_rng().gen();
self.client.send_header_request(request_id, request).timeout(self.timeout).await;
// This should return an `impl Stream<Item = (u64, Vec<Header>)>`
// The headers in the stream are both unvalidated and unordered header
self.client.stream_headers()
}
async fn download(&self, start: BlockId) -> Result<Vec<Header>> {
// Get the stream
let stream = self.request_headers(start).await?;
// This makes the stream retryable
let stream = RetriableStream::new(stream);
// This makes the stream timeout (or whatever configurable timeout)
// if no headers are received back for a particular request
let stream = stream.timeout(5);
// Filter the stream's output for only non-empty headers & responses
// that match our request_Id
let filtered_stream = stream.filter(|(id, headers)| request_id == id && !headers.is_empty());
let headers = {
// maybe can avoid collecting if we can operate on ordered stream somehow?
// https://docs.rs/ordered-stream/0.0.1/ordered_stream/
let mut h = filtered_stream.collect::<Vec<_>>();
h.sort_unstable_by_key(|h| h.number);
h
}
// TODO: Investigate if this can be done in parallel by validating sorted
// buckets in parallel and checking the boundaries between each bucket
self.consensus.validate_headers(&headers, start).await?;
Ok(headers)
}
cc @mattsse I feel like this should be doable as a cleaner abstraction. Avoid the loop
s and while
s as much as possible, stay in the stream abstraction, collect & sort only when we want to run verification.
crates/stages/src/stages/headers.rs
Outdated
let mut state_rcv = self.consensus.forkchoice_state(); | ||
loop { | ||
state_rcv.changed().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible that this deadlocks things somehow if we wait?
Should we instead have a self.consensus.tip()
method which immediately returns H256
instead of having a receiver? Main tradeoff would be that maybe we receive an old tip, and we need to run an extra run of the loop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that makes sense. should we simply early exit on tip that was already processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I think so? @rakita
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am confused a little bit here, is forkchoice in essence forkid? In forkid EIP there is (passed_block_hash, next_fork_number) https://eips.ethereum.org/EIPS/eip-2364.
What I am thinking about the flow of calls that can potentially deadlock is something in sense of:
HeaderStage -> calls fork_choice -> Consensus
Consensus -> pushed new_block -> HeaderStage.
We need to know where mutexes are
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rakita it gives you the current tip and the last finalized block. ForkId is used in eth p2p which we already have in master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps this is a stupid question but can headers be downloaded concurrently?
@@ -11,6 +11,7 @@ description = "Commonly used types in reth." | |||
ethers-core = { git = "https://github.com/gakonst/ethers-rs", default-features = false } | |||
bytes = "1.2" | |||
serde = "1.0" | |||
ethereum-types = { version = "0.13.1", default-features = false } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's missing from primitives
that makes this necessary?
// For uint to hash conversion | ||
pub use ethereum_types::BigEndianHash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, will add this to primitives
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks ❤️🔥
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkrasiuk this is done^
@mattsse currently, we download them in batches by hash, when we receive the batch - we change the cursor to the earliest header hash within that batch. I've been thinking about this yesterday as well, we could optimistically request the headers by block number, unless there is smth preventing us from doing that cc @gakonst |
If I understand that correctly that it really depends on how we request them, If the range is not full, then we need to send follow-up request though, right? Worth thinking about and outlining how and where requests are sent to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice progress!
// Unwrap the latest stream message which will be either | ||
// the msg with headers or timeout error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure we want only the next stream message? could it be that there's >1 messages and we need to process them in a loop? or do we just leave that for the next time the stage will be executed in the loop? @onbjerg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we only send 1 request for a range of headers and we perform a check above on line 71 to match the response with the request we send I think it's ok if this is the direction we are going, I don't think we're going to end up in a place where we have more than 1 message waiting for us here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, rethinking this: We might want to request a large range of blocks (let's say 100K just as an example), but in order to not use a lot of memory at a time it might make sense for the downloader to send smaller batches (e.g. 1K blocks or something like that). I guess we should accomodate that? In that case, we would get more than 1 message per request.
// Iterate the headers in reverse | ||
out.reserve_exact(headers.len()); | ||
let mut headers_rev = headers.into_iter().rev(); | ||
while let Some(parent) = headers_rev.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add some docs explaining this? i also don't really like the loop
appending to out
, can we instead make this return the Headers and do out.extend_from_slice(&self.download_batch(..)
or something
/// Strategy for downloading the headers | ||
pub downloader: Arc<dyn Downloader>, | ||
/// Consensus client implementation | ||
pub consensus: Arc<dyn Consensus>, | ||
/// Downloader client implementation | ||
pub client: Arc<dyn HeadersClient>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these can be generics instead of dyns?
|
||
let mut out = Vec::<HeaderLocked>::new(); | ||
loop { | ||
match self.download_batch(head, tip, &mut stream, &mut out).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally this should be let headers = stream.download(head, tip)
or something similar (pass the consensus by ref to Stream::download
// For uint to hash conversion | ||
pub use ethereum_types::BigEndianHash; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rkrasiuk this is done^
@@ -0,0 +1,466 @@ | |||
use super::downloader::{DownloadError, Downloader}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattsse PTAL this feels a bit convoluted but I could be wrong
* feat(interfaces): auto impl for ref/arc/box * feat(downloader): make consensus part of the downloader and a generic * impl generic for linear dl * impl generic for parallel dl * test(headers): make it work with generics * chore: rm dead code Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
closing in favor of #126 |
♻️ remove optimism config, replace with boolean flag
…net (paradigmxyz#58) * chore: fix system account issue and hertz storage patch issue on testnet * fix CI issues * fix review comments * fix CI issues
still WIP, currently doesn't compile, because of db types
This is the basic implementation of the headers stage. The current steps are:
Things left to be done: