diff --git a/Cargo.lock b/Cargo.lock
index d27ae76b6abb..e40d844e56f6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4120,9 +4120,14 @@ dependencies = [
name = "metered-channel"
version = "0.9.9"
dependencies = [
+ "assert_matches",
"derive_more",
+ "env_logger 0.9.0",
"futures 0.3.17",
"futures-timer 3.0.2",
+ "log",
+ "thiserror",
+ "tracing",
]
[[package]]
@@ -10574,18 +10579,18 @@ dependencies = [
[[package]]
name = "thiserror"
-version = "1.0.26"
+version = "1.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "93119e4feac1cbe6c798c34d3a53ea0026b0b1de6a120deef895137c0529bfe2"
+checksum = "602eca064b2d83369e2b2f34b09c70b605402801927c65c11071ac911d299b88"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.26"
+version = "1.0.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "060d69a0afe7796bf42e9e2ff91f5ee691fb15c53d38b4b62a9a53eb23164745"
+checksum = "bad553cc2c78e8de258400763a647e80e6d1b31ee237275d756f6836d204494c"
dependencies = [
"proc-macro2",
"quote",
@@ -10802,6 +10807,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84f96e095c0c82419687c20ddf5cb3eadb61f4e1405923c9dc8e53a1adacbda8"
dependencies = [
"cfg-if 1.0.0",
+ "log",
"pin-project-lite 0.2.4",
"tracing-attributes",
"tracing-core",
diff --git a/node/metered-channel/Cargo.toml b/node/metered-channel/Cargo.toml
index 2741d95b9a68..53695c30923c 100644
--- a/node/metered-channel/Cargo.toml
+++ b/node/metered-channel/Cargo.toml
@@ -9,6 +9,12 @@ description = "Channels with attached Meters"
futures = "0.3.17"
futures-timer = "3.0.2"
derive_more = "0.99"
+tracing = "0.1.28"
+thiserror = "1.0.29"
[dev-dependencies]
futures = { version = "0.3.17", features = ["thread-pool"] }
+assert_matches = "1.5"
+env_logger = "0.9"
+log = "0.4"
+tracing = { version = "0.1.28", features = ["log"] }
diff --git a/node/metered-channel/src/lib.rs b/node/metered-channel/src/lib.rs
index 4fe1bcd22526..9646cbaee901 100644
--- a/node/metered-channel/src/lib.rs
+++ b/node/metered-channel/src/lib.rs
@@ -24,6 +24,7 @@ use std::sync::{
use derive_more::{Add, Display};
mod bounded;
+pub mod oneshot;
mod unbounded;
pub use self::{bounded::*, unbounded::*};
diff --git a/node/metered-channel/src/oneshot.rs b/node/metered-channel/src/oneshot.rs
new file mode 100644
index 000000000000..bf1a52682462
--- /dev/null
+++ b/node/metered-channel/src/oneshot.rs
@@ -0,0 +1,418 @@
+// Copyright 2021 Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! Metered variant of oneshot channels to be able to extract delays caused by delayed responses.
+
+use std::{
+ ops::Deref,
+ pin::Pin,
+ task::{Context, Poll},
+ time::{Duration, Instant},
+};
+
+use futures::{
+ channel::oneshot::{self, Canceled, Cancellation},
+ future::{Fuse, FusedFuture},
+ prelude::*,
+};
+use futures_timer::Delay;
+
+/// Provides the reason for termination.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[repr(u8)]
+pub enum Reason {
+ Completion = 1,
+ Cancellation = 2,
+ HardTimeout = 3,
+}
+
+/// Obtained measurements by the `Receiver` side of the `MeteredOneshot`.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct Measurements {
+ /// Duration between first poll and polling termination.
+ first_poll_till_end: Duration,
+ /// Duration starting with creation until polling termination.
+ creation_till_end: Duration,
+ /// Reason for resolving the future.
+ reason: Reason,
+}
+
+impl Measurements {
+ /// Obtain the duration of a finished or canceled
+ /// `oneshot` channel.
+ pub fn duration_since_first_poll(&self) -> &Duration {
+ &self.first_poll_till_end
+ }
+
+ /// Obtain the duration of a finished or canceled
+ /// `oneshot` channel.
+ pub fn duration_since_creation(&self) -> &Duration {
+ &self.creation_till_end
+ }
+
+ /// Obtain the reason to the channel termination.
+ pub fn reason(&self) -> &Reason {
+ &self.reason
+ }
+}
+
+/// Create a new pair of `OneshotMetered{Sender,Receiver}`.
+pub fn channel(
+ name: &'static str,
+ soft_timeout: Duration,
+ hard_timeout: Duration,
+) -> (MeteredSender, MeteredReceiver) {
+ let (tx, rx) = oneshot::channel();
+
+ (
+ MeteredSender { name, inner: tx },
+ MeteredReceiver {
+ name,
+ inner: rx,
+ soft_timeout,
+ hard_timeout,
+ soft_timeout_fut: None,
+ hard_timeout_fut: None,
+ first_poll_timestamp: None,
+ creation_timestamp: Instant::now(),
+ },
+ )
+}
+
+#[allow(missing_docs)]
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+ #[error("Oneshot was cancelled.")]
+ Canceled(#[source] Canceled, Measurements),
+ #[error("Oneshot did not receive a response within {}", Duration::as_secs_f64(.0))]
+ HardTimeout(Duration, Measurements),
+}
+
+impl Measurable for Error {
+ fn measurements(&self) -> Measurements {
+ match self {
+ Self::Canceled(_, measurements) => measurements.clone(),
+ Self::HardTimeout(_, measurements) => measurements.clone(),
+ }
+ }
+}
+
+/// Oneshot sender, created by [`channel`].
+#[derive(Debug)]
+pub struct MeteredSender {
+ name: &'static str,
+ inner: oneshot::Sender<(Instant, T)>,
+}
+
+impl MeteredSender {
+ /// Send a value.
+ pub fn send(self, t: T) -> Result<(), T> {
+ let Self { inner, name: _ } = self;
+ inner.send((Instant::now(), t)).map_err(|(_, t)| t)
+ }
+
+ /// Poll if the thing is already cancelled.
+ pub fn poll_canceled(&mut self, ctx: &mut Context<'_>) -> Poll<()> {
+ self.inner.poll_canceled(ctx)
+ }
+
+ /// Access the cancellation object.
+ pub fn cancellation(&mut self) -> Cancellation<'_, (Instant, T)> {
+ self.inner.cancellation()
+ }
+
+ /// Check the cancellation state.
+ pub fn is_canceled(&self) -> bool {
+ self.inner.is_canceled()
+ }
+
+ /// Verify if the `receiver` is connected to the `sender` [`Self`].
+ pub fn is_connected_to(&self, receiver: &MeteredReceiver) -> bool {
+ self.inner.is_connected_to(&receiver.inner)
+ }
+}
+
+/// Oneshot receiver, created by [`channel`].
+#[derive(Debug)]
+pub struct MeteredReceiver {
+ name: &'static str,
+ inner: oneshot::Receiver<(Instant, T)>,
+ /// Soft timeout, on expire a warning is printed.
+ soft_timeout_fut: Option>,
+ soft_timeout: Duration,
+ /// Hard timeout, terminating the sender.
+ hard_timeout_fut: Option,
+ hard_timeout: Duration,
+ /// The first time the receiver was polled.
+ first_poll_timestamp: Option,
+ creation_timestamp: Instant,
+}
+
+impl MeteredReceiver {
+ pub fn close(&mut self) {
+ self.inner.close()
+ }
+
+ /// Attempts to receive a message outside of the context of a task.
+ ///
+ /// A return value of `None` must be considered immediately stale (out of
+ /// date) unless [`close`](MeteredReceiver::close) has been called first.
+ ///
+ /// Returns an error if the sender was dropped.
+ pub fn try_recv(&mut self) -> Result