Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor generation of snapshots from the cli #5464

Merged
merged 14 commits into from
Nov 24, 2023
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/reth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ humantime = "2.1.0"
const-str = "0.5.6"
boyer-moore-magiclen = "0.2.16"
itertools.workspace = true
rayon.workspace = true

[target.'cfg(not(windows))'.dependencies]
jemallocator = { version = "0.5.0", optional = true }
Expand Down
48 changes: 9 additions & 39 deletions bin/reth/src/db/snapshots/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,22 @@ use super::{
Command,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, snapshot::HeaderMask};
use reth_db::{open_db_read_only, snapshot::HeaderMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction},
BlockHash, ChainSpec, Header, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, HeaderProvider, ProviderError,
ProviderFactory, TransactionsProviderExt,
providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory,
TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};
use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_headers_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment = segments::Headers::new(compression, filters);

segment.snapshot::<DB>(provider, PathBuf::default(), range.clone())?;

// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Headers.filename(&range, &tx_range),
SnapshotSegment::Headers.filename_with_configuration(
filters,
compression,
&range,
&tx_range,
),
)?;

Ok(())
}

pub(crate) fn bench_headers_snapshot(
&self,
db_path: &Path,
Expand All @@ -62,14 +28,18 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone());
let provider = factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();

let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let block_range = self.block_range();

let mut row_indexes = block_range.clone().collect::<Vec<_>>();
let mut rng = rand::thread_rng();

