diff --git a/.maintain/gitlab/check_polkadot_companion_build.sh b/.maintain/gitlab/check_polkadot_companion_build.sh
index 73a5a36ff8af9..4b6e45c267efe 100755
--- a/.maintain/gitlab/check_polkadot_companion_build.sh
+++ b/.maintain/gitlab/check_polkadot_companion_build.sh
@@ -92,3 +92,6 @@ cd polkadot
# Test Polkadot pr or master branch with this Substrate commit.
cargo update -p sp-io
time cargo test --all --release --verbose --features=real-overseer
+
+cd parachain/test-parachains/adder/collator/
+time cargo test --release --verbose --locked --features=real-overseer
diff --git a/client/authority-discovery/src/interval.rs b/client/authority-discovery/src/interval.rs
new file mode 100644
index 0000000000000..b3aa5b1c0f678
--- /dev/null
+++ b/client/authority-discovery/src/interval.rs
@@ -0,0 +1,62 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+use futures::stream::Stream;
+use futures::future::FutureExt;
+use futures::ready;
+use futures_timer::Delay;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+use std::time::Duration;
+
+/// Exponentially increasing interval
+///
+/// Doubles interval duration on each tick until the configured maximum is reached.
+pub struct ExpIncInterval {
+ max: Duration,
+ next: Duration,
+ delay: Delay,
+}
+
+impl ExpIncInterval {
+ /// Create a new [`ExpIncInterval`].
+ pub fn new(start: Duration, max: Duration) -> Self {
+ let delay = Delay::new(start);
+ Self {
+ max,
+ next: start * 2,
+ delay,
+ }
+ }
+
+ /// Fast forward the exponentially increasing interval to the configured maximum.
+ pub fn set_to_max(&mut self) {
+ self.next = self.max;
+ self.delay = Delay::new(self.next);
+ }
+}
+
+impl Stream for ExpIncInterval {
+ type Item = ();
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> {
+ ready!(self.delay.poll_unpin(cx));
+ self.delay = Delay::new(self.next);
+ self.next = std::cmp::min(self.max, self.next * 2);
+
+ Poll::Ready(Some(()))
+ }
+}
diff --git a/client/authority-discovery/src/lib.rs b/client/authority-discovery/src/lib.rs
index 4ee57f31e04a5..41aa01e56bde2 100644
--- a/client/authority-discovery/src/lib.rs
+++ b/client/authority-discovery/src/lib.rs
@@ -38,50 +38,41 @@ use sp_runtime::traits::Block as BlockT;
use sp_api::ProvideRuntimeApi;
mod error;
+mod interval;
mod service;
+mod worker;
+
#[cfg(test)]
mod tests;
-mod worker;
/// Configuration of [`Worker`].
pub struct WorkerConfig {
- /// The interval in which the node will publish its own address on the DHT.
+ /// The maximum interval in which the node will publish its own address on the DHT.
///
- /// By default this is set to 12 hours.
- pub publish_interval: Duration,
- /// The interval in which the node will query the DHT for new entries.
+ /// By default this is set to 1 hour.
+ pub max_publish_interval: Duration,
+ /// The maximum interval in which the node will query the DHT for new entries.
///
/// By default this is set to 10 minutes.
- pub query_interval: Duration,
- /// The time the node will wait before triggering the first DHT query or publish.
- ///
- /// By default this is set to 30 seconds.
- ///
- /// This default is based on the rough boostrap time required by libp2p Kademlia.
- pub query_start_delay: Duration,
- /// The interval in which the worker will instruct the peerset to connect to a random subset
- /// of discovered validators.
- ///
- /// By default this is set to 10 minutes.
- pub priority_group_set_interval: Duration,
- /// The time the worker will wait after each query interval tick to pass a subset of
- /// the cached authority addresses down to the peerset.
- ///
- /// Be aware that the actual delay will be computed by [`Self::query_start_delay`] +
- /// [`Self::priority_group_set_start_delay`]
- ///
- /// By default this is set to 5 minutes.
- pub priority_group_set_offset: Duration,
+ pub max_query_interval: Duration,
}
impl Default for WorkerConfig {
fn default() -> Self {
Self {
- publish_interval: Duration::from_secs(12 * 60 * 60),
- query_interval: Duration::from_secs(10 * 60),
- query_start_delay: Duration::from_secs(30),
- priority_group_set_interval: Duration::from_secs(10 * 60),
- priority_group_set_offset: Duration::from_secs(5 * 60),
+ // Kademlia's default time-to-live for Dht records is 36h, republishing records every
+ // 24h through libp2p-kad. Given that a node could restart at any point in time, one can
+ // not depend on the republishing process, thus publishing own external addresses should
+ // happen on an interval < 36h.
+ max_publish_interval: Duration::from_secs(1 * 60 * 60),
+ // External addresses of remote authorities can change at any given point in time. The
+ // interval on which to trigger new queries for the current and next authorities is a trade
+ // off between efficiency and performance.
+ //
+ // Querying 700 [`AuthorityId`]s takes ~8m on the Kusama DHT (16th Nov 2020) when
+ // comparing `authority_discovery_authority_addresses_requested_total` and
+ // `authority_discovery_dht_event_received`.
+ max_query_interval: Duration::from_secs(10 * 60),
}
}
}
diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs
index 42ae3a5213f0f..c8e7a9f7aee35 100644
--- a/client/authority-discovery/src/worker.rs
+++ b/client/authority-discovery/src/worker.rs
@@ -14,17 +14,16 @@
// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see .
-use crate::{error::{Error, Result}, ServicetoWorkerMsg};
+use crate::{error::{Error, Result}, interval::ExpIncInterval, ServicetoWorkerMsg};
use std::collections::{HashMap, HashSet};
use std::convert::TryInto;
use std::marker::PhantomData;
use std::sync::Arc;
-use std::time::{Duration, Instant};
+use std::time::Duration;
use futures::channel::mpsc;
use futures::{FutureExt, Stream, StreamExt, stream::Fuse};
-use futures_timer::Delay;
use addr_cache::AddrCache;
use async_trait::async_trait;
@@ -54,8 +53,6 @@ mod schema { include!(concat!(env!("OUT_DIR"), "/authority_discovery.rs")); }
#[cfg(test)]
pub mod tests;
-type Interval = Box + Unpin + Send + Sync>;
-
const LOG_TARGET: &'static str = "sub-authority-discovery";
/// Name of the Substrate peerset priority group for authorities discovered through the authority
@@ -113,12 +110,12 @@ pub struct Worker {
dht_event_rx: DhtEventStream,
/// Interval to be proactive, publishing own addresses.
- publish_interval: Interval,
+ publish_interval: ExpIncInterval,
/// Interval at which to request addresses of authorities, refilling the pending lookups queue.
- query_interval: Interval,
+ query_interval: ExpIncInterval,
/// Interval on which to set the peerset priority group to a new random
/// set of addresses.
- priority_group_set_interval: Interval,
+ priority_group_set_interval: ExpIncInterval,
/// Queue of throttled lookups pending to be passed to the network.
pending_lookups: Vec,
@@ -153,31 +150,26 @@ where
prometheus_registry: Option,
config: crate::WorkerConfig,
) -> Self {
- // Kademlia's default time-to-live for Dht records is 36h, republishing
- // records every 24h through libp2p-kad.
- // Given that a node could restart at any point in time, one can not depend on the
- // republishing process, thus publishing own external addresses should happen on an interval
- // < 36h.
- let publish_interval = interval_at(
- Instant::now() + config.query_start_delay,
- config.publish_interval,
+ // When a node starts up publishing and querying might fail due to various reasons, for
+ // example due to being not yet fully bootstrapped on the DHT. Thus one should retry rather
+ // sooner than later. On the other hand, a long running node is likely well connected and
+ // thus timely retries are not needed. For this reasoning use an exponentially increasing
+ // interval for `publish_interval`, `query_interval` and `priority_group_set_interval`
+ // instead of a constant interval.
+ let publish_interval = ExpIncInterval::new(
+ Duration::from_secs(2),
+ config.max_publish_interval,
);
-
- // External addresses of remote authorities can change at any given point in time. The
- // interval on which to trigger new queries for the current authorities is a trade off
- // between efficiency and performance.
- let query_interval_start = Instant::now() + config.query_start_delay;
- let query_interval_duration = config.query_interval;
- let query_interval = interval_at(query_interval_start, query_interval_duration);
-
- // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
- // comparing `authority_discovery_authority_addresses_requested_total` and
- // `authority_discovery_dht_event_received`. With that in mind set the peerset priority
- // group on the same interval as the [`query_interval`] above,
- // just delayed by 5 minutes by default.
- let priority_group_set_interval = interval_at(
- query_interval_start + config.priority_group_set_offset,
- config.priority_group_set_interval,
+ let query_interval = ExpIncInterval::new(
+ Duration::from_secs(2),
+ config.max_query_interval,
+ );
+ let priority_group_set_interval = ExpIncInterval::new(
+ Duration::from_secs(2),
+ // Trade-off between node connection churn and connectivity. Using half of
+ // [`crate::WorkerConfig::max_query_interval`] to update priority group once at the
+ // beginning and once in the middle of each query interval.
+ config.max_query_interval / 2,
);
let addr_cache = AddrCache::new();
@@ -413,7 +405,7 @@ where
}
if log_enabled!(log::Level::Debug) {
- let hashes = v.iter().map(|(hash, _value)| hash.clone());
+ let hashes: Vec<_> = v.iter().map(|(hash, _value)| hash.clone()).collect();
debug!(
target: LOG_TARGET,
"Value for hash '{:?}' found on Dht.", hashes,
@@ -449,6 +441,11 @@ where
}
},
DhtEvent::ValuePut(hash) => {
+ // Fast forward the exponentially increasing interval to the configured maximum. In
+ // case this was the first successful address publishing there is no need for a
+ // timely retry.
+ self.publish_interval.set_to_max();
+
if let Some(metrics) = &self.metrics {
metrics.dht_event_received.with_label_values(&["value_put"]).inc();
}
@@ -661,16 +658,6 @@ fn hash_authority_id(id: &[u8]) -> libp2p::kad::record::Key {
libp2p::kad::record::Key::new(&libp2p::multihash::Sha2_256::digest(id))
}
-fn interval_at(start: Instant, duration: Duration) -> Interval {
- let stream = futures::stream::unfold(start, move |next| {
- let time_until_next = next.saturating_duration_since(Instant::now());
-
- Delay::new(time_until_next).map(move |_| Some(((), next + duration)))
- });
-
- Box::new(stream)
-}
-
/// Prometheus metrics for a [`Worker`].
#[derive(Clone)]
pub(crate) struct Metrics {
diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs
index 12adb8f232514..fee861dfeb0c7 100644
--- a/client/authority-discovery/src/worker/tests.rs
+++ b/client/authority-discovery/src/worker/tests.rs
@@ -37,66 +37,6 @@ use substrate_test_runtime_client::runtime::Block;
use super::*;
-#[test]
-fn interval_at_with_start_now() {
- let start = Instant::now();
-
- let mut interval = interval_at(
- std::time::Instant::now(),
- std::time::Duration::from_secs(10),
- );
-
- futures::executor::block_on(async {
- interval.next().await;
- });
-
- assert!(
- Instant::now().saturating_duration_since(start) < Duration::from_secs(1),
- "Expected low resolution instant interval to fire within less than a second.",
- );
-}
-
-#[test]
-fn interval_at_is_queuing_ticks() {
- let start = Instant::now();
-
- let interval = interval_at(start, std::time::Duration::from_millis(100));
-
- // Let's wait for 200ms, thus 3 elements should be queued up (1st at 0ms, 2nd at 100ms, 3rd
- // at 200ms).
- std::thread::sleep(Duration::from_millis(200));
-
- futures::executor::block_on(async {
- interval.take(3).collect::>().await;
- });
-
- // Make sure we did not wait for more than 300 ms, which would imply that `at_interval` is
- // not queuing ticks.
- assert!(
- Instant::now().saturating_duration_since(start) < Duration::from_millis(300),
- "Expect interval to /queue/ events when not polled for a while.",
- );
-}
-
-#[test]
-fn interval_at_with_initial_delay() {
- let start = Instant::now();
-
- let mut interval = interval_at(
- std::time::Instant::now() + Duration::from_millis(100),
- std::time::Duration::from_secs(10),
- );
-
- futures::executor::block_on(async {
- interval.next().await;
- });
-
- assert!(
- Instant::now().saturating_duration_since(start) > Duration::from_millis(100),
- "Expected interval with initial delay not to fire right away.",
- );
-}
-
#[derive(Clone)]
pub(crate) struct TestApi {
pub(crate) authorities: Vec,
diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs
index 60d35dbdf1ae0..717aec01f754a 100644
--- a/client/network/src/discovery.rs
+++ b/client/network/src/discovery.rs
@@ -693,7 +693,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
Err(e) => {
- warn!(target: "sub-libp2p",
+ debug!(target: "sub-libp2p",
"Libp2p => Failed to get record: {:?}", e);
DiscoveryOut::ValueNotFound(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
@@ -704,7 +704,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
let ev = match res {
Ok(ok) => DiscoveryOut::ValuePut(ok.key, stats.duration().unwrap_or_else(Default::default)),
Err(e) => {
- warn!(target: "sub-libp2p",
+ debug!(target: "sub-libp2p",
"Libp2p => Failed to put record: {:?}", e);
DiscoveryOut::ValuePutFailed(e.into_key(), stats.duration().unwrap_or_else(Default::default))
}
@@ -716,7 +716,7 @@ impl NetworkBehaviour for DiscoveryBehaviour {
Ok(ok) => debug!(target: "sub-libp2p",
"Libp2p => Record republished: {:?}",
ok.key),
- Err(e) => warn!(target: "sub-libp2p",
+ Err(e) => debug!(target: "sub-libp2p",
"Libp2p => Republishing of record {:?} failed with: {:?}",
e.key(), e)
}