Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fallback lookups for pruned history #4121

Merged
merged 18 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl<DB: Database> Pruner<DB> {
{
let mut processed = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;

// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
Expand Down Expand Up @@ -525,20 +526,24 @@ impl<DB: Database> Pruner<DB> {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if key.as_ref().highest_block_number == u64::MAX {
// If current shard is the last shard for this sharded key, replace it
// with the previous shard.
if let Some(prev_value) = cursor
.prev()?
.filter(|(prev_key, _)| key_matches(prev_key, &key))
.map(|(_, prev_value)| prev_value)
{
// If current shard is the last shard for the sharded key that has
// previous shards, replace it with the previous shard.
cursor.delete_current()?;
// Upsert will replace the last shard for this sharded key with the
// previous value
// previous value.
cursor.upsert(key.clone(), prev_value)?;
} else {
// If there's no previous shard for this sharded key,
// just delete last shard completely.

// Jump back to the original last shard.
cursor.next()?;
// Delete shard.
cursor.delete_current()?;
}
} else {
Expand All @@ -551,7 +556,7 @@ impl<DB: Database> Pruner<DB> {
}
}

// Jump to the next address
// Jump to the next address.
cursor.seek_exact(last_key(&key))?;
}

Expand Down
79 changes: 57 additions & 22 deletions crates/storage/provider/src/providers/state/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use reth_primitives::{
};
use std::marker::PhantomData;

/// State provider for a given transition id which takes a tx reference.
/// State provider for a given block number which takes a tx reference.
///
/// Historical state provider accesses the state at the start of the provided block number.
/// It means that all changes made in the provided block number are not included.
///
/// Historical state provider reads the following tables:
/// - [tables::AccountHistory]
Expand All @@ -40,6 +43,7 @@ pub enum HistoryInfo {
NotYetWritten,
InChangeset(u64),
InPlainState,
MaybeInPlainState,
}

impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
Expand Down Expand Up @@ -71,7 +75,11 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {

// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountHistory, _>(history_key, |key| key.key == address)
self.history_info::<tables::AccountHistory, _>(
history_key,
|key| key.key == address,
self.lowest_available_blocks.account_history_block_number,
)
}

/// Lookup a storage key in the StorageHistory table
Expand All @@ -86,12 +94,19 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {

// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StorageHistory, _>(history_key, |key| {
key.address == address && key.sharded_key.key == storage_key
})
self.history_info::<tables::StorageHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_available_blocks.storage_history_block_number,
)
}

fn history_info<T, K>(&self, key: K, key_filter: impl Fn(&K) -> bool) -> Result<HistoryInfo>
fn history_info<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> Result<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
Expand All @@ -106,24 +121,36 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
// Get the rank of the first entry after our block.
let rank = chunk.rank(self.block_number as usize);

// If our block is before the first entry in the index chunk, it might be before
// the first write ever. To check, we look at the previous entry and check if the
// key is the same.
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
if rank == 0 && !cursor.prev()?.is_some_and(|(key, _)| key_filter(&key)) {
// The key is written to, but only after our block.
return Ok(HistoryInfo::NotYetWritten)
}
if rank < chunk.len() {
if rank == 0 &&
chunk.select(rank) as u64 != self.block_number &&
!cursor.prev()?.is_some_and(|(key, _)| key_filter(&key))
{
if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a changeset lookup.
Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64))
} else {
// The key is written to, but only after our block.
Ok(HistoryInfo::NotYetWritten)
}
} else if rank < chunk.len() {
// The chunk contains an entry for a write after our block, return it.
Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64))
} else {
// The chunk does not contain an entry for a write after our block. This can only
// happen if this is the last chunk and so we need to look in the plain state.
Ok(HistoryInfo::InPlainState)
}
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
Expand All @@ -146,7 +173,9 @@ impl<'a, 'b, TX: DbTx<'a>> AccountReader for HistoricalStateProviderRef<'a, 'b,
address,
})?
.info),
HistoryInfo::InPlainState => Ok(self.tx.get::<tables::PlainAccountState>(address)?),
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => {
Ok(self.tx.get::<tables::PlainAccountState>(address)?)
}
}
}
}
Expand Down Expand Up @@ -194,7 +223,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b,
})?
.value,
)),
HistoryInfo::InPlainState => Ok(self
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => Ok(self
.tx
.cursor_dup_read::<tables::PlainStorageState>()?
.seek_by_key_subkey(address, storage_key)?
Expand All @@ -219,7 +248,8 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b,
}
}

/// State provider for a given transition
/// State provider for a given block number.
/// For more detailed description, see [HistoricalStateProviderRef].
pub struct HistoricalStateProvider<'a, TX: DbTx<'a>> {
/// Database transaction
tx: TX,
Expand Down Expand Up @@ -280,9 +310,11 @@ delegate_provider_impls!(HistoricalStateProvider<'a, TX> where [TX: DbTx<'a>]);
pub struct LowestAvailableBlocks {
/// Lowest block number at which the account history is available. It may not be available if
/// [reth_primitives::PrunePart::AccountHistory] was pruned.
/// [Option::None] means all history is available.
pub account_history_block_number: Option<BlockNumber>,
/// Lowest block number at which the storage history is available. It may not be available if
/// [reth_primitives::PrunePart::StorageHistory] was pruned.
/// [Option::None] means all history is available.
pub storage_history_block_number: Option<BlockNumber>,
}

Expand Down Expand Up @@ -489,7 +521,10 @@ mod tests {

// run
assert_eq!(HistoricalStateProviderRef::new(&tx, 0).storage(ADDRESS, STORAGE), Ok(None));
assert_eq!(HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE), Ok(None));
assert_eq!(
HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 4).storage(ADDRESS, STORAGE),
Ok(Some(entry_at7.value))
Expand Down Expand Up @@ -558,10 +593,10 @@ mod tests {
storage_history_block_number: Some(2),
},
);
assert_eq!(provider.account_history_lookup(ADDRESS), Ok(HistoryInfo::NotYetWritten));
assert_eq!(provider.account_history_lookup(ADDRESS), Ok(HistoryInfo::MaybeInPlainState));
assert_eq!(
provider.storage_history_lookup(ADDRESS, STORAGE),
Ok(HistoryInfo::NotYetWritten)
Ok(HistoryInfo::MaybeInPlainState)
);

// provider block_number == lowest available block number,
Expand All @@ -574,10 +609,10 @@ mod tests {
storage_history_block_number: Some(1),
},
);
assert_eq!(provider.account_history_lookup(ADDRESS), Ok(HistoryInfo::NotYetWritten));
assert_eq!(provider.account_history_lookup(ADDRESS), Ok(HistoryInfo::MaybeInPlainState));
assert_eq!(
provider.storage_history_lookup(ADDRESS, STORAGE),
Ok(HistoryInfo::NotYetWritten)
Ok(HistoryInfo::MaybeInPlainState)
);
}
}
Loading