From 4c7e011fea9d2083c39fd45af5c6a8f6d4b32627 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 21 Jul 2021 17:01:24 -0500 Subject: [PATCH] never mode for StagnantCheckInterval --- node/core/chain-selection/src/lib.rs | 36 ++++++++++++++++++---------- node/service/src/lib.rs | 7 ++++-- 2 files changed, 28 insertions(+), 15 deletions(-) diff --git a/node/core/chain-selection/src/lib.rs b/node/core/chain-selection/src/lib.rs index 9862f60d7de8..91e766f53df2 100644 --- a/node/core/chain-selection/src/lib.rs +++ b/node/core/chain-selection/src/lib.rs @@ -28,6 +28,7 @@ use polkadot_node_subsystem::{ use kvdb::KeyValueDB; use parity_scale_codec::Error as CodecError; use futures::channel::oneshot; +use futures::future::Either; use futures::prelude::*; use std::time::{UNIX_EPOCH, Duration,SystemTime}; @@ -244,7 +245,7 @@ impl Clock for SystemClock { /// The interval, in seconds to check for stagnant blocks. #[derive(Debug, Clone)] -pub struct StagnantCheckInterval(Duration); +pub struct StagnantCheckInterval(Option); impl Default for StagnantCheckInterval { fn default() -> Self { @@ -255,28 +256,37 @@ impl Default for StagnantCheckInterval { // between 2 validators is D + 5s. const DEFAULT_STAGNANT_CHECK_INTERVAL: Duration = Duration::from_secs(5); - StagnantCheckInterval(DEFAULT_STAGNANT_CHECK_INTERVAL) + StagnantCheckInterval(Some(DEFAULT_STAGNANT_CHECK_INTERVAL)) } } impl StagnantCheckInterval { /// Create a new stagnant-check interval wrapping the given duration. pub fn new(interval: Duration) -> Self { - StagnantCheckInterval(interval) + StagnantCheckInterval(Some(interval)) } - fn timeout_stream(&self) -> impl Stream { - let interval = self.0; - let mut delay = futures_timer::Delay::new(interval); + /// Create a `StagnantCheckInterval` which never triggers. + pub fn never() -> Self { + StagnantCheckInterval(None) + } - futures::stream::poll_fn(move |cx| { - let poll = delay.poll_unpin(cx); - if poll.is_ready() { - delay.reset(interval) - } + fn timeout_stream(&self) -> impl Stream { + match self.0 { + Some(interval) => Either::Left({ + let mut delay = futures_timer::Delay::new(interval); + + futures::stream::poll_fn(move |cx| { + let poll = delay.poll_unpin(cx); + if poll.is_ready() { + delay.reset(interval) + } - poll.map(Some) - }) + poll.map(Some) + }) + }), + None => Either::Right(futures::stream::pending()), + } } } diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 5c92d00ca7d7..4ab09e6cb239 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -42,7 +42,10 @@ use { polkadot_node_core_av_store::Error as AvailabilityError, polkadot_node_core_approval_voting::Config as ApprovalVotingConfig, polkadot_node_core_candidate_validation::Config as CandidateValidationConfig, - polkadot_node_core_chain_selection::Config as ChainSelectionConfig, + polkadot_node_core_chain_selection::{ + self as chain_selection_subsystem, + Config as ChainSelectionConfig, + }, polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig, polkadot_overseer::BlockInfo, sp_trie::PrefixedMemoryDB, @@ -663,7 +666,7 @@ pub fn new_full( let chain_selection_config = ChainSelectionConfig { col_data: crate::parachains_db::REAL_COLUMNS.col_chain_selection_data, - stagnant_check_interval: Default::default(), + stagnant_check_interval: chain_selection_subsystem::StagnantCheckInterval::never(), }; let dispute_coordinator_config = DisputeCoordinatorConfig {