Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Limit stagnant checks to a certain amount of entries (#5742)
Browse files Browse the repository at this point in the history
* Limit number of elements loaded from the stagnant key

This will likely be required if we enable stagnant prunning as currently database has way
too many entries to be prunned in a single iteration

* Fmt run

* Slightly improve logging

* Some more debug nits

* Fmt pass
  • Loading branch information
vstakhov authored and al3mart committed Jul 14, 2022
1 parent c55d6d5 commit 9315a57
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 13 deletions.
3 changes: 2 additions & 1 deletion node/core/chain-selection/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ pub(super) trait Backend {
/// Load the stagnant list at the given timestamp.
fn load_stagnant_at(&self, timestamp: Timestamp) -> Result<Vec<Hash>, Error>;
/// Load all stagnant lists up to and including the given Unix timestamp
/// in ascending order.
/// in ascending order. Stop fetching stagnant entries upon reaching `max_elements`.
fn load_stagnant_at_up_to(
&self,
up_to: Timestamp,
max_elements: usize,
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error>;
/// Load the earliest kept block number.
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error>;
Expand Down
25 changes: 18 additions & 7 deletions node/core/chain-selection/src/db_backend/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ impl Backend for DbBackend {
fn load_stagnant_at_up_to(
&self,
up_to: crate::Timestamp,
max_elements: usize,
) -> Result<Vec<(crate::Timestamp, Vec<Hash>)>, Error> {
let stagnant_at_iter =
self.inner.iter_with_prefix(self.config.col_data, &STAGNANT_AT_PREFIX[..]);
Expand All @@ -240,7 +241,9 @@ impl Backend for DbBackend {
_ => None,
}
})
.take_while(|(at, _)| *at <= up_to.into())
.enumerate()
.take_while(|(idx, (at, _))| *at <= up_to.into() && *idx < max_elements)
.map(|(_, v)| v)
.collect::<Vec<_>>();

Ok(val)
Expand Down Expand Up @@ -528,7 +531,10 @@ mod tests {
let mut backend = DbBackend::new(db, config);

// Prove that it's cheap
assert!(backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap().is_empty());
assert!(backend
.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX)
.unwrap()
.is_empty());

backend
.write(vec![
Expand All @@ -539,7 +545,7 @@ mod tests {
.unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(Timestamp::max_value()).unwrap(),
backend.load_stagnant_at_up_to(Timestamp::max_value(), usize::MAX).unwrap(),
vec![
(2, vec![Hash::repeat_byte(1)]),
(5, vec![Hash::repeat_byte(2)]),
Expand All @@ -548,7 +554,7 @@ mod tests {
);

assert_eq!(
backend.load_stagnant_at_up_to(10).unwrap(),
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
vec![
(2, vec![Hash::repeat_byte(1)]),
(5, vec![Hash::repeat_byte(2)]),
Expand All @@ -557,21 +563,26 @@ mod tests {
);

assert_eq!(
backend.load_stagnant_at_up_to(9).unwrap(),
backend.load_stagnant_at_up_to(9, usize::MAX).unwrap(),
vec![(2, vec![Hash::repeat_byte(1)]), (5, vec![Hash::repeat_byte(2)]),]
);

assert_eq!(
backend.load_stagnant_at_up_to(9, 1).unwrap(),
vec![(2, vec![Hash::repeat_byte(1)]),]
);

backend.write(vec![BackendWriteOp::DeleteStagnantAt(2)]).unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(5).unwrap(),
backend.load_stagnant_at_up_to(5, usize::MAX).unwrap(),
vec![(5, vec![Hash::repeat_byte(2)]),]
);

backend.write(vec![BackendWriteOp::WriteStagnantAt(5, vec![])]).unwrap();

assert_eq!(
backend.load_stagnant_at_up_to(10).unwrap(),
backend.load_stagnant_at_up_to(10, usize::MAX).unwrap(),
vec![(10, vec![Hash::repeat_byte(3)]),]
);
}
Expand Down
12 changes: 9 additions & 3 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Timestamp = u64;
// If a block isn't approved in 120 seconds, nodes will abandon it
// and begin building on another chain.
const STAGNANT_TIMEOUT: Timestamp = 120;
// Maximum number of stagnant entries cleaned during one `STAGNANT_TIMEOUT` iteration
const MAX_STAGNANT_ENTRIES: usize = 1000;

#[derive(Debug, Clone)]
enum Approval {
Expand Down Expand Up @@ -435,7 +437,7 @@ where
}
}
_ = stagnant_check_stream.next().fuse() => {
detect_stagnant(backend, clock.timestamp_now())?;
detect_stagnant(backend, clock.timestamp_now(), MAX_STAGNANT_ENTRIES)?;
}
}
}
Expand Down Expand Up @@ -637,9 +639,13 @@ fn handle_approved_block(backend: &mut impl Backend, approved_block: Hash) -> Re
backend.write(ops)
}

fn detect_stagnant(backend: &mut impl Backend, now: Timestamp) -> Result<(), Error> {
fn detect_stagnant(
backend: &mut impl Backend,
now: Timestamp,
max_elements: usize,
) -> Result<(), Error> {
let ops = {
let overlay = tree::detect_stagnant(&*backend, now)?;
let overlay = tree::detect_stagnant(&*backend, now, max_elements)?;

overlay.into_write_ops()
};
Expand Down
5 changes: 4 additions & 1 deletion node/core/chain-selection/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,16 @@ impl Backend for TestBackend {
fn load_stagnant_at_up_to(
&self,
up_to: Timestamp,
max_elements: usize,
) -> Result<Vec<(Timestamp, Vec<Hash>)>, Error> {
Ok(self
.inner
.lock()
.stagnant_at
.range(..=up_to)
.map(|(t, v)| (*t, v.clone()))
.enumerate()
.take_while(|(idx, _)| *idx < max_elements)
.map(|(_, (t, v))| (*t, v.clone()))
.collect())
}
fn load_first_block_number(&self) -> Result<Option<BlockNumber>, Error> {
Expand Down
33 changes: 32 additions & 1 deletion node/core/chain-selection/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,12 +534,28 @@ pub(super) fn approve_block(
pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
backend: &'a B,
up_to: Timestamp,
max_elements: usize,
) -> Result<OverlayedBackend<'a, B>, Error> {
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to)?;
let stagnant_up_to = backend.load_stagnant_at_up_to(up_to, max_elements)?;
let mut backend = OverlayedBackend::new(backend);

let (min_ts, max_ts) = match stagnant_up_to.len() {
0 => (0 as Timestamp, 0 as Timestamp),
1 => (stagnant_up_to[0].0, stagnant_up_to[0].0),
n => (stagnant_up_to[0].0, stagnant_up_to[n - 1].0),
};

// As this is in ascending order, only the earliest stagnant
// blocks will involve heavy viability propagations.
gum::debug!(
target: LOG_TARGET,
?up_to,
?min_ts,
?max_ts,
"Prepared {} stagnant entries for pruning",
stagnant_up_to.len()
);

for (timestamp, maybe_stagnant) in stagnant_up_to {
backend.delete_stagnant_at(timestamp);

Expand All @@ -550,12 +566,27 @@ pub(super) fn detect_stagnant<'a, B: 'a + Backend>(
entry.viability.approval = Approval::Stagnant;
}
let is_viable = entry.viability.is_viable();
gum::trace!(
target: LOG_TARGET,
?block_hash,
?timestamp,
?was_viable,
?is_viable,
"Found existing stagnant entry"
);

if was_viable && !is_viable {
propagate_viability_update(&mut backend, entry)?;
} else {
backend.write_block_entry(entry);
}
} else {
gum::trace!(
target: LOG_TARGET,
?block_hash,
?timestamp,
"Found non-existing stagnant entry"
);
}
}
}
Expand Down

0 comments on commit 9315a57

Please sign in to comment.