From 6a2e46e946b836a51b2233d00a2d66b86be06cfb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 16 Nov 2020 16:21:22 +0100 Subject: [PATCH 1/3] client/authority-discovery: Publish and query on exponential interval When a node starts up publishing and querying might fail due to various reasons, for example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather sooner than later. On the other hand, a long running node is likely well connected and thus timely retries are not needed. For this reasoning use an exponentially increasing interval for `publish_interval`, `query_interval` and `priority_group_set_interval` instead of a constant interval. --- client/authority-discovery/src/interval.rs | 46 ++++++++++++ client/authority-discovery/src/lib.rs | 51 ++++++------- client/authority-discovery/src/worker.rs | 73 ++++++++----------- .../authority-discovery/src/worker/tests.rs | 60 --------------- client/network/src/discovery.rs | 6 +- 5 files changed, 100 insertions(+), 136 deletions(-) create mode 100644 client/authority-discovery/src/interval.rs diff --git a/client/authority-discovery/src/interval.rs b/client/authority-discovery/src/interval.rs new file mode 100644 index 0000000000000..8ab4f7da1f3a1 --- /dev/null +++ b/client/authority-discovery/src/interval.rs @@ -0,0 +1,46 @@ +use futures::stream::Stream; +use futures::future::FutureExt; +use futures::ready; +use futures_timer::Delay; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Duration; + +/// Exponentially increasing interval +/// +/// Doubles interval duration on each tick until the configured maximum is reached. +pub struct ExpIncInterval { + max: Duration, + next: Duration, + delay: Delay, +} + +impl ExpIncInterval { + /// Create a new [`ExpIncInterval`]. + pub fn new(start: Duration, max: Duration) -> Self { + let delay = Delay::new(start); + Self { + max, + next: start * 2, + delay, + } + } + + /// Fast forward the exponentially increasing interval to the configured maximum. + pub fn set_to_max(&mut self) { + self.next = self.max; + self.delay = Delay::new(self.next); + } +} + +impl Stream for ExpIncInterval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + ready!(self.delay.poll_unpin(cx)); + self.delay = Delay::new(self.next); + self.next = std::cmp::min(self.max, self.next * 2); + + Poll::Ready(Some(())) + } +} diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs index 4ee57f31e04a5..41aa01e56bde2 100644 --- a/client/authority-discovery/src/lib.rs +++ b/client/authority-discovery/src/lib.rs @@ -38,50 +38,41 @@ use sp_runtime::traits::Block as BlockT; use sp_api::ProvideRuntimeApi; mod error; +mod interval; mod service; +mod worker; + #[cfg(test)] mod tests; -mod worker; /// Configuration of [`Worker`]. pub struct WorkerConfig { - /// The interval in which the node will publish its own address on the DHT. + /// The maximum interval in which the node will publish its own address on the DHT. /// - /// By default this is set to 12 hours. - pub publish_interval: Duration, - /// The interval in which the node will query the DHT for new entries. + /// By default this is set to 1 hour. + pub max_publish_interval: Duration, + /// The maximum interval in which the node will query the DHT for new entries. /// /// By default this is set to 10 minutes. - pub query_interval: Duration, - /// The time the node will wait before triggering the first DHT query or publish. - /// - /// By default this is set to 30 seconds. - /// - /// This default is based on the rough boostrap time required by libp2p Kademlia. - pub query_start_delay: Duration, - /// The interval in which the worker will instruct the peerset to connect to a random subset - /// of discovered validators. - /// - /// By default this is set to 10 minutes. - pub priority_group_set_interval: Duration, - /// The time the worker will wait after each query interval tick to pass a subset of - /// the cached authority addresses down to the peerset. - /// - /// Be aware that the actual delay will be computed by [`Self::query_start_delay`] + - /// [`Self::priority_group_set_start_delay`] - /// - /// By default this is set to 5 minutes. - pub priority_group_set_offset: Duration, + pub max_query_interval: Duration, } impl Default for WorkerConfig { fn default() -> Self { Self { - publish_interval: Duration::from_secs(12 * 60 * 60), - query_interval: Duration::from_secs(10 * 60), - query_start_delay: Duration::from_secs(30), - priority_group_set_interval: Duration::from_secs(10 * 60), - priority_group_set_offset: Duration::from_secs(5 * 60), + // Kademlia's default time-to-live for Dht records is 36h, republishing records every + // 24h through libp2p-kad. Given that a node could restart at any point in time, one can + // not depend on the republishing process, thus publishing own external addresses should + // happen on an interval < 36h. + max_publish_interval: Duration::from_secs(1 * 60 * 60), + // External addresses of remote authorities can change at any given point in time. The + // interval on which to trigger new queries for the current and next authorities is a trade + // off between efficiency and performance. + // + // Querying 700 [`AuthorityId`]s takes ~8m on the Kusama DHT (16th Nov 2020) when + // comparing `authority_discovery_authority_addresses_requested_total` and + // `authority_discovery_dht_event_received`. + max_query_interval: Duration::from_secs(10 * 60), } } } diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index 42ae3a5213f0f..c8e7a9f7aee35 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -14,17 +14,16 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use crate::{error::{Error, Result}, ServicetoWorkerMsg}; +use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg}; use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::marker::PhantomData; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; use futures::channel::mpsc; use futures::{FutureExt, Stream, StreamExt, stream::Fuse}; -use futures_timer::Delay; use addr_cache::AddrCache; use async_trait::async_trait; @@ -54,8 +53,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); } #[cfg(test)] pub mod tests; -type Interval = Box + Unpin + Send + Sync>; - const LOG_TARGET: &'static str = "sub-authority-discovery"; /// Name of the Substrate peerset priority group for authorities discovered through the authority @@ -113,12 +110,12 @@ pub struct Worker { dht_event_rx: DhtEventStream, /// Interval to be proactive, publishing own addresses. - publish_interval: Interval, + publish_interval: ExpIncInterval, /// Interval at which to request addresses of authorities, refilling the pending lookups queue. - query_interval: Interval, + query_interval: ExpIncInterval, /// Interval on which to set the peerset priority group to a new random /// set of addresses. - priority_group_set_interval: Interval, + priority_group_set_interval: ExpIncInterval, /// Queue of throttled lookups pending to be passed to the network. pending_lookups: Vec, @@ -153,31 +150,26 @@ where prometheus_registry: Option, config: crate::WorkerConfig, ) -> Self { - // Kademlia's default time-to-live for Dht records is 36h, republishing - // records every 24h through libp2p-kad. - // Given that a node could restart at any point in time, one can not depend on the - // republishing process, thus publishing own external addresses should happen on an interval - // < 36h. - let publish_interval = interval_at( - Instant::now() + config.query_start_delay, - config.publish_interval, + // When a node starts up publishing and querying might fail due to various reasons, for + // example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather + // sooner than later. On the other hand, a long running node is likely well connected and + // thus timely retries are not needed. For this reasoning use an exponentially increasing + // interval for `publish_interval`, `query_interval` and `priority_group_set_interval` + // instead of a constant interval. + let publish_interval = ExpIncInterval::new( + Duration::from_secs(2), + config.max_publish_interval, ); - - // External addresses of remote authorities can change at any given point in time. The - // interval on which to trigger new queries for the current authorities is a trade off - // between efficiency and performance. - let query_interval_start = Instant::now() + config.query_start_delay; - let query_interval_duration = config.query_interval; - let query_interval = interval_at(query_interval_start, query_interval_duration); - - // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when - // comparing `authority_discovery_authority_addresses_requested_total` and - // `authority_discovery_dht_event_received`. With that in mind set the peerset priority - // group on the same interval as the [`query_interval`] above, - // just delayed by 5 minutes by default. - let priority_group_set_interval = interval_at( - query_interval_start + config.priority_group_set_offset, - config.priority_group_set_interval, + let query_interval = ExpIncInterval::new( + Duration::from_secs(2), + config.max_query_interval, + ); + let priority_group_set_interval = ExpIncInterval::new( + Duration::from_secs(2), + // Trade-off between node connection churn and connectivity. Using half of + // [`crate::WorkerConfig::max_query_interval`] to update priority group once at the + // beginning and once in the middle of each query interval. + config.max_query_interval / 2, ); let addr_cache = AddrCache::new(); @@ -413,7 +405,7 @@ where } if log_enabled!(log::Level::Debug) { - let hashes = v.iter().map(|(hash, _value)| hash.clone()); + let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect(); debug!( target: LOG_TARGET, "Value for hash '{:?}' found on Dht.", hashes, @@ -449,6 +441,11 @@ where } }, DhtEvent::ValuePut(hash) => { + // Fast forward the exponentially increasing interval to the configured maximum. In + // case this was the first successful address publishing there is no need for a + // timely retry. + self.publish_interval.set_to_max(); + if let Some(metrics) = &self.metrics { metrics.dht_event_received.with_label_values(&["value_put"]).inc(); } @@ -661,16 +658,6 @@ fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key { libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id)) } -fn interval_at(start: Instant, duration: Duration) -> Interval { - let stream = futures::stream::unfold(start, move |next| { - let time_until_next = next.saturating_duration_since(Instant::now()); - - Delay::new(time_until_next).map(move |_| Some(((), next + duration))) - }); - - Box::new(stream) -} - /// Prometheus metrics for a [`Worker`]. #[derive(Clone)] pub(crate) struct Metrics { diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index 12adb8f232514..fee861dfeb0c7 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -37,66 +37,6 @@ use substrate_test_runtime_client::runtime::Block; use super::*; -#[test] -fn interval_at_with_start_now() { - let start = Instant::now(); - - let mut interval = interval_at( - std::time::Instant::now(), - std::time::Duration::from_secs(10), - ); - - futures::executor::block_on(async { - interval.next().await; - }); - - assert!( - Instant::now().saturating_duration_since(start) < Duration::from_secs(1), - "Expected low resolution instant interval to fire within less than a second.", - ); -} - -#[test] -fn interval_at_is_queuing_ticks() { - let start = Instant::now(); - - let interval = interval_at(start, std::time::Duration::from_millis(100)); - - // Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd - // at 200ms). - std::thread::sleep(Duration::from_millis(200)); - - futures::executor::block_on(async { - interval.take(3).collect::>().await; - }); - - // Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is - // not queuing ticks. - assert!( - Instant::now().saturating_duration_since(start) < Duration::from_millis(300), - "Expect interval to /queue/ events when not polled for a while.", - ); -} - -#[test] -fn interval_at_with_initial_delay() { - let start = Instant::now(); - - let mut interval = interval_at( - std::time::Instant::now() + Duration::from_millis(100), - std::time::Duration::from_secs(10), - ); - - futures::executor::block_on(async { - interval.next().await; - }); - - assert!( - Instant::now().saturating_duration_since(start) > Duration::from_millis(100), - "Expected interval with initial delay not to fire right away.", - ); -} - #[derive(Clone)] pub(crate) struct TestApi { pub(crate) authorities: Vec, diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index f9bda6aabf5f8..552e91f4ee499 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -693,7 +693,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default)) } Err(e) => { - warn!(target: "sub-libp2p", + debug!(target: "sub-libp2p", "Libp2p => Failed to get record: {:?}", e); DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default)) } @@ -704,7 +704,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { let ev = match res { Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)), Err(e) => { - warn!(target: "sub-libp2p", + debug!(target: "sub-libp2p", "Libp2p => Failed to put record: {:?}", e); DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default)) } @@ -716,7 +716,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { Ok(ok) => debug!(target: "sub-libp2p", "Libp2p => Record republished: {:?}", ok.key), - Err(e) => warn!(target: "sub-libp2p", + Err(e) => debug!(target: "sub-libp2p", "Libp2p => Republishing of record {:?} failed with: {:?}", e.key(), e) } From 934381c8b2444d2277ca6f1bd5fab0351f52dceb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 18 Nov 2020 15:59:01 +0100 Subject: [PATCH 2/3] client/authority-discovery/src/interval.rs: Add license header --- client/authority-discovery/src/interval.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/client/authority-discovery/src/interval.rs b/client/authority-discovery/src/interval.rs index 8ab4f7da1f3a1..b3aa5b1c0f678 100644 --- a/client/authority-discovery/src/interval.rs +++ b/client/authority-discovery/src/interval.rs @@ -1,3 +1,19 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + use futures::stream::Stream; use futures::future::FutureExt; use futures::ready; From 114aec1e0643b8434666b08d8bf5e530c55c188e Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 19 Nov 2020 14:38:18 +0100 Subject: [PATCH 3/3] .maintain/gitlab: Ensure adder collator tests are run on CI --- .maintain/gitlab/check_polkadot_companion_build.sh | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.maintain/gitlab/check_polkadot_companion_build.sh b/.maintain/gitlab/check_polkadot_companion_build.sh index 73a5a36ff8af9..4b6e45c267efe 100755 --- a/.maintain/gitlab/check_polkadot_companion_build.sh +++ b/.maintain/gitlab/check_polkadot_companion_build.sh @@ -92,3 +92,6 @@ cd polkadot # Test Polkadot pr or master branch with this Substrate commit. cargo update -p sp-io time cargo test --all --release --verbose --features=real-overseer + +cd parachain/test-parachains/adder/collator/ +time cargo test --release --verbose --locked --features=real-overseer