From 3b2b067d949c02ece20c76f1590066f2cd87b2b8 Mon Sep 17 00:00:00 2001 From: Vsevolod Stakhov Date: Sat, 9 Jul 2022 09:36:33 +0100 Subject: [PATCH] Implement prune only stagnant check mode (#5761) * Limit number of elements loaded from the stagnant key This will likely be required if we enable stagnant prunning as currently database has way too many entries to be prunned in a single iteration * Fmt run * Slightly improve logging * Some more debug nits * Fmt pass * Add stagnant prunning delay * Enable stagnant check worker * Implement stagnant pruning without stagnant checks * Update node/core/chain-selection/src/tree.rs Co-authored-by: Andronik * Apply suggestions from code review Co-authored-by: Andronik Co-authored-by: Andronik --- node/core/chain-selection/src/lib.rs | 62 +++++++++++++++++++++++--- node/core/chain-selection/src/tests.rs | 1 + node/core/chain-selection/src/tree.rs | 35 ++++++++++++++- node/service/src/lib.rs | 4 +- 4 files changed, 95 insertions(+), 7 deletions(-) diff --git a/node/core/chain-selection/src/lib.rs b/node/core/chain-selection/src/lib.rs index be6509e54a29..eb5ceac9b768 100644 --- a/node/core/chain-selection/src/lib.rs +++ b/node/core/chain-selection/src/lib.rs @@ -50,6 +50,8 @@ type Timestamp = u64; // If a block isn't approved in 120 seconds, nodes will abandon it // and begin building on another chain. const STAGNANT_TIMEOUT: Timestamp = 120; +// Delay prunning of the stagnant keys in prune only mode by 25 hours to avoid interception with the finality +const STAGNANT_PRUNE_DELAY: Timestamp = 25 * 60 * 60; // Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration const MAX_STAGNANT_ENTRIES: usize = 1000; @@ -297,6 +299,19 @@ impl StagnantCheckInterval { } } +/// Mode of the stagnant check operations: check and prune or prune only +#[derive(Debug, Clone)] +pub enum StagnantCheckMode { + CheckAndPrune, + PruneOnly, +} + +impl Default for StagnantCheckMode { + fn default() -> Self { + StagnantCheckMode::PruneOnly + } +} + /// Configuration for the chain selection subsystem. #[derive(Debug, Clone)] pub struct Config { @@ -304,6 +319,8 @@ pub struct Config { pub col_data: u32, /// How often to check for stagnant blocks. pub stagnant_check_interval: StagnantCheckInterval, + /// Mode of stagnant checks + pub stagnant_check_mode: StagnantCheckMode, } /// The chain selection subsystem. @@ -340,9 +357,15 @@ impl ChainSelectionSubsystem { ); SpawnedSubsystem { - future: run(ctx, backend, self.config.stagnant_check_interval, Box::new(SystemClock)) - .map(Ok) - .boxed(), + future: run( + ctx, + backend, + self.config.stagnant_check_interval, + self.config.stagnant_check_mode, + Box::new(SystemClock), + ) + .map(Ok) + .boxed(), name: "chain-selection-subsystem", } } @@ -353,12 +376,20 @@ async fn run( mut ctx: Context, mut backend: B, stagnant_check_interval: StagnantCheckInterval, + stagnant_check_mode: StagnantCheckMode, clock: Box, ) where B: Backend, { loop { - let res = run_until_error(&mut ctx, &mut backend, &stagnant_check_interval, &*clock).await; + let res = run_until_error( + &mut ctx, + &mut backend, + &stagnant_check_interval, + &stagnant_check_mode, + &*clock, + ) + .await; match res { Err(e) => { e.trace(); @@ -383,6 +414,7 @@ async fn run_until_error( ctx: &mut Context, backend: &mut B, stagnant_check_interval: &StagnantCheckInterval, + stagnant_check_mode: &StagnantCheckMode, clock: &(dyn Clock + Sync), ) -> Result<(), Error> where @@ -437,7 +469,13 @@ where } } _ = stagnant_check_stream.next().fuse() => { - detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?; + match stagnant_check_mode { + StagnantCheckMode::CheckAndPrune => detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES), + StagnantCheckMode::PruneOnly => { + let now_timestamp = clock.timestamp_now(); + prune_only_stagnant(backend, now_timestamp - STAGNANT_PRUNE_DELAY, MAX_STAGNANT_ENTRIES) + }, + }?; } } } @@ -653,6 +691,20 @@ fn detect_stagnant( backend.write(ops) } +fn prune_only_stagnant( + backend: &mut impl Backend, + up_to: Timestamp, + max_elements: usize, +) -> Result<(), Error> { + let ops = { + let overlay = tree::prune_only_stagnant(&*backend, up_to, max_elements)?; + + overlay.into_write_ops() + }; + + backend.write(ops) +} + // Load the leaves from the backend. If there are no leaves, then return // the finalized block. async fn load_leaves( diff --git a/node/core/chain-selection/src/tests.rs b/node/core/chain-selection/src/tests.rs index 20c4700dff57..404b854d894b 100644 --- a/node/core/chain-selection/src/tests.rs +++ b/node/core/chain-selection/src/tests.rs @@ -244,6 +244,7 @@ fn test_harness>( context, backend.clone(), StagnantCheckInterval::new(TEST_STAGNANT_INTERVAL), + StagnantCheckMode::CheckAndPrune, Box::new(clock.clone()), ); diff --git a/node/core/chain-selection/src/tree.rs b/node/core/chain-selection/src/tree.rs index 5edb6748934d..aafd75de5f97 100644 --- a/node/core/chain-selection/src/tree.rs +++ b/node/core/chain-selection/src/tree.rs @@ -552,7 +552,7 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>( ?up_to, ?min_ts, ?max_ts, - "Prepared {} stagnant entries for pruning", + "Prepared {} stagnant entries for checking/pruning", stagnant_up_to.len() ); @@ -594,6 +594,39 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>( Ok(backend) } +/// Prune stagnant entries at some timestamp without other checks +/// This function is intended just to clean leftover entries when the real +/// stagnant checks are disabled +pub(super) fn prune_only_stagnant<'a, B: 'a + Backend>( + backend: &'a B, + up_to: Timestamp, + max_elements: usize, +) -> Result, Error> { + let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?; + let mut backend = OverlayedBackend::new(backend); + + let (min_ts, max_ts) = match stagnant_up_to.len() { + 0 => (0 as Timestamp, 0 as Timestamp), + 1 => (stagnant_up_to[0].0, stagnant_up_to[0].0), + n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0), + }; + + gum::debug!( + target: LOG_TARGET, + ?up_to, + ?min_ts, + ?max_ts, + "Prepared {} stagnant entries for pruning", + stagnant_up_to.len() + ); + + for (timestamp, _) in stagnant_up_to { + backend.delete_stagnant_at(timestamp); + } + + Ok(backend) +} + /// Revert the tree to the block relative to `hash`. /// /// This accepts a fresh backend and returns an overlay on top of it representing diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 95b613d998f0..0f131280b51d 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -930,7 +930,8 @@ where let chain_selection_config = ChainSelectionConfig { col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data, - stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(), + stagnant_check_interval: Default::default(), + stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly, }; let dispute_coordinator_config = DisputeCoordinatorConfig { @@ -1477,6 +1478,7 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R let config = chain_selection_subsystem::Config { col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data, stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(), + stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly, }; let chain_selection = chain_selection_subsystem::ChainSelectionSubsystem::new(config, db);