Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(op): receipts import, fix chunked read of file with optional block data #10577

Merged
merged 2 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 67 additions & 15 deletions crates/net/downloaders/src/file_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::file_codec::BlockFileCodec;
use std::{collections::HashMap, io, path::Path};

use futures::Future;
use itertools::Either;
use reth_network_p2p::{
Expand All @@ -12,13 +13,16 @@ use reth_network_peers::PeerId;
use reth_primitives::{
BlockBody, BlockHash, BlockHashOrNumber, BlockNumber, Header, SealedHeader, B256,
};
use std::{collections::HashMap, io, path::Path};
use thiserror::Error;
use tokio::{fs::File, io::AsyncReadExt};
use tokio_stream::StreamExt;
use tokio_util::codec::FramedRead;
use tracing::{debug, trace, warn};

use crate::receipt_file_client::FromReceiptReader;

use super::file_codec::BlockFileCodec;

/// Default byte length of chunk to read from chain file.
///
/// Default is 1 GB.
Expand Down Expand Up @@ -85,7 +89,7 @@ impl FileClient {
let mut reader = vec![];
file.read_to_end(&mut reader).await?;

Ok(Self::from_reader(&reader[..], file_len).await?.0)
Ok(Self::from_reader(&reader[..], file_len).await?.file_client)
}

/// Get the tip hash of the chain.
Expand Down Expand Up @@ -184,7 +188,7 @@ impl FromReader for FileClient {
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
{
Expand Down Expand Up @@ -247,7 +251,11 @@ impl FromReader for FileClient {

trace!(target: "downloaders::file", blocks = headers.len(), "Initialized file client");

Ok((Self { headers, hash_to_number, bodies }, remaining_bytes))
Ok(DecodedFileChunk {
file_client: Self { headers, hash_to_number, bodies },
remaining_bytes,
highest_block: None,
})
}
}
}
Expand Down Expand Up @@ -349,6 +357,9 @@ pub struct ChunkedFileReader {
chunk: Vec<u8>,
/// Max bytes per chunk.
chunk_byte_len: u64,
/// Optionally, tracks highest decoded block number. Needed when decoding data that maps * to 1
/// with block number
highest_block: Option<u64>,
}

impl ChunkedFileReader {
Expand All @@ -375,7 +386,7 @@ impl ChunkedFileReader {
let metadata = file.metadata().await?;
let file_byte_len = metadata.len();

Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len })
Ok(Self { file, file_byte_len, chunk: vec![], chunk_byte_len, highest_block: None })
}

