Skip to content

Commit

Permalink
chore: use UnifiedStorageWriter::commit where possible (#10019)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Aug 2, 2024
1 parent f9ad47e commit 06fbdd9
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 132 deletions.
8 changes: 4 additions & 4 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ use reth_network::{BlockDownloaderProvider, NetworkHandle};
use reth_network_api::NetworkInfo;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::StorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderFactory, StageCheckpointReader,
StateWriter, StaticFileProviderFactory, StorageReader,
writer::UnifiedStorageWriter, AccountExtReader, ChainSpecProvider, HashingWriter,
HeaderProvider, LatestStateProviderRef, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StaticFileProviderFactory, StorageReader,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::StageId;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Command {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use reth_network_api::NetworkInfo;
use reth_network_p2p::full_block::FullBlockClient;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::StorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
};
use reth_revm::database::StateProviderDatabase;
Expand Down Expand Up @@ -155,7 +155,7 @@ impl Command {
executor.execute_and_verify_one((&sealed_block.clone().unseal(), td).into())?;
let execution_outcome = executor.finalize();

let mut storage_writer = StorageWriter::new(Some(&provider_rw), None);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

let checkpoint = Some(StageCheckpoint::new(
Expand Down
5 changes: 2 additions & 3 deletions crates/cli/commands/src/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_db_common::{
DbTool,
};
use reth_node_core::args::StageEnum;
use reth_provider::{providers::StaticFileWriter, StaticFileProviderFactory};
use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory};
use reth_stages::StageId;
use reth_static_file_types::{find_fixed_range, StaticFileSegment};

Expand Down Expand Up @@ -174,8 +174,7 @@ impl Command {

tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;

static_file_provider.commit()?;
provider_rw.commit()?;
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;

Ok(())
}
Expand Down
17 changes: 7 additions & 10 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use reth_node_metrics::{
version::VersionInfo,
};
use reth_provider::{
ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
StaticFileWriter,
writer::UnifiedStorageWriter, ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
};
use reth_stages::{
stages::{
Expand Down Expand Up @@ -272,12 +272,10 @@ impl Command {
}

if self.commit {
// For unwinding it makes more sense to commit the database first, since if
// this function is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
provider_rw.commit()?;
provider_factory.static_file_provider().commit()?;
UnifiedStorageWriter::commit_unwind(
provider_rw,
provider_factory.static_file_provider(),
)?;
provider_rw = provider_factory.provider_rw()?;
}
}
Expand All @@ -300,8 +298,7 @@ impl Command {
provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
}
if self.commit {
provider_factory.static_file_provider().commit()?;
provider_rw.commit()?;
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
provider_rw = provider_factory.provider_rw()?;
}

Expand Down
10 changes: 5 additions & 5 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_primitives::{SealedBlock, B256};
use reth_provider::{writer::StorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory};
use reth_prune::{Pruner, PrunerOutput};
use std::sync::{
mpsc::{Receiver, SendError, Sender},
Expand Down Expand Up @@ -62,10 +62,10 @@ where
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
let sf_provider = self.provider.static_file_provider();

StorageWriter::from(&provider_rw, &sf_provider)
UnifiedStorageWriter::from(&provider_rw, &sf_provider)
.remove_blocks_above(new_tip_num)
.expect("todo: handle errors");
StorageWriter::commit_unwind(provider_rw, sf_provider)
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)
.expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand All @@ -80,10 +80,10 @@ where
let provider_rw = self.provider.provider_rw().expect("todo: handle errors");
let static_file_provider = self.provider.static_file_provider();

StorageWriter::from(&provider_rw, &static_file_provider)
UnifiedStorageWriter::from(&provider_rw, &static_file_provider)
.save_blocks(&blocks)
.expect("todo: handle errors");
StorageWriter::commit(provider_rw, static_file_provider)
UnifiedStorageWriter::commit(provider_rw, static_file_provider)
.expect("todo: handle errors");

// we ignore the error because the caller may or may not care about the result
Expand Down
9 changes: 4 additions & 5 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use reth_node_core::version::SHORT_VERSION;
use reth_optimism_primitives::bedrock_import::is_dup_tx;
use reth_primitives::Receipts;
use reth_provider::{
writer::StorageWriter, DatabaseProviderFactory, OriginalValuesKnown, ProviderFactory,
writer::UnifiedStorageWriter, DatabaseProviderFactory, OriginalValuesKnown, ProviderFactory,
StageCheckpointReader, StateWriter, StaticFileProviderFactory, StaticFileWriter, StatsReader,
};
use reth_stages::StageId;
Expand Down Expand Up @@ -222,7 +222,7 @@ where
}

// We're reusing receipt writing code internal to
// `StorageWriter::append_receipts_from_blocks`, so we just use a default empty
// `UnifiedStorageWriter::append_receipts_from_blocks`, so we just use a default empty
// `BundleState`.
let execution_outcome =
ExecutionOutcome::new(Default::default(), receipts, first_block, Default::default());
Expand All @@ -231,14 +231,13 @@ where
static_file_provider.get_writer(first_block, StaticFileSegment::Receipts)?;

// finally, write the receipts
let mut storage_writer = StorageWriter::new(Some(&provider), Some(static_file_producer));
let mut storage_writer = UnifiedStorageWriter::from(&provider, static_file_producer);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;
}

provider.commit()?;
// as static files works in file ranges, internally it will be committing when creating the
// next file range already, so we only need to call explicitly at the end.
static_file_provider.commit()?;
UnifiedStorageWriter::commit(provider, static_file_provider)?;

Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
}
Expand Down
28 changes: 11 additions & 17 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use futures_util::Future;
use reth_db_api::database::Database;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::StaticFileWriter, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
writer::UnifiedStorageWriter, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
Expand Down Expand Up @@ -342,12 +342,10 @@ where
))?;
}

