diff --git a/Cargo.lock b/Cargo.lock index b3f73c167135..c12a766d326f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,9 +119,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.4" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] @@ -1899,9 +1899,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "2.2.2" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" +checksum = "5fb04eee5d9d907f29e80ee6b0e78f7e2c82342c63e3580d8c4f69d9d5aad963" dependencies = [ "pkcs8", "signature", @@ -2505,9 +2505,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", "miniz_oxide", @@ -3008,9 +3008,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" -version = "1.0.3" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "human_bytes" @@ -3105,7 +3105,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows 0.48.0", + "windows", ] [[package]] @@ -3925,9 +3925,9 @@ dependencies = [ [[package]] name = "metrics-process" -version = "1.0.12" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c93f6ad342d3f7bc14724147e2dbc6eb6fdbe5a832ace16ea23b73618e8cc17" +checksum = "006271a8019ad7a9a28cfac2cc40e3ee104d54be763c4a0901e228a63f49d706" dependencies = [ "libproc", "mach2", @@ -3935,7 +3935,7 @@ dependencies = [ "once_cell", "procfs", "rlimit", - "windows 0.51.1", + "windows", ] [[package]] @@ -4135,9 +4135,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.4.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" dependencies = [ "num-traits", ] @@ -5098,7 +5098,7 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick 1.0.3", "memchr", "regex-automata 0.3.6", "regex-syntax 0.7.4", @@ -5119,7 +5119,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick 1.0.3", "memchr", "regex-syntax 0.7.4", ] @@ -6308,9 +6308,9 @@ dependencies = [ [[package]] name = "rlimit" -version = "0.10.1" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3560f70f30a0f16d11d01ed078a07740fe6b489667abc7c7b029155d9f21c3d8" +checksum = "f8a29d87a652dc4d43c586328706bb5cdff211f3f39a530f240b53f7221dab8e" dependencies = [ "libc", ] @@ -8320,25 +8320,6 @@ dependencies = [ "windows-targets 0.48.5", ] -[[package]] -name = "windows" -version = "0.51.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca229916c5ee38c2f2bc1e9d8f04df975b4bd93f9955dc69fabb5d91270045c9" -dependencies = [ - "windows-core", - "windows-targets 0.48.5", -] - -[[package]] -name = "windows-core" -version = "0.51.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1f8cf84f35d2db49a46868f947758c7a1138116f7fac3bc844f43ade1292e64" -dependencies = [ - "windows-targets 0.48.5", -] - [[package]] name = "windows-sys" version = "0.45.0" diff --git a/crates/consensus/beacon/src/engine/prune.rs b/crates/consensus/beacon/src/engine/prune.rs index 257170a376fe..4b2b4852dccf 100644 --- a/crates/consensus/beacon/src/engine/prune.rs +++ b/crates/consensus/beacon/src/engine/prune.rs @@ -3,7 +3,7 @@ use futures::FutureExt; use reth_db::database::Database; use reth_primitives::BlockNumber; -use reth_prune::{Pruner, PrunerError, PrunerWithResult}; +use reth_prune::{Pruner, PrunerResult, PrunerWithResult}; use reth_tasks::TaskSpawner; use std::task::{ready, Context, Poll}; use tokio::sync::oneshot; @@ -116,7 +116,7 @@ pub(crate) enum EnginePruneEvent { /// If this is returned, the pruner is idle. Finished { /// Final result of the pruner run. - result: Result<(), PrunerError>, + result: PrunerResult, }, /// Pruner task was dropped after it was started, unable to receive it because channel /// closed. This would indicate a panicked pruner task diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 6b33d9973de8..ae0c0bb0d474 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -239,7 +239,7 @@ where // deposit in receiving account and update storage let (prev_to, storage): &mut (Account, BTreeMap) = state.get_mut(&to).unwrap(); - let old_entries = new_entries + let mut old_entries: Vec<_> = new_entries .into_iter() .filter_map(|entry| { let old = if entry.value != U256::ZERO { @@ -254,9 +254,12 @@ where Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry }) }) .collect(); + old_entries.sort_by_key(|entry| entry.key); changeset.push((to, *prev_to, old_entries)); + changeset.sort_by_key(|(address, _, _)| *address); + prev_to.balance = prev_to.balance.wrapping_add(transfer); changesets.push(changeset); diff --git a/crates/primitives/src/prune/checkpoint.rs b/crates/primitives/src/prune/checkpoint.rs index 52e1cabd76cb..8096d2067af0 100644 --- a/crates/primitives/src/prune/checkpoint.rs +++ b/crates/primitives/src/prune/checkpoint.rs @@ -1,4 +1,4 @@ -use crate::{prune::PruneMode, BlockNumber}; +use crate::{prune::PruneMode, BlockNumber, TxNumber}; use reth_codecs::{main_codec, Compact}; /// Saves the pruning progress of a stage. @@ -7,7 +7,10 @@ use reth_codecs::{main_codec, Compact}; #[cfg_attr(test, derive(Default))] pub struct PruneCheckpoint { /// Highest pruned block number. - pub block_number: BlockNumber, + /// If it's [None], the pruning for block `0` is not finished yet. + pub block_number: Option, + /// Highest pruned transaction number, if applicable. + pub tx_number: Option, /// Prune mode. pub prune_mode: PruneMode, } diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index 9f8cab504fc4..48bdacdb9e89 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -49,10 +49,14 @@ impl ReceiptsLogPruneConfig { // the BTreeMap (block = 0), otherwise it will be excluded. // Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all // other receipts. + // + // Reminder, that we increment because the [`BlockNumber`] key of the new map should be + // viewed as `PruneMode::Before(block)` let block = (pruned_block + 1).max( mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)? .map(|(block, _)| block) - .unwrap_or_default(), + .unwrap_or_default() + + 1, ); map.entry(block).or_insert_with(Vec::new).push(address) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index eebd6bab0bd8..4332534d39f8 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -20,30 +20,40 @@ use reth_provider::{ TransactionsProvider, }; use std::{ops::RangeInclusive, sync::Arc, time::Instant}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, error, instrument, trace}; -/// Result of [Pruner::run] execution -pub type PrunerResult = Result<(), PrunerError>; +/// Result of [Pruner::run] execution. +/// +/// Returns `true` if pruning has been completed up to the target block, +/// and `false` if there's more data to prune in further runs. +pub type PrunerResult = Result; -/// The pipeline type itself with the result of [Pruner::run] +/// The pruner type itself with the result of [Pruner::run] pub type PrunerWithResult = (Pruner, PrunerResult); pub struct BatchSizes { + /// Maximum number of receipts to prune in one run. receipts: usize, + /// Maximum number of transaction lookup entries to prune in one run. transaction_lookup: usize, + /// Maximum number of transaction senders to prune in one run. transaction_senders: usize, + /// Maximum number of account history entries to prune in one run. + /// Measured in the number of [tables::AccountChangeSet] rows. account_history: usize, + /// Maximum number of storage history entries to prune in one run. + /// Measured in the number of [tables::StorageChangeSet] rows. storage_history: usize, } impl Default for BatchSizes { fn default() -> Self { Self { - receipts: 10000, - transaction_lookup: 10000, - transaction_senders: 10000, - account_history: 10000, - storage_history: 10000, + receipts: 1000, + transaction_lookup: 1000, + transaction_senders: 1000, + account_history: 1000, + storage_history: 1000, } } } @@ -59,6 +69,7 @@ pub struct Pruner { /// when the pruning needs to be initiated. last_pruned_block_number: Option, modes: PruneModes, + /// Maximum entries to prune per one run, per prune part. batch_sizes: BatchSizes, } @@ -83,77 +94,156 @@ impl Pruner { /// Run the pruner pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { - trace!( - target: "pruner", - %tip_block_number, - "Pruner started" - ); + if tip_block_number == 0 { + self.last_pruned_block_number = Some(tip_block_number); + + trace!(target: "pruner", %tip_block_number, "Nothing to prune yet"); + return Ok(true) + } + + trace!(target: "pruner", %tip_block_number, "Pruner started"); let start = Instant::now(); let provider = self.provider_factory.provider_rw()?; + let mut done = true; + if let Some((to_block, prune_mode)) = self.modes.prune_target_block_receipts(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::Receipts, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); - self.prune_receipts(&provider, to_block, prune_mode)?; + let part_done = self.prune_receipts(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::Receipts) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", prune_part = ?PrunePart::Receipts, "No target block to prune"); } if !self.modes.receipts_log_filter.is_empty() { let part_start = Instant::now(); - self.prune_receipts_by_logs(&provider, tip_block_number)?; + let part_done = self.prune_receipts_by_logs(&provider, tip_block_number)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::ContractLogs) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", prune_part = ?PrunePart::ContractLogs, "No filter to prune"); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_transaction_lookup(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::TransactionLookup, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); - self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + let part_done = self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::TransactionLookup) .duration_seconds .record(part_start.elapsed()) + } else { + trace!( + target: "pruner", + prune_part = ?PrunePart::TransactionLookup, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_sender_recovery(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::SenderRecovery, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); - self.prune_transaction_senders(&provider, to_block, prune_mode)?; + let part_done = self.prune_transaction_senders(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::SenderRecovery) .duration_seconds .record(part_start.elapsed()) + } else { + trace!( + target: "pruner", + prune_part = ?PrunePart::SenderRecovery, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_account_history(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::AccountHistory, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); - self.prune_account_history(&provider, to_block, prune_mode)?; + let part_done = self.prune_account_history(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::AccountHistory) .duration_seconds .record(part_start.elapsed()) + } else { + trace!( + target: "pruner", + prune_part = ?PrunePart::AccountHistory, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_storage_history(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::StorageHistory, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); - self.prune_storage_history(&provider, to_block, prune_mode)?; + let part_done = self.prune_storage_history(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::StorageHistory) .duration_seconds .record(part_start.elapsed()) + } else { + trace!( + target: "pruner", + prune_part = ?PrunePart::StorageHistory, + "No target block to prune" + ); } provider.commit()?; @@ -162,13 +252,8 @@ impl Pruner { let elapsed = start.elapsed(); self.metrics.duration_seconds.record(elapsed); - trace!( - target: "pruner", - %tip_block_number, - ?elapsed, - "Pruner finished" - ); - Ok(()) + trace!(target: "pruner", %tip_block_number, ?elapsed, "Pruner finished"); + Ok(done) } /// Returns `true` if the pruning is needed at the provided tip block number. @@ -192,6 +277,36 @@ impl Pruner { } } + /// Get next inclusive block range to prune according to the checkpoint, `to_block` block + /// number and `limit`. + /// + /// To get the range start (`from_block`): + /// 1. If checkpoint exists, use next block. + /// 2. If checkpoint doesn't exist, use block 0. + /// + /// To get the range end: use block `to_block`. + fn get_next_block_range_from_checkpoint( + &self, + provider: &DatabaseProviderRW<'_, DB>, + prune_part: PrunePart, + to_block: BlockNumber, + ) -> reth_interfaces::Result>> { + let from_block = provider + .get_prune_checkpoint(prune_part)? + .and_then(|checkpoint| checkpoint.block_number) + // Checkpoint exists, prune from the next block after the highest pruned one + .map(|block_number| block_number + 1) + // No checkpoint exists, prune from genesis + .unwrap_or(0); + + let range = from_block..=to_block; + if range.is_empty() { + return Ok(None) + } + + Ok(Some(range)) + } + /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block /// number. /// @@ -206,30 +321,34 @@ impl Pruner { prune_part: PrunePart, to_block: BlockNumber, ) -> reth_interfaces::Result>> { - let from_block_number = provider + let from_tx_number = provider .get_prune_checkpoint(prune_part)? - // Checkpoint exists, prune from the next block after the highest pruned one - .map(|checkpoint| checkpoint.block_number + 1) + // Checkpoint exists, prune from the next transaction after the highest pruned one + .and_then(|checkpoint| match checkpoint.tx_number { + Some(tx_number) => Some(tx_number + 1), + _ => { + error!(target: "pruner", %prune_part, ?checkpoint, "Expected transaction number in prune checkpoint, found None"); + None + }, + }) // No checkpoint exists, prune from genesis .unwrap_or(0); - // Get first transaction - let from_tx_num = - provider.block_body_indices(from_block_number)?.map(|body| body.first_tx_num); - // If no block body index is found, the DB is either corrupted or we've already pruned up to - // the latest block, so there's no thing to prune now. - let Some(from_tx_num) = from_tx_num else { return Ok(None) }; - - let to_tx_num = match provider.block_body_indices(to_block)? { + let to_tx_number = match provider.block_body_indices(to_block)? { Some(body) => body, None => return Ok(None), } .last_tx_num(); - Ok(Some(from_tx_num..=to_tx_num)) + let range = from_tx_number..=to_tx_number; + if range.is_empty() { + return Ok(None) + } + + Ok(Some(range)) } - /// Prune receipts up to the provided block, inclusive. + /// Prune receipts up to the provided block, inclusive, respecting the batch size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_receipts( &self, @@ -237,7 +356,7 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::Receipts, to_block, @@ -245,42 +364,44 @@ impl Pruner { Some(range) => range, None => { trace!(target: "pruner", "No receipts to prune"); - return Ok(()) + return Ok(true) } }; - let total = range.clone().count(); + let tx_range_end = *tx_range.end(); - provider.prune_table_with_iterator_in_batches::( - range, + let mut last_pruned_transaction = tx_range_end; + let (deleted, done) = provider.prune_table_with_range::( + tx_range, self.batch_sizes.receipts, - |rows| { - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * rows as f64 / total as f64), - "Pruned receipts" - ); - }, |_| false, + |row| last_pruned_transaction = row.0, )?; + trace!(target: "pruner", %deleted, %done, "Pruned receipts"); + + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more receipts to prune, set the checkpoint block number to previous, + // so we could finish pruning its receipts on the next run. + .checked_sub(if done { 0 } else { 1 }); + + let prune_checkpoint = PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }; - provider.save_prune_checkpoint( - PrunePart::Receipts, - PruneCheckpoint { block_number: to_block, prune_mode }, - )?; + provider.save_prune_checkpoint(PrunePart::Receipts, prune_checkpoint)?; // `PrunePart::Receipts` overrides `PrunePart::ContractLogs`, so we can preemptively // limit their pruning start point. - provider.save_prune_checkpoint( - PrunePart::ContractLogs, - PruneCheckpoint { block_number: to_block, prune_mode }, - )?; + provider.save_prune_checkpoint(PrunePart::ContractLogs, prune_checkpoint)?; - Ok(()) + Ok(done) } - /// Prune receipts up to the provided block by filtering logs. Works as in inclusion list, and - /// removes every receipt not belonging to it. + /// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion + /// list, and removes every receipt not belonging to it. Respects the batch size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_receipts_by_logs( &self, @@ -298,14 +419,25 @@ impl Pruner { .map(|(bn, _)| bn) .unwrap_or_default(); - // Figure out what receipts have already been pruned, so we can have an accurate - // `address_filter` - let pruned = provider + // Get status checkpoint from latest run + let mut last_pruned_block = provider .get_prune_checkpoint(PrunePart::ContractLogs)? - .map(|checkpoint| checkpoint.block_number); + .and_then(|checkpoint| checkpoint.block_number); + let initial_last_pruned_block = last_pruned_block; + + let mut from_tx_number = match initial_last_pruned_block { + Some(block) => provider + .block_body_indices(block)? + .map(|block| block.last_tx_num() + 1) + .unwrap_or(0), + None => 0, + }; + + // Figure out what receipts have already been pruned, so we can have an accurate + // `address_filter` let address_filter = - self.modes.receipts_log_filter.group_by_block(tip_block_number, pruned)?; + self.modes.receipts_log_filter.group_by_block(tip_block_number, last_pruned_block)?; // Splits all transactions in different block ranges. Each block range will have its own // filter address list and will check it while going through the table @@ -334,9 +466,13 @@ impl Pruner { while let Some((start_block, addresses)) = blocks_iter.next() { filtered_addresses.extend_from_slice(addresses); - // This will clear all receipts before the first appearance of a contract log + // This will clear all receipts before the first appearance of a contract log or since + // the block after the last pruned one. if block_ranges.is_empty() { - block_ranges.push((0, *start_block - 1, 0)); + let init = last_pruned_block.map(|b| b + 1).unwrap_or_default(); + if init < *start_block { + block_ranges.push((init, *start_block - 1, 0)); + } } let end_block = @@ -347,86 +483,107 @@ impl Pruner { block_ranges.push((*start_block, end_block, filtered_addresses.len())); } + trace!( + target: "pruner", + ?block_ranges, + ?filtered_addresses, + "Calculated block ranges and filtered addresses", + ); + + let mut limit = self.batch_sizes.receipts; + let mut done = true; + let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { - let range = match self.get_next_tx_num_range_from_checkpoint( - provider, - PrunePart::ContractLogs, - end_block, - )? { - Some(range) => range, + let block_range = start_block..=end_block; + + // Calculate the transaction range from this block range + let tx_range_end = match provider.block_body_indices(end_block)? { + Some(body) => body.last_tx_num(), None => { trace!( - target: "pruner", - block_range = format!("{start_block}..={end_block}"), - "No receipts to prune." + target: "pruner", + ?block_range, + "No receipts to prune." ); continue } }; - - let total = range.clone().count(); - let mut processed = 0; - - provider.prune_table_with_iterator_in_batches::( - range, - self.batch_sizes.receipts, - |rows| { - processed += rows; - trace!( - target: "pruner", - %rows, - block_range = format!("{start_block}..={end_block}"), - progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), - "Pruned receipts" - ); - }, - |receipt| { - num_addresses > 0 && + let tx_range = from_tx_number..=tx_range_end; + + // Delete receipts, except the ones in the inclusion list + let mut last_skipped_transaction = 0; + let deleted; + (deleted, done) = provider.prune_table_with_range::( + tx_range, + limit, + |(tx_num, receipt)| { + let skip = num_addresses > 0 && receipt.logs.iter().any(|log| { filtered_addresses[..num_addresses].contains(&&log.address) - }) + }); + + if skip { + last_skipped_transaction = *tx_num; + } + skip }, + |row| last_pruned_transaction = Some(row.0), )?; + trace!(target: "pruner", %deleted, %done, ?block_range, "Pruned receipts"); + + limit = limit.saturating_sub(deleted); + + // For accurate checkpoints we need to know that we have checked every transaction. + // Example: we reached the end of the range, and the last receipt is supposed to skip + // its deletion. + last_pruned_transaction = + Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); + last_pruned_block = Some( + provider + .transaction_block(last_pruned_transaction.expect("qed"))? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more receipts to prune, set the checkpoint block number to + // previous, so we could finish pruning its receipts on the + // next run. + .saturating_sub(if done { 0 } else { 1 }), + ); - // If this is the last block range, avoid writing an unused checkpoint - if end_block != to_block { - // This allows us to query for the transactions in the next block range with - // [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate - // checkpoint, which should be adjusted in the end. - provider.save_prune_checkpoint( - PrunePart::ContractLogs, - PruneCheckpoint { - block_number: end_block, - prune_mode: PruneMode::Before(end_block + 1), - }, - )?; + if limit == 0 { + done &= end_block == to_block; + break } + + from_tx_number = last_pruned_transaction.expect("qed") + 1; } // If there are contracts using `PruneMode::Distance(_)` there will be receipts before - // `to_block` that become eligible to be pruned in future runs. Therefore, our - // checkpoint is not actually `to_block`, but the `lowest_block_with_distance` from any - // contract. This ensures that in future pruner runs we can - // prune all these receipts between the previous `lowest_block_with_distance` and the new - // one using `get_next_tx_num_range_from_checkpoint`. - let checkpoint_block = self + // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is + // not actually `to_block`, but the `lowest_block_with_distance` from any contract. + // This ensures that in future pruner runs we can prune all these receipts between the + // previous `lowest_block_with_distance` and the new one using + // `get_next_tx_num_range_from_checkpoint`. + // + // Only applies if we were able to prune everything intended for this run, otherwise the + // checkpoing is the `last_pruned_block`. + let prune_mode_block = self .modes .receipts_log_filter - .lowest_block_with_distance(tip_block_number, pruned)? + .lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? .unwrap_or(to_block); provider.save_prune_checkpoint( PrunePart::ContractLogs, PruneCheckpoint { - block_number: checkpoint_block - 1, - prune_mode: PruneMode::Before(checkpoint_block), + block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))), + tx_number: last_pruned_transaction, + prune_mode: PruneMode::Before(prune_mode_block), }, )?; - - Ok(()) + Ok(done) } - /// Prune transaction lookup entries up to the provided block, inclusive. + /// Prune transaction lookup entries up to the provided block, inclusive, respecting the batch + /// size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_transaction_lookup( &self, @@ -434,7 +591,7 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let (start, end) = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::TransactionLookup, to_block, @@ -442,52 +599,54 @@ impl Pruner { Some(range) => range, None => { trace!(target: "pruner", "No transaction lookup entries to prune"); - return Ok(()) - } - }; - let last_tx_num = *range.end(); - let total = range.clone().count(); - let mut processed = 0; - - for i in range.step_by(self.batch_sizes.transaction_lookup) { - // The `min` ensures that the transaction range doesn't exceed the last transaction - // number. `last_tx_num + 1` is used to include the last transaction in the range. - let tx_range = i..(i + self.batch_sizes.transaction_lookup as u64).min(last_tx_num + 1); - - // Retrieve transactions in the range and calculate their hashes in parallel - let mut hashes = provider - .transactions_by_tx_range(tx_range.clone())? - .into_par_iter() - .map(|transaction| transaction.hash()) - .collect::>(); - - // Number of transactions retrieved from the database should match the tx range count - let tx_count = tx_range.clone().count(); - if hashes.len() != tx_count { - return Err(PrunerError::InconsistentData( - "Unexpected number of transaction hashes retrieved by transaction number range", - )) + return Ok(true) } + } + .into_inner(); + let tx_range = start..=(end.min(start + self.batch_sizes.transaction_lookup as u64 - 1)); + let tx_range_end = *tx_range.end(); + + // Retrieve transactions in the range and calculate their hashes in parallel + let hashes = provider + .transactions_by_tx_range(tx_range.clone())? + .into_par_iter() + .map(|transaction| transaction.hash()) + .collect::>(); + + // Number of transactions retrieved from the database should match the tx range count + let tx_count = tx_range.clone().count(); + if hashes.len() != tx_count { + return Err(PrunerError::InconsistentData( + "Unexpected number of transaction hashes retrieved by transaction number range", + )) + } - // Pre-sort hashes to prune them in order - hashes.sort_unstable(); + let mut last_pruned_transaction = tx_range_end; + let (deleted, done) = provider.prune_table_with_iterator::( + hashes, + self.batch_sizes.transaction_lookup, + |row| last_pruned_transaction = row.1, + )?; + trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup"); - let rows = provider.prune_table_with_iterator::(hashes)?; - processed += rows; - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), - "Pruned transaction lookup" - ); - } + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction lookup entries to prune, set the checkpoint block number + // to previous, so we could finish pruning its transaction lookup entries on the next + // run. + .checked_sub(if done { 0 } else { 1 }); provider.save_prune_checkpoint( PrunePart::TransactionLookup, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune transaction senders up to the provided block, inclusive. @@ -498,7 +657,7 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::SenderRecovery, to_block, @@ -506,30 +665,37 @@ impl Pruner { Some(range) => range, None => { trace!(target: "pruner", "No transaction senders to prune"); - return Ok(()) + return Ok(true) } }; - let total = range.clone().count(); + let tx_range_end = *tx_range.end(); - provider.prune_table_with_range_in_batches::( - range, + let mut last_pruned_transaction = tx_range_end; + let (deleted, done) = provider.prune_table_with_range::( + tx_range, self.batch_sizes.transaction_senders, - |rows, _| { - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * rows as f64 / total as f64), - "Pruned transaction senders" - ); - }, + |_| false, + |row| last_pruned_transaction = row.0, )?; + trace!(target: "pruner", %deleted, %done, "Pruned transaction senders"); + + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction senders to prune, set the checkpoint block number to + // previous, so we could finish pruning its transaction senders on the next run. + .checked_sub(if done { 0 } else { 1 }); provider.save_prune_checkpoint( PrunePart::SenderRecovery, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune account history up to the provided block, inclusive. @@ -540,48 +706,52 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let from_block = provider - .get_prune_checkpoint(PrunePart::AccountHistory)? - .map(|checkpoint| checkpoint.block_number + 1) - .unwrap_or_default(); - let range = from_block..=to_block; - let total = range.clone().count(); + let range = match self.get_next_block_range_from_checkpoint( + provider, + PrunePart::AccountHistory, + to_block, + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No account history to prune"); + return Ok(true) + } + }; + let range_end = *range.end(); - provider.prune_table_with_range_in_batches::( + let mut last_changeset_pruned_block = None; + let (rows, done) = provider.prune_table_with_range::( range, self.batch_sizes.account_history, - |keys, rows| { - trace!( - target: "pruner", - %keys, - %rows, - progress = format!("{:.1}%", 100.0 * keys as f64 / total as f64), - "Pruned account history (changesets)" - ); - }, + |_| false, + |row| last_changeset_pruned_block = Some(row.0), )?; + trace!(target: "pruner", %rows, %done, "Pruned account history (changesets)"); + + let last_changeset_pruned_block = last_changeset_pruned_block + // If there's more account account changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its account changesets on the next run. + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); - self.prune_history_indices::( + let (processed, deleted) = self.prune_history_indices::( provider, - to_block, + last_changeset_pruned_block, |a, b| a.key == b.key, |key| ShardedKey::last(key.key), - self.batch_sizes.account_history, - |rows| { - trace!( - target: "pruner", - rows, - "Pruned account history (indices)" - ); - }, )?; + trace!(target: "pruner", %processed, %deleted, %done, "Pruned account history (history)" ); provider.save_prune_checkpoint( PrunePart::AccountHistory, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune storage history up to the provided block, inclusive. @@ -592,64 +762,70 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let from_block = provider - .get_prune_checkpoint(PrunePart::StorageHistory)? - .map(|checkpoint| checkpoint.block_number + 1) - .unwrap_or_default(); - let block_range = from_block..=to_block; - let range = BlockNumberAddress::range(block_range); + let range = match self.get_next_block_range_from_checkpoint( + provider, + PrunePart::StorageHistory, + to_block, + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No storage history to prune"); + return Ok(true) + } + }; + let range_end = *range.end(); - provider.prune_table_with_range_in_batches::( - range, + let mut last_changeset_pruned_block = None; + let (rows, done) = provider.prune_table_with_range::( + BlockNumberAddress::range(range), self.batch_sizes.storage_history, - |keys, rows| { - trace!( - target: "pruner", - %keys, - %rows, - "Pruned storage history (changesets)" - ); - }, + |_| false, + |row| last_changeset_pruned_block = Some(row.0.block_number()), )?; + trace!(target: "pruner", %rows, %done, "Pruned storage history (changesets)"); + + let last_changeset_pruned_block = last_changeset_pruned_block + // If there's more account storage changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its storage changesets on the next run. + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); - self.prune_history_indices::( + let (processed, deleted) = self.prune_history_indices::( provider, - to_block, + last_changeset_pruned_block, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), - self.batch_sizes.storage_history, - |rows| { - trace!( - target: "pruner", - rows, - "Pruned storage history (indices)" - ); - }, )?; + trace!(target: "pruner", %processed, %deleted, %done, "Pruned storage history (history)" ); provider.save_prune_checkpoint( PrunePart::StorageHistory, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune history indices up to the provided block, inclusive. + /// + /// Returns total number of processed (walked) and deleted entities. fn prune_history_indices( &self, provider: &DatabaseProviderRW<'_, DB>, to_block: BlockNumber, key_matches: impl Fn(&T::Key, &T::Key) -> bool, last_key: impl Fn(&T::Key) -> T::Key, - batch_size: usize, - batch_callback: impl Fn(usize), - ) -> PrunerResult + ) -> Result<(usize, usize), PrunerError> where T: Table, T::Key: AsRef>, { let mut processed = 0; + let mut deleted = 0; let mut cursor = provider.tx_ref().cursor_write::()?; // Prune history table: @@ -665,6 +841,7 @@ impl Pruner { // completely. if key.as_ref().highest_block_number <= to_block { cursor.delete_current()?; + deleted += 1; if key.as_ref().highest_block_number == to_block { // Shard contains only block numbers up to the target one, so we can skip to // the last shard for this key. It is guaranteed that further shards for this @@ -694,6 +871,7 @@ impl Pruner { // has previous shards, replace it with the previous shard. Some((prev_key, prev_value)) if key_matches(&prev_key, &key) => { cursor.delete_current()?; + deleted += 1; // Upsert will replace the last shard for this sharded key with // the previous value. cursor.upsert(key.clone(), prev_value)?; @@ -708,6 +886,7 @@ impl Pruner { } // Delete shard. cursor.delete_current()?; + deleted += 1; } } } @@ -715,6 +894,7 @@ impl Pruner { // just delete it. else { cursor.delete_current()?; + deleted += 1; } } else { cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?; @@ -728,17 +908,9 @@ impl Pruner { } processed += 1; - - if processed % batch_size == 0 { - batch_callback(batch_size); - } } - if processed % batch_size != 0 { - batch_callback(processed % batch_size); - } - - Ok(()) + Ok((processed, deleted)) } } @@ -746,17 +918,26 @@ impl Pruner { mod tests { use crate::{pruner::BatchSizes, Pruner}; use assert_matches::assert_matches; - use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList}; + use itertools::{ + FoldWhile::{Continue, Done}, + Itertools, + }; + use reth_db::{ + cursor::DbCursorRO, tables, test_utils::create_test_rw_db, transaction::DbTx, + BlockNumberList, + }; use reth_interfaces::test_utils::{ generators, generators::{ - random_block_range, random_changeset_range, random_eoa_account_range, random_receipt, + random_block_range, random_changeset_range, random_eoa_account, + random_eoa_account_range, random_log, random_receipt, }, }; use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET, + BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, ReceiptsLogPruneConfig, + TxNumber, H256, MAINNET, }; - use reth_provider::PruneCheckpointReader; + use reth_provider::{PruneCheckpointReader, TransactionsProvider}; use reth_stages::test_utils::TestTransaction; use std::{collections::BTreeMap, ops::AddAssign}; @@ -819,24 +1000,60 @@ mod tests { }, ); + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(PrunePart::Receipts) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let last_pruned_tx_number = blocks + .iter() + .map(|block| block.body.len()) + .sum::() + .min(next_tx_number_to_prune as usize + pruner.batch_sizes.receipts - 1); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); - assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(())); + let result = pruner.prune_receipts(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -879,27 +1096,59 @@ mod tests { }, ); + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(PrunePart::TransactionLookup) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let last_pruned_tx_number = + blocks.iter().map(|block| block.body.len()).sum::().min( + next_tx_number_to_prune as usize + pruner.batch_sizes.transaction_lookup - 1, + ); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); - assert_matches!( - pruner.prune_transaction_lookup(&provider, to_block, prune_mode), - Ok(()) - ); + let result = pruner.prune_transaction_lookup(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -945,27 +1194,59 @@ mod tests { }, ); + let next_tx_number_to_prune = tx + .inner() + .get_prune_checkpoint(PrunePart::SenderRecovery) + .unwrap() + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) + .unwrap_or_default(); + + let last_pruned_tx_number = + blocks.iter().map(|block| block.body.len()).sum::().min( + next_tx_number_to_prune as usize + pruner.batch_sizes.transaction_senders - 1, + ); + + let last_pruned_block_number = blocks + .iter() + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count > last_pruned_tx_number { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); - assert_matches!( - pruner.prune_transaction_senders(&provider, to_block, prune_mode), - Ok(()) - ); + let result = pruner.prune_transaction_senders(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -974,8 +1255,7 @@ mod tests { let tx = TestTransaction::default(); let mut rng = generators::rng(); - let block_num = 7000; - let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + let blocks = random_block_range(&mut rng, 0..=7000, H256::zero(), 0..1); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); let accounts = @@ -1007,7 +1287,7 @@ mod tests { let original_shards = tx.table::().unwrap(); - let test_prune = |to_block: BlockNumber| { + let test_prune = |to_block: BlockNumber, run: usize, expect_done: bool| { let prune_mode = PruneMode::Before(to_block); let pruner = Pruner::new( tx.inner_raw(), @@ -1016,29 +1296,68 @@ mod tests { PruneModes { account_history: Some(prune_mode), ..Default::default() }, BatchSizes { // Less than total amount of blocks to prune to test the batching logic - account_history: 10, + account_history: 2000, ..Default::default() }, ); let provider = tx.inner_rw(); - assert_matches!(pruner.prune_account_history(&provider, to_block, prune_mode), Ok(())); + let result = pruner.prune_account_history(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + assert_eq!(done, expect_done); provider.commit().expect("commit"); + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.into_iter().map(move |change| (block_number, change)) + }) + .collect::>(); + + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _))| { + *i < pruner.batch_sizes.account_history * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _)| if done { *block_number } else { block_number.saturating_sub(1) } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = + pruned_changesets.fold(BTreeMap::new(), |mut acc, (block_number, change)| { + acc.entry(block_number).or_insert_with(Vec::new).push(change); + acc + }); + assert_eq!( tx.table::().unwrap().len(), - changesets[to_block as usize + 1..].iter().flatten().count() + pruned_changesets.values().flatten().count() ); let actual_shards = tx.table::().unwrap(); let expected_shards = original_shards .iter() - .filter(|(key, _)| key.highest_block_number > to_block) + .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= to_block as usize) + .skip_while(|block| *block <= last_pruned_block_number as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -1048,15 +1367,17 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) ); }; - // Prune first time: no previous checkpoint is present - test_prune(3000); - // Prune second time: previous checkpoint is present, should continue pruning from where - // ended last time - test_prune(4500); + test_prune(1700, 1, false); + test_prune(1700, 2, true); + test_prune(2000, 3, true); } #[test] @@ -1064,8 +1385,7 @@ mod tests { let tx = TestTransaction::default(); let mut rng = generators::rng(); - let block_num = 7000; - let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + let blocks = random_block_range(&mut rng, 0..=7000, H256::zero(), 0..1); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); let accounts = @@ -1097,7 +1417,7 @@ mod tests { let original_shards = tx.table::().unwrap(); - let test_prune = |to_block: BlockNumber| { + let test_prune = |to_block: BlockNumber, run: usize, expect_done: bool| { let prune_mode = PruneMode::Before(to_block); let pruner = Pruner::new( tx.inner_raw(), @@ -1106,33 +1426,72 @@ mod tests { PruneModes { storage_history: Some(prune_mode), ..Default::default() }, BatchSizes { // Less than total amount of blocks to prune to test the batching logic - storage_history: 10, + storage_history: 2000, ..Default::default() }, ); let provider = tx.inner_rw(); - assert_matches!(pruner.prune_storage_history(&provider, to_block, prune_mode), Ok(())); + let result = pruner.prune_storage_history(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + assert_eq!(done, expect_done); provider.commit().expect("commit"); + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.into_iter().flat_map(move |(address, _, entries)| { + entries.into_iter().map(move |entry| (block_number, address, entry)) + }) + }) + .collect::>(); + + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _, _))| { + *i < pruner.batch_sizes.storage_history * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) + .unwrap_or_default(); + + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _, _)| if done { *block_number } else { block_number.saturating_sub(1) } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = pruned_changesets.fold( + BTreeMap::new(), + |mut acc, (block_number, address, entry)| { + acc.entry((block_number, address)).or_insert_with(Vec::new).push(entry); + acc + }, + ); + assert_eq!( tx.table::().unwrap().len(), - changesets[to_block as usize + 1..] - .iter() - .flatten() - .flat_map(|(_, _, entries)| entries) - .count() + pruned_changesets.values().flatten().count() ); let actual_shards = tx.table::().unwrap(); let expected_shards = original_shards .iter() - .filter(|(key, _)| key.sharded_key.highest_block_number > to_block) + .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= to_block as usize) + .skip_while(|block| *block <= last_pruned_block_number as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -1142,14 +1501,116 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) ); }; - // Prune first time: no previous checkpoint is present - test_prune(3000); - // Prune second time: previous checkpoint is present, should continue pruning from where - // ended last time - test_prune(4500); + test_prune(2300, 1, false); + test_prune(2300, 2, true); + test_prune(3000, 3, true); + } + + #[test] + fn prune_receipts_by_logs() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let tip = 300; + let blocks = random_block_range(&mut rng, 0..=tip, H256::zero(), 1..5); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut receipts = Vec::new(); + + let (deposit_contract_addr, _) = random_eoa_account(&mut rng); + for block in &blocks { + assert!(!block.body.is_empty()); + for (txi, transaction) in block.body.iter().enumerate() { + let mut receipt = random_receipt(&mut rng, transaction, Some(1)); + receipt.logs.push(random_log( + &mut rng, + if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None }, + Some(1), + )); + receipts.push((receipts.len() as u64, receipt)); + } + } + tx.insert_receipts(receipts).expect("insert receipts"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let run_prune = || { + let provider = tx.inner_rw(); + + let prune_before_block: usize = 20; + let prune_mode = PruneMode::Before(prune_before_block as u64); + let receipts_log_filter = + ReceiptsLogPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + PruneModes { + receipts_log_filter: receipts_log_filter.clone(), + ..Default::default() + }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + receipts: 10, + ..Default::default() + }, + ); + + let result = pruner.prune_receipts_by_logs(&provider, tip); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + provider.commit().expect("commit"); + + let (pruned_block, pruned_tx) = tx + .inner() + .get_prune_checkpoint(PrunePart::ContractLogs) + .unwrap() + .and_then(|checkpoint| { + Some((checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap())) + }) + .unwrap_or_default(); + + // All receipts are in the end of the block + let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() - + ((pruned_tx + 1) - unprunable) as usize + ); + + return done + }; + + while !run_prune() {} + + let provider = tx.inner(); + let mut cursor = provider.tx_ref().cursor_read::().unwrap(); + let walker = cursor.walk(None).unwrap(); + for receipt in walker { + let (tx_num, receipt) = receipt.unwrap(); + + // Either we only find our contract, or the receipt is part of the unprunable receipts + // set by tip - 128 + assert!( + receipt.logs.iter().any(|l| l.address == deposit_contract_addr) || + provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128, + ); + } } } diff --git a/crates/revm/src/executor.rs b/crates/revm/src/executor.rs index 990e674bb7c5..6ba25f4192aa 100644 --- a/crates/revm/src/executor.rs +++ b/crates/revm/src/executor.rs @@ -251,6 +251,13 @@ where // append gas used cumulative_gas_used += result.gas_used(); + tracing::trace!( + target: "revm::executor", + hash = ?transaction.hash, + gas_used = result.gas_used(), + "transaction executed" + ); + // Push transaction changeset and calculate header bloom filter for receipt. post_state.add_receipt( block.number, diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 9f72c69d6e3b..002f0b6708a9 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -212,11 +212,7 @@ fn stage_checkpoint( ) -> Result { let pruned_entries = provider .get_prune_checkpoint(PrunePart::SenderRecovery)? - .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) - .transpose()? - .flatten() - // +1 is needed because TxNumber is 0-indexed - .map(|body| body.last_tx_num() + 1) + .and_then(|checkpoint| checkpoint.tx_number) .unwrap_or_default(); Ok(EntitiesCheckpoint { // If `TxSenders` table was pruned, we will have a number of entries in it not matching @@ -409,7 +405,13 @@ mod tests { .save_prune_checkpoint( PrunePart::SenderRecovery, PruneCheckpoint { - block_number: max_pruned_block as BlockNumber, + block_number: Some(max_pruned_block), + tx_number: Some( + blocks[..=max_pruned_block as usize] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + ), prune_mode: PruneMode::Full, }, ) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 211266d45d81..65f5772b74ee 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -13,7 +13,7 @@ use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, PrunePart, TransactionSignedNoHash, TxNumber, H256, }; -use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader}; +use reth_provider::{DatabaseProviderRW, PruneCheckpointReader}; use tokio::sync::mpsc; use tracing::*; @@ -186,11 +186,7 @@ fn stage_checkpoint( ) -> Result { let pruned_entries = provider .get_prune_checkpoint(PrunePart::TransactionLookup)? - .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) - .transpose()? - .flatten() - // +1 is needed because TxNumber is 0-indexed - .map(|body| body.last_tx_num() + 1) + .and_then(|checkpoint| checkpoint.tx_number) .unwrap_or_default(); Ok(EntitiesCheckpoint { // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching @@ -365,7 +361,13 @@ mod tests { .save_prune_checkpoint( PrunePart::TransactionLookup, PruneCheckpoint { - block_number: max_pruned_block as BlockNumber, + block_number: Some(max_pruned_block), + tx_number: Some( + blocks[..=max_pruned_block as usize] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + ), prune_mode: PruneMode::Full, }, ) diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 506cc59ebab3..b45417909b95 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -111,14 +111,18 @@ impl ProviderFactory { // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune part. - if let Some(prune_checkpoint) = account_history_prune_checkpoint { + if let Some(prune_checkpoint_block_number) = + account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number) + { state_provider = state_provider.with_lowest_available_account_history_block_number( - prune_checkpoint.block_number + 1, + prune_checkpoint_block_number + 1, ); } - if let Some(prune_checkpoint) = storage_history_prune_checkpoint { + if let Some(prune_checkpoint_block_number) = + storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number) + { state_provider = state_provider.with_lowest_available_storage_history_block_number( - prune_checkpoint.block_number + 1, + prune_checkpoint_block_number + 1, ); } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index b013ee697737..85c497cc4432 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -17,7 +17,7 @@ use reth_db::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, }, - table::Table, + table::{Table, TableRow}, tables, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, @@ -624,85 +624,61 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { } /// Prune the table for the specified pre-sorted key iterator. - /// Returns number of rows pruned. - pub fn prune_table_with_iterator( - &self, - keys: impl IntoIterator, - ) -> std::result::Result { - self.prune_table_with_iterator_in_batches::(keys, usize::MAX, |_| {}, |_| false) - } - - /// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after - /// every `batch_size` pruned rows with number of total rows pruned. - /// - /// `skip_filter` can be used to skip pruning certain elements. /// /// Returns number of rows pruned. - pub fn prune_table_with_iterator_in_batches( + pub fn prune_table_with_iterator( &self, keys: impl IntoIterator, - batch_size: usize, - mut batch_callback: impl FnMut(usize), - skip_filter: impl Fn(&T::Value) -> bool, - ) -> std::result::Result { + limit: usize, + mut delete_callback: impl FnMut(TableRow), + ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; let mut deleted = 0; - for key in keys { - if let Some((_, value)) = cursor.seek_exact(key)? { - if !skip_filter(&value) { - cursor.delete_current()?; - deleted += 1; - } + let mut keys = keys.into_iter(); + for key in &mut keys { + let row = cursor.seek_exact(key.clone())?; + if let Some(row) = row { + cursor.delete_current()?; + deleted += 1; + delete_callback(row); } - if deleted % batch_size == 0 { - batch_callback(deleted); + if deleted == limit { + break } } - if deleted % batch_size != 0 { - batch_callback(deleted); - } - - Ok(deleted) + Ok((deleted, keys.next().is_none())) } - /// Prune the table for the specified key range, calling `chunk_callback` after every - /// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort - /// tables, these numbers will be different as one key can correspond to multiple rows. + /// Prune the table for the specified key range. /// - /// Returns number of rows pruned. - pub fn prune_table_with_range_in_batches( + /// Returns number of total unique keys and total rows pruned pruned. + pub fn prune_table_with_range( &self, - keys: impl RangeBounds, - batch_size: usize, - mut batch_callback: impl FnMut(usize, usize), - ) -> std::result::Result<(), DatabaseError> { + keys: impl RangeBounds + Clone + Debug, + limit: usize, + mut skip_filter: impl FnMut(&TableRow) -> bool, + mut delete_callback: impl FnMut(TableRow), + ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; - let mut walker = cursor.walk_range(keys)?; - let mut deleted_keys = 0; - let mut deleted_rows = 0; - let mut previous_key = None; - - while let Some((key, _)) = walker.next().transpose()? { - walker.delete_current()?; - deleted_rows += 1; - if previous_key.as_ref().map(|previous_key| previous_key != &key).unwrap_or(true) { - deleted_keys += 1; - previous_key = Some(key); - } + let mut walker = cursor.walk_range(keys.clone())?; + let mut deleted = 0; - if deleted_rows % batch_size == 0 { - batch_callback(deleted_keys, deleted_rows); + while let Some(row) = walker.next().transpose()? { + if !skip_filter(&row) { + walker.delete_current()?; + deleted += 1; + delete_callback(row); } - } - if deleted_rows % batch_size != 0 { - batch_callback(deleted_keys, deleted_rows); + if deleted == limit { + break + } } - Ok(()) + Ok((deleted, walker.next().transpose()?.is_none())) } /// Load shard and remove it. If list is empty, last shard was full or