diff --git a/Cargo.lock b/Cargo.lock index d27ae76b6abb..e40d844e56f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4120,9 +4120,14 @@ dependencies = [ name = "metered-channel" version = "0.9.9" dependencies = [ + "assert_matches", "derive_more", + "env_logger 0.9.0", "futures 0.3.17", "futures-timer 3.0.2", + "log", + "thiserror", + "tracing", ] [[package]] @@ -10574,18 +10579,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.26" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2" +checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.26" +version = "1.0.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745" +checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c" dependencies = [ "proc-macro2", "quote", @@ -10802,6 +10807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8" dependencies = [ "cfg-if 1.0.0", + "log", "pin-project-lite 0.2.4", "tracing-attributes", "tracing-core", diff --git a/node/metered-channel/Cargo.toml b/node/metered-channel/Cargo.toml index 2741d95b9a68..53695c30923c 100644 --- a/node/metered-channel/Cargo.toml +++ b/node/metered-channel/Cargo.toml @@ -9,6 +9,12 @@ description = "Channels with attached Meters" futures = "0.3.17" futures-timer = "3.0.2" derive_more = "0.99" +tracing = "0.1.28" +thiserror = "1.0.29" [dev-dependencies] futures = { version = "0.3.17", features = ["thread-pool"] } +assert_matches = "1.5" +env_logger = "0.9" +log = "0.4" +tracing = { version = "0.1.28", features = ["log"] } diff --git a/node/metered-channel/src/lib.rs b/node/metered-channel/src/lib.rs index 4fe1bcd22526..9646cbaee901 100644 --- a/node/metered-channel/src/lib.rs +++ b/node/metered-channel/src/lib.rs @@ -24,6 +24,7 @@ use std::sync::{ use derive_more::{Add, Display}; mod bounded; +pub mod oneshot; mod unbounded; pub use self::{bounded::*, unbounded::*}; diff --git a/node/metered-channel/src/oneshot.rs b/node/metered-channel/src/oneshot.rs new file mode 100644 index 000000000000..bf1a52682462 --- /dev/null +++ b/node/metered-channel/src/oneshot.rs @@ -0,0 +1,418 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Metered variant of oneshot channels to be able to extract delays caused by delayed responses. + +use std::{ + ops::Deref, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures::{ + channel::oneshot::{self, Canceled, Cancellation}, + future::{Fuse, FusedFuture}, + prelude::*, +}; +use futures_timer::Delay; + +/// Provides the reason for termination. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u8)] +pub enum Reason { + Completion = 1, + Cancellation = 2, + HardTimeout = 3, +} + +/// Obtained measurements by the `Receiver` side of the `MeteredOneshot`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Measurements { + /// Duration between first poll and polling termination. + first_poll_till_end: Duration, + /// Duration starting with creation until polling termination. + creation_till_end: Duration, + /// Reason for resolving the future. + reason: Reason, +} + +impl Measurements { + /// Obtain the duration of a finished or canceled + /// `oneshot` channel. + pub fn duration_since_first_poll(&self) -> &Duration { + &self.first_poll_till_end + } + + /// Obtain the duration of a finished or canceled + /// `oneshot` channel. + pub fn duration_since_creation(&self) -> &Duration { + &self.creation_till_end + } + + /// Obtain the reason to the channel termination. + pub fn reason(&self) -> &Reason { + &self.reason + } +} + +/// Create a new pair of `OneshotMetered{Sender,Receiver}`. +pub fn channel( + name: &'static str, + soft_timeout: Duration, + hard_timeout: Duration, +) -> (MeteredSender, MeteredReceiver) { + let (tx, rx) = oneshot::channel(); + + ( + MeteredSender { name, inner: tx }, + MeteredReceiver { + name, + inner: rx, + soft_timeout, + hard_timeout, + soft_timeout_fut: None, + hard_timeout_fut: None, + first_poll_timestamp: None, + creation_timestamp: Instant::now(), + }, + ) +} + +#[allow(missing_docs)] +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Oneshot was cancelled.")] + Canceled(#[source] Canceled, Measurements), + #[error("Oneshot did not receive a response within {}", Duration::as_secs_f64(.0))] + HardTimeout(Duration, Measurements), +} + +impl Measurable for Error { + fn measurements(&self) -> Measurements { + match self { + Self::Canceled(_, measurements) => measurements.clone(), + Self::HardTimeout(_, measurements) => measurements.clone(), + } + } +} + +/// Oneshot sender, created by [`channel`]. +#[derive(Debug)] +pub struct MeteredSender { + name: &'static str, + inner: oneshot::Sender<(Instant, T)>, +} + +impl MeteredSender { + /// Send a value. + pub fn send(self, t: T) -> Result<(), T> { + let Self { inner, name: _ } = self; + inner.send((Instant::now(), t)).map_err(|(_, t)| t) + } + + /// Poll if the thing is already cancelled. + pub fn poll_canceled(&mut self, ctx: &mut Context<'_>) -> Poll<()> { + self.inner.poll_canceled(ctx) + } + + /// Access the cancellation object. + pub fn cancellation(&mut self) -> Cancellation<'_, (Instant, T)> { + self.inner.cancellation() + } + + /// Check the cancellation state. + pub fn is_canceled(&self) -> bool { + self.inner.is_canceled() + } + + /// Verify if the `receiver` is connected to the `sender` [`Self`]. + pub fn is_connected_to(&self, receiver: &MeteredReceiver) -> bool { + self.inner.is_connected_to(&receiver.inner) + } +} + +/// Oneshot receiver, created by [`channel`]. +#[derive(Debug)] +pub struct MeteredReceiver { + name: &'static str, + inner: oneshot::Receiver<(Instant, T)>, + /// Soft timeout, on expire a warning is printed. + soft_timeout_fut: Option>, + soft_timeout: Duration, + /// Hard timeout, terminating the sender. + hard_timeout_fut: Option, + hard_timeout: Duration, + /// The first time the receiver was polled. + first_poll_timestamp: Option, + creation_timestamp: Instant, +} + +impl MeteredReceiver { + pub fn close(&mut self) { + self.inner.close() + } + + /// Attempts to receive a message outside of the context of a task. + /// + /// A return value of `None` must be considered immediately stale (out of + /// date) unless [`close`](MeteredReceiver::close) has been called first. + /// + /// Returns an error if the sender was dropped. + pub fn try_recv(&mut self) -> Result>, Error> { + match self.inner.try_recv() { + Ok(Some((when, value))) => { + let measurements = self.create_measurement(when, Reason::Completion); + Ok(Some(OutputWithMeasurements { value, measurements })) + }, + Err(e) => { + let measurements = self.create_measurement( + self.first_poll_timestamp.unwrap_or_else(|| Instant::now()), + Reason::Cancellation, + ); + Err(Error::Canceled(e, measurements)) + }, + Ok(None) => Ok(None), + } + } + + /// Helper to create a measurement. + /// + /// `start` determines the first possible time where poll can resolve with `Ready`. + fn create_measurement(&self, start: Instant, reason: Reason) -> Measurements { + let end = Instant::now(); + Measurements { + // negative values are ok, if `send` was called before we poll for the first time. + first_poll_till_end: end - start, + creation_till_end: end - self.creation_timestamp, + reason, + } + } +} + +impl FusedFuture for MeteredReceiver { + fn is_terminated(&self) -> bool { + self.inner.is_terminated() + } +} + +impl Future for MeteredReceiver { + type Output = Result, Error>; + + fn poll( + mut self: Pin<&mut Self>, + ctx: &mut Context<'_>, + ) -> Poll, Error>> { + let first_poll_timestamp = + self.first_poll_timestamp.get_or_insert_with(|| Instant::now()).clone(); + + let soft_timeout = self.soft_timeout.clone(); + let soft_timeout = self + .soft_timeout_fut + .get_or_insert_with(move || Delay::new(soft_timeout).fuse()); + + if Pin::new(soft_timeout).poll(ctx).is_ready() { + tracing::warn!("Oneshot `{name}` exceeded the soft threshold", name = &self.name); + } + + let hard_timeout = self.hard_timeout.clone(); + let hard_timeout = + self.hard_timeout_fut.get_or_insert_with(move || Delay::new(hard_timeout)); + + if Pin::new(hard_timeout).poll(ctx).is_ready() { + let measurements = self.create_measurement(first_poll_timestamp, Reason::HardTimeout); + return Poll::Ready(Err(Error::HardTimeout(self.hard_timeout.clone(), measurements))) + } + + match Pin::new(&mut self.inner).poll(ctx) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + let measurements = + self.create_measurement(first_poll_timestamp, Reason::Cancellation); + Poll::Ready(Err(Error::Canceled(e, measurements))) + }, + Poll::Ready(Ok((ref sent_at_timestamp, value))) => { + let measurements = + self.create_measurement(sent_at_timestamp.clone(), Reason::Completion); + Poll::Ready(Ok(OutputWithMeasurements:: { value, measurements })) + }, + } + } +} + +/// A dummy trait that allows implementing `measurements` for `Result<_,_>`. +pub trait Measurable { + /// Obtain a set of measurements represented by the `Measurements` type. + fn measurements(&self) -> Measurements; +} + +impl Measurable for Result, Error> { + fn measurements(&self) -> Measurements { + match self { + Err(err) => err.measurements(), + Ok(val) => val.measurements(), + } + } +} + +/// A wrapping type for the actual type `T` that is sent with the +/// oneshot yet allow to attach `Measurements` to it. +/// +/// Implements `AsRef` besides others for easier access to the inner, +/// wrapped type. +#[derive(Clone, Debug)] +pub struct OutputWithMeasurements { + value: T, + measurements: Measurements, +} + +impl Measurable for OutputWithMeasurements { + fn measurements(&self) -> Measurements { + self.measurements.clone() + } +} + +impl OutputWithMeasurements { + /// Converts the wrapper type into it's inner value. + /// + /// `trait Into` cannot be implemented due to conflicts. + pub fn into(self) -> T { + self.value + } +} + +impl AsRef for OutputWithMeasurements { + fn as_ref(&self) -> &T { + &self.value + } +} + +impl Deref for OutputWithMeasurements { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.value + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use futures::{executor::ThreadPool, task::SpawnExt}; + + use super::*; + + #[derive(Clone, PartialEq, Eq, Debug)] + struct DummyItem { + vals: [u8; 256], + } + + impl Default for DummyItem { + fn default() -> Self { + Self { vals: [0u8; 256] } + } + } + + fn test_launch(name: &'static str, gen_sender_test: S, gen_receiver_test: R) + where + S: Fn(MeteredSender) -> FS, + R: Fn(MeteredReceiver) -> FR, + FS: Future + Send + 'static, + FR: Future + Send + 'static, + { + let _ = env_logger::builder().is_test(true).filter_level(LevelFilter::Trace).try_init(); + + let pool = ThreadPool::new().unwrap(); + let (tx, rx) = channel(name, Duration::from_secs(1), Duration::from_secs(3)); + futures::executor::block_on(async move { + let handle_receiver = pool.spawn_with_handle(gen_receiver_test(rx)).unwrap(); + let handle_sender = pool.spawn_with_handle(gen_sender_test(tx)).unwrap(); + futures::future::select( + futures::future::join(handle_sender, handle_receiver), + Delay::new(Duration::from_secs(5)), + ) + .await; + }); + } + + use log::LevelFilter; + + #[test] + fn easy() { + test_launch( + "easy", + |tx| async move { + tx.send(DummyItem::default()).unwrap(); + }, + |rx| async move { + let x = rx.await.unwrap(); + let measurements = x.measurements(); + assert_eq!(x.as_ref(), &DummyItem::default()); + dbg!(measurements); + }, + ); + } + + #[test] + fn cancel_by_drop() { + test_launch( + "cancel_by_drop", + |tx| async move { + Delay::new(Duration::from_secs(2)).await; + drop(tx); + }, + |rx| async move { + let result = rx.await; + assert_matches!(result, Err(Error::Canceled(_, _))); + dbg!(result.measurements()); + }, + ); + } + + #[test] + fn starve_till_hard_timeout() { + test_launch( + "starve_till_timeout", + |tx| async move { + Delay::new(Duration::from_secs(4)).await; + let _ = tx.send(DummyItem::default()); + }, + |rx| async move { + let result = rx.await; + assert_matches!(&result, e @ &Err(Error::HardTimeout(_, _)) => { + println!("{:?}", e); + }); + dbg!(result.measurements()); + }, + ); + } + + #[test] + fn starve_till_soft_timeout_then_food() { + test_launch( + "starve_till_soft_timeout_then_food", + |tx| async move { + Delay::new(Duration::from_secs(2)).await; + let _ = tx.send(DummyItem::default()); + }, + |rx| async move { + let result = rx.await; + assert_matches!(result, Ok(_)); + dbg!(result.measurements()); + }, + ); + } +} diff --git a/node/network/gossip-support/src/lib.rs b/node/network/gossip-support/src/lib.rs index 65af6cb12756..97be38933802 100644 --- a/node/network/gossip-support/src/lib.rs +++ b/node/network/gossip-support/src/lib.rs @@ -340,12 +340,13 @@ where "Connectivity seems low, we are only connected to {}% of available validators (see debug logs for details)", connected_ratio ); } + let pretty = PrettyAuthorities(unconnected_authorities); tracing::debug!( target: LOG_TARGET, ?connected_ratio, ?absolute_connected, ?absolute_resolved, - unconnected_authorities = %PrettyAuthorities(unconnected_authorities), + unconnected_authorities = %pretty, "Connectivity Report" ); } diff --git a/node/network/gossip-support/src/tests.rs b/node/network/gossip-support/src/tests.rs index fc25b137d3c7..1220f1a5461f 100644 --- a/node/network/gossip-support/src/tests.rs +++ b/node/network/gossip-support/src/tests.rs @@ -340,9 +340,10 @@ fn test_log_output() { m.insert(bob, addrs); m }; + let pretty = PrettyAuthorities(unconnected_authorities.iter()); tracing::debug!( target: LOG_TARGET, - unconnected_authorities = %PrettyAuthorities(unconnected_authorities.iter()), + unconnected_authorities = %pretty, "Connectivity Report" ); }