From cf670f267a43ae384aff54f670a749b34e6a035b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 17 Aug 2023 13:13:09 +0100 Subject: [PATCH 01/20] feat(pruner): respect batch size per run --- Cargo.lock | 170 +++---- crates/prune/src/pruner.rs | 457 +++++++++++------- .../src/providers/database/provider.rs | 17 +- 3 files changed, 351 insertions(+), 293 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c22e923952df..dfe06094db21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -119,9 +119,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.0.4" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6748e8def348ed4d14996fa801f4122cd763fff530258cdc03f64b25f89d3a5a" +checksum = "86b8f9420f797f2d9e935edf629310eb938a0d839f984e25327f3c7eed22300c" dependencies = [ "memchr", ] @@ -230,9 +230,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.74" +version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c6f84b74db2535ebae81eede2f39b947dcbf01d093ae5f791e5dd414a1bf289" +checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854" [[package]] name = "aquamarine" @@ -723,7 +723,7 @@ dependencies = [ [[package]] name = "boa_ast" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "bitflags 2.4.0", "boa_interner", @@ -736,7 +736,7 @@ dependencies = [ [[package]] name = "boa_engine" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "bitflags 2.4.0", "boa_ast", @@ -755,7 +755,7 @@ dependencies = [ "num-bigint", "num-integer", "num-traits", - "num_enum 0.7.0", + "num_enum", "once_cell", "pollster", "rand 0.8.5", @@ -774,7 +774,7 @@ dependencies = [ [[package]] name = "boa_gc" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "boa_macros", "boa_profiler", @@ -785,7 +785,7 @@ dependencies = [ [[package]] name = "boa_icu_provider" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "icu_collections", "icu_normalizer", @@ -798,7 +798,7 @@ dependencies = [ [[package]] name = "boa_interner" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "boa_gc", "boa_macros", @@ -813,7 +813,7 @@ dependencies = [ [[package]] name = "boa_macros" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "proc-macro2 1.0.66", "quote 1.0.32", @@ -824,7 +824,7 @@ dependencies = [ [[package]] name = "boa_parser" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" dependencies = [ "bitflags 2.4.0", "boa_ast", @@ -844,7 +844,7 @@ dependencies = [ [[package]] name = "boa_profiler" version = "0.17.0" -source = "git+https://github.com/boa-dev/boa#06bb71aa50e7163f6b38767190ba30177086685f" +source = "git+https://github.com/boa-dev/boa#a3b46545a2a09f9ac81fd83ac6b180934c728f61" [[package]] name = "boyer-moore-magiclen" @@ -1899,9 +1899,9 @@ dependencies = [ [[package]] name = "ed25519" -version = "2.2.2" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60f6d271ca33075c88028be6f04d502853d63a5ece419d269c15315d4fc1cf1d" +checksum = "5fb04eee5d9d907f29e80ee6b0e78f7e2c82342c63e3580d8c4f69d9d5aad963" dependencies = [ "pkcs8", "signature", @@ -2265,7 +2265,7 @@ dependencies = [ "generic-array", "hex", "k256", - "num_enum 0.6.1", + "num_enum", "once_cell", "open-fastrlp", "rand 0.8.5", @@ -2505,9 +2505,9 @@ dependencies = [ [[package]] name = "flate2" -version = "1.0.27" +version = "1.0.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6c98ee8095e9d1dcbf2fcc6d95acccb90d1c81db1e44725c6a984b1dbdfb010" +checksum = "3b9429470923de8e8cbd4d2dc513535400b4b3fef0319fb5c4e1f520a7bef743" dependencies = [ "crc32fast", "miniz_oxide", @@ -3008,9 +3008,9 @@ checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" [[package]] name = "httpdate" -version = "1.0.3" +version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" +checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" [[package]] name = "human_bytes" @@ -3105,7 +3105,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows 0.48.0", + "windows", ] [[package]] @@ -3957,9 +3957,9 @@ dependencies = [ [[package]] name = "metrics-process" -version = "1.0.12" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1c93f6ad342d3f7bc14724147e2dbc6eb6fdbe5a832ace16ea23b73618e8cc17" +checksum = "006271a8019ad7a9a28cfac2cc40e3ee104d54be763c4a0901e228a63f49d706" dependencies = [ "libproc", "mach2", @@ -3967,7 +3967,7 @@ dependencies = [ "once_cell", "procfs", "rlimit", - "windows 0.51.0", + "windows", ] [[package]] @@ -4169,9 +4169,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.4.4" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ba157ca0885411de85d6ca030ba7e2a83a28636056c7c699b07c8b6f7383214" +checksum = "02e0d21255c828d6f128a1e41534206671e8c3ea0c62f32291e808dc82cff17d" dependencies = [ "num-traits", ] @@ -4245,16 +4245,7 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a015b430d3c108a207fd776d2e2196aaf8b1cf8cf93253e3a097ff3085076a1" dependencies = [ - "num_enum_derive 0.6.1", -] - -[[package]] -name = "num_enum" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70bf6736f74634d299d00086f02986875b3c2d924781a6a2cb6c201e73da0ceb" -dependencies = [ - "num_enum_derive 0.7.0", + "num_enum_derive", ] [[package]] @@ -4269,18 +4260,6 @@ dependencies = [ "syn 2.0.28", ] -[[package]] -name = "num_enum_derive" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56ea360eafe1022f7cc56cd7b869ed57330fb2453d0c7831d99b74c65d2f5597" -dependencies = [ - "proc-macro-crate", - "proc-macro2 1.0.66", - "quote 1.0.32", - "syn 2.0.28", -] - [[package]] name = "num_threads" version = "0.1.6" @@ -4469,7 +4448,7 @@ dependencies = [ "libc", "redox_syscall 0.3.5", "smallvec 1.11.0", - "windows-targets 0.48.2", + "windows-targets 0.48.1", ] [[package]] @@ -5141,7 +5120,7 @@ version = "1.9.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81bc1d4caf89fac26a70747fe603c130093b53c773888797a6329091246d651a" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick 1.0.3", "memchr", "regex-automata 0.3.6", "regex-syntax 0.7.4", @@ -5162,7 +5141,7 @@ version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fed1ceff11a1dddaee50c9dc8e4938bd106e9d89ae372f192311e7da498e3b69" dependencies = [ - "aho-corasick 1.0.4", + "aho-corasick 1.0.3", "memchr", "regex-syntax 0.7.4", ] @@ -6334,9 +6313,9 @@ dependencies = [ [[package]] name = "rlimit" -version = "0.10.1" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3560f70f30a0f16d11d01ed078a07740fe6b489667abc7c7b029155d9f21c3d8" +checksum = "f8a29d87a652dc4d43c586328706bb5cdff211f3f39a530f240b53f7221dab8e" dependencies = [ "libc", ] @@ -6754,9 +6733,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.105" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693151e1ac27563d6dbcec9dee9fbd5da8539b20fa14ad3752b2e6d363ace360" +checksum = "076066c5f1078eac5b722a31827a8832fe108bed65dfa75e233c89f8206e976c" dependencies = [ "itoa", "ryu", @@ -7433,18 +7412,18 @@ checksum = "aac81b6fd6beb5884b0cf3321b8117e6e5d47ecb6fc89f414cfdcca8b2fe2dd8" [[package]] name = "thiserror" -version = "1.0.46" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9207952ae1a003f42d3d5e892dac3c6ba42aa6ac0c79a6a91a2b5cb4253e75c" +checksum = "611040a08a0439f8248d1990b111c95baa9c704c805fa1f62104b39655fd7f90" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.46" +version = "1.0.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1728216d3244de4f14f14f8c15c79be1a7c67867d28d69b719690e2a19fb445" +checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2 1.0.66", "quote 1.0.32", @@ -8349,26 +8328,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" dependencies = [ - "windows-targets 0.48.2", -] - -[[package]] -name = "windows" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9763fb813068e9f4ab70a92a0c6ad61ff6b342f693b1ed0e5387c854386e670" -dependencies = [ - "windows-core", - "windows-targets 0.48.2", -] - -[[package]] -name = "windows-core" -version = "0.51.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b81650771e76355778637954dc9d7eb8d991cd89ad64ba26f21eeb3c22d8d836" -dependencies = [ - "windows-targets 0.48.2", + "windows-targets 0.48.1", ] [[package]] @@ -8386,7 +8346,7 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets 0.48.2", + "windows-targets 0.48.1", ] [[package]] @@ -8406,17 +8366,17 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.48.2" +version = "0.48.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1eeca1c172a285ee6c2c84c341ccea837e7c01b12fbb2d0fe3c9e550ce49ec8" +checksum = "05d4b17490f70499f20b9e791dcf6a299785ce8af4d709018206dc5b4953e95f" dependencies = [ - "windows_aarch64_gnullvm 0.48.2", - "windows_aarch64_msvc 0.48.2", - "windows_i686_gnu 0.48.2", - "windows_i686_msvc 0.48.2", - "windows_x86_64_gnu 0.48.2", - "windows_x86_64_gnullvm 0.48.2", - "windows_x86_64_msvc 0.48.2", + "windows_aarch64_gnullvm 0.48.0", + "windows_aarch64_msvc 0.48.0", + "windows_i686_gnu 0.48.0", + "windows_i686_msvc 0.48.0", + "windows_x86_64_gnu 0.48.0", + "windows_x86_64_gnullvm 0.48.0", + "windows_x86_64_msvc 0.48.0", ] [[package]] @@ -8427,9 +8387,9 @@ checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" [[package]] name = "windows_aarch64_gnullvm" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b10d0c968ba7f6166195e13d593af609ec2e3d24f916f081690695cf5eaffb2f" +checksum = "91ae572e1b79dba883e0d315474df7305d12f569b400fcf90581b06062f7e1bc" [[package]] name = "windows_aarch64_msvc" @@ -8439,9 +8399,9 @@ checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" [[package]] name = "windows_aarch64_msvc" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "571d8d4e62f26d4932099a9efe89660e8bd5087775a2ab5cdd8b747b811f1058" +checksum = "b2ef27e0d7bdfcfc7b868b317c1d32c641a6fe4629c171b8928c7b08d98d7cf3" [[package]] name = "windows_i686_gnu" @@ -8451,9 +8411,9 @@ checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" [[package]] name = "windows_i686_gnu" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2229ad223e178db5fbbc8bd8d3835e51e566b8474bfca58d2e6150c48bb723cd" +checksum = "622a1962a7db830d6fd0a69683c80a18fda201879f0f447f065a3b7467daa241" [[package]] name = "windows_i686_msvc" @@ -8463,9 +8423,9 @@ checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" [[package]] name = "windows_i686_msvc" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "600956e2d840c194eedfc5d18f8242bc2e17c7775b6684488af3a9fff6fe3287" +checksum = "4542c6e364ce21bf45d69fdd2a8e455fa38d316158cfd43b3ac1c5b1b19f8e00" [[package]] name = "windows_x86_64_gnu" @@ -8475,9 +8435,9 @@ checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" [[package]] name = "windows_x86_64_gnu" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea99ff3f8b49fb7a8e0d305e5aec485bd068c2ba691b6e277d29eaeac945868a" +checksum = "ca2b8a661f7628cbd23440e50b05d705db3686f894fc9580820623656af974b1" [[package]] name = "windows_x86_64_gnullvm" @@ -8487,9 +8447,9 @@ checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" [[package]] name = "windows_x86_64_gnullvm" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f1a05a1ece9a7a0d5a7ccf30ba2c33e3a61a30e042ffd247567d1de1d94120d" +checksum = "7896dbc1f41e08872e9d5e8f8baa8fdd2677f29468c4e156210174edc7f7b953" [[package]] name = "windows_x86_64_msvc" @@ -8499,15 +8459,15 @@ checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" [[package]] name = "windows_x86_64_msvc" -version = "0.48.2" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d419259aba16b663966e29e6d7c6ecfa0bb8425818bb96f6f1f3c3eb71a6e7b9" +checksum = "1a515f5799fe4961cb532f983ce2b23082366b898e52ffbce459c86f67c8378a" [[package]] name = "winnow" -version = "0.5.11" +version = "0.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e461589e194280efaa97236b73623445efa195aa633fd7004f39805707a9d53" +checksum = "5504cc7644f4b593cbc05c4a55bf9bd4e94b867c3c0bd440934174d50482427d" dependencies = [ "memchr", ] diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e39cd5b4c29d..1fa10ca3c576 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -8,7 +8,7 @@ use reth_db::{ models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey}, table::Table, tables, - transaction::DbTxMut, + transaction::{DbTx, DbTxMut}, BlockNumberList, }; use reth_primitives::{ @@ -28,21 +28,26 @@ pub type PrunerResult = Result<(), PrunerError>; pub type PrunerWithResult = (Pruner, PrunerResult); pub struct BatchSizes { + // Measured in transactions receipts: usize, + // Measured in transactions transaction_lookup: usize, + // Measured in transactions transaction_senders: usize, + // Measured in blocks account_history: usize, + // Measured in blocks storage_history: usize, } impl Default for BatchSizes { fn default() -> Self { Self { - receipts: 10000, - transaction_lookup: 10000, - transaction_senders: 10000, - account_history: 10000, - storage_history: 10000, + receipts: 100, + transaction_lookup: 100, + transaction_senders: 100, + account_history: 100, + storage_history: 100, } } } @@ -182,6 +187,37 @@ impl Pruner { } } + /// Get next inclusive block range to prune according to the checkpoint, `to_block` block + /// number and `limit`. + /// + /// To get the range start (`from_block`): + /// 1. If checkpoint exists, use next block. + /// 2. If checkpoint doesn't exist, use block 0. + /// + /// To get the range end: use block `min(to_block, from_block + limit - 1)`. + fn get_next_block_range_from_checkpoint( + &self, + provider: &DatabaseProviderRW<'_, DB>, + prune_part: PrunePart, + to_block: BlockNumber, + limit: usize, + ) -> reth_interfaces::Result, bool)>> { + let from_block = provider + .get_prune_checkpoint(prune_part)? + // Checkpoint exists, prune from the next block after the highest pruned one + .map(|checkpoint| checkpoint.block_number + 1) + // No checkpoint exists, prune from genesis + .unwrap_or(0); + + let range = from_block..=to_block.min(from_block + limit as u64 - 1); + if range.is_empty() { + return Ok(None) + } + + let is_final_range = *range.end() == to_block; + Ok(Some((range, is_final_range))) + } + /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block /// number. /// @@ -195,7 +231,10 @@ impl Pruner { provider: &DatabaseProviderRW<'_, DB>, prune_part: PrunePart, to_block: BlockNumber, - ) -> reth_interfaces::Result>> { + limit: usize, + ) -> reth_interfaces::Result< + Option<(RangeInclusive, RangeInclusive, bool)>, + > { let from_block_number = provider .get_prune_checkpoint(prune_part)? // Checkpoint exists, prune from the next block after the highest pruned one @@ -210,13 +249,36 @@ impl Pruner { // the latest block, so there's no thing to prune now. let Some(from_tx_num) = from_tx_num else { return Ok(None) }; - let to_tx_num = match provider.block_body_indices(to_block)? { - Some(body) => body, - None => return Ok(None), + let mut to_block_number = 0; + let mut tx_count = 0; + for entry in provider + .tx_ref() + .cursor_read::()? + .walk_range(from_block_number..=to_block)? + { + let (block_number, block_body) = entry?; + to_block_number = block_number; + tx_count += block_body.tx_count; + + if tx_count >= limit as u64 { + break + } + } + + let to_tx_num = from_tx_num.saturating_add(tx_count).saturating_sub(1); + + let block_range = from_block_number..=to_block_number; + if block_range.is_empty() { + return Ok(None) + } + + let tx_range = from_tx_num..=to_tx_num; + if tx_range.is_empty() { + return Ok(None) } - .last_tx_num(); - Ok(Some(from_tx_num..=to_tx_num)) + let is_final_range = *block_range.end() == to_block; + Ok(Some((block_range, tx_range, is_final_range))) } /// Prune receipts up to the provided block, inclusive. @@ -227,10 +289,11 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::Receipts, to_block, + self.batch_sizes.receipts, )? { Some(range) => range, None => { @@ -238,24 +301,13 @@ impl Pruner { return Ok(()) } }; - let total = range.clone().count(); - provider.prune_table_with_iterator_in_batches::( - range, - self.batch_sizes.receipts, - |rows| { - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * rows as f64 / total as f64), - "Pruned receipts" - ); - }, - )?; + let rows = provider.prune_table_with_iterator::(tx_range)?; + trace!(target: "pruner", %rows, %done, "Pruned receipts"); provider.save_prune_checkpoint( PrunePart::Receipts, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { block_number: *block_range.end(), prune_mode }, )?; Ok(()) @@ -269,10 +321,11 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::TransactionLookup, to_block, + self.batch_sizes.transaction_lookup, )? { Some(range) => range, None => { @@ -280,46 +333,31 @@ impl Pruner { return Ok(()) } }; - let last_tx_num = *range.end(); - let total = range.clone().count(); - let mut processed = 0; - - for i in range.step_by(self.batch_sizes.transaction_lookup) { - // The `min` ensures that the transaction range doesn't exceed the last transaction - // number. `last_tx_num + 1` is used to include the last transaction in the range. - let tx_range = i..(i + self.batch_sizes.transaction_lookup as u64).min(last_tx_num + 1); - // Retrieve transactions in the range and calculate their hashes in parallel - let mut hashes = provider - .transactions_by_tx_range(tx_range.clone())? - .into_par_iter() - .map(|transaction| transaction.hash()) - .collect::>(); + // Retrieve transactions in the range and calculate their hashes in parallel + let mut hashes = provider + .transactions_by_tx_range(tx_range.clone())? + .into_par_iter() + .map(|transaction| transaction.hash()) + .collect::>(); + + // Number of transactions retrieved from the database should match the tx range count + let tx_count = tx_range.count(); + if hashes.len() != tx_count { + return Err(PrunerError::InconsistentData( + "Unexpected number of transaction hashes retrieved by transaction number range", + )) + } - // Number of transactions retrieved from the database should match the tx range count - let tx_count = tx_range.clone().count(); - if hashes.len() != tx_count { - return Err(PrunerError::InconsistentData( - "Unexpected number of transaction hashes retrieved by transaction number range", - )) - } + // Pre-sort hashes to prune them in order + hashes.sort_unstable(); - // Pre-sort hashes to prune them in order - hashes.sort_unstable(); - - let rows = provider.prune_table_with_iterator::(hashes)?; - processed += rows; - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * processed as f64 / total as f64), - "Pruned transaction lookup" - ); - } + let rows = provider.prune_table_with_iterator::(hashes)?; + trace!(target: "pruner", %rows, %done, "Pruned transaction lookup"); provider.save_prune_checkpoint( PrunePart::TransactionLookup, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { block_number: *block_range.end(), prune_mode }, )?; Ok(()) @@ -333,10 +371,11 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let range = match self.get_next_tx_num_range_from_checkpoint( + let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::SenderRecovery, to_block, + self.batch_sizes.transaction_senders, )? { Some(range) => range, None => { @@ -344,24 +383,13 @@ impl Pruner { return Ok(()) } }; - let total = range.clone().count(); - provider.prune_table_with_range_in_batches::( - range, - self.batch_sizes.transaction_senders, - |rows, _| { - trace!( - target: "pruner", - %rows, - progress = format!("{:.1}%", 100.0 * rows as f64 / total as f64), - "Pruned transaction senders" - ); - }, - )?; + let (_, rows) = provider.prune_table_with_range::(tx_range)?; + trace!(target: "pruner", %rows, %done, "Pruned transaction senders"); provider.save_prune_checkpoint( PrunePart::SenderRecovery, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { block_number: *block_range.end(), prune_mode }, )?; Ok(()) @@ -375,45 +403,34 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let from_block = provider - .get_prune_checkpoint(PrunePart::AccountHistory)? - .map(|checkpoint| checkpoint.block_number + 1) - .unwrap_or_default(); - let range = from_block..=to_block; - let total = range.clone().count(); - - provider.prune_table_with_range_in_batches::( - range, + let (range, done) = match self.get_next_block_range_from_checkpoint( + provider, + PrunePart::AccountHistory, + to_block, self.batch_sizes.account_history, - |keys, rows| { - trace!( - target: "pruner", - %keys, - %rows, - progress = format!("{:.1}%", 100.0 * keys as f64 / total as f64), - "Pruned account history (changesets)" - ); - }, - )?; + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No acount history to prune"); + return Ok(()) + } + }; + let range_end = *range.end(); + + let (keys, rows) = provider.prune_table_with_range::(range)?; + trace!(target: "pruner", %keys, %rows, %done, "Pruned account history (changesets)"); - self.prune_history_indices::( + let rows = self.prune_history_indices::( provider, - to_block, + range_end, |a, b| a.key == b.key, |key| ShardedKey::last(key.key), - self.batch_sizes.account_history, - |rows| { - trace!( - target: "pruner", - rows, - "Pruned account history (indices)" - ); - }, )?; + trace!(target: "pruner", %rows, %done, "Pruned account history (history)" ); provider.save_prune_checkpoint( PrunePart::AccountHistory, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { block_number: range_end, prune_mode }, )?; Ok(()) @@ -427,44 +444,35 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let from_block = provider - .get_prune_checkpoint(PrunePart::StorageHistory)? - .map(|checkpoint| checkpoint.block_number + 1) - .unwrap_or_default(); - let block_range = from_block..=to_block; - let range = BlockNumberAddress::range(block_range); - - provider.prune_table_with_range_in_batches::( - range, + let (range, done) = match self.get_next_block_range_from_checkpoint( + provider, + PrunePart::StorageHistory, + to_block, self.batch_sizes.storage_history, - |keys, rows| { - trace!( - target: "pruner", - %keys, - %rows, - "Pruned storage history (changesets)" - ); - }, - )?; + )? { + Some(range) => range, + None => { + trace!(target: "pruner", "No storage history to prune"); + return Ok(()) + } + }; + let range_end = *range.end(); + + let (keys, rows) = provider + .prune_table_with_range::(BlockNumberAddress::range(range))?; + trace!(target: "pruner", %keys, %rows, "Pruned storage history (changesets)"); self.prune_history_indices::( provider, - to_block, + range_end, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), - self.batch_sizes.storage_history, - |rows| { - trace!( - target: "pruner", - rows, - "Pruned storage history (indices)" - ); - }, )?; + trace!(target: "pruner", %rows, %done, "Pruned storage history (history)" ); provider.save_prune_checkpoint( PrunePart::StorageHistory, - PruneCheckpoint { block_number: to_block, prune_mode }, + PruneCheckpoint { block_number: range_end, prune_mode }, )?; Ok(()) @@ -477,9 +485,7 @@ impl Pruner { to_block: BlockNumber, key_matches: impl Fn(&T::Key, &T::Key) -> bool, last_key: impl Fn(&T::Key) -> T::Key, - batch_size: usize, - batch_callback: impl Fn(usize), - ) -> PrunerResult + ) -> Result where T: Table, T::Key: AsRef>, @@ -557,17 +563,9 @@ impl Pruner { } processed += 1; - - if processed % batch_size == 0 { - batch_callback(batch_size); - } } - if processed % batch_size != 0 { - batch_callback(processed % batch_size); - } - - Ok(()) + Ok(processed) } } @@ -575,6 +573,10 @@ impl Pruner { mod tests { use crate::{pruner::BatchSizes, Pruner}; use assert_matches::assert_matches; + use itertools::{ + FoldWhile::{Continue, Done}, + Itertools, + }; use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList}; use reth_interfaces::test_utils::{ generators, @@ -648,24 +650,47 @@ mod tests { }, ); + let previous_checkpoint_block_number = tx + .inner() + .get_prune_checkpoint(PrunePart::Receipts) + .unwrap() + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + + let actual_to_block = blocks + .iter() + .skip(previous_checkpoint_block_number as usize) + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count >= pruner.batch_sizes.receipts { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(())); provider.commit().expect("commit"); assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks[actual_to_block as usize + 1..] + .iter() + .map(|block| block.body.len()) + .sum::(), ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -708,6 +733,28 @@ mod tests { }, ); + let previous_checkpoint_block_number = tx + .inner() + .get_prune_checkpoint(PrunePart::TransactionLookup) + .unwrap() + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + + let actual_to_block = blocks + .iter() + .skip(previous_checkpoint_block_number as usize) + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count >= pruner.batch_sizes.transaction_lookup { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); assert_matches!( pruner.prune_transaction_lookup(&provider, to_block, prune_mode), @@ -717,18 +764,19 @@ mod tests { assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks[actual_to_block as usize + 1..] + .iter() + .map(|block| block.body.len()) + .sum::() ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -774,6 +822,28 @@ mod tests { }, ); + let previous_checkpoint_block_number = tx + .inner() + .get_prune_checkpoint(PrunePart::SenderRecovery) + .unwrap() + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + + let actual_to_block = blocks + .iter() + .skip(previous_checkpoint_block_number as usize) + .fold_while((0, 0), |(_, mut tx_count), block| { + tx_count += block.body.len(); + + if tx_count >= pruner.batch_sizes.transaction_senders { + Done((block.number, tx_count)) + } else { + Continue((block.number, tx_count)) + } + }) + .into_inner() + .0; + let provider = tx.inner_rw(); assert_matches!( pruner.prune_transaction_senders(&provider, to_block, prune_mode), @@ -783,18 +853,19 @@ mod tests { assert_eq!( tx.table::().unwrap().len(), - blocks[to_block as usize + 1..].iter().map(|block| block.body.len()).sum::() + blocks[actual_to_block as usize + 1..] + .iter() + .map(|block| block.body.len()) + .sum::() ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) ); }; - // Pruning first time ever, no previous checkpoint is present - test_prune(10); - // Prune second time, previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(15); + test_prune(15); test_prune(20); } @@ -803,8 +874,7 @@ mod tests { let tx = TestTransaction::default(); let mut rng = generators::rng(); - let block_num = 7000; - let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + let blocks = random_block_range(&mut rng, 0..=7000, H256::zero(), 0..1); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); let accounts = @@ -845,29 +915,40 @@ mod tests { PruneModes { account_history: Some(prune_mode), ..Default::default() }, BatchSizes { // Less than total amount of blocks to prune to test the batching logic - account_history: 10, + account_history: 2000, ..Default::default() }, ); + let previous_checkpoint_block_number = tx + .inner() + .get_prune_checkpoint(PrunePart::AccountHistory) + .unwrap() + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + + let actual_to_block = to_block.min( + previous_checkpoint_block_number + pruner.batch_sizes.account_history as u64 - 1, + ); + let provider = tx.inner_rw(); assert_matches!(pruner.prune_account_history(&provider, to_block, prune_mode), Ok(())); provider.commit().expect("commit"); assert_eq!( tx.table::().unwrap().len(), - changesets[to_block as usize + 1..].iter().flatten().count() + changesets[actual_to_block as usize + 1..].iter().flatten().count() ); let actual_shards = tx.table::().unwrap(); let expected_shards = original_shards .iter() - .filter(|(key, _)| key.highest_block_number > to_block) + .filter(|(key, _)| key.highest_block_number > actual_to_block) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= to_block as usize) + .skip_while(|block| *block <= actual_to_block as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -877,14 +958,12 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) ); }; - // Prune first time: no previous checkpoint is present test_prune(3000); - // Prune second time: previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(3000); test_prune(4500); } @@ -893,8 +972,7 @@ mod tests { let tx = TestTransaction::default(); let mut rng = generators::rng(); - let block_num = 7000; - let blocks = random_block_range(&mut rng, 0..=block_num, H256::zero(), 0..1); + let blocks = random_block_range(&mut rng, 0..=7000, H256::zero(), 0..1); tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); let accounts = @@ -935,18 +1013,29 @@ mod tests { PruneModes { storage_history: Some(prune_mode), ..Default::default() }, BatchSizes { // Less than total amount of blocks to prune to test the batching logic - storage_history: 10, + storage_history: 3000, ..Default::default() }, ); + let previous_checkpoint_block_number = tx + .inner() + .get_prune_checkpoint(PrunePart::StorageHistory) + .unwrap() + .map(|checkpoint| checkpoint.block_number + 1) + .unwrap_or_default(); + + let actual_to_block = to_block.min( + previous_checkpoint_block_number + pruner.batch_sizes.storage_history as u64 - 1, + ); + let provider = tx.inner_rw(); assert_matches!(pruner.prune_storage_history(&provider, to_block, prune_mode), Ok(())); provider.commit().expect("commit"); assert_eq!( tx.table::().unwrap().len(), - changesets[to_block as usize + 1..] + changesets[actual_to_block as usize + 1..] .iter() .flatten() .flat_map(|(_, _, entries)| entries) @@ -957,11 +1046,11 @@ mod tests { let expected_shards = original_shards .iter() - .filter(|(key, _)| key.sharded_key.highest_block_number > to_block) + .filter(|(key, _)| key.sharded_key.highest_block_number > actual_to_block) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= to_block as usize) + .skip_while(|block| *block <= actual_to_block as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -971,14 +1060,12 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(), - Some(PruneCheckpoint { block_number: to_block, prune_mode }) + Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) ); }; - // Prune first time: no previous checkpoint is present test_prune(3000); - // Prune second time: previous checkpoint is present, should continue pruning from where - // ended last time + test_prune(3000); test_prune(4500); } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 1d80bf4662f8..9627cb081895 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -624,6 +624,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { } /// Prune the table for the specified pre-sorted key iterator. + /// /// Returns number of rows pruned. pub fn prune_table_with_iterator( &self, @@ -663,17 +664,27 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { Ok(deleted) } + /// Prune the table for the specified key range. + /// + /// Returns number of total unique keys and total rows pruned pruned. + pub fn prune_table_with_range( + &self, + keys: impl RangeBounds, + ) -> std::result::Result<(usize, usize), DatabaseError> { + self.prune_table_with_range_in_batches::(keys, usize::MAX, |_, _| {}) + } + /// Prune the table for the specified key range, calling `chunk_callback` after every /// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort /// tables, these numbers will be different as one key can correspond to multiple rows. /// - /// Returns number of rows pruned. + /// Returns number of total unique keys and total rows pruned pruned. pub fn prune_table_with_range_in_batches( &self, keys: impl RangeBounds, batch_size: usize, mut batch_callback: impl FnMut(usize, usize), - ) -> std::result::Result<(), DatabaseError> { + ) -> std::result::Result<(usize, usize), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; let mut walker = cursor.walk_range(keys)?; let mut deleted_keys = 0; @@ -697,7 +708,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { batch_callback(deleted_keys, deleted_rows); } - Ok(()) + Ok((deleted_keys, deleted_rows)) } /// Load shard and remove it. If list is empty, last shard was full or From 689d2677191f4fae3c50101ee12d3af50d61665b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 14:26:26 +0100 Subject: [PATCH 02/20] new batch logic --- .../interfaces/src/test_utils/generators.rs | 5 +- crates/primitives/src/prune/checkpoint.rs | 8 +- crates/prune/src/pruner.rs | 489 +++++++++++------- crates/stages/src/stages/sender_recovery.rs | 6 +- crates/stages/src/stages/tx_lookup.rs | 8 +- .../provider/src/providers/database/mod.rs | 12 +- .../src/providers/database/provider.rs | 83 +-- 7 files changed, 352 insertions(+), 259 deletions(-) diff --git a/crates/interfaces/src/test_utils/generators.rs b/crates/interfaces/src/test_utils/generators.rs index 6b33d9973de8..ae0c0bb0d474 100644 --- a/crates/interfaces/src/test_utils/generators.rs +++ b/crates/interfaces/src/test_utils/generators.rs @@ -239,7 +239,7 @@ where // deposit in receiving account and update storage let (prev_to, storage): &mut (Account, BTreeMap) = state.get_mut(&to).unwrap(); - let old_entries = new_entries + let mut old_entries: Vec<_> = new_entries .into_iter() .filter_map(|entry| { let old = if entry.value != U256::ZERO { @@ -254,9 +254,12 @@ where Some(StorageEntry { value: old.unwrap_or(U256::from(0)), ..entry }) }) .collect(); + old_entries.sort_by_key(|entry| entry.key); changeset.push((to, *prev_to, old_entries)); + changeset.sort_by_key(|(address, _, _)| *address); + prev_to.balance = prev_to.balance.wrapping_add(transfer); changesets.push(changeset); diff --git a/crates/primitives/src/prune/checkpoint.rs b/crates/primitives/src/prune/checkpoint.rs index 52e1cabd76cb..b1800cb4181f 100644 --- a/crates/primitives/src/prune/checkpoint.rs +++ b/crates/primitives/src/prune/checkpoint.rs @@ -1,4 +1,4 @@ -use crate::{prune::PruneMode, BlockNumber}; +use crate::{prune::PruneMode, BlockNumber, TxNumber}; use reth_codecs::{main_codec, Compact}; /// Saves the pruning progress of a stage. @@ -6,8 +6,10 @@ use reth_codecs::{main_codec, Compact}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] #[cfg_attr(test, derive(Default))] pub struct PruneCheckpoint { - /// Highest pruned block number. - pub block_number: BlockNumber, + /// Highest pruned block number, if any. + pub block_number: Option, + /// Highest pruned transaction number, if applicable + pub tx_number: Option, /// Prune mode. pub prune_mode: PruneMode, } diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 1fa10ca3c576..78d987051bf2 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -8,7 +8,7 @@ use reth_db::{ models::{storage_sharded_key::StorageShardedKey, BlockNumberAddress, ShardedKey}, table::Table, tables, - transaction::{DbTx, DbTxMut}, + transaction::DbTxMut, BlockNumberList, }; use reth_primitives::{ @@ -19,10 +19,10 @@ use reth_provider::{ TransactionsProvider, }; use std::{ops::RangeInclusive, sync::Arc, time::Instant}; -use tracing::{debug, instrument, trace}; +use tracing::{debug, error, instrument, trace}; /// Result of [Pruner::run] execution -pub type PrunerResult = Result<(), PrunerError>; +pub type PrunerResult = Result; /// The pipeline type itself with the result of [Pruner::run] pub type PrunerWithResult = (Pruner, PrunerResult); @@ -96,11 +96,13 @@ impl Pruner { let provider = self.provider_factory.provider_rw()?; + let mut done = true; + if let Some((to_block, prune_mode)) = self.modes.prune_target_block_receipts(tip_block_number)? { let part_start = Instant::now(); - self.prune_receipts(&provider, to_block, prune_mode)?; + done = done && self.prune_receipts(&provider, to_block, prune_mode)?; self.metrics .get_prune_part_metrics(PrunePart::Receipts) .duration_seconds @@ -111,7 +113,7 @@ impl Pruner { self.modes.prune_target_block_transaction_lookup(tip_block_number)? { let part_start = Instant::now(); - self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + done = done && self.prune_transaction_lookup(&provider, to_block, prune_mode)?; self.metrics .get_prune_part_metrics(PrunePart::TransactionLookup) .duration_seconds @@ -122,7 +124,7 @@ impl Pruner { self.modes.prune_target_block_sender_recovery(tip_block_number)? { let part_start = Instant::now(); - self.prune_transaction_senders(&provider, to_block, prune_mode)?; + done = done && self.prune_transaction_senders(&provider, to_block, prune_mode)?; self.metrics .get_prune_part_metrics(PrunePart::SenderRecovery) .duration_seconds @@ -133,7 +135,7 @@ impl Pruner { self.modes.prune_target_block_account_history(tip_block_number)? { let part_start = Instant::now(); - self.prune_account_history(&provider, to_block, prune_mode)?; + done = done && self.prune_account_history(&provider, to_block, prune_mode)?; self.metrics .get_prune_part_metrics(PrunePart::AccountHistory) .duration_seconds @@ -144,7 +146,7 @@ impl Pruner { self.modes.prune_target_block_storage_history(tip_block_number)? { let part_start = Instant::now(); - self.prune_storage_history(&provider, to_block, prune_mode)?; + done = done && self.prune_storage_history(&provider, to_block, prune_mode)?; self.metrics .get_prune_part_metrics(PrunePart::StorageHistory) .duration_seconds @@ -163,7 +165,7 @@ impl Pruner { ?elapsed, "Pruner finished" ); - Ok(()) + Ok(done) } /// Returns `true` if the pruning is needed at the provided tip block number. @@ -200,22 +202,21 @@ impl Pruner { provider: &DatabaseProviderRW<'_, DB>, prune_part: PrunePart, to_block: BlockNumber, - limit: usize, - ) -> reth_interfaces::Result, bool)>> { + ) -> reth_interfaces::Result>> { let from_block = provider .get_prune_checkpoint(prune_part)? + .and_then(|checkpoint| checkpoint.block_number) // Checkpoint exists, prune from the next block after the highest pruned one - .map(|checkpoint| checkpoint.block_number + 1) + .map(|block_number| block_number + 1) // No checkpoint exists, prune from genesis .unwrap_or(0); - let range = from_block..=to_block.min(from_block + limit as u64 - 1); + let range = from_block..=to_block; if range.is_empty() { return Ok(None) } - let is_final_range = *range.end() == to_block; - Ok(Some((range, is_final_range))) + Ok(Some(range)) } /// Get next inclusive tx number range to prune according to the checkpoint and `to_block` block @@ -231,57 +232,30 @@ impl Pruner { provider: &DatabaseProviderRW<'_, DB>, prune_part: PrunePart, to_block: BlockNumber, - limit: usize, - ) -> reth_interfaces::Result< - Option<(RangeInclusive, RangeInclusive, bool)>, - > { - let from_block_number = provider + ) -> reth_interfaces::Result>> { + let from_tx_number = provider .get_prune_checkpoint(prune_part)? - // Checkpoint exists, prune from the next block after the highest pruned one - .map(|checkpoint| checkpoint.block_number + 1) + // Checkpoint exists, prune from the next transaction after the highest pruned one + .and_then(|checkpoint| match checkpoint.tx_number { + Some(tx_number) => Some(tx_number + 1), + _ => { + error!(target: "pruner", %prune_part, ?checkpoint, "Expected transaction number in prune checkpoint, found None"); + None + }, + }) // No checkpoint exists, prune from genesis .unwrap_or(0); - // Get first transaction - let from_tx_num = - provider.block_body_indices(from_block_number)?.map(|body| body.first_tx_num); - // If no block body index is found, the DB is either corrupted or we've already pruned up to - // the latest block, so there's no thing to prune now. - let Some(from_tx_num) = from_tx_num else { return Ok(None) }; - - let mut to_block_number = 0; - let mut tx_count = 0; - for entry in provider - .tx_ref() - .cursor_read::()? - .walk_range(from_block_number..=to_block)? - { - let (block_number, block_body) = entry?; - to_block_number = block_number; - tx_count += block_body.tx_count; - - if tx_count >= limit as u64 { - break - } - } - - let to_tx_num = from_tx_num.saturating_add(tx_count).saturating_sub(1); - - let block_range = from_block_number..=to_block_number; - if block_range.is_empty() { - return Ok(None) - } - - let tx_range = from_tx_num..=to_tx_num; - if tx_range.is_empty() { - return Ok(None) + let to_tx_number = match provider.block_body_indices(to_block)? { + Some(body) => body, + None => return Ok(None), } + .last_tx_num(); - let is_final_range = *block_range.end() == to_block; - Ok(Some((block_range, tx_range, is_final_range))) + Ok(Some(from_tx_number..=to_tx_number)) } - /// Prune receipts up to the provided block, inclusive. + /// Prune receipts up to the provided block, inclusive, respecting the batch size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_receipts( &self, @@ -289,28 +263,42 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( + let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::Receipts, to_block, - self.batch_sizes.receipts, )? { Some(range) => range, None => { trace!(target: "pruner", "No receipts to prune"); - return Ok(()) + return Ok(true) } }; + let tx_range_end = *tx_range.end(); - let rows = provider.prune_table_with_iterator::(tx_range)?; + let mut last_pruned_transaction = tx_range_end; + let (rows, done) = provider.prune_table_with_range::( + tx_range, + self.batch_sizes.receipts, + |row| last_pruned_transaction = row.0, + )?; trace!(target: "pruner", %rows, %done, "Pruned receipts"); + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + .checked_sub(if done { 0 } else { 1 }); + provider.save_prune_checkpoint( PrunePart::Receipts, - PruneCheckpoint { block_number: *block_range.end(), prune_mode }, + PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune transaction lookup entries up to the provided block, inclusive. @@ -321,46 +309,59 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( + let (start, end) = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::TransactionLookup, to_block, - self.batch_sizes.transaction_lookup, )? { Some(range) => range, None => { trace!(target: "pruner", "No transaction lookup entries to prune"); - return Ok(()) + return Ok(true) } - }; + } + .into_inner(); + let tx_range = start..=(end.min(start + self.batch_sizes.transaction_lookup as u64 - 1)); + let tx_range_end = *tx_range.end(); // Retrieve transactions in the range and calculate their hashes in parallel - let mut hashes = provider + let hashes = provider .transactions_by_tx_range(tx_range.clone())? .into_par_iter() .map(|transaction| transaction.hash()) .collect::>(); // Number of transactions retrieved from the database should match the tx range count - let tx_count = tx_range.count(); + let tx_count = tx_range.clone().count(); if hashes.len() != tx_count { return Err(PrunerError::InconsistentData( "Unexpected number of transaction hashes retrieved by transaction number range", )) } - // Pre-sort hashes to prune them in order - hashes.sort_unstable(); - - let rows = provider.prune_table_with_iterator::(hashes)?; + let mut last_pruned_transaction = tx_range_end; + let (rows, done) = provider.prune_table_with_iterator::( + hashes, + self.batch_sizes.transaction_lookup, + |row| last_pruned_transaction = row.1, + )?; trace!(target: "pruner", %rows, %done, "Pruned transaction lookup"); + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + .checked_sub(if done { 0 } else { 1 }); + provider.save_prune_checkpoint( PrunePart::TransactionLookup, - PruneCheckpoint { block_number: *block_range.end(), prune_mode }, + PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune transaction senders up to the provided block, inclusive. @@ -371,28 +372,42 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let (block_range, tx_range, done) = match self.get_next_tx_num_range_from_checkpoint( + let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::SenderRecovery, to_block, - self.batch_sizes.transaction_senders, )? { Some(range) => range, None => { trace!(target: "pruner", "No transaction senders to prune"); - return Ok(()) + return Ok(true) } }; + let tx_range_end = *tx_range.end(); - let (_, rows) = provider.prune_table_with_range::(tx_range)?; + let mut last_pruned_transaction = tx_range_end; + let (rows, done) = provider.prune_table_with_range::( + tx_range, + self.batch_sizes.transaction_senders, + |row| last_pruned_transaction = row.0, + )?; trace!(target: "pruner", %rows, %done, "Pruned transaction senders"); + let last_pruned_block = provider + .transaction_block(last_pruned_transaction)? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + .checked_sub(if done { 0 } else { 1 }); + provider.save_prune_checkpoint( PrunePart::SenderRecovery, - PruneCheckpoint { block_number: *block_range.end(), prune_mode }, + PruneCheckpoint { + block_number: last_pruned_block, + tx_number: Some(last_pruned_transaction), + prune_mode, + }, )?; - Ok(()) + Ok(done) } /// Prune account history up to the provided block, inclusive. @@ -403,26 +418,34 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let (range, done) = match self.get_next_block_range_from_checkpoint( + let range = match self.get_next_block_range_from_checkpoint( provider, PrunePart::AccountHistory, to_block, - self.batch_sizes.account_history, )? { Some(range) => range, None => { trace!(target: "pruner", "No acount history to prune"); - return Ok(()) + return Ok(true) } }; let range_end = *range.end(); - let (keys, rows) = provider.prune_table_with_range::(range)?; - trace!(target: "pruner", %keys, %rows, %done, "Pruned account history (changesets)"); + let mut last_pruned_block_number = None; + let (rows, done) = provider.prune_table_with_range::( + range, + self.batch_sizes.account_history, + |row| last_pruned_block_number = Some(row.0), + )?; + trace!(target: "pruner", %rows, %done, "Pruned account history (changesets)"); + + let last_pruned_block = last_pruned_block_number + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); let rows = self.prune_history_indices::( provider, - range_end, + last_pruned_block, |a, b| a.key == b.key, |key| ShardedKey::last(key.key), )?; @@ -430,10 +453,10 @@ impl Pruner { provider.save_prune_checkpoint( PrunePart::AccountHistory, - PruneCheckpoint { block_number: range_end, prune_mode }, + PruneCheckpoint { block_number: Some(last_pruned_block), tx_number: None, prune_mode }, )?; - Ok(()) + Ok(done) } /// Prune storage history up to the provided block, inclusive. @@ -444,27 +467,34 @@ impl Pruner { to_block: BlockNumber, prune_mode: PruneMode, ) -> PrunerResult { - let (range, done) = match self.get_next_block_range_from_checkpoint( + let range = match self.get_next_block_range_from_checkpoint( provider, PrunePart::StorageHistory, to_block, - self.batch_sizes.storage_history, )? { Some(range) => range, None => { trace!(target: "pruner", "No storage history to prune"); - return Ok(()) + return Ok(true) } }; let range_end = *range.end(); - let (keys, rows) = provider - .prune_table_with_range::(BlockNumberAddress::range(range))?; - trace!(target: "pruner", %keys, %rows, "Pruned storage history (changesets)"); + let mut last_pruned_block_number = None; + let (rows, done) = provider.prune_table_with_range::( + BlockNumberAddress::range(range), + self.batch_sizes.storage_history, + |row| last_pruned_block_number = Some(row.0.block_number()), + )?; + trace!(target: "pruner", %rows, %done, "Pruned storage history (changesets)"); + + let last_pruned_block = last_pruned_block_number + .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) + .unwrap_or(range_end); self.prune_history_indices::( provider, - range_end, + last_pruned_block, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), )?; @@ -472,10 +502,10 @@ impl Pruner { provider.save_prune_checkpoint( PrunePart::StorageHistory, - PruneCheckpoint { block_number: range_end, prune_mode }, + PruneCheckpoint { block_number: Some(last_pruned_block), tx_number: None, prune_mode }, )?; - Ok(()) + Ok(done) } /// Prune history indices up to the provided block, inclusive. @@ -585,7 +615,7 @@ mod tests { }, }; use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, H256, MAINNET, + BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, H256, MAINNET, }; use reth_provider::PruneCheckpointReader; use reth_stages::test_utils::TestTransaction; @@ -650,20 +680,26 @@ mod tests { }, ); - let previous_checkpoint_block_number = tx + let next_tx_number_to_prune = tx .inner() .get_prune_checkpoint(PrunePart::Receipts) .unwrap() - .map(|checkpoint| checkpoint.block_number + 1) + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) .unwrap_or_default(); - let actual_to_block = blocks + let last_pruned_tx_number = blocks + .iter() + .map(|block| block.body.len()) + .sum::() + .min(next_tx_number_to_prune as usize + pruner.batch_sizes.receipts - 1); + + let last_pruned_block_number = blocks .iter() - .skip(previous_checkpoint_block_number as usize) .fold_while((0, 0), |(_, mut tx_count), block| { tx_count += block.body.len(); - if tx_count >= pruner.batch_sizes.receipts { + if tx_count > last_pruned_tx_number { Done((block.number, tx_count)) } else { Continue((block.number, tx_count)) @@ -673,19 +709,26 @@ mod tests { .0; let provider = tx.inner_rw(); - assert_matches!(pruner.prune_receipts(&provider, to_block, prune_mode), Ok(())); + let result = pruner.prune_receipts(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[actual_to_block as usize + 1..] - .iter() - .map(|block| block.body.len()) - .sum::(), + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::Receipts).unwrap(), - Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; @@ -733,20 +776,25 @@ mod tests { }, ); - let previous_checkpoint_block_number = tx + let next_tx_number_to_prune = tx .inner() .get_prune_checkpoint(PrunePart::TransactionLookup) .unwrap() - .map(|checkpoint| checkpoint.block_number + 1) + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) .unwrap_or_default(); - let actual_to_block = blocks + let last_pruned_tx_number = + blocks.iter().map(|block| block.body.len()).sum::().min( + next_tx_number_to_prune as usize + pruner.batch_sizes.transaction_lookup - 1, + ); + + let last_pruned_block_number = blocks .iter() - .skip(previous_checkpoint_block_number as usize) .fold_while((0, 0), |(_, mut tx_count), block| { tx_count += block.body.len(); - if tx_count >= pruner.batch_sizes.transaction_lookup { + if tx_count > last_pruned_tx_number { Done((block.number, tx_count)) } else { Continue((block.number, tx_count)) @@ -756,22 +804,26 @@ mod tests { .0; let provider = tx.inner_rw(); - assert_matches!( - pruner.prune_transaction_lookup(&provider, to_block, prune_mode), - Ok(()) - ); + let result = pruner.prune_transaction_lookup(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[actual_to_block as usize + 1..] - .iter() - .map(|block| block.body.len()) - .sum::() + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::TransactionLookup).unwrap(), - Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; @@ -822,20 +874,25 @@ mod tests { }, ); - let previous_checkpoint_block_number = tx + let next_tx_number_to_prune = tx .inner() .get_prune_checkpoint(PrunePart::SenderRecovery) .unwrap() - .map(|checkpoint| checkpoint.block_number + 1) + .and_then(|checkpoint| checkpoint.tx_number) + .map(|tx_number| tx_number + 1) .unwrap_or_default(); - let actual_to_block = blocks + let last_pruned_tx_number = + blocks.iter().map(|block| block.body.len()).sum::().min( + next_tx_number_to_prune as usize + pruner.batch_sizes.transaction_senders - 1, + ); + + let last_pruned_block_number = blocks .iter() - .skip(previous_checkpoint_block_number as usize) .fold_while((0, 0), |(_, mut tx_count), block| { tx_count += block.body.len(); - if tx_count >= pruner.batch_sizes.transaction_senders { + if tx_count > last_pruned_tx_number { Done((block.number, tx_count)) } else { Continue((block.number, tx_count)) @@ -845,22 +902,26 @@ mod tests { .0; let provider = tx.inner_rw(); - assert_matches!( - pruner.prune_transaction_senders(&provider, to_block, prune_mode), - Ok(()) - ); + let result = pruner.prune_transaction_senders(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); provider.commit().expect("commit"); + let last_pruned_block_number = + last_pruned_block_number.checked_sub(if done { 0 } else { 1 }); + assert_eq!( tx.table::().unwrap().len(), - blocks[actual_to_block as usize + 1..] - .iter() - .map(|block| block.body.len()) - .sum::() + blocks.iter().map(|block| block.body.len()).sum::() - + (last_pruned_tx_number + 1) ); assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::SenderRecovery).unwrap(), - Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: last_pruned_block_number, + tx_number: Some(last_pruned_tx_number as TxNumber), + prune_mode + }) ); }; @@ -906,7 +967,7 @@ mod tests { let original_shards = tx.table::().unwrap(); - let test_prune = |to_block: BlockNumber| { + let test_prune = |to_block: BlockNumber, run: usize, expect_done: bool| { let prune_mode = PruneMode::Before(to_block); let pruner = Pruner::new( tx.inner_raw(), @@ -920,35 +981,63 @@ mod tests { }, ); - let previous_checkpoint_block_number = tx - .inner() - .get_prune_checkpoint(PrunePart::AccountHistory) - .unwrap() - .map(|checkpoint| checkpoint.block_number + 1) + let provider = tx.inner_rw(); + let result = pruner.prune_account_history(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + assert_eq!(done, expect_done); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.into_iter().map(move |change| (block_number, change)) + }) + .collect::>(); + + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _))| { + *i < pruner.batch_sizes.account_history * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) .unwrap_or_default(); - let actual_to_block = to_block.min( - previous_checkpoint_block_number + pruner.batch_sizes.account_history as u64 - 1, - ); + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); - let provider = tx.inner_rw(); - assert_matches!(pruner.prune_account_history(&provider, to_block, prune_mode), Ok(())); - provider.commit().expect("commit"); + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _)| if done { *block_number } else { block_number.saturating_sub(1) } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = + pruned_changesets.fold(BTreeMap::new(), |mut acc, (block_number, change)| { + acc.entry(block_number).or_insert_with(Vec::new).push(change); + acc + }); assert_eq!( tx.table::().unwrap().len(), - changesets[actual_to_block as usize + 1..].iter().flatten().count() + pruned_changesets.values().flatten().count() ); let actual_shards = tx.table::().unwrap(); let expected_shards = original_shards .iter() - .filter(|(key, _)| key.highest_block_number > actual_to_block) + .filter(|(key, _)| key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= actual_to_block as usize) + .skip_while(|block| *block <= last_pruned_block_number as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -958,13 +1047,17 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::AccountHistory).unwrap(), - Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) ); }; - test_prune(3000); - test_prune(3000); - test_prune(4500); + test_prune(1700, 1, false); + test_prune(1700, 2, true); + test_prune(2000, 3, true); } #[test] @@ -1004,7 +1097,7 @@ mod tests { let original_shards = tx.table::().unwrap(); - let test_prune = |to_block: BlockNumber| { + let test_prune = |to_block: BlockNumber, run: usize, expect_done: bool| { let prune_mode = PruneMode::Before(to_block); let pruner = Pruner::new( tx.inner_raw(), @@ -1013,44 +1106,72 @@ mod tests { PruneModes { storage_history: Some(prune_mode), ..Default::default() }, BatchSizes { // Less than total amount of blocks to prune to test the batching logic - storage_history: 3000, + storage_history: 2000, ..Default::default() }, ); - let previous_checkpoint_block_number = tx - .inner() - .get_prune_checkpoint(PrunePart::StorageHistory) - .unwrap() - .map(|checkpoint| checkpoint.block_number + 1) + let provider = tx.inner_rw(); + let result = pruner.prune_storage_history(&provider, to_block, prune_mode); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + assert_eq!(done, expect_done); + provider.commit().expect("commit"); + + let changesets = changesets + .iter() + .enumerate() + .flat_map(|(block_number, changeset)| { + changeset.into_iter().flat_map(move |(address, _, entries)| { + entries.into_iter().map(move |entry| (block_number, address, entry)) + }) + }) + .collect::>(); + + let pruned = changesets + .iter() + .enumerate() + .skip_while(|(i, (block_number, _, _))| { + *i < pruner.batch_sizes.storage_history * run && + *block_number <= to_block as usize + }) + .next() + .map(|(i, _)| i) .unwrap_or_default(); - let actual_to_block = to_block.min( - previous_checkpoint_block_number + pruner.batch_sizes.storage_history as u64 - 1, + let mut pruned_changesets = changesets + .iter() + // Skip what we've pruned so far, subtracting one to get last pruned block number + // further down + .skip(pruned.saturating_sub(1)); + + let last_pruned_block_number = pruned_changesets + .next() + .map(|(block_number, _, _)| if done { *block_number } else { block_number.saturating_sub(1) } as BlockNumber) + .unwrap_or(to_block); + + let pruned_changesets = pruned_changesets.fold( + BTreeMap::new(), + |mut acc, (block_number, address, entry)| { + acc.entry((block_number, address)).or_insert_with(Vec::new).push(entry); + acc + }, ); - let provider = tx.inner_rw(); - assert_matches!(pruner.prune_storage_history(&provider, to_block, prune_mode), Ok(())); - provider.commit().expect("commit"); - assert_eq!( tx.table::().unwrap().len(), - changesets[actual_to_block as usize + 1..] - .iter() - .flatten() - .flat_map(|(_, _, entries)| entries) - .count() + pruned_changesets.values().flatten().count() ); let actual_shards = tx.table::().unwrap(); let expected_shards = original_shards .iter() - .filter(|(key, _)| key.sharded_key.highest_block_number > actual_to_block) + .filter(|(key, _)| key.sharded_key.highest_block_number > last_pruned_block_number) .map(|(key, blocks)| { let new_blocks = blocks .iter(0) - .skip_while(|block| *block <= actual_to_block as usize) + .skip_while(|block| *block <= last_pruned_block_number as usize) .collect::>(); (key.clone(), BlockNumberList::new_pre_sorted(new_blocks)) }) @@ -1060,12 +1181,16 @@ mod tests { assert_eq!( tx.inner().get_prune_checkpoint(PrunePart::StorageHistory).unwrap(), - Some(PruneCheckpoint { block_number: actual_to_block, prune_mode }) + Some(PruneCheckpoint { + block_number: Some(last_pruned_block_number), + tx_number: None, + prune_mode + }) ); }; - test_prune(3000); - test_prune(3000); - test_prune(4500); + test_prune(2300, 1, false); + test_prune(2300, 2, true); + test_prune(3000, 3, true); } } diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 9f72c69d6e3b..4a5ef3c1991a 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -212,11 +212,7 @@ fn stage_checkpoint( ) -> Result { let pruned_entries = provider .get_prune_checkpoint(PrunePart::SenderRecovery)? - .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) - .transpose()? - .flatten() - // +1 is needed because TxNumber is 0-indexed - .map(|body| body.last_tx_num() + 1) + .and_then(|checkpoint| checkpoint.tx_number) .unwrap_or_default(); Ok(EntitiesCheckpoint { // If `TxSenders` table was pruned, we will have a number of entries in it not matching diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 211266d45d81..b1e01eb061c4 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -13,7 +13,7 @@ use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, PrunePart, TransactionSignedNoHash, TxNumber, H256, }; -use reth_provider::{BlockReader, DatabaseProviderRW, PruneCheckpointReader}; +use reth_provider::{DatabaseProviderRW, PruneCheckpointReader}; use tokio::sync::mpsc; use tracing::*; @@ -186,11 +186,7 @@ fn stage_checkpoint( ) -> Result { let pruned_entries = provider .get_prune_checkpoint(PrunePart::TransactionLookup)? - .map(|checkpoint| provider.block_body_indices(checkpoint.block_number)) - .transpose()? - .flatten() - // +1 is needed because TxNumber is 0-indexed - .map(|body| body.last_tx_num() + 1) + .and_then(|checkpoint| checkpoint.tx_number) .unwrap_or_default(); Ok(EntitiesCheckpoint { // If `TxHashNumber` table was pruned, we will have a number of entries in it not matching diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 506cc59ebab3..b45417909b95 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -111,14 +111,18 @@ impl ProviderFactory { // If we pruned account or storage history, we can't return state on every historical block. // Instead, we should cap it at the latest prune checkpoint for corresponding prune part. - if let Some(prune_checkpoint) = account_history_prune_checkpoint { + if let Some(prune_checkpoint_block_number) = + account_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number) + { state_provider = state_provider.with_lowest_available_account_history_block_number( - prune_checkpoint.block_number + 1, + prune_checkpoint_block_number + 1, ); } - if let Some(prune_checkpoint) = storage_history_prune_checkpoint { + if let Some(prune_checkpoint_block_number) = + storage_history_prune_checkpoint.and_then(|checkpoint| checkpoint.block_number) + { state_provider = state_provider.with_lowest_available_storage_history_block_number( - prune_checkpoint.block_number + 1, + prune_checkpoint_block_number + 1, ); } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 9627cb081895..5624e9cf3150 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -17,7 +17,7 @@ use reth_db::{ sharded_key, storage_sharded_key::StorageShardedKey, AccountBeforeTx, BlockNumberAddress, ShardedKey, StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals, }, - table::Table, + table::{Table, TableRow}, tables, transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, @@ -629,39 +629,27 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { pub fn prune_table_with_iterator( &self, keys: impl IntoIterator, - ) -> std::result::Result { - self.prune_table_with_iterator_in_batches::(keys, usize::MAX, |_| {}) - } - - /// Prune the table for the specified pre-sorted key iterator, calling `chunk_callback` after - /// every `batch_size` pruned rows with number of total rows pruned. - /// - /// Returns number of rows pruned. - pub fn prune_table_with_iterator_in_batches( - &self, - keys: impl IntoIterator, - batch_size: usize, - mut batch_callback: impl FnMut(usize), - ) -> std::result::Result { + limit: usize, + mut delete_callback: impl FnMut(TableRow), + ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; let mut deleted = 0; - for key in keys { - if cursor.seek_exact(key)?.is_some() { + let mut keys = keys.into_iter(); + while let Some(key) = keys.next() { + let row = cursor.seek_exact(key.clone())?; + if let Some(row) = row { cursor.delete_current()?; + deleted += 1; + delete_callback(row); } - deleted += 1; - if deleted % batch_size == 0 { - batch_callback(deleted); + if deleted == limit { + break } } - if deleted % batch_size != 0 { - batch_callback(deleted); - } - - Ok(deleted) + Ok((deleted, keys.next().is_none())) } /// Prune the table for the specified key range. @@ -669,46 +657,25 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { /// Returns number of total unique keys and total rows pruned pruned. pub fn prune_table_with_range( &self, - keys: impl RangeBounds, - ) -> std::result::Result<(usize, usize), DatabaseError> { - self.prune_table_with_range_in_batches::(keys, usize::MAX, |_, _| {}) - } - - /// Prune the table for the specified key range, calling `chunk_callback` after every - /// `batch_size` pruned rows with number of total unique keys and total rows pruned. For dupsort - /// tables, these numbers will be different as one key can correspond to multiple rows. - /// - /// Returns number of total unique keys and total rows pruned pruned. - pub fn prune_table_with_range_in_batches( - &self, - keys: impl RangeBounds, - batch_size: usize, - mut batch_callback: impl FnMut(usize, usize), - ) -> std::result::Result<(usize, usize), DatabaseError> { + keys: impl RangeBounds + Clone + Debug, + limit: usize, + mut delete_callback: impl FnMut(TableRow), + ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; - let mut walker = cursor.walk_range(keys)?; - let mut deleted_keys = 0; - let mut deleted_rows = 0; - let mut previous_key = None; + let mut walker = cursor.walk_range(keys.clone())?; + let mut deleted = 0; - while let Some((key, _)) = walker.next().transpose()? { + while let Some(row) = walker.next().transpose()? { walker.delete_current()?; - deleted_rows += 1; - if previous_key.as_ref().map(|previous_key| previous_key != &key).unwrap_or(true) { - deleted_keys += 1; - previous_key = Some(key); - } + deleted += 1; + delete_callback(row); - if deleted_rows % batch_size == 0 { - batch_callback(deleted_keys, deleted_rows); + if deleted == limit { + break } } - if deleted_rows % batch_size != 0 { - batch_callback(deleted_keys, deleted_rows); - } - - Ok((deleted_keys, deleted_rows)) + Ok((deleted, walker.next().transpose()?.is_none())) } /// Load shard and remove it. If list is empty, last shard was full or From d9115be20c613b4ca68e97d2024d5f49ade9044e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 14:35:20 +0100 Subject: [PATCH 03/20] fix engine prune event --- crates/consensus/beacon/src/engine/prune.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/consensus/beacon/src/engine/prune.rs b/crates/consensus/beacon/src/engine/prune.rs index 257170a376fe..b6e8e3fb7acb 100644 --- a/crates/consensus/beacon/src/engine/prune.rs +++ b/crates/consensus/beacon/src/engine/prune.rs @@ -116,7 +116,7 @@ pub(crate) enum EnginePruneEvent { /// If this is returned, the pruner is idle. Finished { /// Final result of the pruner run. - result: Result<(), PrunerError>, + result: Result, }, /// Pruner task was dropped after it was started, unable to receive it because channel /// closed. This would indicate a panicked pruner task From bbf5c0cb5359c2d920fdfabe6c10a8bf0f754395 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 15:45:26 +0100 Subject: [PATCH 04/20] more trace logs --- crates/prune/src/pruner.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 78d987051bf2..7e9ea8ac2c4a 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -107,6 +107,8 @@ impl Pruner { .get_prune_part_metrics(PrunePart::Receipts) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", "No receipts to prune"); } if let Some((to_block, prune_mode)) = @@ -118,6 +120,8 @@ impl Pruner { .get_prune_part_metrics(PrunePart::TransactionLookup) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", "No transaction lookup entries to prune"); } if let Some((to_block, prune_mode)) = @@ -129,6 +133,8 @@ impl Pruner { .get_prune_part_metrics(PrunePart::SenderRecovery) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", "No transaction senders to prune"); } if let Some((to_block, prune_mode)) = @@ -140,6 +146,8 @@ impl Pruner { .get_prune_part_metrics(PrunePart::AccountHistory) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", "No account history entries to prune"); } if let Some((to_block, prune_mode)) = @@ -151,6 +159,8 @@ impl Pruner { .get_prune_part_metrics(PrunePart::StorageHistory) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", "No storage history entries to prune"); } provider.commit()?; From adffcf630542fe4f8c5b4041ea7187db409c9e6a Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 15:52:59 +0100 Subject: [PATCH 05/20] even more logs --- crates/prune/src/pruner.rs | 70 +++++++++++++++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 5 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 7e9ea8ac2c4a..0200d9481aa4 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -101,6 +101,14 @@ impl Pruner { if let Some((to_block, prune_mode)) = self.modes.prune_target_block_receipts(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::Receipts, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); done = done && self.prune_receipts(&provider, to_block, prune_mode)?; self.metrics @@ -108,12 +116,24 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!(target: "pruner", "No receipts to prune"); + trace!( + target: "pruner", + prune_part = ?PrunePart::Receipts, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_transaction_lookup(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::TransactionLookup, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); done = done && self.prune_transaction_lookup(&provider, to_block, prune_mode)?; self.metrics @@ -121,12 +141,24 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!(target: "pruner", "No transaction lookup entries to prune"); + trace!( + target: "pruner", + prune_part = ?PrunePart::TransactionLookup, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_sender_recovery(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::SenderRecovery, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); done = done && self.prune_transaction_senders(&provider, to_block, prune_mode)?; self.metrics @@ -134,12 +166,24 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!(target: "pruner", "No transaction senders to prune"); + trace!( + target: "pruner", + prune_part = ?PrunePart::SenderRecovery, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_account_history(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::AccountHistory, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); done = done && self.prune_account_history(&provider, to_block, prune_mode)?; self.metrics @@ -147,12 +191,24 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!(target: "pruner", "No account history entries to prune"); + trace!( + target: "pruner", + prune_part = ?PrunePart::AccountHistory, + "No target block to prune" + ); } if let Some((to_block, prune_mode)) = self.modes.prune_target_block_storage_history(tip_block_number)? { + trace!( + target: "pruner", + prune_part = ?PrunePart::StorageHistory, + %to_block, + ?prune_mode, + "Got target block to prune" + ); + let part_start = Instant::now(); done = done && self.prune_storage_history(&provider, to_block, prune_mode)?; self.metrics @@ -160,7 +216,11 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!(target: "pruner", "No storage history entries to prune"); + trace!( + target: "pruner", + prune_part = ?PrunePart::StorageHistory, + "No target block to prune" + ); } provider.commit()?; From 8fa47575fa1021360b6d52c8d21577fe5d8ef4b3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 16:13:06 +0100 Subject: [PATCH 06/20] fix short-circuit --- crates/prune/src/pruner.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 0200d9481aa4..92d661f51aba 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -110,7 +110,8 @@ impl Pruner { ); let part_start = Instant::now(); - done = done && self.prune_receipts(&provider, to_block, prune_mode)?; + let part_done = self.prune_receipts(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::Receipts) .duration_seconds @@ -135,7 +136,8 @@ impl Pruner { ); let part_start = Instant::now(); - done = done && self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + let part_done = self.prune_transaction_lookup(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::TransactionLookup) .duration_seconds @@ -160,7 +162,8 @@ impl Pruner { ); let part_start = Instant::now(); - done = done && self.prune_transaction_senders(&provider, to_block, prune_mode)?; + let part_done = self.prune_transaction_senders(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::SenderRecovery) .duration_seconds @@ -185,7 +188,8 @@ impl Pruner { ); let part_start = Instant::now(); - done = done && self.prune_account_history(&provider, to_block, prune_mode)?; + let part_done = self.prune_account_history(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::AccountHistory) .duration_seconds @@ -210,7 +214,8 @@ impl Pruner { ); let part_start = Instant::now(); - done = done && self.prune_storage_history(&provider, to_block, prune_mode)?; + let part_done = self.prune_storage_history(&provider, to_block, prune_mode)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::StorageHistory) .duration_seconds @@ -495,7 +500,7 @@ impl Pruner { )? { Some(range) => range, None => { - trace!(target: "pruner", "No acount history to prune"); + trace!(target: "pruner", "No account history to prune"); return Ok(true) } }; From 80a53efbf82230ea4ff10266b253194744554863 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 16:20:34 +0100 Subject: [PATCH 07/20] improve log fields --- crates/prune/src/pruner.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 92d661f51aba..ea27fde4e22e 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -352,12 +352,12 @@ impl Pruner { let tx_range_end = *tx_range.end(); let mut last_pruned_transaction = tx_range_end; - let (rows, done) = provider.prune_table_with_range::( + let (deleted, done) = provider.prune_table_with_range::( tx_range, self.batch_sizes.receipts, |row| last_pruned_transaction = row.0, )?; - trace!(target: "pruner", %rows, %done, "Pruned receipts"); + trace!(target: "pruner", %deleted, %done, "Pruned receipts"); let last_pruned_block = provider .transaction_block(last_pruned_transaction)? @@ -415,12 +415,12 @@ impl Pruner { } let mut last_pruned_transaction = tx_range_end; - let (rows, done) = provider.prune_table_with_iterator::( + let (deleted, done) = provider.prune_table_with_iterator::( hashes, self.batch_sizes.transaction_lookup, |row| last_pruned_transaction = row.1, )?; - trace!(target: "pruner", %rows, %done, "Pruned transaction lookup"); + trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup"); let last_pruned_block = provider .transaction_block(last_pruned_transaction)? @@ -461,12 +461,12 @@ impl Pruner { let tx_range_end = *tx_range.end(); let mut last_pruned_transaction = tx_range_end; - let (rows, done) = provider.prune_table_with_range::( + let (deleted, done) = provider.prune_table_with_range::( tx_range, self.batch_sizes.transaction_senders, |row| last_pruned_transaction = row.0, )?; - trace!(target: "pruner", %rows, %done, "Pruned transaction senders"); + trace!(target: "pruner", %deleted, %done, "Pruned transaction senders"); let last_pruned_block = provider .transaction_block(last_pruned_transaction)? @@ -518,13 +518,13 @@ impl Pruner { .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); - let rows = self.prune_history_indices::( + let (processed, deleted) = self.prune_history_indices::( provider, last_pruned_block, |a, b| a.key == b.key, |key| ShardedKey::last(key.key), )?; - trace!(target: "pruner", %rows, %done, "Pruned account history (history)" ); + trace!(target: "pruner", %processed, %deleted, %done, "Pruned account history (history)" ); provider.save_prune_checkpoint( PrunePart::AccountHistory, @@ -567,13 +567,13 @@ impl Pruner { .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); - self.prune_history_indices::( + let (processed, deleted) = self.prune_history_indices::( provider, last_pruned_block, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), )?; - trace!(target: "pruner", %rows, %done, "Pruned storage history (history)" ); + trace!(target: "pruner", %processed, %deleted, %done, "Pruned storage history (history)" ); provider.save_prune_checkpoint( PrunePart::StorageHistory, @@ -590,12 +590,13 @@ impl Pruner { to_block: BlockNumber, key_matches: impl Fn(&T::Key, &T::Key) -> bool, last_key: impl Fn(&T::Key) -> T::Key, - ) -> Result + ) -> Result<(usize, usize), PrunerError> where T: Table, T::Key: AsRef>, { let mut processed = 0; + let mut deleted = 0; let mut cursor = provider.tx_ref().cursor_write::()?; // Prune history table: @@ -611,6 +612,7 @@ impl Pruner { // If shard consists only of block numbers less than the target one, delete shard // completely. cursor.delete_current()?; + deleted += 1; if key.as_ref().highest_block_number == to_block { // Shard contains only block numbers up to the target one, so we can skip to the // next sharded key. It is guaranteed that further shards for this sharded key @@ -641,6 +643,7 @@ impl Pruner { // If current shard is the last shard for the sharded key that has // previous shards, replace it with the previous shard. cursor.delete_current()?; + deleted += 1; // Upsert will replace the last shard for this sharded key with the // previous value. cursor.upsert(key.clone(), prev_value)?; @@ -652,11 +655,13 @@ impl Pruner { cursor.next()?; // Delete shard. cursor.delete_current()?; + deleted += 1; } } else { // If current shard is not the last shard for this sharded key, // just delete it. cursor.delete_current()?; + deleted += 1; } } else { cursor.upsert(key.clone(), BlockNumberList::new_pre_sorted(new_blocks))?; @@ -670,7 +675,7 @@ impl Pruner { processed += 1; } - Ok(processed) + Ok((processed, deleted)) } } From 1d149e0a1cd7040499b2e0d6fac80f99c54b8d2c Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 16:29:15 +0100 Subject: [PATCH 08/20] increase batch sizes to 1000 --- crates/prune/src/pruner.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index ea27fde4e22e..eb4fad65c61e 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -43,11 +43,11 @@ pub struct BatchSizes { impl Default for BatchSizes { fn default() -> Self { Self { - receipts: 100, - transaction_lookup: 100, - transaction_senders: 100, - account_history: 100, - storage_history: 100, + receipts: 1000, + transaction_lookup: 1000, + transaction_senders: 1000, + account_history: 1000, + storage_history: 1000, } } } From 54a67f6158703870f6b50554510430d454f6b345 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 18 Aug 2023 17:19:36 +0100 Subject: [PATCH 09/20] add comments --- crates/consensus/beacon/src/engine/prune.rs | 4 +-- crates/primitives/src/prune/checkpoint.rs | 5 +-- crates/prune/src/pruner.rs | 36 ++++++++++++++----- .../src/providers/database/provider.rs | 2 +- 4 files changed, 33 insertions(+), 14 deletions(-) diff --git a/crates/consensus/beacon/src/engine/prune.rs b/crates/consensus/beacon/src/engine/prune.rs index b6e8e3fb7acb..4b2b4852dccf 100644 --- a/crates/consensus/beacon/src/engine/prune.rs +++ b/crates/consensus/beacon/src/engine/prune.rs @@ -3,7 +3,7 @@ use futures::FutureExt; use reth_db::database::Database; use reth_primitives::BlockNumber; -use reth_prune::{Pruner, PrunerError, PrunerWithResult}; +use reth_prune::{Pruner, PrunerResult, PrunerWithResult}; use reth_tasks::TaskSpawner; use std::task::{ready, Context, Poll}; use tokio::sync::oneshot; @@ -116,7 +116,7 @@ pub(crate) enum EnginePruneEvent { /// If this is returned, the pruner is idle. Finished { /// Final result of the pruner run. - result: Result, + result: PrunerResult, }, /// Pruner task was dropped after it was started, unable to receive it because channel /// closed. This would indicate a panicked pruner task diff --git a/crates/primitives/src/prune/checkpoint.rs b/crates/primitives/src/prune/checkpoint.rs index b1800cb4181f..8096d2067af0 100644 --- a/crates/primitives/src/prune/checkpoint.rs +++ b/crates/primitives/src/prune/checkpoint.rs @@ -6,9 +6,10 @@ use reth_codecs::{main_codec, Compact}; #[derive(Debug, PartialEq, Eq, Clone, Copy)] #[cfg_attr(test, derive(Default))] pub struct PruneCheckpoint { - /// Highest pruned block number, if any. + /// Highest pruned block number. + /// If it's [None], the pruning for block `0` is not finished yet. pub block_number: Option, - /// Highest pruned transaction number, if applicable + /// Highest pruned transaction number, if applicable. pub tx_number: Option, /// Prune mode. pub prune_mode: PruneMode, diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index eb4fad65c61e..48ef0d964d13 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -21,22 +21,27 @@ use reth_provider::{ use std::{ops::RangeInclusive, sync::Arc, time::Instant}; use tracing::{debug, error, instrument, trace}; -/// Result of [Pruner::run] execution +/// Result of [Pruner::run] execution. +/// +/// Returns `true` if pruning has been completed up to the target block, +/// and `false` if there's more data to prune in further runs. pub type PrunerResult = Result; -/// The pipeline type itself with the result of [Pruner::run] +/// The pruner type itself with the result of [Pruner::run] pub type PrunerWithResult = (Pruner, PrunerResult); pub struct BatchSizes { - // Measured in transactions + /// Maximum number of receipts to prune in one run. receipts: usize, - // Measured in transactions + /// Maximum number of transaction lookup entries to prune in one run. transaction_lookup: usize, - // Measured in transactions + /// Maximum number of transaction senders to prune in one run. transaction_senders: usize, - // Measured in blocks + /// Maximum number of account history entries to prune in one run. + /// Measured in the number of [tables::AccountChangeSet] rows. account_history: usize, - // Measured in blocks + /// Maximum number of storage history entries to prune in one run. + /// Measured in the number of [tables::StorageChangeSet] rows. storage_history: usize, } @@ -63,6 +68,7 @@ pub struct Pruner { /// when the pruning needs to be initiated. last_pruned_block_number: Option, modes: PruneModes, + /// Maximum entries to prune per one run, per prune part. batch_sizes: BatchSizes, } @@ -271,7 +277,7 @@ impl Pruner { /// 1. If checkpoint exists, use next block. /// 2. If checkpoint doesn't exist, use block 0. /// - /// To get the range end: use block `min(to_block, from_block + limit - 1)`. + /// To get the range end: use block `to_block`. fn get_next_block_range_from_checkpoint( &self, provider: &DatabaseProviderRW<'_, DB>, @@ -362,6 +368,8 @@ impl Pruner { let last_pruned_block = provider .transaction_block(last_pruned_transaction)? .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more receipts to prune, set the checkpoint block number to previous, + // so we could finish pruning its receipts on the next run. .checked_sub(if done { 0 } else { 1 }); provider.save_prune_checkpoint( @@ -376,7 +384,8 @@ impl Pruner { Ok(done) } - /// Prune transaction lookup entries up to the provided block, inclusive. + /// Prune transaction lookup entries up to the provided block, inclusive, respecting the batch + /// size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_transaction_lookup( &self, @@ -425,6 +434,9 @@ impl Pruner { let last_pruned_block = provider .transaction_block(last_pruned_transaction)? .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction lookup entries to prune, set the checkpoint block number + // to previous, so we could finish pruning its transaction lookup entries on the next + // run. .checked_sub(if done { 0 } else { 1 }); provider.save_prune_checkpoint( @@ -471,6 +483,8 @@ impl Pruner { let last_pruned_block = provider .transaction_block(last_pruned_transaction)? .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more transaction senders to prune, set the checkpoint block number to + // previous, so we could finish pruning its transaction senders on the next run. .checked_sub(if done { 0 } else { 1 }); provider.save_prune_checkpoint( @@ -515,6 +529,8 @@ impl Pruner { trace!(target: "pruner", %rows, %done, "Pruned account history (changesets)"); let last_pruned_block = last_pruned_block_number + // If there's more account account changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its account changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); @@ -564,6 +580,8 @@ impl Pruner { trace!(target: "pruner", %rows, %done, "Pruned storage history (changesets)"); let last_pruned_block = last_pruned_block_number + // If there's more account storage changesets to prune, set the checkpoint block number + // to previous, so we could finish pruning its storage changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) .unwrap_or(range_end); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 5624e9cf3150..a18703681fde 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -636,7 +636,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let mut deleted = 0; let mut keys = keys.into_iter(); - while let Some(key) = keys.next() { + for key in &mut keys { let row = cursor.seek_exact(key.clone())?; if let Some(row) = row { cursor.delete_current()?; From 8451bed155851cce0330d81a17092d536fe19497 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 21 Aug 2023 13:07:50 +0100 Subject: [PATCH 10/20] fix stage checkpoint tests --- crates/stages/src/stages/sender_recovery.rs | 7 ++++++- crates/stages/src/stages/tx_lookup.rs | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 0f0f7f0dbfd5..002f0b6708a9 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -406,7 +406,12 @@ mod tests { PrunePart::SenderRecovery, PruneCheckpoint { block_number: Some(max_pruned_block), - tx_number: None, + tx_number: Some( + blocks[..=max_pruned_block as usize] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + ), prune_mode: PruneMode::Full, }, ) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index d6ee347c1aa5..65f5772b74ee 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -362,7 +362,12 @@ mod tests { PrunePart::TransactionLookup, PruneCheckpoint { block_number: Some(max_pruned_block), - tx_number: None, + tx_number: Some( + blocks[..=max_pruned_block as usize] + .iter() + .map(|block| block.body.len() as u64) + .sum::(), + ), prune_mode: PruneMode::Full, }, ) From c10eeb53c400acf5e688a79cf9d5d29aabd01b2b Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Mon, 21 Aug 2023 18:04:54 +0100 Subject: [PATCH 11/20] fixes of receipts by logs pruner checkpoint --- crates/prune/src/pruner.rs | 37 ++++++++++++++++++++++++++++--------- crates/revm/src/executor.rs | 7 +++++++ 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 7902f48b3bb4..e25b3f7c589e 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -397,8 +397,8 @@ impl Pruner { Ok(done) } - /// Prune receipts up to the provided block by filtering logs. Works as in inclusion list, and - /// removes every receipt not belonging to it. + /// Prune receipts up to the provided block, inclusive, by filtering logs. Works as in inclusion + /// list, and removes every receipt not belonging to it. Respects the batch size. #[instrument(level = "trace", skip(self, provider), target = "pruner")] fn prune_receipts_by_logs( &self, @@ -533,23 +533,42 @@ impl Pruner { } // If there are contracts using `PruneMode::Distance(_)` there will be receipts before - // `to_block` that become eligible to be pruned in future runs. Therefore, our - // checkpoint is not actually `to_block`, but the `lowest_block_with_distance` from any - // contract. This ensures that in future pruner runs we can - // prune all these receipts between the previous `lowest_block_with_distance` and the new - // one using `get_next_tx_num_range_from_checkpoint`. + // `to_block` that become eligible to be pruned in future runs. Therefore, our checkpoint is + // not actually `to_block`, but the `lowest_block_with_distance` from any contract. + // This ensures that in future pruner runs we can prune all these receipts between the + // previous `lowest_block_with_distance` and the new one using + // `get_next_tx_num_range_from_checkpoint`. let prune_mode_block = self .modes .contract_logs_filter .lowest_block_with_distance(tip_block_number, pruned_block)? .unwrap_or(to_block); - let checkpoint_block = last_pruned_block.map(|block| block.min(prune_mode_block - 1)); + + let (checkpoint_block, checkpoint_transaction) = if done { + let Some(prune_mode_block) = prune_mode_block.checked_sub(1) else { return Ok(done) }; + ( + Some(prune_mode_block), + provider + .block_body_indices(prune_mode_block)? + .ok_or(PrunerError::InconsistentData( + "Block body indices for prune mode block not found", + ))? + .last_tx_num(), + ) + } else { + ( + last_pruned_block, + last_pruned_transaction.ok_or(PrunerError::InconsistentData( + "Last pruned transaction is not present", + ))?, + ) + }; provider.save_prune_checkpoint( PrunePart::ContractLogs, PruneCheckpoint { block_number: checkpoint_block, - tx_number: last_pruned_transaction, + tx_number: Some(checkpoint_transaction), prune_mode: PruneMode::Before(prune_mode_block), }, )?; diff --git a/crates/revm/src/executor.rs b/crates/revm/src/executor.rs index 990e674bb7c5..6ba25f4192aa 100644 --- a/crates/revm/src/executor.rs +++ b/crates/revm/src/executor.rs @@ -251,6 +251,13 @@ where // append gas used cumulative_gas_used += result.gas_used(); + tracing::trace!( + target: "revm::executor", + hash = ?transaction.hash, + gas_used = result.gas_used(), + "transaction executed" + ); + // Push transaction changeset and calculate header bloom filter for receipt. post_state.add_receipt( block.number, From df14e1e0f788f7f4b9ed970f5b0e6bb4872b14c8 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 10:43:37 +0100 Subject: [PATCH 12/20] return early if receipts by logs block range is empty --- crates/prune/src/pruner.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e25b3f7c589e..c77f7b0262c0 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -471,6 +471,11 @@ impl Pruner { let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { let block_range = start_block..=end_block; + if block_range.is_empty() { + trace!(target: "pruner", "No receipts by logs to prune"); + return Ok(true) + } + let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::ContractLogs, From 0498b1888102c7ce97aac88c9a205c2c5d4ee47e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 10:54:55 +0100 Subject: [PATCH 13/20] Revert "return early if receipts by logs block range is empty" This reverts commit df14e1e0f788f7f4b9ed970f5b0e6bb4872b14c8. --- crates/prune/src/pruner.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index c77f7b0262c0..e25b3f7c589e 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -471,11 +471,6 @@ impl Pruner { let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { let block_range = start_block..=end_block; - if block_range.is_empty() { - trace!(target: "pruner", "No receipts by logs to prune"); - return Ok(true) - } - let tx_range = match self.get_next_tx_num_range_from_checkpoint( provider, PrunePart::ContractLogs, From a092d725cc36368ab247d143ecffe67b2954e399 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 10:55:03 +0100 Subject: [PATCH 14/20] check empty tx range --- crates/prune/src/pruner.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e25b3f7c589e..2a88df63d656 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -343,7 +343,12 @@ impl Pruner { } .last_tx_num(); - Ok(Some(from_tx_number..=to_tx_number)) + let range = from_tx_number..=to_tx_number; + if range.is_empty() { + return Ok(None) + } + + Ok(Some(range)) } /// Prune receipts up to the provided block, inclusive, respecting the batch size. From bedb5a450b8489c5de282b29309b99855a5498aa Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 11:08:54 +0100 Subject: [PATCH 15/20] check tip block number --- crates/prune/src/pruner.rs | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 2a88df63d656..da9242efcfbc 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -94,11 +94,14 @@ impl Pruner { /// Run the pruner pub fn run(&mut self, tip_block_number: BlockNumber) -> PrunerResult { - trace!( - target: "pruner", - %tip_block_number, - "Pruner started" - ); + if tip_block_number == 0 { + self.last_pruned_block_number = Some(tip_block_number); + + trace!(target: "pruner", %tip_block_number, "Nothing to prune yet"); + return Ok(true) + } + + trace!(target: "pruner", %tip_block_number, "Pruner started"); let start = Instant::now(); let provider = self.provider_factory.provider_rw()?; @@ -124,11 +127,7 @@ impl Pruner { .duration_seconds .record(part_start.elapsed()) } else { - trace!( - target: "pruner", - prune_part = ?PrunePart::Receipts, - "No target block to prune" - ); + trace!(target: "pruner", prune_part = ?PrunePart::Receipts, "No target block to prune"); } if !self.modes.contract_logs_filter.is_empty() { @@ -250,12 +249,7 @@ impl Pruner { let elapsed = start.elapsed(); self.metrics.duration_seconds.record(elapsed); - trace!( - target: "pruner", - %tip_block_number, - ?elapsed, - "Pruner finished" - ); + trace!(target: "pruner", %tip_block_number, ?elapsed, "Pruner finished"); Ok(done) } From 08f71e2fb0af6a5ebc81dca7238ef0626043d5b0 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 13:41:12 +0100 Subject: [PATCH 16/20] add another log for contracts by logs pruning --- crates/prune/src/pruner.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index bd30c4f941ad..0da7df36dbc0 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -132,11 +132,14 @@ impl Pruner { if !self.modes.contract_logs_filter.is_empty() { let part_start = Instant::now(); - self.prune_receipts_by_logs(&provider, tip_block_number)?; + let part_done = self.prune_receipts_by_logs(&provider, tip_block_number)?; + done = done && part_done; self.metrics .get_prune_part_metrics(PrunePart::ContractLogs) .duration_seconds .record(part_start.elapsed()) + } else { + trace!(target: "pruner", prune_part = ?PrunePart::ContractLogs, "No filter to prune"); } if let Some((to_block, prune_mode)) = @@ -464,6 +467,13 @@ impl Pruner { block_ranges.push((*start_block, end_block, filtered_addresses.len())); } + trace!( + target: "pruner", + ?block_ranges, + ?filtered_addresses, + "Calculated block ranges and filtered addresses", + ); + let mut limit = self.batch_sizes.receipts; let mut done = true; let mut last_pruned_block = None; From b941ed4e6aa20f948dda409d9e1f4b47e4ca1c8f Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Tue, 22 Aug 2023 13:58:26 +0100 Subject: [PATCH 17/20] prune receipts by logs with range instead of iterator --- crates/prune/src/pruner.rs | 7 +++++-- .../src/providers/database/provider.rs | 18 +++++++++--------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 0da7df36dbc0..94180a3f56c4 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -373,6 +373,7 @@ impl Pruner { let (deleted, done) = provider.prune_table_with_range::( tx_range, self.batch_sizes.receipts, + |_| false, |row| last_pruned_transaction = row.0, )?; trace!(target: "pruner", %deleted, %done, "Pruned receipts"); @@ -499,7 +500,7 @@ impl Pruner { last_pruned_transaction = Some(tx_range_end); let deleted; - (deleted, done) = provider.prune_table_with_iterator::( + (deleted, done) = provider.prune_table_with_range::( tx_range, limit, |receipt| { @@ -628,7 +629,6 @@ impl Pruner { let (deleted, done) = provider.prune_table_with_iterator::( hashes, self.batch_sizes.transaction_lookup, - |_| false, |row| last_pruned_transaction = row.1, )?; trace!(target: "pruner", %deleted, %done, "Pruned transaction lookup"); @@ -678,6 +678,7 @@ impl Pruner { let (deleted, done) = provider.prune_table_with_range::( tx_range, self.batch_sizes.transaction_senders, + |_| false, |row| last_pruned_transaction = row.0, )?; trace!(target: "pruner", %deleted, %done, "Pruned transaction senders"); @@ -726,6 +727,7 @@ impl Pruner { let (rows, done) = provider.prune_table_with_range::( range, self.batch_sizes.account_history, + |_| false, |row| last_pruned_block_number = Some(row.0), )?; trace!(target: "pruner", %rows, %done, "Pruned account history (changesets)"); @@ -777,6 +779,7 @@ impl Pruner { let (rows, done) = provider.prune_table_with_range::( BlockNumberAddress::range(range), self.batch_sizes.storage_history, + |_| false, |row| last_pruned_block_number = Some(row.0.block_number()), )?; trace!(target: "pruner", %rows, %done, "Pruned storage history (changesets)"); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 537910f85855..a79c91f32ed8 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -630,7 +630,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, keys: impl IntoIterator, limit: usize, - skip_filter: impl Fn(&T::Value) -> bool, mut delete_callback: impl FnMut(TableRow), ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; @@ -640,11 +639,9 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { for key in &mut keys { let row = cursor.seek_exact(key.clone())?; if let Some(row) = row { - if !skip_filter(&row.1) { - cursor.delete_current()?; - deleted += 1; - delete_callback(row); - } + cursor.delete_current()?; + deleted += 1; + delete_callback(row); } if deleted == limit { @@ -662,6 +659,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, keys: impl RangeBounds + Clone + Debug, limit: usize, + skip_filter: impl Fn(&T::Value) -> bool, mut delete_callback: impl FnMut(TableRow), ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; @@ -669,9 +667,11 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let mut deleted = 0; while let Some(row) = walker.next().transpose()? { - walker.delete_current()?; - deleted += 1; - delete_callback(row); + if !skip_filter(&row.1) { + walker.delete_current()?; + deleted += 1; + delete_callback(row); + } if deleted == limit { break From d4e760adbc3d3a712ee8f9a41da7ad9676227523 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 22 Aug 2023 19:54:19 +0000 Subject: [PATCH 18/20] fix checkpoints for contract log filtering on live sync --- crates/primitives/src/prune/mod.rs | 6 +- crates/prune/src/pruner.rs | 235 +++++++++++++----- .../src/providers/database/provider.rs | 4 +- 3 files changed, 175 insertions(+), 70 deletions(-) diff --git a/crates/primitives/src/prune/mod.rs b/crates/primitives/src/prune/mod.rs index 5359c3d9c72f..0836e6933f2e 100644 --- a/crates/primitives/src/prune/mod.rs +++ b/crates/primitives/src/prune/mod.rs @@ -49,10 +49,14 @@ impl ContractLogsPruneConfig { // the BTreeMap (block = 0), otherwise it will be excluded. // Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all // other receipts. + // + // Reminder, that we increment because the [`BlockNumber`] key of the new map should be + // viewed as `PruneMode::Before(block)` let block = (pruned_block + 1).max( mode.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)? .map(|(block, _)| block) - .unwrap_or_default(), + .unwrap_or_default() + + 1, ); map.entry(block).or_insert_with(Vec::new).push(address) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 94180a3f56c4..4f82c301dbb5 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -419,14 +419,25 @@ impl Pruner { .map(|(bn, _)| bn) .unwrap_or_default(); - // Figure out what receipts have already been pruned, so we can have an accurate - // `address_filter` - let pruned_block = provider + // Get status checkpoint from latest run + let mut last_pruned_block = provider .get_prune_checkpoint(PrunePart::ContractLogs)? .and_then(|checkpoint| checkpoint.block_number); + let initial_last_pruned_block = last_pruned_block; + + let mut from_tx_number = match initial_last_pruned_block { + Some(block) => provider + .block_body_indices(block)? + .map(|block| block.last_tx_num() + 1) + .unwrap_or(0), + None => 0, + }; + + // Figure out what receipts have already been pruned, so we can have an accurate + // `address_filter` let address_filter = - self.modes.contract_logs_filter.group_by_block(tip_block_number, pruned_block)?; + self.modes.contract_logs_filter.group_by_block(tip_block_number, last_pruned_block)?; // Splits all transactions in different block ranges. Each block range will have its own // filter address list and will check it while going through the table @@ -455,9 +466,13 @@ impl Pruner { while let Some((start_block, addresses)) = blocks_iter.next() { filtered_addresses.extend_from_slice(addresses); - // This will clear all receipts before the first appearance of a contract log + // This will clear all receipts before the first appearance of a contract log or since + // the block after the last pruned one. if block_ranges.is_empty() { - block_ranges.push((0, *start_block - 1, 0)); + let init = last_pruned_block.and_then(|b| Some(b + 1)).unwrap_or_default(); + if init <= *start_block - 1 { + block_ranges.push((init, *start_block - 1, 0)); + } } let end_block = @@ -477,16 +492,13 @@ impl Pruner { let mut limit = self.batch_sizes.receipts; let mut done = true; - let mut last_pruned_block = None; let mut last_pruned_transaction = None; for (start_block, end_block, num_addresses) in block_ranges { let block_range = start_block..=end_block; - let tx_range = match self.get_next_tx_num_range_from_checkpoint( - provider, - PrunePart::ContractLogs, - end_block, - )? { - Some(range) => range, + + // Calculate the transaction range from this block range + let tx_range_end = match provider.block_body_indices(end_block)? { + Some(body) => body.last_tx_num(), None => { trace!( target: "pruner", @@ -496,18 +508,24 @@ impl Pruner { continue } }; - let tx_range_end = *tx_range.end(); + let tx_range = from_tx_number..=tx_range_end; - last_pruned_transaction = Some(tx_range_end); + // Delete receipts, except the ones in the inclusion list + let mut last_skipped_transaction = 0; let deleted; (deleted, done) = provider.prune_table_with_range::( tx_range, limit, - |receipt| { - num_addresses > 0 && + |(tx_num, receipt)| { + let skip = num_addresses > 0 && receipt.logs.iter().any(|log| { filtered_addresses[..num_addresses].contains(&&log.address) - }) + }); + + if skip { + last_skipped_transaction = *tx_num; + } + skip }, |row| last_pruned_transaction = Some(row.0), )?; @@ -515,31 +533,27 @@ impl Pruner { limit = limit.saturating_sub(deleted); - last_pruned_block = provider - .transaction_block(last_pruned_transaction.unwrap())? - .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? - // If there's more receipts to prune, set the checkpoint block number to previous, - // so we could finish pruning its receipts on the next run. - .checked_sub(if done { 0 } else { 1 }); - - // If this is the last block range, avoid writing an unused checkpoint - if last_pruned_block != Some(to_block) { - // This allows us to query for the transactions in the next block range with - // [`get_next_tx_num_range_from_checkpoint`]. It's just a temporary intermediate - // checkpoint, which should be adjusted in the end. - provider.save_prune_checkpoint( - PrunePart::ContractLogs, - PruneCheckpoint { - block_number: last_pruned_block, - tx_number: last_pruned_transaction, - prune_mode: PruneMode::Before(end_block + 1), - }, - )?; - } + // For accurate checkpoints we need to know that we have checked every transaction. + // Example: we reached the end of the range, and the last receipt is supposed to skip + // its deletion. + last_pruned_transaction = + Some(last_pruned_transaction.unwrap_or_default().max(last_skipped_transaction)); + last_pruned_block = Some( + provider + .transaction_block(last_pruned_transaction.expect("qed"))? + .ok_or(PrunerError::InconsistentData("Block for transaction is not found"))? + // If there's more receipts to prune, set the checkpoint block number to + // previous, so we could finish pruning its receipts on the + // next run. + .saturating_sub(if done { 0 } else { 1 }), + ); if limit == 0 { + done &= end_block == to_block; break } + + from_tx_number = last_pruned_transaction.expect("qed") + 1; } // If there are contracts using `PruneMode::Distance(_)` there will be receipts before @@ -548,41 +562,23 @@ impl Pruner { // This ensures that in future pruner runs we can prune all these receipts between the // previous `lowest_block_with_distance` and the new one using // `get_next_tx_num_range_from_checkpoint`. + // + // Only applies if we were able to prune everything intended for this run, otherwise the + // checkpoing is the `last_pruned_block`. let prune_mode_block = self .modes .contract_logs_filter - .lowest_block_with_distance(tip_block_number, pruned_block)? + .lowest_block_with_distance(tip_block_number, initial_last_pruned_block)? .unwrap_or(to_block); - let (checkpoint_block, checkpoint_transaction) = if done { - let Some(prune_mode_block) = prune_mode_block.checked_sub(1) else { return Ok(done) }; - ( - Some(prune_mode_block), - provider - .block_body_indices(prune_mode_block)? - .ok_or(PrunerError::InconsistentData( - "Block body indices for prune mode block not found", - ))? - .last_tx_num(), - ) - } else { - ( - last_pruned_block, - last_pruned_transaction.ok_or(PrunerError::InconsistentData( - "Last pruned transaction is not present", - ))?, - ) - }; - provider.save_prune_checkpoint( PrunePart::ContractLogs, PruneCheckpoint { - block_number: checkpoint_block, - tx_number: Some(checkpoint_transaction), + block_number: Some(prune_mode_block.min(last_pruned_block.unwrap_or(u64::MAX))), + tx_number: last_pruned_transaction, prune_mode: PruneMode::Before(prune_mode_block), }, )?; - Ok(done) } @@ -916,17 +912,22 @@ mod tests { FoldWhile::{Continue, Done}, Itertools, }; - use reth_db::{tables, test_utils::create_test_rw_db, BlockNumberList}; + use reth_db::{ + cursor::DbCursorRO, tables, test_utils::create_test_rw_db, transaction::DbTx, + BlockNumberList, + }; use reth_interfaces::test_utils::{ generators, generators::{ - random_block_range, random_changeset_range, random_eoa_account_range, random_receipt, + random_block_range, random_changeset_range, random_eoa_account, + random_eoa_account_range, random_log, random_receipt, }, }; use reth_primitives::{ - BlockNumber, PruneCheckpoint, PruneMode, PruneModes, PrunePart, TxNumber, H256, MAINNET, + BlockNumber, ContractLogsPruneConfig, PruneCheckpoint, PruneMode, PruneModes, PrunePart, + TxNumber, H256, MAINNET, }; - use reth_provider::PruneCheckpointReader; + use reth_provider::{PruneCheckpointReader, TransactionsProvider}; use reth_stages::test_utils::TestTransaction; use std::{collections::BTreeMap, ops::AddAssign}; @@ -1502,4 +1503,104 @@ mod tests { test_prune(2300, 2, true); test_prune(3000, 3, true); } + + #[test] + fn prune_receipts_by_logs() { + let tx = TestTransaction::default(); + let mut rng = generators::rng(); + + let tip = 300; + let blocks = random_block_range(&mut rng, 0..=tip, H256::zero(), 1..5); + tx.insert_blocks(blocks.iter(), None).expect("insert blocks"); + + let mut receipts = Vec::new(); + + let (deposit_contract_addr, _) = random_eoa_account(&mut rng); + for block in &blocks { + assert!(!block.body.is_empty()); + for (txi, transaction) in block.body.iter().enumerate() { + let mut receipt = random_receipt(&mut rng, transaction, Some(1)); + receipt.logs.push(random_log( + &mut rng, + if txi == (block.body.len() - 1) { Some(deposit_contract_addr) } else { None }, + Some(1), + )); + receipts.push((receipts.len() as u64, receipt)); + } + } + tx.insert_receipts(receipts).expect("insert receipts"); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() + ); + assert_eq!( + tx.table::().unwrap().len(), + tx.table::().unwrap().len() + ); + + let run_prune = || { + let provider = tx.inner_rw(); + + let prune_before_block: usize = 20; + let prune_mode = PruneMode::Before(prune_before_block as u64); + let contract_logs_filter = + ContractLogsPruneConfig(BTreeMap::from([(deposit_contract_addr, prune_mode)])); + let pruner = Pruner::new( + tx.inner_raw(), + MAINNET.clone(), + 5, + PruneModes { + contract_logs_filter: contract_logs_filter.clone(), + ..Default::default() + }, + BatchSizes { + // Less than total amount of blocks to prune to test the batching logic + receipts: 10, + ..Default::default() + }, + ); + + let result = pruner.prune_receipts_by_logs(&provider, tip); + assert_matches!(result, Ok(_)); + let done = result.unwrap(); + provider.commit().expect("commit"); + + let (pruned_block, pruned_tx) = tx + .inner() + .get_prune_checkpoint(PrunePart::ContractLogs) + .unwrap() + .and_then(|checkpoint| { + Some((checkpoint.block_number.unwrap(), checkpoint.tx_number.unwrap())) + }) + .unwrap_or_default(); + + // All receipts are in the end of the block + let unprunable = pruned_block.saturating_sub(prune_before_block as u64 - 1); + + assert_eq!( + tx.table::().unwrap().len(), + blocks.iter().map(|block| block.body.len()).sum::() - + ((pruned_tx + 1) - unprunable) as usize + ); + + return done + }; + + while !run_prune() {} + + let provider = tx.inner(); + let mut cursor = provider.tx_ref().cursor_read::().unwrap(); + let walker = cursor.walk(None).unwrap(); + for receipt in walker { + let (tx_num, receipt) = receipt.unwrap(); + + // Either we only find our contract, or the receipt is part of the unprunable receipts + // set by tip - 128 + assert!( + receipt.logs.iter().any(|l| l.address == deposit_contract_addr) || + provider.transaction_block(tx_num).unwrap().unwrap() > tip - 128, + ); + } + } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index a79c91f32ed8..85c497cc4432 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -659,7 +659,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { &self, keys: impl RangeBounds + Clone + Debug, limit: usize, - skip_filter: impl Fn(&T::Value) -> bool, + mut skip_filter: impl FnMut(&TableRow) -> bool, mut delete_callback: impl FnMut(TableRow), ) -> std::result::Result<(usize, bool), DatabaseError> { let mut cursor = self.tx.cursor_write::()?; @@ -667,7 +667,7 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> DatabaseProvider<'this, TX> { let mut deleted = 0; while let Some(row) = walker.next().transpose()? { - if !skip_filter(&row.1) { + if !skip_filter(&row) { walker.delete_current()?; deleted += 1; delete_callback(row); From a042bf68cb9a92e7e7714b8b0ebbb8ace07e91e3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 23 Aug 2023 11:59:36 +0100 Subject: [PATCH 19/20] fix lint --- crates/prune/src/pruner.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index 4f82c301dbb5..e0930dd67c20 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -469,8 +469,8 @@ impl Pruner { // This will clear all receipts before the first appearance of a contract log or since // the block after the last pruned one. if block_ranges.is_empty() { - let init = last_pruned_block.and_then(|b| Some(b + 1)).unwrap_or_default(); - if init <= *start_block - 1 { + let init = last_pruned_block.map(|b| b + 1).unwrap_or_default(); + if init < *start_block { block_ranges.push((init, *start_block - 1, 0)); } } From 7d43486f062bec2ea2f60581792783c13540b69e Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Wed, 23 Aug 2023 12:04:13 +0100 Subject: [PATCH 20/20] fixes after review --- crates/prune/src/pruner.rs | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index e0930dd67c20..5df06e995888 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -719,16 +719,16 @@ impl Pruner { }; let range_end = *range.end(); - let mut last_pruned_block_number = None; + let mut last_changeset_pruned_block = None; let (rows, done) = provider.prune_table_with_range::( range, self.batch_sizes.account_history, |_| false, - |row| last_pruned_block_number = Some(row.0), + |row| last_changeset_pruned_block = Some(row.0), )?; trace!(target: "pruner", %rows, %done, "Pruned account history (changesets)"); - let last_pruned_block = last_pruned_block_number + let last_changeset_pruned_block = last_changeset_pruned_block // If there's more account account changesets to prune, set the checkpoint block number // to previous, so we could finish pruning its account changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) @@ -736,7 +736,7 @@ impl Pruner { let (processed, deleted) = self.prune_history_indices::( provider, - last_pruned_block, + last_changeset_pruned_block, |a, b| a.key == b.key, |key| ShardedKey::last(key.key), )?; @@ -744,7 +744,11 @@ impl Pruner { provider.save_prune_checkpoint( PrunePart::AccountHistory, - PruneCheckpoint { block_number: Some(last_pruned_block), tx_number: None, prune_mode }, + PruneCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + prune_mode, + }, )?; Ok(done) @@ -771,16 +775,16 @@ impl Pruner { }; let range_end = *range.end(); - let mut last_pruned_block_number = None; + let mut last_changeset_pruned_block = None; let (rows, done) = provider.prune_table_with_range::( BlockNumberAddress::range(range), self.batch_sizes.storage_history, |_| false, - |row| last_pruned_block_number = Some(row.0.block_number()), + |row| last_changeset_pruned_block = Some(row.0.block_number()), )?; trace!(target: "pruner", %rows, %done, "Pruned storage history (changesets)"); - let last_pruned_block = last_pruned_block_number + let last_changeset_pruned_block = last_changeset_pruned_block // If there's more account storage changesets to prune, set the checkpoint block number // to previous, so we could finish pruning its storage changesets on the next run. .map(|block_number| if done { block_number } else { block_number.saturating_sub(1) }) @@ -788,7 +792,7 @@ impl Pruner { let (processed, deleted) = self.prune_history_indices::( provider, - last_pruned_block, + last_changeset_pruned_block, |a, b| a.address == b.address && a.sharded_key.key == b.sharded_key.key, |key| StorageShardedKey::last(key.address, key.sharded_key.key), )?; @@ -796,13 +800,19 @@ impl Pruner { provider.save_prune_checkpoint( PrunePart::StorageHistory, - PruneCheckpoint { block_number: Some(last_pruned_block), tx_number: None, prune_mode }, + PruneCheckpoint { + block_number: Some(last_changeset_pruned_block), + tx_number: None, + prune_mode, + }, )?; Ok(done) } /// Prune history indices up to the provided block, inclusive. + /// + /// Returns total number of processed (walked) and deleted entities. fn prune_history_indices( &self, provider: &DatabaseProviderRW<'_, DB>,