Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
never mode for StagnantCheckInterval
Browse files Browse the repository at this point in the history
  • Loading branch information
rphmeier committed Jul 21, 2021
1 parent 4aebe3e commit 4c7e011
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 15 deletions.
36 changes: 23 additions & 13 deletions node/core/chain-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use polkadot_node_subsystem::{
use kvdb::KeyValueDB;
use parity_scale_codec::Error as CodecError;
use futures::channel::oneshot;
use futures::future::Either;
use futures::prelude::*;

use std::time::{UNIX_EPOCH, Duration,SystemTime};
Expand Down Expand Up @@ -244,7 +245,7 @@ impl Clock for SystemClock {

/// The interval, in seconds to check for stagnant blocks.
#[derive(Debug, Clone)]
pub struct StagnantCheckInterval(Duration);
pub struct StagnantCheckInterval(Option<Duration>);

impl Default for StagnantCheckInterval {
fn default() -> Self {
Expand All @@ -255,28 +256,37 @@ impl Default for StagnantCheckInterval {
// between 2 validators is D + 5s.
const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5);

StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL)
StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL))
}
}

impl StagnantCheckInterval {
/// Create a new stagnant-check interval wrapping the given duration.
pub fn new(interval: Duration) -> Self {
StagnantCheckInterval(interval)
StagnantCheckInterval(Some(interval))
}

fn timeout_stream(&self) -> impl Stream<Item = ()> {
let interval = self.0;
let mut delay = futures_timer::Delay::new(interval);
/// Create a `StagnantCheckInterval` which never triggers.
pub fn never() -> Self {
StagnantCheckInterval(None)
}

futures::stream::poll_fn(move |cx| {
let poll = delay.poll_unpin(cx);
if poll.is_ready() {
delay.reset(interval)
}
fn timeout_stream(&self) -> impl Stream<Item = ()> {
match self.0 {
Some(interval) => Either::Left({
let mut delay = futures_timer::Delay::new(interval);

futures::stream::poll_fn(move |cx| {
let poll = delay.poll_unpin(cx);
if poll.is_ready() {
delay.reset(interval)
}

poll.map(Some)
})
poll.map(Some)
})
}),
None => Either::Right(futures::stream::pending()),
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use {
polkadot_node_core_av_store::Error as AvailabilityError,
polkadot_node_core_approval_voting::Config as ApprovalVotingConfig,
polkadot_node_core_candidate_validation::Config as CandidateValidationConfig,
polkadot_node_core_chain_selection::Config as ChainSelectionConfig,
polkadot_node_core_chain_selection::{
self as chain_selection_subsystem,
Config as ChainSelectionConfig,
},
polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig,
polkadot_overseer::BlockInfo,
sp_trie::PrefixedMemoryDB,
Expand Down Expand Up @@ -663,7 +666,7 @@ pub fn new_full<RuntimeApi, Executor, OverseerGenerator>(

let chain_selection_config = ChainSelectionConfig {
col_data: crate::parachains_db::REAL_COLUMNS.col_chain_selection_data,
stagnant_check_interval: Default::default(),
stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(),
};

let dispute_coordinator_config = DisputeCoordinatorConfig {
Expand Down

0 comments on commit 4c7e011

Please sign in to comment.