Skip to content

Commit

Permalink
feat: rm HashedStateChanges, introduce StorageWriter::write_hashed_st…
Browse files Browse the repository at this point in the history
…ate (#9561)
  • Loading branch information
Rjected authored Jul 16, 2024
1 parent fb6ea8b commit 539b4e4
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 136 deletions.
10 changes: 6 additions & 4 deletions crates/engine/tree/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use reth_db::database::Database;
use reth_errors::ProviderResult;
use reth_primitives::B256;
use reth_provider::{
bundle_state::HashedStateChanges, BlockExecutionWriter, BlockNumReader, BlockWriter,
HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, HistoryWriter,
OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateWriter,
};
use reth_prune::{Pruner, PrunerOutput};
use reth_stages_types::{StageCheckpoint, StageId};
Expand Down Expand Up @@ -82,14 +82,16 @@ impl<DB: Database> DatabaseService<DB> {
// Write state and changesets to the database.
// Must be written after blocks because of the receipt lookup.
let execution_outcome = block.execution_outcome().clone();
// TODO: use single storage writer in task when sf / db tasks are combined
execution_outcome.write_to_storage(&provider_rw, None, OriginalValuesKnown::No)?;

// insert hashes and intermediate merkle nodes
{
let trie_updates = block.trie_updates().clone();
let hashed_state = block.hashed_state();
HashedStateChanges(&hashed_state.clone().into_sorted())
.write_to_db(&provider_rw)?;
// TODO: use single storage writer in task when sf / db tasks are combined
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_hashed_state(&hashed_state.clone().into_sorted())?;
trie_updates.write_to_database(provider_rw.tx_ref())?;
}

Expand Down
123 changes: 0 additions & 123 deletions crates/storage/provider/src/bundle_state/hashed_state_changes.rs

This file was deleted.

2 changes: 0 additions & 2 deletions crates/storage/provider/src/bundle_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@
//! This module contains all the logic related to bundle state.

mod execution_outcome;
mod hashed_state_changes;
mod state_changes;
mod state_reverts;

pub use execution_outcome::{AccountRevertInit, BundleStateInit, OriginalValuesKnown, RevertsInit};
pub use hashed_state_changes::HashedStateChanges;
pub use state_changes::StateChanges;
pub use state_reverts::{StateReverts, StorageRevertsIter};
6 changes: 4 additions & 2 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{
bundle_state::{BundleStateInit, HashedStateChanges, RevertsInit},
bundle_state::{BundleStateInit, RevertsInit},
providers::{database::metrics, static_file::StaticFileWriter, StaticFileProvider},
to_range,
traits::{
AccountExtReader, BlockSource, ChangeSetReader, ReceiptProvider, StageCheckpointWriter,
},
writer::StorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, EvmEnvProvider, FinalizedBlockReader, FinalizedBlockWriter,
HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider, HistoricalStateProvider,
Expand Down Expand Up @@ -3307,7 +3308,8 @@ impl<DB: Database> BlockWriter for DatabaseProviderRW<DB> {

// insert hashes and intermediate merkle nodes
{
HashedStateChanges(&hashed_state).write_to_db(self)?;
let storage_writer = StorageWriter::new(Some(self), None);
storage_writer.write_hashed_state(&hashed_state)?;
trie_updates.write_to_database(&self.tx)?;
}
durations_recorder.record_relative(metrics::Action::InsertHashes);
Expand Down
110 changes: 108 additions & 2 deletions crates/storage/provider/src/writer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
use crate::{providers::StaticFileProviderRWRefMut, DatabaseProviderRW};
use itertools::Itertools;
use reth_db::{
cursor::DbCursorRO,
cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO, DbDupCursorRW},
tables,
transaction::{DbTx, DbTxMut},
Database,
};
use reth_errors::{ProviderError, ProviderResult};
use reth_primitives::BlockNumber;
use reth_primitives::{BlockNumber, StorageEntry, U256};
use reth_storage_api::ReceiptWriter;
use reth_storage_errors::writer::StorageWriterError;
use reth_trie::HashedPostStateSorted;
use static_file::StaticFileWriter;

mod database;
Expand Down Expand Up @@ -93,6 +95,49 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
Ok(())
}

/// Writes the hashed state changes to the database
pub fn write_hashed_state(&self, hashed_state: &HashedPostStateSorted) -> ProviderResult<()> {
self.ensure_database_writer()?;

// Write hashed account updates.
let mut hashed_accounts_cursor =
self.database_writer().tx_ref().cursor_write::<tables::HashedAccounts>()?;
for (hashed_address, account) in hashed_state.accounts().accounts_sorted() {
if let Some(account) = account {
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
}

// Write hashed storage changes.
let sorted_storages = hashed_state.account_storages().iter().sorted_by_key(|(key, _)| *key);
let mut hashed_storage_cursor =
self.database_writer().tx_ref().cursor_dup_write::<tables::HashedStorages>()?;
for (hashed_address, storage) in sorted_storages {
if storage.is_wiped() && hashed_storage_cursor.seek_exact(*hashed_address)?.is_some() {
hashed_storage_cursor.delete_current_duplicates()?;
}

for (hashed_slot, value) in storage.storage_slots_sorted() {
let entry = StorageEntry { key: hashed_slot, value };
if let Some(db_entry) =
hashed_storage_cursor.seek_by_key_subkey(*hashed_address, entry.key)?
{
if db_entry.key == entry.key {
hashed_storage_cursor.delete_current()?;
}
}

if entry.value != U256::ZERO {
hashed_storage_cursor.upsert(*hashed_address, entry)?;
}
}
}

Ok(())
}

/// Appends receipts block by block.
///
/// ATTENTION: If called from [`StorageWriter`] without a static file producer, it will always
Expand Down Expand Up @@ -155,3 +200,64 @@ impl<'a, 'b, DB: Database> StorageWriter<'a, 'b, DB> {
Ok(())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::create_test_provider_factory;
use reth_db_api::transaction::DbTx;
use reth_primitives::{keccak256, Account, Address, B256};
use reth_trie::{HashedPostState, HashedStorage};

#[test]
fn wiped_entries_are_removed() {
let provider_factory = create_test_provider_factory();

let addresses = (0..10).map(|_| Address::random()).collect::<Vec<_>>();
let destroyed_address = *addresses.first().unwrap();
let destroyed_address_hashed = keccak256(destroyed_address);
let slot = B256::with_last_byte(1);
let hashed_slot = keccak256(slot);
{
let provider_rw = provider_factory.provider_rw().unwrap();
let mut accounts_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedAccounts>().unwrap();
let mut storage_cursor =
provider_rw.tx_ref().cursor_write::<tables::HashedStorages>().unwrap();

for address in addresses {
let hashed_address = keccak256(address);
accounts_cursor
.insert(hashed_address, Account { nonce: 1, ..Default::default() })
.unwrap();
storage_cursor
.insert(hashed_address, StorageEntry { key: hashed_slot, value: U256::from(1) })
.unwrap();
}
provider_rw.commit().unwrap();
}

let mut hashed_state = HashedPostState::default();
hashed_state.accounts.insert(destroyed_address_hashed, None);
hashed_state.storages.insert(destroyed_address_hashed, HashedStorage::new(true));

let provider_rw = provider_factory.provider_rw().unwrap();
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
assert_eq!(storage_writer.write_hashed_state(&hashed_state.into_sorted()), Ok(()));
provider_rw.commit().unwrap();

let provider = provider_factory.provider().unwrap();
assert_eq!(
provider.tx_ref().get::<tables::HashedAccounts>(destroyed_address_hashed),
Ok(None)
);
assert_eq!(
provider
.tx_ref()
.cursor_read::<tables::HashedStorages>()
.unwrap()
.seek_by_key_subkey(destroyed_address_hashed, hashed_slot),
Ok(None)
);
}
}
6 changes: 3 additions & 3 deletions crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ use proptest_arbitrary_interop::arb;
use rayon::ThreadPoolBuilder;
use reth_primitives::{Account, B256, U256};
use reth_provider::{
bundle_state::HashedStateChanges, providers::ConsistentDbView,
test_utils::create_test_provider_factory,
providers::ConsistentDbView, test_utils::create_test_provider_factory, writer::StorageWriter,
};
use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
Expand All @@ -27,7 +26,8 @@ pub fn calculate_state_root(c: &mut Criterion) {
let provider_factory = create_test_provider_factory();
{
let provider_rw = provider_factory.provider_rw().unwrap();
HashedStateChanges(&db_state.into_sorted()).write_to_db(&provider_rw).unwrap();
let storage_writer = StorageWriter::new(Some(&provider_rw), None);
storage_writer.write_hashed_state(&db_state.into_sorted()).unwrap();
let (_, updates) =
StateRoot::from_tx(provider_rw.tx_ref()).root_with_updates().unwrap();
updates.write_to_database(provider_rw.tx_ref()).unwrap();
Expand Down

0 comments on commit 539b4e4

Please sign in to comment.