Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Implement prune only stagnant check mode (#5761)
Browse files Browse the repository at this point in the history
* Limit number of elements loaded from the stagnant key

This will likely be required if we enable stagnant prunning as currently database has way
too many entries to be prunned in a single iteration

* Fmt run

* Slightly improve logging

* Some more debug nits

* Fmt pass

* Add stagnant prunning delay

* Enable stagnant check worker

* Implement stagnant pruning without stagnant checks

* Update node/core/chain-selection/src/tree.rs

Co-authored-by: Andronik <write@reusable.software>

* Apply suggestions from code review

Co-authored-by: Andronik <write@reusable.software>

Co-authored-by: Andronik <write@reusable.software>
  • Loading branch information
vstakhov and ordian authored Jul 9, 2022
1 parent d7d9ab5 commit 3b2b067
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 7 deletions.
62 changes: 57 additions & 5 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Timestamp = u64;
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
// Delay prunning of the stagnant keys in prune only mode by 25 hours to avoid interception with the finality
const STAGNANT_PRUNE_DELAY: Timestamp = 25 * 60 * 60;
// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
const MAX_STAGNANT_ENTRIES: usize = 1000;

Expand Down Expand Up @@ -297,13 +299,28 @@ impl StagnantCheckInterval {
}
}

/// Mode of the stagnant check operations: check and prune or prune only
#[derive(Debug, Clone)]
pub enum StagnantCheckMode {
CheckAndPrune,
PruneOnly,
}

impl Default for StagnantCheckMode {
fn default() -> Self {
StagnantCheckMode::PruneOnly
}
}

/// Configuration for the chain selection subsystem.
#[derive(Debug, Clone)]
pub struct Config {
/// The column in the database that the storage should use.
pub col_data: u32,
/// How often to check for stagnant blocks.
pub stagnant_check_interval: StagnantCheckInterval,
/// Mode of stagnant checks
pub stagnant_check_mode: StagnantCheckMode,
}

/// The chain selection subsystem.
Expand Down Expand Up @@ -340,9 +357,15 @@ impl<Context> ChainSelectionSubsystem {
);

SpawnedSubsystem {
future: run(ctx, backend, self.config.stagnant_check_interval, Box::new(SystemClock))
.map(Ok)
.boxed(),
future: run(
ctx,
backend,
self.config.stagnant_check_interval,
self.config.stagnant_check_mode,
Box::new(SystemClock),
)
.map(Ok)
.boxed(),
name: "chain-selection-subsystem",
}
}
Expand All @@ -353,12 +376,20 @@ async fn run<Context, B>(
mut ctx: Context,
mut backend: B,
stagnant_check_interval: StagnantCheckInterval,
stagnant_check_mode: StagnantCheckMode,
clock: Box<dyn Clock + Send + Sync>,
) where
B: Backend,
{
loop {
let res = run_until_error(&mut ctx, &mut backend, &stagnant_check_interval, &*clock).await;
let res = run_until_error(
&mut ctx,
&mut backend,
&stagnant_check_interval,
&stagnant_check_mode,
&*clock,
)
.await;
match res {
Err(e) => {
e.trace();
Expand All @@ -383,6 +414,7 @@ async fn run_until_error<Context, B>(
ctx: &mut Context,
backend: &mut B,
stagnant_check_interval: &StagnantCheckInterval,
stagnant_check_mode: &StagnantCheckMode,
clock: &(dyn Clock + Sync),
) -> Result<(), Error>
where
Expand Down Expand Up @@ -437,7 +469,13 @@ where
}
}
_ = stagnant_check_stream.next().fuse() => {
detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?;
match stagnant_check_mode {
StagnantCheckMode::CheckAndPrune => detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES),
StagnantCheckMode::PruneOnly => {
let now_timestamp = clock.timestamp_now();
prune_only_stagnant(backend, now_timestamp - STAGNANT_PRUNE_DELAY, MAX_STAGNANT_ENTRIES)
},
}?;
}
}
}
Expand Down Expand Up @@ -653,6 +691,20 @@ fn detect_stagnant(
backend.write(ops)
}

fn prune_only_stagnant(
backend: &mut impl Backend,
up_to: Timestamp,
max_elements: usize,
) -> Result<(), Error> {
let ops = {
let overlay = tree::prune_only_stagnant(&*backend, up_to, max_elements)?;

overlay.into_write_ops()
};

backend.write(ops)
}

// Load the leaves from the backend. If there are no leaves, then return
// the finalized block.
async fn load_leaves(
Expand Down
1 change: 1 addition & 0 deletions node/core/chain-selection/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ fn test_harness<T: Future<Output = VirtualOverseer>>(
context,
backend.clone(),
StagnantCheckInterval::new(TEST_STAGNANT_INTERVAL),
StagnantCheckMode::CheckAndPrune,
Box::new(clock.clone()),
);

Expand Down
35 changes: 34 additions & 1 deletion node/core/chain-selection/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -552,7 +552,7 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
?up_to,
?min_ts,
?max_ts,
"Prepared {} stagnant entries for pruning",
"Prepared {} stagnant entries for checking/pruning",
stagnant_up_to.len()
);

Expand Down Expand Up @@ -594,6 +594,39 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
Ok(backend)
}

/// Prune stagnant entries at some timestamp without other checks
/// This function is intended just to clean leftover entries when the real
/// stagnant checks are disabled
pub(super) fn prune_only_stagnant<'a, B: 'a + Backend>(
backend: &'a B,
up_to: Timestamp,
max_elements: usize,
) -> Result<OverlayedBackend<'a, B>, Error> {
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?;
let mut backend = OverlayedBackend::new(backend);

let (min_ts, max_ts) = match stagnant_up_to.len() {
0 => (0 as Timestamp, 0 as Timestamp),
1 => (stagnant_up_to[0].0, stagnant_up_to[0].0),
n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0),
};

gum::debug!(
target: LOG_TARGET,
?up_to,
?min_ts,
?max_ts,
"Prepared {} stagnant entries for pruning",
stagnant_up_to.len()
);

for (timestamp, _) in stagnant_up_to {
backend.delete_stagnant_at(timestamp);
}

Ok(backend)
}

/// Revert the tree to the block relative to `hash`.
///
/// This accepts a fresh backend and returns an overlay on top of it representing
Expand Down
4 changes: 3 additions & 1 deletion node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,8 @@ where

let chain_selection_config = ChainSelectionConfig {
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
stagnant_check_interval: Default::default(),
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
};

let dispute_coordinator_config = DisputeCoordinatorConfig {
Expand Down Expand Up @@ -1477,6 +1478,7 @@ fn revert_chain_selection(db: Arc<dyn Database>, hash: Hash) -> sp_blockchain::R
let config = chain_selection_subsystem::Config {
col_data: parachains_db::REAL_COLUMNS.col_chain_selection_data,
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly,
};

let chain_selection = chain_selection_subsystem::ChainSelectionSubsystem::new(config, db);
Expand Down

0 comments on commit 3b2b067

Please sign in to comment.