Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bench builds #5635

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use criterion::{
BenchmarkGroup, Criterion,
};
use pprof::criterion::{Output, PProfProfiler};
use reth_db::DatabaseEnv;
use reth_db::{test_utils::TempDatabase, DatabaseEnv};
use reth_interfaces::test_utils::TestConsensus;
use reth_primitives::{stage::StageCheckpoint, MAINNET};
use reth_provider::ProviderFactory;
use reth_primitives::stage::StageCheckpoint;
use reth_stages::{
stages::{MerkleStage, SenderRecoveryStage, TotalDifficultyStage, TransactionLookupStage},
test_utils::TestStageDB,
Expand Down Expand Up @@ -122,22 +121,21 @@ fn measure_stage_with_path<F, S>(
stage_range: StageRange,
label: String,
) where
S: Clone + Stage<DatabaseEnv>,
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let tx = TestStageDB::new(&path);
let db = TestStageDB::new(&path);
let (input, _) = stage_range;

group.bench_function(label, move |b| {
b.to_async(FuturesExecutor).iter_with_setup(
|| {
// criterion setup does not support async, so we have to use our own runtime
setup(stage.clone(), &tx, stage_range)
setup(stage.clone(), &db, stage_range)
},
|_| async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(tx.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();
stage
.execute_ready(input)
.await
Expand All @@ -156,7 +154,7 @@ fn measure_stage<F, S>(
block_interval: std::ops::Range<u64>,
label: String,
) where
S: Clone + Stage<DatabaseEnv>,
S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>,
F: Fn(S, &TestStageDB, StageRange),
{
let path = setup::txs_testdata(block_interval.end);
Expand Down
5 changes: 3 additions & 2 deletions crates/stages/benches/setup/account_hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ fn find_stage_range(db: &Path) -> StageRange {
let mut stage_range = None;
TestStageDB::new(db)
.factory
.db_ref()
.view(|tx| {
let mut cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
let from = cursor.first()?.unwrap().0;
Expand Down Expand Up @@ -62,8 +63,8 @@ fn generate_testdata_db(num_blocks: u64) -> (PathBuf, StageRange) {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Account Hashing testdata not found, generating to {:?}", path.display());
let tx = TestStageDB::new(&path);
let provider = tx.provider_rw();
let db = TestStageDB::new(&path);
let provider = db.factory.provider_rw().unwrap();
let _accounts = AccountHashingStage::seed(&provider, opts);
provider.commit().expect("failed to commit");
}
Expand Down
39 changes: 19 additions & 20 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use itertools::concat;
use reth_db::{
cursor::DbCursorRO,
tables,
test_utils::TempDatabase,
transaction::{DbTx, DbTxMut},
DatabaseEnv,
};
Expand All @@ -12,8 +13,7 @@ use reth_interfaces::test_utils::{
random_eoa_account_range,
},
};
use reth_primitives::{Account, Address, SealedBlock, B256, MAINNET};
use reth_provider::ProviderFactory;
use reth_primitives::{Account, Address, SealedBlock, B256, U256};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::TestStageDB,
Expand All @@ -23,6 +23,7 @@ use reth_trie::StateRoot;
use std::{
collections::BTreeMap,
path::{Path, PathBuf},
sync::Arc,
};

mod constants;
Expand All @@ -32,7 +33,7 @@ pub use account_hashing::*;

pub(crate) type StageRange = (ExecInput, UnwindInput);

pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
pub(crate) fn stage_unwind<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
stage: S,
db: &TestStageDB,
range: StageRange,
Expand All @@ -41,16 +42,15 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(

tokio::runtime::Runtime::new().unwrap().block_on(async {
let mut stage = stage.clone();
let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();

// Clear previous run
stage
.unwind(&provider, unwind)
.map_err(|e| {
format!(
"{e}\nMake sure your test database at `{}` isn't too old and incompatible with newer stage changes.",
db.path.as_ref().unwrap().display()
db.factory.db_ref().path().display()
)
})
.unwrap();
Expand All @@ -59,16 +59,15 @@ pub(crate) fn stage_unwind<S: Clone + Stage<DatabaseEnv>>(
});
}

pub(crate) fn unwind_hashes<S: Clone + Stage<DatabaseEnv>>(
pub(crate) fn unwind_hashes<S: Clone + Stage<Arc<TempDatabase<DatabaseEnv>>>>(
stage: S,
db: &TestStageDB,
range: StageRange,
) {
let (input, unwind) = range;

let mut stage = stage.clone();
let factory = ProviderFactory::new(db.factory.db(), MAINNET.clone());
let provider = factory.provider_rw().unwrap();
let provider = db.factory.provider_rw().unwrap();

StorageHashingStage::default().unwind(&provider, unwind).unwrap();
AccountHashingStage::default().unwind(&provider, unwind).unwrap();
Expand Down Expand Up @@ -105,7 +104,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
// create the dirs
std::fs::create_dir_all(&path).unwrap();
println!("Transactions testdata not found, generating to {:?}", path.display());
let tx = TestStageDB::new(&path);
let db = TestStageDB::new(&path);

let accounts: BTreeMap<Address, Account> = concat([
random_eoa_account_range(&mut rng, 0..n_eoa),
Expand All @@ -124,11 +123,11 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
key_range.clone(),
);

tx.insert_accounts_and_storages(start_state.clone()).unwrap();
db.insert_accounts_and_storages(start_state.clone()).unwrap();

// make first block after genesis have valid state root
let (root, updates) =
StateRoot::new(tx.provider_rw().tx_ref()).root_with_updates().unwrap();
StateRoot::new(db.factory.provider_rw().unwrap().tx_ref()).root_with_updates().unwrap();
let second_block = blocks.get_mut(1).unwrap();
let cloned_second = second_block.clone();
let mut updated_header = cloned_second.header.unseal();
Expand All @@ -137,8 +136,8 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {

let offset = transitions.len() as u64;

tx.insert_changesets(transitions, None).unwrap();
tx.commit(|tx| updates.flush(tx)).unwrap();
db.insert_changesets(transitions, None).unwrap();
db.commit(|tx| Ok(updates.flush(tx)?)).unwrap();

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand All @@ -148,13 +147,13 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
key_range,
);

tx.insert_changesets(transitions, Some(offset)).unwrap();
db.insert_changesets(transitions, Some(offset)).unwrap();

tx.insert_accounts_and_storages(final_state).unwrap();
db.insert_accounts_and_storages(final_state).unwrap();

// make last block have valid state root
let root = {
let tx_mut = tx.provider_rw();
let tx_mut = db.factory.provider_rw().unwrap();
let root = StateRoot::new(tx_mut.tx_ref()).root().unwrap();
tx_mut.commit().unwrap();
root
Expand All @@ -166,12 +165,12 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> PathBuf {
updated_header.state_root = root;
*last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last };

tx.insert_blocks(blocks.iter(), None).unwrap();
db.insert_blocks(blocks.iter(), None).unwrap();

// initialize TD
tx.commit(|tx| {
db.commit(|tx| {
let (head, _) = tx.cursor_read::<tables::Headers>()?.first()?.unwrap_or_default();
tx.put::<tables::HeaderTD>(head, reth_primitives::U256::from(0).into())
Ok(tx.put::<tables::HeaderTD>(head, U256::from(0).into())?)
})
.unwrap();
}
Expand Down
9 changes: 7 additions & 2 deletions crates/storage/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,17 @@ pub mod test_utils {
}

impl<DB> TempDatabase<DB> {
/// returns the ref of inner db
/// Returns the reference to inner db.
pub fn db(&self) -> &DB {
self.db.as_ref().unwrap()
}

/// returns the inner db
/// Returns the path to the database.
pub fn path(&self) -> &Path {
&self.path
}

/// Convert temp database into inner.
pub fn into_inner_db(mut self) -> DB {
self.db.take().unwrap() // take out db to avoid clean path in drop fn
}
Expand Down
101 changes: 52 additions & 49 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use reth_primitives::{
use revm::primitives::{BlockEnv, CfgEnv};
use std::{
ops::{RangeBounds, RangeInclusive},
path::PathBuf,
path::{Path, PathBuf},
sync::Arc,
};
use tokio::sync::watch;
Expand All @@ -46,42 +46,39 @@ pub struct ProviderFactory<DB> {
snapshot_provider: Option<Arc<SnapshotProvider>>,
}

impl<DB: Database> ProviderFactory<DB> {
/// Returns a provider with a created `DbTx` inside, which allows fetching data from the
/// database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone());

if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}

Ok(provider)
}

/// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
/// data from the database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to
/// open.
pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<DB>> {
let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone());

if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
chain_spec: Arc::clone(&self.chain_spec),
snapshot_provider: self.snapshot_provider.clone(),
}

Ok(DatabaseProviderRW(provider))
}
}

impl<DB: Database> ProviderFactory<DB> {}

impl<DB> ProviderFactory<DB> {
/// create new database provider
/// Create new database provider factory.
pub fn new(db: DB, chain_spec: Arc<ChainSpec>) -> Self {
Self { db, chain_spec, snapshot_provider: None }
}

/// database provider comes with a shared snapshot provider
/// Create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
pub fn new_with_database_path<P: AsRef<Path>>(
path: P,
chain_spec: Arc<ChainSpec>,
log_level: Option<LogLevel>,
) -> RethResult<ProviderFactory<DatabaseEnv>> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?,
chain_spec,
snapshot_provider: None,
})
}

/// Database provider that comes with a shared snapshot provider.
pub fn with_snapshots(
mut self,
snapshots_path: PathBuf,
Expand All @@ -93,35 +90,41 @@ impl<DB> ProviderFactory<DB> {
));
self
}

/// Returns reference to the underlying database.
pub fn db_ref(&self) -> &DB {
&self.db
}
}

impl<DB: Database> ProviderFactory<DB> {
/// create new database provider by passing a path. [`ProviderFactory`] will own the database
/// instance.
pub fn new_with_database_path<P: AsRef<std::path::Path>>(
path: P,
chain_spec: Arc<ChainSpec>,
log_level: Option<LogLevel>,
) -> RethResult<ProviderFactory<DatabaseEnv>> {
Ok(ProviderFactory::<DatabaseEnv> {
db: init_db(path, log_level).map_err(|e| RethError::Custom(e.to_string()))?,
chain_spec,
snapshot_provider: None,
})
/// Returns a provider with a created `DbTx` inside, which allows fetching data from the
/// database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open.
pub fn provider(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone());

if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}

Ok(provider)
}
}

impl<DB: Clone> Clone for ProviderFactory<DB> {
fn clone(&self) -> Self {
Self {
db: self.db.clone(),
chain_spec: Arc::clone(&self.chain_spec),
snapshot_provider: self.snapshot_provider.clone(),
/// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating
/// data from the database using different types of providers. Example: [`HeaderProvider`]
/// [`BlockHashReader`]. This may fail if the inner read/write database transaction fails to
/// open.
pub fn provider_rw(&self) -> ProviderResult<DatabaseProviderRW<DB>> {
let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone());

if let Some(snapshot_provider) = &self.snapshot_provider {
provider = provider.with_snapshot_provider(snapshot_provider.clone());
}

Ok(DatabaseProviderRW(provider))
}
}

impl<DB: Database> ProviderFactory<DB> {
/// Storage provider for latest block
pub fn latest(&self) -> ProviderResult<StateProviderBox> {
trace!(target: "providers::db", "Returning latest state provider");
Expand Down
Loading