/// Calculates the number of bytes to read from the chain file. Returns a tuple of the chunk
Expand All @@ -392,11 +403,9 @@ impl ChunkedFileReader {
}
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
/// Reads bytes from file and buffers as next chunk to decode. Returns byte length of next
/// chunk to read.
async fn read_next_chunk(&mut self) -> Result<Option<u64>, io::Error> {
if self.file_byte_len == 0 && self.chunk.is_empty() {
dbg!(self.chunk.is_empty());
// eof
Expand Down Expand Up @@ -431,12 +440,42 @@ impl ChunkedFileReader {
"new bytes were read from file"
);

Ok(Some(next_chunk_byte_len as u64))
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_chunk<T>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReader,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

// make new file client from chunk
let (file_client, bytes) =
T::from_reader(&self.chunk[..], next_chunk_byte_len as u64).await?;
let DecodedFileChunk { file_client, remaining_bytes, .. } =
T::from_reader(&self.chunk[..], next_chunk_byte_len).await?;

// save left over bytes
self.chunk = bytes;
self.chunk = remaining_bytes;

Ok(Some(file_client))
}

/// Read next chunk from file. Returns [`FileClient`] containing decoded chunk.
pub async fn next_receipts_chunk<T, D>(&mut self) -> Result<Option<T>, T::Error>
where
T: FromReceiptReader<D>,
{
let Some(next_chunk_byte_len) = self.read_next_chunk().await? else { return Ok(None) };

// make new file client from chunk
let DecodedFileChunk { file_client, remaining_bytes, highest_block } =
T::from_receipt_reader(&self.chunk[..], next_chunk_byte_len, self.highest_block)
.await?;

// save left over bytes
self.chunk = remaining_bytes;
// update highest block
self.highest_block = highest_block;

Ok(Some(file_client))
}
Expand All @@ -446,16 +485,29 @@ impl ChunkedFileReader {
pub trait FromReader {
/// Error returned by file client type.
type Error: From<io::Error>;

/// Returns a file client
fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}

/// Output from decoding a file chunk with [`FromReader::from_reader`].
#[derive(Debug)]
pub struct DecodedFileChunk<T> {
/// File client, i.e. the decoded part of chunk.
pub file_client: T,
/// Remaining bytes that have not been decoded, e.g. a partial block or a partial receipt.
pub remaining_bytes: Vec<u8>,
/// Highest block of decoded chunk. This is needed when decoding data that maps * to 1 with
/// block number, like receipts.
pub highest_block: Option<u64>,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 2 additions & 0 deletions crates/net/downloaders/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,5 @@ pub mod file_codec;

#[cfg(any(test, feature = "test-utils"))]
pub mod test_utils;

pub use file_client::{DecodedFileChunk, FileClientError};
115 changes: 57 additions & 58 deletions crates/net/downloaders/src/receipt_file_client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::marker::PhantomData;
use std::{fmt, io, marker::PhantomData};

use futures::Future;
use reth_primitives::{Receipt, Receipts};
Expand All @@ -7,7 +7,7 @@ use tokio_stream::StreamExt;
use tokio_util::codec::{Decoder, FramedRead};
use tracing::trace;

use crate::file_client::{FileClientError, FromReader};
use crate::{DecodedFileChunk, FileClientError};

/// File client for reading RLP encoded receipts from file. Receipts in file must be in sequential
/// order w.r.t. block number.
Expand All @@ -26,67 +26,48 @@ pub struct ReceiptFileClient<D> {
/// Constructs a file client from a reader and decoder.
pub trait FromReceiptReader<D> {
/// Error returned by file client type.
type Error: From<std::io::Error>;
type Error: From<io::Error>;

/// Returns a decoder instance
fn decoder() -> D;

/// Returns a file client
fn from_receipt_reader<B>(
reader: B,
decoder: D,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
prev_chunk_highest_block: Option<u64>,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
Self: Sized,
B: AsyncReadExt + Unpin;
}

impl<D> FromReader for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ Default,
{
type Error = D::Error;

fn from_reader<B>(
reader: B,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
where
B: AsyncReadExt + Unpin,
{
Self::from_receipt_reader(reader, Self::decoder(), num_bytes)
}
}

impl<D> FromReceiptReader<D> for ReceiptFileClient<D>
where
D: Decoder<Item = Option<ReceiptWithBlockNumber>, Error = FileClientError>
+ std::fmt::Debug
+ fmt::Debug
+ Default,
{
type Error = D::Error;

fn decoder() -> D {
Default::default()
D::default()
}

/// Initialize the [`ReceiptFileClient`] from bytes that have been read from file. Caution! If
/// first block has no transactions, it's assumed to be the genesis block.
fn from_receipt_reader<B>(
reader: B,
decoder: D,
num_bytes: u64,
) -> impl Future<Output = Result<(Self, Vec<u8>), Self::Error>>
prev_chunk_highest_block: Option<u64>,
) -> impl Future<Output = Result<DecodedFileChunk<Self>, Self::Error>>
where
B: AsyncReadExt + Unpin,
{
let mut receipts = Receipts::default();

// use with_capacity to make sure the internal buffer contains the entire chunk
let mut stream = FramedRead::with_capacity(reader, decoder, num_bytes as usize);
let mut stream = FramedRead::with_capacity(reader, Self::decoder(), num_bytes as usize);

trace!(target: "downloaders::file",
target_num_bytes=num_bytes,
Expand Down Expand Up @@ -152,10 +133,16 @@ where
block_number = num + receipts.len() as u64;
}
None => {
// this is the first block and it's empty, assume it's the genesis
// block
first_block = Some(0);
block_number = 0;
// this is the first block and it's empty
if let Some(highest_block) = prev_chunk_highest_block {
// this is a chunked read and this is not the first chunk
block_number = highest_block + 1;
} else {
// this is not a chunked read or this is the first chunk. assume
// it's the genesis block
block_number = 0;
}
first_block = Some(block_number);
}
}

Expand Down Expand Up @@ -196,15 +183,16 @@ where
"Initialized receipt file client"
);

Ok((
Self {
Ok(DecodedFileChunk {
file_client: Self {
receipts,
first_block: first_block.unwrap_or_default(),
total_receipts,
_marker: Default::default(),
},
remaining_bytes,
))
highest_block: Some(block_number),
})
}
}
}
Expand All @@ -220,17 +208,16 @@ pub struct ReceiptWithBlockNumber {

#[cfg(test)]
mod test {
use crate::{
file_client::{FileClientError, FromReader},
receipt_file_client::{ReceiptFileClient, ReceiptWithBlockNumber},
};
use alloy_rlp::{Decodable, RlpDecodable};
use reth_primitives::{
hex, Address, Buf, Bytes, BytesMut, Log, LogData, Receipt, TxType, B256,
};
use reth_tracing::init_test_tracing;
use tokio_util::codec::Decoder;

use super::{FromReceiptReader, ReceiptFileClient, ReceiptWithBlockNumber};
use crate::{DecodedFileChunk, FileClientError};

#[derive(Debug, PartialEq, Eq, RlpDecodable)]
struct MockReceipt {
tx_type: u8,
Expand Down Expand Up @@ -578,12 +565,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];

let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();

// 2 non-empty receipt objects
assert_eq!(2, total_receipts);
Expand All @@ -610,12 +601,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];

let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();

// 2 non-empty receipt objects
assert_eq!(2, total_receipts);
Expand Down Expand Up @@ -643,12 +638,16 @@ mod test {
let encoded_byte_len = encoded_receipts.len() as u64;
let reader = &mut &encoded_receipts[..];

let (
ReceiptFileClient { receipts, first_block, total_receipts, _marker },
_remaining_bytes,
) = ReceiptFileClient::<MockReceiptFileCodec>::from_reader(reader, encoded_byte_len)
.await
.unwrap();
let DecodedFileChunk {
file_client: ReceiptFileClient { receipts, first_block, total_receipts, .. },
..
} = ReceiptFileClient::<MockReceiptFileCodec>::from_receipt_reader(
reader,
encoded_byte_len,
None,
)
.await
.unwrap();

// 4 non-empty receipt objects
assert_eq!(4, total_receipts);
Expand Down
Loading
Loading