Expand Down
151 changes: 121 additions & 30 deletions bin/reth/src/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
use clap::Parser;
use itertools::Itertools;
use reth_db::{open_db_read_only, DatabaseEnv};
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use reth_db::{database::Database, open_db_read_only, DatabaseEnv};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::NippyJar;
use reth_primitives::{
snapshot::{Compression, InclusionFilter, PerfectHashingFunction},
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::ProviderFactory;
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use reth_provider::{BlockNumReader, ProviderFactory, TransactionsProviderExt};
use reth_snapshot::{segments as snap_segments, segments::Segment};
use std::{
ops::RangeInclusive,
path::{Path, PathBuf},
sync::Arc,
time::Instant,
};

mod bench;
mod headers;
Expand All @@ -28,6 +36,14 @@ pub struct Command {
#[arg(long, short, default_value = "500000")]
block_interval: u64,

/// Number of snapshots built in parallel.
#[arg(long, short, default_value = "1")]
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
parallel: u64,

/// Flag to skip snapshot creation and print snapshot files stats.
#[arg(long, default_value = "false")]
only_stats: bool,

/// Flag to enable database-to-snapshot benchmarking.
#[arg(long, default_value = "false")]
bench: bool,
Expand All @@ -41,7 +57,7 @@ pub struct Command {
compression: Vec<Compression>,

/// Flag to enable inclusion list filters and PHFs.
#[arg(long, default_value = "true")]
#[arg(long, default_value = "false")]
with_filters: bool,

/// Specifies the perfect hashing function to use.
Expand All @@ -65,39 +81,36 @@ impl Command {

{
let db = open_db_read_only(db_path, None)?;
let factory = ProviderFactory::new(db, chain.clone());
let provider = factory.provider()?;
let factory = Arc::new(ProviderFactory::new(db, chain.clone()));

if !self.only_bench {
for ((mode, compression), phf) in all_combinations.clone() {
let filters = if self.with_filters {
Filters::WithFilters(InclusionFilter::Cuckoo, *phf)
} else {
Filters::WithoutFilters
};

match mode {
SnapshotSegment::Headers => self.generate_headers_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Headers::new(*compression, filters),
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Transactions::new(*compression, filters),
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Receipts::new(*compression, filters),
)?,
SnapshotSegment::Transactions => self
.generate_transactions_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
SnapshotSegment::Receipts => self
.generate_receipts_snapshot::<DatabaseEnv>(
&provider,
*compression,
InclusionFilter::Cuckoo,
*phf,
)?,
}
}
}
}

if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
Expand Down Expand Up @@ -130,8 +143,86 @@ impl Command {
Ok(())
}

/// Gives out the inclusive block range for the snapshot requested by the user.
fn block_range(&self) -> RangeInclusive<BlockNumber> {
self.from..=(self.from + self.block_interval - 1)
/// Generates successive inclusive block ranges up to the tip starting at `self.from`.
fn block_ranges(&self, tip: BlockNumber) -> Vec<RangeInclusive<BlockNumber>> {
let mut from = self.from;
let mut ranges = Vec::new();

while from <= tip {
let end_range = std::cmp::min(from + self.block_interval - 1, tip);
ranges.push(from..=end_range);
from = end_range + 1;
}

ranges
}

/// Generates snapshots from `self.from` with a `self.block_interval`. Generates them in
/// parallel if specified.
fn generate_snapshot<DB: Database>(
&self,
factory: Arc<ProviderFactory<DB>>,
segment: impl Segment + Send + Sync,
) -> eyre::Result<()> {
let dir = PathBuf::default();
let ranges = self.block_ranges(factory.last_block_number()?);

let mut created_snapshots = vec![];

// Filter/PHF is memory intensive, so we have to limit the parallelism.
for block_ranges in ranges.chunks(self.parallel as usize) {
let created_files = block_ranges
.into_par_iter()
.map(|block_range| {
let provider = factory.provider()?;

if !self.only_stats {
segment.snapshot::<DB>(&provider, &dir, block_range.clone())?;
}

let tx_range =
provider.transaction_range_by_block_range(block_range.clone())?;

Ok(segment.segment().filename(block_range, &tx_range))
})
.collect::<Result<Vec<_>, eyre::Report>>()?;

created_snapshots.extend(created_files);
}

self.stats(created_snapshots)
}

/// Prints detailed statistics for each snapshot, including loading time.
///
/// This function loads each snapshot from the provided paths and prints
/// statistics about various aspects of each snapshot, such as filters size,
/// offset index size, offset list size, and loading time.
fn stats(&self, snapshots: Vec<impl AsRef<Path>>) -> eyre::Result<()> {
for snap in &snapshots {
let start_time = Instant::now();
let jar = NippyJar::<SegmentHeader>::load(snap.as_ref())?;
let duration = start_time.elapsed();

println!("Snapshot: {:?}", snap.as_ref().file_name());
println!(
" Filters Size: {:>7.2} MB",
jar.filter_size() as f64 / (1024.0 * 1024.0)
);
println!(
" Offset Index Size: {:>7.2} MB",
jar.offsets_index_size() as f64 / (1024.0 * 1024.0)
);
println!(
" Offset List Size: {:>7.2} MB",
jar.offsets_size() as f64 / (1024.0 * 1024.0)
);
println!(
" Loading Time: {:>7.2} ms | {:>7.2} µs",
duration.as_millis() as f64,
duration.as_micros() as f64
);
}
Ok(())
}
}
48 changes: 10 additions & 38 deletions bin/reth/src/db/snapshots/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,55 +3,23 @@ use super::{
Command, Compression, PerfectHashingFunction,
};
use rand::{seq::SliceRandom, Rng};
use reth_db::{database::Database, open_db_read_only, snapshot::ReceiptMask};
use reth_db::{open_db_read_only, snapshot::ReceiptMask};
use reth_interfaces::db::LogLevel;
use reth_primitives::{
snapshot::{Filters, InclusionFilter},
ChainSpec, Receipt, SnapshotSegment,
};
use reth_provider::{
providers::SnapshotProvider, DatabaseProviderRO, ProviderError, ProviderFactory,
ReceiptProvider, TransactionsProvider, TransactionsProviderExt,
providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider,
TransactionsProvider, TransactionsProviderExt,
};
use reth_snapshot::{segments, segments::Segment};

use std::{
path::{Path, PathBuf},
sync::Arc,
};

impl Command {
pub(crate) fn generate_receipts_snapshot<DB: Database>(
&self,
provider: &DatabaseProviderRO<'_, DB>,
compression: Compression,
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let block_range = self.block_range();
let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let segment: segments::Receipts = segments::Receipts::new(compression, filters);
segment.snapshot::<DB>(provider, PathBuf::default(), block_range.clone())?;

// Default name doesn't have any configuration
let tx_range = provider.transaction_range_by_block_range(block_range.clone())?;
reth_primitives::fs::rename(
SnapshotSegment::Receipts.filename(&block_range, &tx_range),
SnapshotSegment::Receipts.filename_with_configuration(
filters,
compression,
&block_range,
&tx_range,
),
)?;

Ok(())
}

pub(crate) fn bench_receipts_snapshot(
&self,
db_path: &Path,
Expand All @@ -61,14 +29,18 @@ impl Command {
inclusion_filter: InclusionFilter,
phf: PerfectHashingFunction,
) -> eyre::Result<()> {
let factory = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone());
let provider = factory.provider()?;
let tip = provider.last_block_number()?;
let block_range =
self.block_ranges(tip).first().expect("has been generated before").clone();

let filters = if self.with_filters {
Filters::WithFilters(inclusion_filter, phf)
} else {
Filters::WithoutFilters
};

let block_range = self.block_range();

let mut rng = rand::thread_rng();

let tx_range = ProviderFactory::new(open_db_read_only(db_path, log_level)?, chain.clone())
Expand Down
Loading
Loading