// For unwinding it makes more sense to commit the database first, since if
// this function is interrupted before the static files commit, we can just
// truncate the static files according to the
// checkpoints on the next start-up.
provider_rw.commit()?;
self.provider_factory.static_file_provider().commit()?;
UnifiedStorageWriter::commit_unwind(
provider_rw,
self.provider_factory.static_file_provider(),
)?;

stage.post_unwind_commit()?;

Expand Down Expand Up @@ -455,14 +453,10 @@ where
result: out.clone(),
});

// For execution it makes more sense to commit the static files first, since if
// this function is interrupted before the database commit, we can just truncate
// the static files according to the checkpoints on the next
// start-up.
self.provider_factory.static_file_provider().commit()?;
provider_rw.commit()?;

stage.post_execute_commit()?;
UnifiedStorageWriter::commit(
provider_rw,
self.provider_factory.static_file_provider(),
)?;

if done {
let block_number = checkpoint.block_number;
Expand Down Expand Up @@ -520,8 +514,8 @@ fn on_stage_error<DB: Database>(
StageId::MerkleExecute,
prev_checkpoint.unwrap_or_default(),
)?;
factory.static_file_provider().commit()?;
provider_rw.commit()?;

UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?;

// We unwind because of a validation error. If the unwind itself
// fails, we bail entirely,
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_primitives::{BlockNumber, Header, StaticFileSegment};
use reth_primitives_traits::format_gas_throughput;
use reth_provider::{
providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter},
writer::StorageWriter,
writer::UnifiedStorageWriter,
BlockReader, DatabaseProviderRW, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, StateWriter, StatsReader, TransactionVariant,
};
Expand Down Expand Up @@ -361,7 +361,7 @@ where
let time = Instant::now();

// write output
let mut writer = StorageWriter::new(Some(provider), static_file_producer);
let mut writer = UnifiedStorageWriter::new(provider, static_file_producer);
writer.write_to_storage(state, OriginalValuesKnown::Yes)?;

let db_write_duration = time.elapsed();
Expand Down
9 changes: 5 additions & 4 deletions crates/storage/db-common/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use reth_primitives::{
use reth_provider::{
errors::provider::ProviderResult,
providers::{StaticFileProvider, StaticFileWriter},
writer::StorageWriter,
writer::UnifiedStorageWriter,
BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DatabaseProviderRW,
ExecutionOutcome, HashingWriter, HistoryWriter, OriginalValuesKnown, ProviderError,
ProviderFactory, RevertsInit, StageCheckpointWriter, StateWriter, StaticFileProviderFactory,
Expand Down Expand Up @@ -131,8 +131,9 @@ pub fn init_genesis<DB: Database>(factory: ProviderFactory<DB>) -> Result<B256,
let segment = StaticFileSegment::Transactions;
static_file_provider.latest_writer(segment)?.increment_block(0)?;

provider_rw.commit()?;
static_file_provider.commit()?;
// `commit_unwind`` will first commit the DB and then the static file provider, which is
// necessary on `init_genesis`.
UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;

Ok(hash)
}
Expand Down Expand Up @@ -210,7 +211,7 @@ pub fn insert_state<'a, 'b, DB: Database>(
Vec::new(),
);

let mut storage_writer = StorageWriter::new(Some(provider), None);
let mut storage_writer = UnifiedStorageWriter::from_database(provider);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::Yes)?;

trace!(target: "reth::cli", "Inserted state");
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/errors/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ pub enum ProviderError {
StorageLockError(#[from] crate::lockfile::StorageLockError),
/// Storage writer error.
#[error(transparent)]
StorageWriterError(#[from] crate::writer::StorageWriterError),
UnifiedStorageWriterError(#[from] crate::writer::UnifiedStorageWriterError),
}

impl From<reth_fs_util::FsPathError> for ProviderError {
Expand Down
7 changes: 2 additions & 5 deletions crates/storage/errors/src/writer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::db::DatabaseError;
use reth_primitives::StaticFileSegment;

/// `StorageWriter` related errors
/// `UnifiedStorageWriter` related errors
#[derive(Clone, Debug, thiserror_no_std::Error, PartialEq, Eq)]
pub enum StorageWriterError {
/// Database writer is missing
#[error("Database writer is missing")]
MissingDatabaseWriter,
pub enum UnifiedStorageWriterError {
/// Static file writer is missing
#[error("Static file writer is missing")]
MissingStaticFileWriter,
Expand Down
6 changes: 3 additions & 3 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::StorageWriter,
writer::UnifiedStorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, EvmEnvProvider, FinalizedBlockReader,
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
Expand Down Expand Up @@ -3570,7 +3570,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
Ok(block_indices)
}

/// TODO(joshie): this fn should be moved to `StorageWriter` eventually
/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
fn append_blocks_with_state(
&self,
blocks: Vec<SealedBlockWithSenders>,
Expand Down Expand Up @@ -3600,7 +3600,7 @@ impl<TX: DbTxMut + DbTx> BlockWriter for DatabaseProvider<TX> {
// Must be written after blocks because of the receipt lookup.
// TODO: should _these_ be moved to storagewriter? seems like storagewriter should be
// _above_ db provider
let mut storage_writer = StorageWriter::new(Some(self), None);
let mut storage_writer = UnifiedStorageWriter::from_database(self);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
durations_recorder.record_relative(metrics::Action::InsertState);

Expand Down
Loading

0 comments on commit 06fbdd9

Please sign in to comment.