diff --git a/.maintain/sentry-node/docker-compose.yml b/.maintain/sentry-node/docker-compose.yml
index 2af9449853c77..a4cc8f1ebb92e 100644
--- a/.maintain/sentry-node/docker-compose.yml
+++ b/.maintain/sentry-node/docker-compose.yml
@@ -47,9 +47,9 @@ services:
- "--validator"
- "--alice"
- "--sentry-nodes"
- - "/dns/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi"
+ - "/dns/sentry-a/tcp/30333/p2p/12D3KooWSCufgHzV4fCwRijfH2k3abrpAJxTKxEvN1FDuRXA2U9x"
- "--reserved-nodes"
- - "/dns/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi"
+ - "/dns/sentry-a/tcp/30333/p2p/12D3KooWSCufgHzV4fCwRijfH2k3abrpAJxTKxEvN1FDuRXA2U9x"
# Not only bind to localhost.
- "--unsafe-ws-external"
- "--unsafe-rpc-external"
@@ -83,11 +83,11 @@ services:
- "--port"
- "30333"
- "--sentry"
- - "/dns/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR"
+ - "/dns/validator-a/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp"
- "--reserved-nodes"
- - "/dns/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR"
+ - "/dns/validator-a/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp"
- "--bootnodes"
- - "/dns/validator-b/tcp/30333/p2p/QmSVnNf9HwVMT1Y4cK1P6aoJcEZjmoTXpjKBmAABLMnZEk"
+ - "/dns/validator-b/tcp/30333/p2p/12D3KooWHdiAxVd8uMQR1hGWXccidmfCwLqcMpGwR6QcTP6QRMuD"
- "--no-telemetry"
- "--rpc-cors"
- "all"
@@ -118,9 +118,9 @@ services:
- "--validator"
- "--bob"
- "--bootnodes"
- - "/dns/validator-a/tcp/30333/p2p/QmRpheLN4JWdAnY7HGJfWFNbfkQCb6tFf4vvA6hgjMZKrR"
+ - "/dns/validator-a/tcp/30333/p2p/12D3KooWEyoppNCUx8Yx66oV9fJnriXwCcXwDDUA2kj6vnc6iDEp"
- "--bootnodes"
- - "/dns/sentry-a/tcp/30333/p2p/QmV7EhW6J6KgmNdr558RH1mPx2xGGznW7At4BhXzntRFsi"
+ - "/dns/sentry-a/tcp/30333/p2p/12D3KooWSCufgHzV4fCwRijfH2k3abrpAJxTKxEvN1FDuRXA2U9x"
- "--no-telemetry"
- "--rpc-cors"
- "all"
diff --git a/Cargo.lock b/Cargo.lock
index 191a9149e89c2..df1ab73b63c37 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1652,6 +1652,7 @@ dependencies = [
name = "frame-support-procedural"
version = "2.0.0"
dependencies = [
+ "Inflector",
"frame-support-procedural-tools",
"proc-macro2",
"quote",
@@ -7225,6 +7226,7 @@ dependencies = [
name = "sc-network-test"
version = "0.8.0"
dependencies = [
+ "async-std",
"futures 0.3.8",
"futures-timer 3.0.2",
"libp2p",
diff --git a/bin/node/runtime/src/lib.rs b/bin/node/runtime/src/lib.rs
index 2afa89f86c024..fa07cf1bd8e9e 100644
--- a/bin/node/runtime/src/lib.rs
+++ b/bin/node/runtime/src/lib.rs
@@ -1180,6 +1180,10 @@ impl_runtime_apis! {
Babe::current_epoch()
}
+ fn next_epoch() -> sp_consensus_babe::Epoch {
+ Babe::next_epoch()
+ }
+
fn generate_key_ownership_proof(
_slot_number: sp_consensus_babe::SlotNumber,
authority_id: sp_consensus_babe::AuthorityId,
diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs
index a7366d00e7cfb..64426cae6f65e 100644
--- a/client/network/src/behaviour.rs
+++ b/client/network/src/behaviour.rs
@@ -17,20 +17,22 @@
// along with this program. If not, see .
use crate::{
- config::{ProtocolId, Role}, block_requests, light_client_handler,
- peer_info, request_responses, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
+ config::{ProtocolId, Role}, light_client_handler, peer_info, request_responses,
+ discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT,
};
use bytes::Bytes;
use codec::Encode as _;
+use futures::channel::oneshot;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
use libp2p::identify::IdentifyInfo;
use libp2p::kad::record;
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters};
use log::debug;
+use prost::Message;
use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}};
use sp_runtime::{traits::{Block as BlockT, NumberFor}, Justification};
use std::{
@@ -42,7 +44,7 @@ use std::{
};
pub use crate::request_responses::{
- ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError
+ ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId,
};
/// General behaviour of the network. Combines all protocols together.
@@ -58,8 +60,6 @@ pub struct Behaviour {
discovery: DiscoveryBehaviour,
/// Generic request-reponse protocols.
request_responses: request_responses::RequestResponsesBehaviour,
- /// Block request handling.
- block_requests: block_requests::BlockRequests,
/// Light client request handling.
light_client_handler: light_client_handler::LightClientHandler,
@@ -70,6 +70,11 @@ pub struct Behaviour {
/// Role of our local node, as originally passed from the configuration.
#[behaviour(ignore)]
role: Role,
+
+ /// Protocol name used to send out block requests via
+ /// [`request_responses::RequestResponsesBehaviour`].
+ #[behaviour(ignore)]
+ block_request_protocol_name: String,
}
/// Event generated by `Behaviour`.
@@ -93,34 +98,18 @@ pub enum BehaviourOut {
result: Result,
},
- /// A request initiated using [`Behaviour::send_request`] has succeeded or failed.
- RequestFinished {
- /// Request that has succeeded.
- request_id: RequestId,
- /// Response sent by the remote or reason for failure.
- result: Result, RequestFailure>,
- },
-
- /// Started a new request with the given node.
- ///
- /// This event is for statistics purposes only. The request and response handling are entirely
- /// internal to the behaviour.
- OpaqueRequestStarted {
- peer: PeerId,
- /// Protocol name of the request.
- protocol: String,
- },
- /// Finished, successfully or not, a previously-started request.
+ /// A request has succeeded or failed.
///
- /// This event is for statistics purposes only. The request and response handling are entirely
- /// internal to the behaviour.
- OpaqueRequestFinished {
- /// Who we were requesting.
+ /// This event is generated for statistics purposes.
+ RequestFinished {
+ /// Peer that we send a request to.
peer: PeerId,
- /// Protocol name of the request.
- protocol: String,
- /// How long before the response came or the request got cancelled.
- request_duration: Duration,
+ /// Name of the protocol in question.
+ protocol: Cow<'static, str>,
+ /// Duration the request took.
+ duration: Duration,
+ /// Result of the request.
+ result: Result<(), RequestFailure>,
},
/// Opened a substream with the given node with the given notifications protocol.
@@ -180,21 +169,28 @@ impl Behaviour {
role: Role,
user_agent: String,
local_public_key: PublicKey,
- block_requests: block_requests::BlockRequests,
light_client_handler: light_client_handler::LightClientHandler,
disco_config: DiscoveryConfig,
- request_response_protocols: Vec,
+ // Block request protocol config.
+ block_request_protocol_config: request_responses::ProtocolConfig,
+ // All remaining request protocol configs.
+ mut request_response_protocols: Vec,
) -> Result {
+ // Extract protocol name and add to `request_response_protocols`.
+ let block_request_protocol_name = block_request_protocol_config.name.to_string();
+ request_response_protocols.push(block_request_protocol_config);
+
Ok(Behaviour {
substrate,
peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key),
discovery: disco_config.finish(),
request_responses:
request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?,
- block_requests,
light_client_handler,
events: VecDeque::new(),
role,
+
+ block_request_protocol_name,
})
}
@@ -236,13 +232,14 @@ impl Behaviour {
}
/// Initiates sending a request.
- ///
- /// An error is returned if we are not connected to the target peer of if the protocol doesn't
- /// match one that has been registered.
- pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec)
- -> Result
- {
- self.request_responses.send_request(target, protocol, request)
+ pub fn send_request(
+ &mut self,
+ target: &PeerId,
+ protocol: &str,
+ request: Vec,
+ pending_response: oneshot::Sender, RequestFailure>>,
+ ) {
+ self.request_responses.send_request(target, protocol, request, pending_response)
}
/// Registers a new notifications protocol.
@@ -331,28 +328,20 @@ Behaviour {
self.events.push_back(BehaviourOut::BlockImport(origin, blocks)),
CustomMessageOutcome::JustificationImport(origin, hash, nb, justification) =>
self.events.push_back(BehaviourOut::JustificationImport(origin, hash, nb, justification)),
- CustomMessageOutcome::BlockRequest { target, request } => {
- match self.block_requests.send_request(&target, request) {
- block_requests::SendRequestOutcome::Ok => {
- self.events.push_back(BehaviourOut::OpaqueRequestStarted {
- peer: target,
- protocol: self.block_requests.protocol_name().to_owned(),
- });
- },
- block_requests::SendRequestOutcome::Replaced { request_duration, .. } => {
- self.events.push_back(BehaviourOut::OpaqueRequestFinished {
- peer: target.clone(),
- protocol: self.block_requests.protocol_name().to_owned(),
- request_duration,
- });
- self.events.push_back(BehaviourOut::OpaqueRequestStarted {
- peer: target,
- protocol: self.block_requests.protocol_name().to_owned(),
- });
- }
- block_requests::SendRequestOutcome::NotConnected |
- block_requests::SendRequestOutcome::EncodeError(_) => {},
+ CustomMessageOutcome::BlockRequest { target, request, pending_response } => {
+ let mut buf = Vec::with_capacity(request.encoded_len());
+ if let Err(err) = request.encode(&mut buf) {
+ log::warn!(
+ target: "sync",
+ "Failed to encode block request {:?}: {:?}",
+ request, err
+ );
+ return
}
+
+ self.request_responses.send_request(
+ &target, &self.block_request_protocol_name, buf, pending_response,
+ );
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
@@ -401,51 +390,15 @@ impl NetworkBehaviourEventProcess {
+ request_responses::Event::RequestFinished { peer, protocol, duration, result } => {
self.events.push_back(BehaviourOut::RequestFinished {
- request_id,
- result,
+ peer, protocol, duration, result,
});
},
}
}
}
-impl NetworkBehaviourEventProcess> for Behaviour {
- fn inject_event(&mut self, event: block_requests::Event) {
- match event {
- block_requests::Event::AnsweredRequest { peer, total_handling_time } => {
- self.events.push_back(BehaviourOut::InboundRequest {
- peer,
- protocol: self.block_requests.protocol_name().to_owned().into(),
- result: Ok(total_handling_time),
- });
- },
- block_requests::Event::Response { peer, response, request_duration } => {
- self.events.push_back(BehaviourOut::OpaqueRequestFinished {
- peer: peer.clone(),
- protocol: self.block_requests.protocol_name().to_owned(),
- request_duration,
- });
- let ev = self.substrate.on_block_response(peer, response);
- self.inject_event(ev);
- }
- block_requests::Event::RequestCancelled { peer, request_duration, .. } |
- block_requests::Event::RequestTimeout { peer, request_duration, .. } => {
- // There doesn't exist any mechanism to report cancellations or timeouts yet, so
- // we process them by disconnecting the node.
- self.events.push_back(BehaviourOut::OpaqueRequestFinished {
- peer: peer.clone(),
- protocol: self.block_requests.protocol_name().to_owned(),
- request_duration,
- });
- self.substrate.on_block_request_failed(&peer);
- }
- }
- }
-}
-
impl NetworkBehaviourEventProcess
for Behaviour {
fn inject_event(&mut self, event: peer_info::PeerInfoEvent) {
diff --git a/client/network/src/block_request_handler.rs b/client/network/src/block_request_handler.rs
new file mode 100644
index 0000000000000..c88be52ecf0de
--- /dev/null
+++ b/client/network/src/block_request_handler.rs
@@ -0,0 +1,220 @@
+// Copyright 2020 Parity Technologies (UK) Ltd.
+// This file is part of Substrate.
+
+// Substrate is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Substrate is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Substrate. If not, see .
+
+//! Helper for handling (i.e. answering) block requests from a remote peer via the
+//! [`crate::request_responses::RequestResponsesBehaviour`].
+
+use codec::{Encode, Decode};
+use crate::chain::Client;
+use crate::config::ProtocolId;
+use crate::protocol::{message::BlockAttributes};
+use crate::request_responses::{IncomingRequest, ProtocolConfig};
+use crate::schema::v1::block_request::FromBlock;
+use crate::schema::v1::{BlockResponse, Direction};
+use futures::channel::{mpsc, oneshot};
+use futures::stream::StreamExt;
+use log::debug;
+use prost::Message;
+use sp_runtime::generic::BlockId;
+use sp_runtime::traits::{Block as BlockT, Header, One, Zero};
+use std::cmp::min;
+use std::sync::{Arc};
+use std::time::Duration;
+
+const LOG_TARGET: &str = "block-request-handler";
+const MAX_BLOCKS_IN_RESPONSE: usize = 128;
+const MAX_BODY_BYTES: usize = 8 * 1024 * 1024;
+
+/// Generates a [`ProtocolConfig`] for the block request protocol, refusing incoming requests.
+pub fn generate_protocol_config(protocol_id: ProtocolId) -> ProtocolConfig {
+ ProtocolConfig {
+ name: generate_protocol_name(protocol_id).into(),
+ max_request_size: 1024 * 1024,
+ max_response_size: 16 * 1024 * 1024,
+ request_timeout: Duration::from_secs(40),
+ inbound_queue: None,
+ }
+}
+
+/// Generate the block protocol name from chain specific protocol identifier.
+fn generate_protocol_name(protocol_id: ProtocolId) -> String {
+ let mut s = String::new();
+ s.push_str("/");
+ s.push_str(protocol_id.as_ref());
+ s.push_str("/sync/2");
+ s
+}
+
+/// Handler for incoming block requests from a remote peer.
+pub struct BlockRequestHandler {
+ client: Arc>,
+ request_receiver: mpsc::Receiver,
+}
+
+impl BlockRequestHandler {
+ /// Create a new [`BlockRequestHandler`].
+ pub fn new(protocol_id: ProtocolId, client: Arc>) -> (Self, ProtocolConfig) {
+ // Rate of arrival multiplied with the waiting time in the queue equals the queue length.
+ //
+ // An average Polkadot sentry node serves less than 5 requests per second. The 95th percentile
+ // serving a request is less than 2 second. Thus one would estimate the queue length to be
+ // below 10.
+ //
+ // Choosing 20 as the queue length to give some additional buffer.
+ let (tx, request_receiver) = mpsc::channel(20);
+
+ let mut protocol_config = generate_protocol_config(protocol_id);
+ protocol_config.inbound_queue = Some(tx);
+
+ (Self { client, request_receiver }, protocol_config)
+ }
+
+ fn handle_request(
+ &self,
+ payload: Vec,
+ pending_response: oneshot::Sender>
+ ) -> Result<(), HandleRequestError> {
+ let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
+
+ let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? {
+ FromBlock::Hash(ref h) => {
+ let h = Decode::decode(&mut h.as_ref())?;
+ BlockId::::Hash(h)
+ }
+ FromBlock::Number(ref n) => {
+ let n = Decode::decode(&mut n.as_ref())?;
+ BlockId::::Number(n)
+ }
+ };
+
+ let max_blocks = if request.max_blocks == 0 {
+ MAX_BLOCKS_IN_RESPONSE
+ } else {
+ min(request.max_blocks as usize, MAX_BLOCKS_IN_RESPONSE)
+ };
+
+ let direction = Direction::from_i32(request.direction)
+ .ok_or(HandleRequestError::ParseDirection)?;
+ let attributes = BlockAttributes::from_be_u32(request.fields)?;
+ let get_header = attributes.contains(BlockAttributes::HEADER);
+ let get_body = attributes.contains(BlockAttributes::BODY);
+ let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
+
+ let mut blocks = Vec::new();
+ let mut block_id = from_block_id;
+
+ let mut total_size: usize = 0;
+ while let Some(header) = self.client.header(block_id).unwrap_or(None) {
+ let number = *header.number();
+ let hash = header.hash();
+ let parent_hash = *header.parent_hash();
+ let justification = if get_justification {
+ self.client.justification(&BlockId::Hash(hash))?
+ } else {
+ None
+ };
+ let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
+
+ let body = if get_body {
+ match self.client.block_body(&BlockId::Hash(hash))? {
+ Some(mut extrinsics) => extrinsics.iter_mut()
+ .map(|extrinsic| extrinsic.encode())
+ .collect(),
+ None => {
+ log::trace!(target: "sync", "Missing data for block request.");
+ break;
+ }
+ }
+ } else {
+ Vec::new()
+ };
+
+ let block_data = crate::schema::v1::BlockData {
+ hash: hash.encode(),
+ header: if get_header {
+ header.encode()
+ } else {
+ Vec::new()
+ },
+ body,
+ receipt: Vec::new(),
+ message_queue: Vec::new(),
+ justification: justification.unwrap_or_default(),
+ is_empty_justification,
+ };
+
+ total_size += block_data.body.len();
+ blocks.push(block_data);
+
+ if blocks.len() >= max_blocks as usize || total_size > MAX_BODY_BYTES {
+ break
+ }
+
+ match direction {
+ Direction::Ascending => {
+ block_id = BlockId::Number(number + One::one())
+ }
+ Direction::Descending => {
+ if number.is_zero() {
+ break
+ }
+ block_id = BlockId::Hash(parent_hash)
+ }
+ }
+ }
+
+ let res = BlockResponse { blocks };
+
+ let mut data = Vec::with_capacity(res.encoded_len());
+ res.encode(&mut data)?;
+
+ pending_response.send(data)
+ .map_err(|_| HandleRequestError::SendResponse)
+ }
+
+ /// Run [`BlockRequestHandler`].
+ pub async fn run(mut self) {
+ while let Some(request) = self.request_receiver.next().await {
+ let IncomingRequest { peer, payload, pending_response } = request;
+
+ match self.handle_request(payload, pending_response) {
+ Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
+ Err(e) => debug!(
+ target: LOG_TARGET,
+ "Failed to handle block request from {}: {}",
+ peer, e,
+ ),
+ }
+ }
+ }
+}
+
+#[derive(derive_more::Display, derive_more::From)]
+enum HandleRequestError {
+ #[display(fmt = "Failed to decode request: {}.", _0)]
+ DecodeProto(prost::DecodeError),
+ #[display(fmt = "Failed to encode response: {}.", _0)]
+ EncodeProto(prost::EncodeError),
+ #[display(fmt = "Failed to decode block hash: {}.", _0)]
+ DecodeScale(codec::Error),
+ #[display(fmt = "Missing `BlockRequest::from_block` field.")]
+ MissingFromField,
+ #[display(fmt = "Failed to parse BlockRequest::direction.")]
+ ParseDirection,
+ Client(sp_blockchain::Error),
+ #[display(fmt = "Failed to send response.")]
+ SendResponse,
+}
diff --git a/client/network/src/block_requests.rs b/client/network/src/block_requests.rs
deleted file mode 100644
index ff107e37ef3fd..0000000000000
--- a/client/network/src/block_requests.rs
+++ /dev/null
@@ -1,859 +0,0 @@
-// This file is part of Substrate.
-
-// Copyright (C) 2020-2021 Parity Technologies (UK) Ltd.
-// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
-
-// This program is free software: you can redistribute it and/or modify
-// it under the terms of the GNU General Public License as published by
-// the Free Software Foundation, either version 3 of the License, or
-// (at your option) any later version.
-
-// This program is distributed in the hope that it will be useful,
-// but WITHOUT ANY WARRANTY; without even the implied warranty of
-// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-// GNU General Public License for more details.
-
-// You should have received a copy of the GNU General Public License
-// along with this program. If not, see .
-
-//! `NetworkBehaviour` implementation which handles incoming block requests.
-//!
-//! Every request is coming in on a separate connection substream which gets
-//! closed after we have sent the response back. Incoming requests are encoded
-//! as protocol buffers (cf. `api.v1.proto`).
-
-#![allow(unused)]
-
-use bytes::Bytes;
-use codec::{Encode, Decode};
-use crate::{
- chain::Client,
- config::ProtocolId,
- protocol::{message::{self, BlockAttributes}},
- schema,
-};
-use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered};
-use futures_timer::Delay;
-use libp2p::{
- core::{
- ConnectedPoint,
- Multiaddr,
- PeerId,
- connection::ConnectionId,
- upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated},
- upgrade::{DeniedUpgrade, read_one, write_one}
- },
- swarm::{
- NegotiatedSubstream,
- NetworkBehaviour,
- NetworkBehaviourAction,
- NotifyHandler,
- OneShotHandler,
- OneShotHandlerConfig,
- PollParameters,
- SubstreamProtocol
- }
-};
-use prost::Message;
-use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}};
-use std::{
- cmp::min,
- collections::{HashMap, VecDeque},
- io,
- iter,
- marker::PhantomData,
- pin::Pin,
- sync::Arc,
- time::Duration,
- task::{Context, Poll}
-};
-use void::{Void, unreachable};
-use wasm_timer::Instant;
-
-// Type alias for convenience.
-pub type Error = Box;
-
-/// Event generated by the block requests behaviour.
-#[derive(Debug)]
-pub enum Event {
- /// A request came and we have successfully answered it.
- AnsweredRequest {
- /// Peer which has emitted the request.
- peer: PeerId,
- /// Time elapsed between when we received the request and when we sent back the response.
- total_handling_time: Duration,
- },
-
- /// A response to a block request has arrived.
- Response {
- peer: PeerId,
- response: message::BlockResponse,
- /// Time elapsed between the start of the request and the response.
- request_duration: Duration,
- },
-
- /// A request has been cancelled because the peer has disconnected.
- /// Disconnects can also happen as a result of violating the network protocol.
- ///
- /// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`.
- /// > For that, you must check the value returned by `send_request`.
- RequestCancelled {
- peer: PeerId,
- /// Time elapsed between the start of the request and the cancellation.
- request_duration: Duration,
- },
-
- /// A request has timed out.
- RequestTimeout {
- peer: PeerId,
- /// Time elapsed between the start of the request and the timeout.
- request_duration: Duration,
- }
-}
-
-/// Configuration options for `BlockRequests`.
-#[derive(Debug, Clone)]
-pub struct Config {
- max_block_data_response: u32,
- max_block_body_bytes: usize,
- max_request_len: usize,
- max_response_len: usize,
- inactivity_timeout: Duration,
- request_timeout: Duration,
- protocol: String,
-}
-
-impl Config {
- /// Create a fresh configuration with the following options:
- ///
- /// - max. block data in response = 128
- /// - max. request size = 1 MiB
- /// - max. response size = 16 MiB
- /// - inactivity timeout = 15s
- /// - request timeout = 40s
- pub fn new(id: &ProtocolId) -> Self {
- let mut c = Config {
- max_block_data_response: 128,
- max_block_body_bytes: 8 * 1024 * 1024,
- max_request_len: 1024 * 1024,
- max_response_len: 16 * 1024 * 1024,
- inactivity_timeout: Duration::from_secs(15),
- request_timeout: Duration::from_secs(40),
- protocol: String::new(),
- };
- c.set_protocol(id);
- c
- }
-
- /// Limit the max. number of block data in a response.
- pub fn set_max_block_data_response(&mut self, v: u32) -> &mut Self {
- self.max_block_data_response = v;
- self
- }
-
- /// Limit the max. length of incoming block request bytes.
- pub fn set_max_request_len(&mut self, v: usize) -> &mut Self {
- self.max_request_len = v;
- self
- }
-
- /// Limit the max. size of responses to our block requests.
- pub fn set_max_response_len(&mut self, v: usize) -> &mut Self {
- self.max_response_len = v;
- self
- }
-
- /// Limit the max. duration the substream may remain inactive before closing it.
- pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self {
- self.inactivity_timeout = v;
- self
- }
-
- /// Set the maximum total bytes of block bodies that are send in the response.
- /// Note that at least one block is always sent regardless of the limit.
- /// This should be lower than the value specified in `set_max_response_len`
- /// accounting for headers, justifications and encoding overhead.
- pub fn set_max_block_body_bytes(&mut self, v: usize) -> &mut Self {
- self.max_block_body_bytes = v;
- self
- }
-
- /// Set protocol to use for upgrade negotiation.
- pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self {
- let mut s = String::new();
- s.push_str("/");
- s.push_str(id.as_ref());
- s.push_str("/sync/2");
- self.protocol = s;
- self
- }
-}
-
-/// The block request handling behaviour.
-pub struct BlockRequests {
- /// This behaviour's configuration.
- config: Config,
- /// Blockchain client.
- chain: Arc>,
- /// List of all active connections and the requests we've sent.
- peers: HashMap>>,
- /// Futures sending back the block request response. Returns the `PeerId` we sent back to, and
- /// the total time the handling of this request took.
- outgoing: FuturesUnordered>,
- /// Events to return as soon as possible from `poll`.
- pending_events: VecDeque, Event>>,
-}
-
-/// Local tracking of a libp2p connection.
-#[derive(Debug)]
-struct Connection {
- id: ConnectionId,
- ongoing_request: Option>,
-}
-
-#[derive(Debug)]
-struct OngoingRequest {
- /// `Instant` when the request has been emitted. Used for diagnostic purposes.
- emitted: Instant,
- request: message::BlockRequest,
- timeout: Delay,
-}
-
-/// Outcome of calling `send_request`.
-#[derive(Debug)]
-#[must_use]
-pub enum SendRequestOutcome {
- /// Request has been emitted.
- Ok,
- /// The request has been emitted and has replaced an existing request.
- Replaced {
- /// The previously-emitted request.
- previous: message::BlockRequest,
- /// Time that had elapsed since `previous` has been emitted.
- request_duration: Duration,
- },
- /// Didn't start a request because we have no connection to this node.
- /// If `send_request` returns that, it is as if the function had never been called.
- NotConnected,
- /// Error while serializing the request.
- EncodeError(prost::EncodeError),
-}
-
-impl BlockRequests
-where
- B: Block,
-{
- pub fn new(cfg: Config, chain: Arc>) -> Self {
- BlockRequests {
- config: cfg,
- chain,
- peers: HashMap::new(),
- outgoing: FuturesUnordered::new(),
- pending_events: VecDeque::new(),
- }
- }
-
- /// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`).
- pub fn protocol_name(&self) -> &str {
- &self.config.protocol
- }
-
- /// Issue a new block request.
- ///
- /// Cancels any existing request targeting the same `PeerId`.
- ///
- /// If the response doesn't arrive in time, or if the remote answers improperly, the target
- /// will be disconnected.
- pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) -> SendRequestOutcome {
- // Determine which connection to send the request to.
- let connection = if let Some(peer) = self.peers.get_mut(target) {
- // We don't want to have multiple requests for any given node, so in priority try to
- // find a connection with an existing request, to override it.
- if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) {
- entry
- } else if let Some(entry) = peer.get_mut(0) {
- entry
- } else {
- log::error!(
- target: "sync",
- "State inconsistency: empty list of peer connections"
- );
- return SendRequestOutcome::NotConnected;
- }
- } else {
- return SendRequestOutcome::NotConnected;
- };
-
- let protobuf_rq = build_protobuf_block_request(
- req.fields,
- req.from.clone(),
- req.to.clone(),
- req.direction,
- req.max,
- );
-
- let mut buf = Vec::with_capacity(protobuf_rq.encoded_len());
- if let Err(err) = protobuf_rq.encode(&mut buf) {
- log::warn!(
- target: "sync",
- "Failed to encode block request {:?}: {:?}",
- protobuf_rq,
- err
- );
- return SendRequestOutcome::EncodeError(err);
- }
-
- let previous_request = connection.ongoing_request.take();
- connection.ongoing_request = Some(OngoingRequest {
- emitted: Instant::now(),
- request: req.clone(),
- timeout: Delay::new(self.config.request_timeout),
- });
-
- log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq);
- self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
- peer_id: target.clone(),
- handler: NotifyHandler::One(connection.id),
- event: OutboundProtocol {
- request: buf,
- original_request: req,
- max_response_size: self.config.max_response_len,
- protocol: self.config.protocol.as_bytes().to_vec().into(),
- },
- });
-
- if let Some(previous_request) = previous_request {
- log::debug!(
- target: "sync",
- "Replacing existing block request on connection {:?}",
- connection.id
- );
- SendRequestOutcome::Replaced {
- previous: previous_request.request,
- request_duration: previous_request.emitted.elapsed(),
- }
- } else {
- SendRequestOutcome::Ok
- }
- }
-
- /// Callback, invoked when a new block request has been received from remote.
- fn on_block_request
- ( &mut self
- , peer: &PeerId
- , request: &schema::v1::BlockRequest
- ) -> Result
- {
- log::trace!(
- target: "sync",
- "Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}",
- peer,
- request.from_block,
- request.to_block,
- request.max_blocks);
-
- let from_block_id =
- match request.from_block {
- Some(schema::v1::block_request::FromBlock::Hash(ref h)) => {
- let h = Decode::decode(&mut h.as_ref())?;
- BlockId::::Hash(h)
- }
- Some(schema::v1::block_request::FromBlock::Number(ref n)) => {
- let n = Decode::decode(&mut n.as_ref())?;
- BlockId::::Number(n)
- }
- None => {
- let msg = "missing `BlockRequest::from_block` field";
- return Err(io::Error::new(io::ErrorKind::Other, msg).into())
- }
- };
-
- let max_blocks =
- if request.max_blocks == 0 {
- self.config.max_block_data_response
- } else {
- min(request.max_blocks, self.config.max_block_data_response)
- };
-
- let direction =
- if request.direction == schema::v1::Direction::Ascending as i32 {
- schema::v1::Direction::Ascending
- } else if request.direction == schema::v1::Direction::Descending as i32 {
- schema::v1::Direction::Descending
- } else {
- let msg = format!("invalid `BlockRequest::direction` value: {}", request.direction);
- return Err(io::Error::new(io::ErrorKind::Other, msg).into())
- };
-
- let attributes = BlockAttributes::from_be_u32(request.fields)?;
- let get_header = attributes.contains(BlockAttributes::HEADER);
- let get_body = attributes.contains(BlockAttributes::BODY);
- let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
-
- let mut blocks = Vec::new();
- let mut block_id = from_block_id;
- let mut total_size = 0;
- while let Some(header) = self.chain.header(block_id).unwrap_or(None) {
- if blocks.len() >= max_blocks as usize
- || (blocks.len() >= 1 && total_size > self.config.max_block_body_bytes)
- {
- break
- }
-
- let number = *header.number();
- let hash = header.hash();
- let parent_hash = *header.parent_hash();
- let justification = if get_justification {
- self.chain.justification(&BlockId::Hash(hash))?
- } else {
- None
- };
- let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
-
- let body = if get_body {
- match self.chain.block_body(&BlockId::Hash(hash))? {
- Some(mut extrinsics) => extrinsics.iter_mut()
- .map(|extrinsic| extrinsic.encode())
- .collect(),
- None => {
- log::trace!(target: "sync", "Missing data for block request.");
- break;
- }
- }
- } else {
- Vec::new()
- };
-
- let block_data = schema::v1::BlockData {
- hash: hash.encode(),
- header: if get_header {
- header.encode()
- } else {
- Vec::new()
- },
- body,
- receipt: Vec::new(),
- message_queue: Vec::new(),
- justification: justification.unwrap_or_default(),
- is_empty_justification,
- };
-
- total_size += block_data.body.len();
- blocks.push(block_data);
-
- match direction {
- schema::v1::Direction::Ascending => {
- block_id = BlockId::Number(number + One::one())
- }
- schema::v1::Direction::Descending => {
- if number.is_zero() {
- break
- }
- block_id = BlockId::Hash(parent_hash)
- }
- }
- }
-
- Ok(schema::v1::BlockResponse { blocks })
- }
-}
-
-impl NetworkBehaviour for BlockRequests
-where
- B: Block
-{
- type ProtocolsHandler = OneShotHandler, OutboundProtocol, NodeEvent>;
- type OutEvent = Event;
-
- fn new_handler(&mut self) -> Self::ProtocolsHandler {
- let p = InboundProtocol {
- max_request_len: self.config.max_request_len,
- protocol: self.config.protocol.as_bytes().to_owned().into(),
- marker: PhantomData,
- };
- let mut cfg = OneShotHandlerConfig::default();
- cfg.keep_alive_timeout = self.config.inactivity_timeout;
- cfg.outbound_substream_timeout = self.config.request_timeout;
- OneShotHandler::new(SubstreamProtocol::new(p, ()), cfg)
- }
-
- fn addresses_of_peer(&mut self, _: &PeerId) -> Vec {
- Vec::new()
- }
-
- fn inject_connected(&mut self, _peer: &PeerId) {
- }
-
- fn inject_disconnected(&mut self, _peer: &PeerId) {
- }
-
- fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
- self.peers.entry(peer_id.clone())
- .or_default()
- .push(Connection {
- id: *id,
- ongoing_request: None,
- });
- }
-
- fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) {
- let mut needs_remove = false;
- if let Some(entry) = self.peers.get_mut(peer_id) {
- if let Some(pos) = entry.iter().position(|i| i.id == *id) {
- let ongoing_request = entry.remove(pos).ongoing_request;
- if let Some(ongoing_request) = ongoing_request {
- log::debug!(
- target: "sync",
- "Connection {:?} with {} closed with ongoing sync request: {:?}",
- id,
- peer_id,
- ongoing_request
- );
- let ev = Event::RequestCancelled {
- peer: peer_id.clone(),
- request_duration: ongoing_request.emitted.elapsed(),
- };
- self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
- }
- if entry.is_empty() {
- needs_remove = true;
- }
- } else {
- log::error!(
- target: "sync",
- "State inconsistency: connection id not found in list"
- );
- }
- } else {
- log::error!(
- target: "sync",
- "State inconsistency: peer_id not found in list of connections"
- );
- }
- if needs_remove {
- self.peers.remove(peer_id);
- }
- }
-
- fn inject_event(
- &mut self,
- peer: PeerId,
- connection_id: ConnectionId,
- node_event: NodeEvent
- ) {
- match node_event {
- NodeEvent::Request(request, mut stream, handling_start) => {
- match self.on_block_request(&peer, &request) {
- Ok(res) => {
- log::trace!(
- target: "sync",
- "Enqueueing block response for peer {} with {} blocks",
- peer, res.blocks.len()
- );
- let mut data = Vec::with_capacity(res.encoded_len());
- if let Err(e) = res.encode(&mut data) {
- log::debug!(
- target: "sync",
- "Error encoding block response for peer {}: {}",
- peer, e
- )
- } else {
- self.outgoing.push(async move {
- if let Err(e) = write_one(&mut stream, data).await {
- log::debug!(
- target: "sync",
- "Error writing block response: {}",
- e
- );
- }
- (peer, handling_start.elapsed())
- }.boxed());
- }
- }
- Err(e) => log::debug!(
- target: "sync",
- "Error handling block request from peer {}: {}", peer, e
- )
- }
- }
- NodeEvent::Response(original_request, response) => {
- log::trace!(
- target: "sync",
- "Received block response from peer {} with {} blocks",
- peer, response.blocks.len()
- );
- let request_duration = if let Some(connections) = self.peers.get_mut(&peer) {
- if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) {
- if let Some(ongoing_request) = &mut connection.ongoing_request {
- if ongoing_request.request == original_request {
- let request_duration = ongoing_request.emitted.elapsed();
- connection.ongoing_request = None;
- request_duration
- } else {
- // We're no longer interested in that request.
- log::debug!(
- target: "sync",
- "Received response from {} to obsolete block request {:?}",
- peer,
- original_request
- );
- return;
- }
- } else {
- // We remove from `self.peers` requests we're no longer interested in,
- // so this can legitimately happen.
- log::trace!(
- target: "sync",
- "Response discarded because it concerns an obsolete request"
- );
- return;
- }
- } else {
- log::error!(
- target: "sync",
- "State inconsistency: response on non-existing connection {:?}",
- connection_id
- );
- return;
- }
- } else {
- log::error!(
- target: "sync",
- "State inconsistency: response on non-connected peer {}",
- peer
- );
- return;
- };
-
- let blocks = response.blocks.into_iter().map(|block_data| {
- Ok(message::BlockData:: {
- hash: Decode::decode(&mut block_data.hash.as_ref())?,
- header: if !block_data.header.is_empty() {
- Some(Decode::decode(&mut block_data.header.as_ref())?)
- } else {
- None
- },
- body: if original_request.fields.contains(message::BlockAttributes::BODY) {
- Some(block_data.body.iter().map(|body| {
- Decode::decode(&mut body.as_ref())
- }).collect::, _>>()?)
- } else {
- None
- },
- receipt: if !block_data.message_queue.is_empty() {
- Some(block_data.receipt)
- } else {
- None
- },
- message_queue: if !block_data.message_queue.is_empty() {
- Some(block_data.message_queue)
- } else {
- None
- },
- justification: if !block_data.justification.is_empty() {
- Some(block_data.justification)
- } else if block_data.is_empty_justification {
- Some(Vec::new())
- } else {
- None
- },
- })
- }).collect::, codec::Error>>();
-
- match blocks {
- Ok(blocks) => {
- let id = original_request.id;
- let ev = Event::Response {
- peer,
- response: message::BlockResponse:: { id, blocks },
- request_duration,
- };
- self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev));
- }
- Err(err) => {
- log::debug!(
- target: "sync",
- "Failed to decode block response from peer {}: {}", peer, err
- );
- }
- }
- }
- }
- }
-
- fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters)
- -> Poll, Event>>
- {
- if let Some(ev) = self.pending_events.pop_front() {
- return Poll::Ready(ev);
- }
-
- // Check the request timeouts.
- for (peer, connections) in &mut self.peers {
- for connection in connections {
- let ongoing_request = match &mut connection.ongoing_request {
- Some(rq) => rq,
- None => continue,
- };
-
- if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) {
- let original_request = ongoing_request.request.clone();
- let request_duration = ongoing_request.emitted.elapsed();
- connection.ongoing_request = None;
- log::debug!(
- target: "sync",
- "Request timeout for {}: {:?}",
- peer, original_request
- );
- let ev = Event::RequestTimeout {
- peer: peer.clone(),
- request_duration,
- };
- return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
- }
- }
- }
-
- if let Poll::Ready(Some((peer, total_handling_time))) = self.outgoing.poll_next_unpin(cx) {
- let ev = Event::AnsweredRequest {
- peer,
- total_handling_time,
- };
- return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev));
- }
-
- Poll::Pending
- }
-}
-
-/// Output type of inbound and outbound substream upgrades.
-#[derive(Debug)]
-pub enum NodeEvent {
- /// Incoming request from remote, substream to use for the response, and when we started
- /// handling this request.
- Request(schema::v1::BlockRequest, T, Instant),
- /// Incoming response from remote.
- Response(message::BlockRequest, schema::v1::BlockResponse),
-}
-
-/// Substream upgrade protocol.
-///
-/// We attempt to parse an incoming protobuf encoded request (cf. `Request`)
-/// which will be handled by the `BlockRequests` behaviour, i.e. the request
-/// will become visible via `inject_node_event` which then dispatches to the
-/// relevant callback to process the message and prepare a response.
-#[derive(Debug, Clone)]
-pub struct InboundProtocol {
- /// The max. request length in bytes.
- max_request_len: usize,
- /// The protocol to use during upgrade negotiation.
- protocol: Bytes,
- /// Type of the block.
- marker: PhantomData,
-}
-
-impl UpgradeInfo for InboundProtocol {
- type Info = Bytes;
- type InfoIter = iter::Once;
-
- fn protocol_info(&self) -> Self::InfoIter {
- iter::once(self.protocol.clone())
- }
-}
-
-impl InboundUpgrade for InboundProtocol
-where
- B: Block,
- T: AsyncRead + AsyncWrite + Unpin + Send + 'static
-{
- type Output = NodeEvent;
- type Error = ReadOneError;
- type Future = BoxFuture<'static, Result>;
-
- fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future {
- // This `Instant` will be passed around until the processing of this request is done.
- let handling_start = Instant::now();
-
- let future = async move {
- let len = self.max_request_len;
- let vec = read_one(&mut s, len).await?;
- match schema::v1::BlockRequest::decode(&vec[..]) {
- Ok(r) => Ok(NodeEvent::Request(r, s, handling_start)),
- Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)))
- }
- };
- future.boxed()
- }
-}
-
-/// Substream upgrade protocol.
-///
-/// Sends a request to remote and awaits the response.
-#[derive(Debug, Clone)]
-pub struct OutboundProtocol {
- /// The serialized protobuf request.
- request: Vec,
- /// The original request. Passed back through the API when the response comes back.
- original_request: message::BlockRequest,
- /// The max. response length in bytes.
- max_response_size: usize,
- /// The protocol to use for upgrade negotiation.
- protocol: Bytes,
-}
-
-impl UpgradeInfo for OutboundProtocol {
- type Info = Bytes;
- type InfoIter = iter::Once;
-
- fn protocol_info(&self) -> Self::InfoIter {
- iter::once(self.protocol.clone())
- }
-}
-
-impl OutboundUpgrade for OutboundProtocol
-where
- B: Block,
- T: AsyncRead + AsyncWrite + Unpin + Send + 'static
-{
- type Output = NodeEvent;
- type Error = ReadOneError;
- type Future = BoxFuture<'static, Result>;
-
- fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future {
- async move {
- write_one(&mut s, &self.request).await?;
- let vec = read_one(&mut s, self.max_response_size).await?;
-
- schema::v1::BlockResponse::decode(&vec[..])
- .map(|r| NodeEvent::Response(self.original_request, r))
- .map_err(|e| {
- ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))
- })
- }.boxed()
- }
-}
-
-/// Build protobuf block request message.
-pub(crate) fn build_protobuf_block_request(
- attributes: BlockAttributes,
- from_block: message::FromBlock,
- to_block: Option,
- direction: message::Direction,
- max_blocks: Option,
-) -> schema::v1::BlockRequest {
- schema::v1::BlockRequest {
- fields: attributes.to_be_u32(),
- from_block: match from_block {
- message::FromBlock::Hash(h) =>
- Some(schema::v1::block_request::FromBlock::Hash(h.encode())),
- message::FromBlock::Number(n) =>
- Some(schema::v1::block_request::FromBlock::Number(n.encode())),
- },
- to_block: to_block.map(|h| h.encode()).unwrap_or_default(),
- direction: match direction {
- message::Direction::Ascending => schema::v1::Direction::Ascending as i32,
- message::Direction::Descending => schema::v1::Direction::Descending as i32,
- },
- max_blocks: max_blocks.unwrap_or(0),
- }
-}
diff --git a/client/network/src/config.rs b/client/network/src/config.rs
index 7c85da8bbaa1b..b7e47e973a33d 100644
--- a/client/network/src/config.rs
+++ b/client/network/src/config.rs
@@ -95,6 +95,18 @@ pub struct Params {
/// Registry for recording prometheus metrics to.
pub metrics_registry: Option,
+
+ /// Request response configuration for the block request protocol.
+ ///
+ /// [`RequestResponseConfig`] [`name`] is used to tag outgoing block requests with the correct
+ /// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming block
+ /// requests, if enabled.
+ ///
+ /// Can be constructed either via [`block_request_handler::generate_protocol_config`] allowing
+ /// outgoing but not incoming requests, or constructed via
+ /// [`block_request_handler::BlockRequestHandler::new`] allowing both outgoing and incoming
+ /// requests.
+ pub block_request_protocol_config: RequestResponseConfig,
}
/// Role of the local node.
diff --git a/client/network/src/gossip/tests.rs b/client/network/src/gossip/tests.rs
index d284616ce942c..e621adf0c09e1 100644
--- a/client/network/src/gossip/tests.rs
+++ b/client/network/src/gossip/tests.rs
@@ -16,7 +16,9 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see .
-use crate::{config, gossip::QueuedSender, Event, NetworkService, NetworkWorker};
+use crate::block_request_handler::BlockRequestHandler;
+use crate::gossip::QueuedSender;
+use crate::{config, Event, NetworkService, NetworkWorker};
use futures::prelude::*;
use sp_runtime::traits::{Block as BlockT, Header as _};
@@ -33,7 +35,7 @@ type TestNetworkService = NetworkService<
///
/// > **Note**: We return the events stream in order to not possibly lose events between the
/// > construction of the service and the moment the events stream is grabbed.
-fn build_test_full_node(config: config::NetworkConfiguration)
+fn build_test_full_node(network_config: config::NetworkConfiguration)
-> (Arc, impl Stream- )
{
let client = Arc::new(
@@ -90,19 +92,31 @@ fn build_test_full_node(config: config::NetworkConfiguration)
None,
));
+ let protocol_id = config::ProtocolId::from("/test-protocol-name");
+
+ let block_request_protocol_config = {
+ let (handler, protocol_config) = BlockRequestHandler::new(
+ protocol_id.clone(),
+ client.clone(),
+ );
+ async_std::task::spawn(handler.run().boxed());
+ protocol_config
+ };
+
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
- network_config: config,
+ network_config,
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
- protocol_id: config::ProtocolId::from("/test-protocol-name"),
+ protocol_id,
import_queue,
block_announce_validator: Box::new(
sp_consensus::block_validation::DefaultBlockAnnounceValidator,
),
metrics_registry: None,
+ block_request_protocol_config,
})
.unwrap();
diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs
index 533a69dd4d5a5..ab7625ff9fe8a 100644
--- a/client/network/src/lib.rs
+++ b/client/network/src/lib.rs
@@ -246,7 +246,6 @@
//!
mod behaviour;
-mod block_requests;
mod chain;
mod peer_info;
mod discovery;
@@ -259,6 +258,7 @@ mod service;
mod transport;
mod utils;
+pub mod block_request_handler;
pub mod config;
pub mod error;
pub mod gossip;
diff --git a/client/network/src/light_client_handler.rs b/client/network/src/light_client_handler.rs
index 83c1160a33642..3ac6e67a23278 100644
--- a/client/network/src/light_client_handler.rs
+++ b/client/network/src/light_client_handler.rs
@@ -29,7 +29,6 @@
use bytes::Bytes;
use codec::{self, Encode, Decode};
use crate::{
- block_requests::build_protobuf_block_request,
chain::Client,
config::ProtocolId,
protocol::message::{BlockAttributes, Direction, FromBlock},
@@ -1066,13 +1065,16 @@ fn retries(request: &Request) -> usize {
fn serialize_request(request: &Request) -> Result, prost::EncodeError> {
let request = match request {
Request::Body { request, .. } => {
- let rq = build_protobuf_block_request::<_, NumberFor>(
- BlockAttributes::BODY,
- FromBlock::Hash(request.header.hash()),
- None,
- Direction::Ascending,
- Some(1),
- );
+ let rq = schema::v1::BlockRequest {
+ fields: BlockAttributes::BODY.to_be_u32(),
+ from_block: Some(schema::v1::block_request::FromBlock::Hash(
+ request.header.hash().encode(),
+ )),
+ to_block: Default::default(),
+ direction: schema::v1::Direction::Ascending as i32,
+ max_blocks: 1,
+ };
+
let mut buf = Vec::with_capacity(rq.encoded_len());
rq.encode(&mut buf)?;
return Ok(buf);
diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs
index 3bbfdb2cb65f7..e3d6d5e815c3b 100644
--- a/client/network/src/protocol.rs
+++ b/client/network/src/protocol.rs
@@ -21,41 +21,43 @@ use crate::{
chain::Client,
config::{ProtocolId, TransactionPool, TransactionImportFuture, TransactionImport},
error,
+ request_responses::RequestFailure,
utils::{interval, LruHashSet},
};
use bytes::{Bytes, BytesMut};
-use futures::{prelude::*, stream::FuturesUnordered};
+use codec::{Decode, DecodeAll, Encode};
+use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
use generic_proto::{GenericProto, GenericProtoOut};
-use libp2p::{Multiaddr, PeerId};
use libp2p::core::{ConnectedPoint, connection::{ConnectionId, ListenerId}};
-use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
+use libp2p::request_response::OutboundFailure;
use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters};
+use libp2p::swarm::{ProtocolsHandler, IntoProtocolsHandler};
+use libp2p::{Multiaddr, PeerId};
+use log::{log, Level, trace, debug, warn, error};
+use message::{BlockAnnounce, Message};
+use message::generic::{Message as GenericMessage, Roles};
+use prometheus_endpoint::{
+ Registry, Gauge, Counter, GaugeVec,
+ PrometheusError, Opts, register, U64
+};
+use prost::Message as _;
use sp_consensus::{
BlockOrigin,
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
-use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
};
use sp_arithmetic::traits::SaturatedConversion;
-use message::{BlockAnnounce, Message};
-use message::generic::{Message as GenericMessage, Roles};
-use prometheus_endpoint::{
- Registry, Gauge, Counter, GaugeVec,
- PrometheusError, Opts, register, U64
-};
use sync::{ChainSync, SyncState};
use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
-use log::{log, Level, trace, debug, warn, error};
-use wasm_timer::Instant;
mod generic_proto;
@@ -65,7 +67,6 @@ pub mod sync;
pub use generic_proto::{NotificationsSink, Ready, NotifsHandlerError};
-const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100);
/// Interval at which we propagate transactions;
@@ -95,6 +96,8 @@ mod rep {
use sc_peerset::ReputationChange as Rep;
/// Reputation change when a peer doesn't respond in time to our messages.
pub const TIMEOUT: Rep = Rep::new(-(1 << 10), "Request timeout");
+ /// Reputation change when a peer refuses a request.
+ pub const REFUSED: Rep = Rep::new(-(1 << 10), "Request refused");
/// Reputation change when we are a light client and a peer is behind us.
pub const PEER_BEHIND_US_LIGHT: Rep = Rep::new(-(1 << 8), "Useless for a light peer");
/// Reputation change when a peer sends us any transaction.
@@ -110,8 +113,6 @@ mod rep {
pub const BAD_TRANSACTION: Rep = Rep::new(-(1 << 12), "Bad transaction");
/// We received a message that failed to decode.
pub const BAD_MESSAGE: Rep = Rep::new(-(1 << 12), "Bad message");
- /// We received an unexpected response.
- pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet");
/// We received an unexpected transaction packet.
pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet");
/// Peer has different genesis.
@@ -125,7 +126,6 @@ mod rep {
}
struct Metrics {
- obsolete_requests: Gauge,
peers: Gauge,
queued_blocks: Gauge,
fork_targets: Gauge,
@@ -136,10 +136,6 @@ struct Metrics {
impl Metrics {
fn register(r: &Registry) -> Result {
Ok(Metrics {
- obsolete_requests: {
- let g = Gauge::new("sync_obsolete_requests", "Number of obsolete requests")?;
- register(g, r)?
- },
peers: {
let g = Gauge::new("sync_peers", "Number of peers we sync with")?;
register(g, r)?
@@ -241,13 +237,14 @@ struct PacketStats {
}
/// Peer information
-#[derive(Debug, Clone)]
+#[derive(Debug)]
struct Peer {
info: PeerInfo,
- /// Current block request, if any.
- block_request: Option<(Instant, message::BlockRequest)>,
- /// Requests we are no longer interested in.
- obsolete_requests: HashMap,
+ /// Current block request, if any. Started by emitting [`CustomMessageOutcome::BlockRequest`].
+ block_request: Option<(
+ message::BlockRequest,
+ oneshot::Receiver, RequestFailure>>,
+ )>,
/// Holds a set of transactions known to this peer.
known_transactions: LruHashSet,
/// Holds a set of blocks known to this peer.
@@ -640,8 +637,12 @@ impl Protocol {
CustomMessageOutcome::None
}
- fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest) {
- update_peer_request::(&mut self.context_data.peers, who, request)
+ fn prepare_block_request(
+ &mut self,
+ who: PeerId,
+ request: message::BlockRequest,
+ ) -> CustomMessageOutcome {
+ prepare_block_request::(&mut self.context_data.peers, who, request)
}
/// Called by peer when it is disconnecting
@@ -674,52 +675,76 @@ impl Protocol {
/// Must contain the same `PeerId` and request that have been emitted.
pub fn on_block_response(
&mut self,
- peer: PeerId,
- response: message::BlockResponse,
+ peer_id: PeerId,
+ request: message::BlockRequest,
+ response: crate::schema::v1::BlockResponse,
) -> CustomMessageOutcome {
- let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) {
- if p.obsolete_requests.remove(&response.id).is_some() {
- trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
+ let blocks = response.blocks.into_iter().map(|block_data| {
+ Ok(message::BlockData:: {
+ hash: Decode::decode(&mut block_data.hash.as_ref())?,
+ header: if !block_data.header.is_empty() {
+ Some(Decode::decode(&mut block_data.header.as_ref())?)
+ } else {
+ None
+ },
+ body: if request.fields.contains(message::BlockAttributes::BODY) {
+ Some(block_data.body.iter().map(|body| {
+ Decode::decode(&mut body.as_ref())
+ }).collect::, _>>()?)
+ } else {
+ None
+ },
+ receipt: if !block_data.message_queue.is_empty() {
+ Some(block_data.receipt)
+ } else {
+ None
+ },
+ message_queue: if !block_data.message_queue.is_empty() {
+ Some(block_data.message_queue)
+ } else {
+ None
+ },
+ justification: if !block_data.justification.is_empty() {
+ Some(block_data.justification)
+ } else if block_data.is_empty_justification {
+ Some(Vec::new())
+ } else {
+ None
+ },
+ })
+ }).collect::, codec::Error>>();
+
+ let blocks = match blocks {
+ Ok(blocks) => blocks,
+ Err(err) => {
+ debug!(target: "sync", "Failed to decode block response from {}: {}", peer_id, err);
+ self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
return CustomMessageOutcome::None;
}
- // Clear the request. If the response is invalid peer will be disconnected anyway.
- match p.block_request.take() {
- Some((_, request)) if request.id == response.id => request,
- Some(_) => {
- trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id);
- return CustomMessageOutcome::None;
- }
- None => {
- trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
- self.behaviour.disconnect_peer(&peer);
- self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
- return CustomMessageOutcome::None;
- }
- }
- } else {
- trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer);
- self.behaviour.disconnect_peer(&peer);
- self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE);
- return CustomMessageOutcome::None;
+ };
+
+ let block_response = message::BlockResponse:: {
+ id: request.id,
+ blocks,
};
let blocks_range = || match (
- response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
- response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
+ block_response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
+ block_response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};
trace!(target: "sync", "BlockResponse {} from {} with {} blocks {}",
- response.id,
- peer,
- response.blocks.len(),
+ block_response.id,
+ peer_id,
+ block_response.blocks.len(),
blocks_range(),
);
if request.fields == message::BlockAttributes::JUSTIFICATION {
- match self.sync.on_block_justification(peer, response) {
+ match self.sync.on_block_justification(peer_id, block_response) {
Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None,
Ok(sync::OnBlockJustification::Import { peer, hash, number, justification }) =>
CustomMessageOutcome::JustificationImport(peer, hash, number, justification),
@@ -730,15 +755,11 @@ impl Protocol {
}
}
} else {
- match self.sync.on_block_data(&peer, Some(request), response) {
+ match self.sync.on_block_data(&peer_id, Some(request), block_response) {
Ok(sync::OnBlockData::Import(origin, blocks)) =>
CustomMessageOutcome::BlockImport(origin, blocks),
- Ok(sync::OnBlockData::Request(peer, mut req)) => {
- self.update_peer_request(&peer, &mut req);
- CustomMessageOutcome::BlockRequest {
- target: peer,
- request: req,
- }
+ Ok(sync::OnBlockData::Request(peer, req)) => {
+ self.prepare_block_request(peer, req)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -749,52 +770,13 @@ impl Protocol {
}
}
- /// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] if it has failed.
- pub fn on_block_request_failed(
- &mut self,
- peer: &PeerId,
- ) {
- self.peerset_handle.report_peer(peer.clone(), rep::TIMEOUT);
- self.behaviour.disconnect_peer(peer);
- }
-
/// Perform time based maintenance.
///
/// > **Note**: This method normally doesn't have to be called except for testing purposes.
pub fn tick(&mut self) {
- self.maintain_peers();
self.report_metrics()
}
- fn maintain_peers(&mut self) {
- let tick = Instant::now();
- let mut aborting = Vec::new();
- {
- for (who, peer) in self.context_data.peers.iter() {
- if peer.block_request.as_ref().map_or(false, |(t, _)| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
- log!(
- target: "sync",
- if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
- "Request timeout {}", who
- );
- aborting.push(who.clone());
- } else if peer.obsolete_requests.values().any(|t| (tick - *t).as_secs() > REQUEST_TIMEOUT_SEC) {
- log!(
- target: "sync",
- if self.important_peers.contains(who) { Level::Warn } else { Level::Trace },
- "Obsolete timeout {}", who
- );
- aborting.push(who.clone());
- }
- }
- }
-
- for p in aborting {
- self.behaviour.disconnect_peer(&p);
- self.peerset_handle.report_peer(p, rep::TIMEOUT);
- }
- }
-
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
@@ -870,7 +852,6 @@ impl Protocol {
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
- obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
@@ -881,12 +862,9 @@ impl Protocol {
if info.roles.is_full() {
match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) {
Ok(None) => (),
- Ok(Some(mut req)) => {
- self.update_peer_request(&who, &mut req);
- self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
- target: who.clone(),
- request: req,
- });
+ Ok(Some(req)) => {
+ let event = self.prepare_block_request(who.clone(), req);
+ self.pending_messages.push_back(event);
},
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1216,12 +1194,8 @@ impl Protocol {
Ok(sync::OnBlockData::Import(origin, blocks)) => {
CustomMessageOutcome::BlockImport(origin, blocks)
},
- Ok(sync::OnBlockData::Request(peer, mut req)) => {
- self.update_peer_request(&peer, &mut req);
- CustomMessageOutcome::BlockRequest {
- target: peer,
- request: req,
- }
+ Ok(sync::OnBlockData::Request(peer, req)) => {
+ self.prepare_block_request(peer, req)
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1268,12 +1242,10 @@ impl Protocol {
);
for result in results {
match result {
- Ok((id, mut req)) => {
- update_peer_request(&mut self.context_data.peers, &id, &mut req);
- self.pending_messages.push_back(CustomMessageOutcome::BlockRequest {
- target: id,
- request: req,
- });
+ Ok((id, req)) => {
+ self.pending_messages.push_back(
+ prepare_block_request(&mut self.context_data.peers, id, req)
+ );
}
Err(sync::BadPeer(id, repu)) => {
self.behaviour.disconnect_peer(&id);
@@ -1316,13 +1288,6 @@ impl Protocol {
use std::convert::TryInto;
if let Some(metrics) = &self.metrics {
- let mut obsolete_requests: u64 = 0;
- for peer in self.context_data.peers.values() {
- let n = peer.obsolete_requests.len().try_into().unwrap_or(std::u64::MAX);
- obsolete_requests = obsolete_requests.saturating_add(n);
- }
- metrics.obsolete_requests.set(obsolete_requests);
-
let n = self.context_data.peers.len().try_into().unwrap_or(std::u64::MAX);
metrics.peers.set(n);
@@ -1343,6 +1308,39 @@ impl Protocol {
}
}
+fn prepare_block_request(
+ peers: &mut HashMap>,
+ who: PeerId,
+ mut request: message::BlockRequest,
+) -> CustomMessageOutcome {
+ let (tx, rx) = oneshot::channel();
+
+ if let Some(ref mut peer) = peers.get_mut(&who) {
+ request.id = peer.next_request_id;
+ peer.next_request_id += 1;
+ peer.block_request = Some((request.clone(), rx));
+ }
+
+ let request = crate::schema::v1::BlockRequest {
+ fields: request.fields.to_be_u32(),
+ from_block: match request.from {
+ message::FromBlock::Hash(h) =>
+ Some(crate::schema::v1::block_request::FromBlock::Hash(h.encode())),
+ message::FromBlock::Number(n) =>
+ Some(crate::schema::v1::block_request::FromBlock::Number(n.encode())),
+ },
+ to_block: request.to.map(|h| h.encode()).unwrap_or_default(),
+ direction: request.direction as i32,
+ max_blocks: request.max.unwrap_or(0),
+ };
+
+ CustomMessageOutcome::BlockRequest {
+ target: who,
+ request: request,
+ pending_response: tx,
+ }
+}
+
/// Outcome of an incoming custom message.
#[derive(Debug)]
#[must_use]
@@ -1367,33 +1365,16 @@ pub enum CustomMessageOutcome {
/// Messages have been received on one or more notifications protocols.
NotificationsReceived { remote: PeerId, messages: Vec<(Cow<'static, str>, Bytes)> },
/// A new block request must be emitted.
- /// You must later call either [`Protocol::on_block_response`] or
- /// [`Protocol::on_block_request_failed`].
- /// Each peer can only have one active request. If a request already exists for this peer, it
- /// must be silently discarded.
- /// It is the responsibility of the handler to ensure that a timeout exists.
- BlockRequest { target: PeerId, request: message::BlockRequest },
+ BlockRequest {
+ target: PeerId,
+ request: crate::schema::v1::BlockRequest,
+ pending_response: oneshot::Sender, RequestFailure>>,
+ },
/// Peer has a reported a new head of chain.
PeerNewBest(PeerId, NumberFor),
None,
}
-fn update_peer_request(
- peers: &mut HashMap>,
- who: &PeerId,
- request: &mut message::BlockRequest,
-) {
- if let Some(ref mut peer) = peers.get_mut(who) {
- request.id = peer.next_request_id;
- peer.next_request_id += 1;
- if let Some((timestamp, request)) = peer.block_request.take() {
- trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who);
- peer.obsolete_requests.insert(request.id, timestamp);
- }
- peer.block_request = Some((Instant::now(), request.clone()));
- }
-}
-
impl NetworkBehaviour for Protocol {
type ProtocolsHandler = ::ProtocolsHandler;
type OutEvent = CustomMessageOutcome;
@@ -1445,6 +1426,80 @@ impl NetworkBehaviour for Protocol {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
}
+ // Check for finished outgoing requests.
+ let mut finished_block_requests = Vec::new();
+ for (id, peer) in self.context_data.peers.iter_mut() {
+ if let Peer { block_request: Some((_, pending_response)), .. } = peer {
+ match pending_response.poll_unpin(cx) {
+ Poll::Ready(Ok(Ok(resp))) => {
+ let (req, _) = peer.block_request.take().unwrap();
+
+ let protobuf_response = match crate::schema::v1::BlockResponse::decode(&resp[..]) {
+ Ok(proto) => proto,
+ Err(e) => {
+ trace!(target: "sync", "Failed to decode block request to peer {:?}: {:?}.", id, e);
+ self.peerset_handle.report_peer(id.clone(), rep::BAD_MESSAGE);
+ self.behaviour.disconnect_peer(id);
+ continue;
+ }
+ };
+
+ finished_block_requests.push((id.clone(), req, protobuf_response));
+ },
+ Poll::Ready(Ok(Err(e))) => {
+ peer.block_request.take();
+ trace!(target: "sync", "Block request to peer {:?} failed: {:?}.", id, e);
+
+ match e {
+ RequestFailure::Network(OutboundFailure::Timeout) => {
+ self.peerset_handle.report_peer(id.clone(), rep::TIMEOUT);
+ self.behaviour.disconnect_peer(id);
+ }
+ RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => {
+ self.peerset_handle.report_peer(id.clone(), rep::BAD_PROTOCOL);
+ self.behaviour.disconnect_peer(id);
+ }
+ RequestFailure::Network(OutboundFailure::DialFailure) => {
+ self.behaviour.disconnect_peer(id);
+ }
+ RequestFailure::Refused => {
+ self.peerset_handle.report_peer(id.clone(), rep::REFUSED);
+ self.behaviour.disconnect_peer(id);
+ }
+ RequestFailure::Network(OutboundFailure::ConnectionClosed)
+ | RequestFailure::NotConnected => {
+ self.behaviour.disconnect_peer(id);
+ },
+ RequestFailure::UnknownProtocol => {
+ debug_assert!(false, "Block request protocol should always be known.");
+ }
+ RequestFailure::Obsolete => {
+ debug_assert!(
+ false,
+ "Can not receive `RequestFailure::Obsolete` after dropping the \
+ response receiver.",
+ );
+ }
+ }
+ },
+ Poll::Ready(Err(oneshot::Canceled)) => {
+ peer.block_request.take();
+ trace!(
+ target: "sync",
+ "Block request to peer {:?} failed due to oneshot being canceled.",
+ id,
+ );
+ self.behaviour.disconnect_peer(id);
+ },
+ Poll::Pending => {},
+ }
+ }
+ }
+ for (id, req, protobuf_response) in finished_block_requests {
+ let ev = self.on_block_response(id, req, protobuf_response);
+ self.pending_messages.push_back(ev);
+ }
+
while let Poll::Ready(Some(())) = self.tick_timeout.poll_next_unpin(cx) {
self.tick();
}
@@ -1453,20 +1508,12 @@ impl NetworkBehaviour for Protocol {
self.propagate_transactions();
}
- for (id, mut r) in self.sync.block_requests() {
- update_peer_request(&mut self.context_data.peers, &id, &mut r);
- let event = CustomMessageOutcome::BlockRequest {
- target: id.clone(),
- request: r,
- };
+ for (id, request) in self.sync.block_requests() {
+ let event = prepare_block_request(&mut self.context_data.peers, id.clone(), request);
self.pending_messages.push_back(event);
}
- for (id, mut r) in self.sync.justification_requests() {
- update_peer_request(&mut self.context_data.peers, &id, &mut r);
- let event = CustomMessageOutcome::BlockRequest {
- target: id,
- request: r,
- };
+ for (id, request) in self.sync.justification_requests() {
+ let event = prepare_block_request(&mut self.context_data.peers, id, request);
self.pending_messages.push_back(event);
}
if let Poll::Ready(Some((tx_hash, result))) = self.pending_transactions.poll_next_unpin(cx) {
@@ -1570,7 +1617,9 @@ impl NetworkBehaviour for Protocol {
}
}
Some(Fallback::Transactions) => {
- if let Ok(m) = message::Transactions::decode(&mut message.as_ref()) {
+ if let Ok(m) = as Decode>::decode(
+ &mut message.as_ref(),
+ ) {
self.on_transactions(peer_id, m);
} else {
warn!(target: "sub-libp2p", "Failed to decode transactions list");
@@ -1594,17 +1643,25 @@ impl NetworkBehaviour for Protocol {
}
}
None => {
- debug!(target: "sub-libp2p", "Received notification from unknown protocol {:?}", protocol_name);
+ debug!(
+ target: "sub-libp2p",
+ "Received notification from unknown protocol {:?}",
+ protocol_name,
+ );
CustomMessageOutcome::None
}
}
};
- if let CustomMessageOutcome::None = outcome {
- Poll::Pending
- } else {
- Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome))
+ if !matches!(outcome, CustomMessageOutcome::::None) {
+ return Poll::Ready(NetworkBehaviourAction::GenerateEvent(outcome));
}
+
+ if let Some(message) = self.pending_messages.pop_front() {
+ return Poll::Ready(NetworkBehaviourAction::GenerateEvent(message));
+ }
+
+ Poll::Pending
}
fn inject_addr_reach_failure(
diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs
index 806c04e5f307c..fbdb1432379ed 100644
--- a/client/network/src/request_responses.rs
+++ b/client/network/src/request_responses.rs
@@ -137,11 +137,17 @@ pub enum Event {
/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
/// failed.
+ ///
+ /// This event is generated for statistics purposes.
RequestFinished {
- /// Request that has succeeded.
- request_id: RequestId,
- /// Response sent by the remote or reason for failure.
- result: Result, RequestFailure>,
+ /// Peer that we send a request to.
+ peer: PeerId,
+ /// Name of the protocol in question.
+ protocol: Cow<'static, str>,
+ /// Duration the request took.
+ duration: Duration,
+ /// Result of the request.
+ result: Result<(), RequestFailure>
},
}
@@ -155,8 +161,11 @@ pub struct RequestResponsesBehaviour {
(RequestResponse, Option>)
>,
+ /// Pending requests, passed down to a [`RequestResponse`] behaviour, awaiting a reply.
+ pending_requests: HashMap, RequestFailure>>)>,
+
/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
- /// response to send back to the remote.
+ /// start time and the response to send back to the remote.
pending_responses: stream::FuturesUnordered<
Pin> + Send>>
>,
@@ -203,6 +212,7 @@ impl RequestResponsesBehaviour {
Ok(Self {
protocols,
+ pending_requests: Default::default(),
pending_responses: Default::default(),
pending_responses_arrival_time: Default::default(),
})
@@ -212,17 +222,36 @@ impl RequestResponsesBehaviour {
///
/// An error is returned if we are not connected to the target peer or if the protocol doesn't
/// match one that has been registered.
- pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec)
- -> Result
- {
+ pub fn send_request(
+ &mut self,
+ target: &PeerId,
+ protocol: &str,
+ request: Vec,
+ pending_response: oneshot::Sender, RequestFailure>>,
+ ) {
if let Some((protocol, _)) = self.protocols.get_mut(protocol) {
if protocol.is_connected(target) {
- Ok(protocol.send_request(target, request))
+ let request_id = protocol.send_request(target, request);
+ self.pending_requests.insert(request_id, (Instant::now(), pending_response));
} else {
- Err(SendRequestError::NotConnected)
+ if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
+ log::debug!(
+ target: "sub-libp2p",
+ "Not connected to peer {:?}. At the same time local \
+ node is no longer interested in the result.",
+ target,
+ );
+ };
}
} else {
- Err(SendRequestError::UnknownProtocol)
+ if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
+ log::debug!(
+ target: "sub-libp2p",
+ "Unknown protocol {:?}. At the same time local \
+ node is no longer interested in the result.",
+ protocol,
+ );
+ };
}
}
}
@@ -440,6 +469,8 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
payload: request,
pending_response: tx,
});
+ } else {
+ debug_assert!(false, "Received message on outbound-only protocol.");
}
let protocol = protocol.clone();
@@ -463,29 +494,80 @@ impl NetworkBehaviour for RequestResponsesBehaviour {
// Received a response from a remote to one of our requests.
RequestResponseEvent::Message {
+ peer,
message: RequestResponseMessage::Response {
request_id,
response,
},
..
} => {
+ let (started, delivered) = match self.pending_requests.remove(&request_id) {
+ Some((started, pending_response)) => {
+ let delivered = pending_response.send(
+ response.map_err(|()| RequestFailure::Refused),
+ ).map_err(|_| RequestFailure::Obsolete);
+ (started, delivered)
+ }
+ None => {
+ log::warn!(
+ target: "sub-libp2p",
+ "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
+ request_id,
+ );
+ debug_assert!(false);
+ continue;
+ }
+ };
+
let out = Event::RequestFinished {
- request_id,
- result: response.map_err(|()| RequestFailure::Refused),
+ peer,
+ protocol: protocol.clone(),
+ duration: started.elapsed(),
+ result: delivered,
};
+
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
// One of our requests has failed.
RequestResponseEvent::OutboundFailure {
+ peer,
request_id,
error,
..
} => {
+ let started = match self.pending_requests.remove(&request_id) {
+ Some((started, pending_response)) => {
+ if pending_response.send(
+ Err(RequestFailure::Network(error.clone())),
+ ).is_err() {
+ log::debug!(
+ target: "sub-libp2p",
+ "Request with id {:?} failed. At the same time local \
+ node is no longer interested in the result.",
+ request_id,
+ );
+ }
+ started
+ }
+ None => {
+ log::warn!(
+ target: "sub-libp2p",
+ "Received `RequestResponseEvent::Message` with unexpected request id {:?}",
+ request_id,
+ );
+ debug_assert!(false);
+ continue;
+ }
+ };
+
let out = Event::RequestFinished {
- request_id,
+ peer,
+ protocol: protocol.clone(),
+ duration: started.elapsed(),
result: Err(RequestFailure::Network(error)),
};
+
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out));
}
@@ -529,21 +611,18 @@ pub enum RegisterError {
DuplicateProtocol(#[error(ignore)] Cow<'static, str>),
}
-/// Error when sending a request.
+/// Error in a request.
#[derive(Debug, derive_more::Display, derive_more::Error)]
-pub enum SendRequestError {
+pub enum RequestFailure {
/// We are not currently connected to the requested peer.
NotConnected,
/// Given protocol hasn't been registered.
UnknownProtocol,
-}
-
-/// Error in a request.
-#[derive(Debug, derive_more::Display, derive_more::Error)]
-pub enum RequestFailure {
/// Remote has closed the substream before answering, thereby signaling that it considers the
/// request as valid, but refused to answer it.
Refused,
+ /// The remote replied, but the local node is no longer interested in the response.
+ Obsolete,
/// Problem on the network.
#[display(fmt = "Problem on the network")]
Network(#[error(ignore)] OutboundFailure),
@@ -685,7 +764,7 @@ impl RequestResponseCodec for GenericCodec {
#[cfg(test)]
mod tests {
- use futures::channel::mpsc;
+ use futures::channel::{mpsc, oneshot};
use futures::executor::LocalPool;
use futures::prelude::*;
use futures::task::Spawn;
@@ -771,31 +850,32 @@ mod tests {
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
pool.run_until(async move {
- let mut sent_request_id = None;
+ let mut response_receiver = None;
loop {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
- let id = swarm.send_request(
+ let (sender, receiver) = oneshot::channel();
+ swarm.send_request(
&peer_id,
protocol_name,
- b"this is a request".to_vec()
- ).unwrap();
- assert!(sent_request_id.is_none());
- sent_request_id = Some(id);
+ b"this is a request".to_vec(),
+ sender,
+ );
+ assert!(response_receiver.is_none());
+ response_receiver = Some(receiver);
}
SwarmEvent::Behaviour(super::Event::RequestFinished {
- request_id,
- result,
+ result, ..
}) => {
- assert_eq!(Some(request_id), sent_request_id);
- let result = result.unwrap();
- assert_eq!(result, b"this is a response");
+ result.unwrap();
break;
}
_ => {}
}
}
+
+ assert_eq!(response_receiver.unwrap().await.unwrap().unwrap(), b"this is a response");
});
}
@@ -875,33 +955,35 @@ mod tests {
// Remove and run the remaining swarm.
let (mut swarm, _) = swarms.remove(0);
pool.run_until(async move {
- let mut sent_request_id = None;
+ let mut response_receiver = None;
loop {
match swarm.next_event().await {
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
- let id = swarm.send_request(
+ let (sender, receiver) = oneshot::channel();
+ swarm.send_request(
&peer_id,
protocol_name,
- b"this is a request".to_vec()
- ).unwrap();
- assert!(sent_request_id.is_none());
- sent_request_id = Some(id);
+ b"this is a request".to_vec(),
+ sender,
+ );
+ assert!(response_receiver.is_none());
+ response_receiver = Some(receiver);
}
SwarmEvent::Behaviour(super::Event::RequestFinished {
- request_id,
- result,
+ result, ..
}) => {
- assert_eq!(Some(request_id), sent_request_id);
- match result {
- Err(super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed)) => {},
- _ => panic!()
- }
+ assert!(result.is_err());
break;
}
_ => {}
}
}
+
+ match response_receiver.unwrap().await.unwrap().unwrap_err() {
+ super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed) => {},
+ _ => panic!()
+ }
});
}
}
diff --git a/client/network/src/service.rs b/client/network/src/service.rs
index 816f80106b8df..d8f0146e2e339 100644
--- a/client/network/src/service.rs
+++ b/client/network/src/service.rs
@@ -38,7 +38,7 @@ use crate::{
NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer,
},
on_demand_layer::AlwaysBadChecker,
- light_client_handler, block_requests,
+ light_client_handler,
protocol::{
self,
NotifsHandlerError,
@@ -94,7 +94,6 @@ use std::{
},
task::Poll,
};
-use wasm_timer::Instant;
pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};
@@ -287,10 +286,6 @@ impl NetworkWorker {
params.network_config.client_version,
params.network_config.node_name
);
- let block_requests = {
- let config = block_requests::Config::new(¶ms.protocol_id);
- block_requests::BlockRequests::new(config, params.chain.clone())
- };
let light_client_handler = {
let config = light_client_handler::Config::new(¶ms.protocol_id);
light_client_handler::LightClientHandler::new(
@@ -329,9 +324,9 @@ impl NetworkWorker {
params.role,
user_agent,
local_public,
- block_requests,
light_client_handler,
discovery_config,
+ params.block_request_protocol_config,
params.network_config.request_response_protocols,
);
@@ -430,7 +425,6 @@ impl NetworkWorker {
peers_notifications_sinks,
metrics,
boot_node_ids,
- pending_requests: HashMap::with_capacity(128),
})
}
@@ -1231,13 +1225,6 @@ pub struct NetworkWorker {
metrics: Option,
/// The `PeerId`'s of all boot nodes.
boot_node_ids: Arc>,
- /// Requests started using [`NetworkService::request`]. Includes the channel to send back the
- /// response, when the request has started, and the name of the protocol for diagnostic
- /// purposes.
- pending_requests: HashMap<
- behaviour::RequestId,
- (oneshot::Sender, RequestFailure>>, Instant, String)
- >,
/// For each peer and protocol combination, an object that allows sending notifications to
/// that peer. Shared with the [`NetworkService`].
peers_notifications_sinks: Arc), NotificationsSink>>>,
@@ -1310,29 +1297,7 @@ impl Future for NetworkWorker {
ServiceToWorkerMsg::EventStream(sender) =>
this.event_streams.push(sender),
ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => {
- // Calling `send_request` can fail immediately in some circumstances.
- // This is handled by sending back an error on the channel.
- match this.network_service.send_request(&target, &protocol, request) {
- Ok(request_id) => {
- if let Some(metrics) = this.metrics.as_ref() {
- metrics.requests_out_started_total
- .with_label_values(&[&protocol])
- .inc();
- }
- this.pending_requests.insert(
- request_id,
- (pending_response, Instant::now(), protocol.to_string())
- );
- },
- Err(behaviour::SendRequestError::NotConnected) => {
- let err = RequestFailure::Network(OutboundFailure::ConnectionClosed);
- let _ = pending_response.send(Err(err));
- },
- Err(behaviour::SendRequestError::UnknownProtocol) => {
- let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols);
- let _ = pending_response.send(Err(err));
- },
- }
+ this.network_service.send_request(&target, &protocol, request, pending_response);
},
ServiceToWorkerMsg::DisconnectPeer(who) =>
this.network_service.user_protocol_mut().disconnect_peer(&who),
@@ -1396,51 +1361,37 @@ impl Future for NetworkWorker {
}
}
},
- Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => {
- if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) {
- if let Some(metrics) = this.metrics.as_ref() {
- match &result {
- Ok(_) => {
- metrics.requests_out_success_total
- .with_label_values(&[&protocol])
- .observe(started.elapsed().as_secs_f64());
- }
- Err(err) => {
- let reason = match err {
- RequestFailure::Refused => "refused",
- RequestFailure::Network(OutboundFailure::DialFailure) =>
- "dial-failure",
- RequestFailure::Network(OutboundFailure::Timeout) =>
- "timeout",
- RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
- "connection-closed",
- RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
- "unsupported",
- };
-
- metrics.requests_out_failure_total
- .with_label_values(&[&protocol, reason])
- .inc();
- }
+ Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished {
+ protocol, duration, result, ..
+ })) => {
+ if let Some(metrics) = this.metrics.as_ref() {
+ match result {
+ Ok(_) => {
+ metrics.requests_out_success_total
+ .with_label_values(&[&protocol])
+ .observe(duration.as_secs_f64());
+ }
+ Err(err) => {
+ let reason = match err {
+ RequestFailure::NotConnected => "not-connected",
+ RequestFailure::UnknownProtocol => "unknown-protocol",
+ RequestFailure::Refused => "refused",
+ RequestFailure::Obsolete => "obsolete",
+ RequestFailure::Network(OutboundFailure::DialFailure) =>
+ "dial-failure",
+ RequestFailure::Network(OutboundFailure::Timeout) =>
+ "timeout",
+ RequestFailure::Network(OutboundFailure::ConnectionClosed) =>
+ "connection-closed",
+ RequestFailure::Network(OutboundFailure::UnsupportedProtocols) =>
+ "unsupported",
+ };
+
+ metrics.requests_out_failure_total
+ .with_label_values(&[&protocol, reason])
+ .inc();
}
}
- let _ = send_back.send(result);
- } else {
- error!("Request not in pending_requests");
- }
- },
- Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => {
- if let Some(metrics) = this.metrics.as_ref() {
- metrics.requests_out_started_total
- .with_label_values(&[&protocol])
- .inc();
- }
- },
- Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => {
- if let Some(metrics) = this.metrics.as_ref() {
- metrics.requests_out_success_total
- .with_label_values(&[&protocol])
- .observe(request_duration.as_secs_f64());
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => {
@@ -1567,11 +1518,11 @@ impl Future for NetworkWorker {
let reason = match cause {
Some(ConnectionError::IO(_)) => "transport-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
- EitherError::A(EitherError::A(EitherError::B(
- EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout",
+ EitherError::A(EitherError::B(EitherError::A(
+ PingFailure::Timeout)))))))) => "ping-timeout",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A(
- EitherError::A(EitherError::A(EitherError::A(
- NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged",
+ EitherError::A(EitherError::A(
+ NotifsHandlerError::SyncNotificationsClogged))))))) => "sync-notifications-clogged",
Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error",
Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout",
None => "actively-closed",
diff --git a/client/network/src/service/metrics.rs b/client/network/src/service/metrics.rs
index 3dd0d48888ec7..40d65ea45f111 100644
--- a/client/network/src/service/metrics.rs
+++ b/client/network/src/service/metrics.rs
@@ -78,7 +78,6 @@ pub struct Metrics {
pub requests_in_success_total: HistogramVec,
pub requests_out_failure_total: CounterVec,
pub requests_out_success_total: HistogramVec,
- pub requests_out_started_total: CounterVec,
}
impl Metrics {
@@ -230,7 +229,8 @@ impl Metrics {
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_in_success_total",
- "Total number of requests received and answered"
+ "For successful incoming requests, time between receiving the request and \
+ starting to send the response"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
@@ -248,20 +248,13 @@ impl Metrics {
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_out_success_total",
- "For successful requests, time between a request's start and finish"
+ "For successful outgoing requests, time between a request's start and finish"
),
buckets: prometheus::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
- requests_out_started_total: prometheus::register(CounterVec::new(
- Opts::new(
- "sub_libp2p_requests_out_started_total",
- "Total number of requests emitted"
- ),
- &["protocol"]
- )?, registry)?,
})
}
}
diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs
index 3372fd9f92922..2b0405d88e581 100644
--- a/client/network/src/service/tests.rs
+++ b/client/network/src/service/tests.rs
@@ -17,6 +17,7 @@
// along with this program. If not, see .
use crate::{config, Event, NetworkService, NetworkWorker};
+use crate::block_request_handler::BlockRequestHandler;
use libp2p::PeerId;
use futures::prelude::*;
@@ -91,6 +92,17 @@ fn build_test_full_node(config: config::NetworkConfiguration)
None,
));
+ let protocol_id = config::ProtocolId::from("/test-protocol-name");
+
+ let block_request_protocol_config = {
+ let (handler, protocol_config) = BlockRequestHandler::new(
+ protocol_id.clone(),
+ client.clone(),
+ );
+ async_std::task::spawn(handler.run().boxed());
+ protocol_config
+ };
+
let worker = NetworkWorker::new(config::Params {
role: config::Role::Full,
executor: None,
@@ -98,12 +110,13 @@ fn build_test_full_node(config: config::NetworkConfiguration)
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(crate::config::EmptyTransactionPool),
- protocol_id: config::ProtocolId::from("/test-protocol-name"),
+ protocol_id,
import_queue,
block_announce_validator: Box::new(
sp_consensus::block_validation::DefaultBlockAnnounceValidator,
),
metrics_registry: None,
+ block_request_protocol_config,
})
.unwrap();
diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml
index 535b3cda19c39..e9f49021bbf5a 100644
--- a/client/network/test/Cargo.toml
+++ b/client/network/test/Cargo.toml
@@ -13,6 +13,7 @@ repository = "https://github.com/paritytech/substrate/"
targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
+async-std = "1.6.5"
sc-network = { version = "0.8.0", path = "../" }
log = "0.4.8"
parking_lot = "0.11.1"
diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs
index 68e2bd1594d15..b8b230f5d0719 100644
--- a/client/network/test/src/lib.rs
+++ b/client/network/test/src/lib.rs
@@ -29,6 +29,7 @@ use std::{
use libp2p::build_multiaddr;
use log::trace;
+use sc_network::block_request_handler::{self, BlockRequestHandler};
use sp_blockchain::{
HeaderBackend, Result as ClientResult,
well_known_cache_keys::{self, Id as CacheKeyId},
@@ -49,6 +50,7 @@ use sp_consensus::block_import::{BlockImport, ImportResult};
use sp_consensus::Error as ConsensusError;
use sp_consensus::{BlockOrigin, ForkChoiceStrategy, BlockImportParams, BlockCheckParams, JustificationImport};
use futures::prelude::*;
+use futures::future::BoxFuture;
use sc_network::{NetworkWorker, NetworkService, config::ProtocolId};
use sc_network::config::{NetworkConfiguration, TransportConfig};
use libp2p::PeerId;
@@ -682,6 +684,14 @@ pub trait TestNetFactory: Sized {
network_config.allow_non_globals_in_dht = true;
network_config.notifications_protocols = config.notifications_protocols;
+ let protocol_id = ProtocolId::from("test-protocol-name");
+
+ let block_request_protocol_config = {
+ let (handler, protocol_config) = BlockRequestHandler::new(protocol_id.clone(), client.clone());
+ self.spawn_task(handler.run().boxed());
+ protocol_config
+ };
+
let network = NetworkWorker::new(sc_network::config::Params {
role: Role::Full,
executor: None,
@@ -689,11 +699,12 @@ pub trait TestNetFactory: Sized {
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
- protocol_id: ProtocolId::from("test-protocol-name"),
+ protocol_id,
import_queue,
block_announce_validator: config.block_announce_validator
.unwrap_or_else(|| Box::new(DefaultBlockAnnounceValidator)),
metrics_registry: None,
+ block_request_protocol_config,
}).unwrap();
trace!(target: "test_network", "Peer identifier: {}", network.service().local_peer_id());
@@ -757,6 +768,13 @@ pub trait TestNetFactory: Sized {
network_config.listen_addresses = vec![listen_addr.clone()];
network_config.allow_non_globals_in_dht = true;
+ let protocol_id = ProtocolId::from("test-protocol-name");
+
+ // Add block request handler.
+ let block_request_protocol_config = block_request_handler::generate_protocol_config(
+ protocol_id.clone(),
+ );
+
let network = NetworkWorker::new(sc_network::config::Params {
role: Role::Light,
executor: None,
@@ -764,10 +782,11 @@ pub trait TestNetFactory: Sized {
chain: client.clone(),
on_demand: None,
transaction_pool: Arc::new(EmptyTransactionPool),
- protocol_id: ProtocolId::from("test-protocol-name"),
+ protocol_id,
import_queue,
block_announce_validator: Box::new(DefaultBlockAnnounceValidator),
metrics_registry: None,
+ block_request_protocol_config,
}).unwrap();
self.mut_peers(|peers| {
@@ -792,6 +811,11 @@ pub trait TestNetFactory: Sized {
});
}
+ /// Used to spawn background tasks, e.g. the block request protocol handler.
+ fn spawn_task(&self, f: BoxFuture<'static, ()>) {
+ async_std::task::spawn(f);
+ }
+
/// Polls the testnet until all nodes are in sync.
///
/// Must be executed in a task context.
diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs
index 5426169a83317..e3476e625ca55 100644
--- a/client/service/src/builder.rs
+++ b/client/service/src/builder.rs
@@ -43,6 +43,7 @@ use sc_keystore::LocalKeystore;
use log::{info, warn};
use sc_network::config::{Role, OnDemand};
use sc_network::NetworkService;
+use sc_network::block_request_handler::{self, BlockRequestHandler};
use sp_runtime::generic::BlockId;
use sp_runtime::traits::{
Block as BlockT, SaturatedConversion, HashFor, Zero, BlockIdTo,
@@ -908,6 +909,21 @@ pub fn build_network(
Box::new(DefaultBlockAnnounceValidator)
};
+ let block_request_protocol_config = {
+ if matches!(config.role, Role::Light) {
+ // Allow outgoing requests but deny incoming requests.
+ block_request_handler::generate_protocol_config(protocol_id.clone())
+ } else {
+ // Allow both outgoing and incoming requests.
+ let (handler, protocol_config) = BlockRequestHandler::new(
+ protocol_id.clone(),
+ client.clone(),
+ );
+ spawn_handle.spawn("block_request_handler", handler.run());
+ protocol_config
+ }
+ };
+
let network_params = sc_network::config::Params {
role: config.role.clone(),
executor: {
@@ -923,7 +939,8 @@ pub fn build_network(
import_queue: Box::new(import_queue),
protocol_id,
block_announce_validator,
- metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone())
+ metrics_registry: config.prometheus_config.as_ref().map(|config| config.registry.clone()),
+ block_request_protocol_config,
};
let has_bootnodes = !network_params.network_config.boot_nodes.is_empty();
diff --git a/frame/assets/src/benchmarking.rs b/frame/assets/src/benchmarking.rs
index db98164023d5c..63258c2f591b6 100644
--- a/frame/assets/src/benchmarking.rs
+++ b/frame/assets/src/benchmarking.rs
@@ -74,8 +74,6 @@ fn assert_last_event(generic_event: ::Event) {
}
benchmarks! {
- _ { }
-
create {
let caller: T::AccountId = whitelisted_caller();
let caller_lookup = T::Lookup::unlookup(caller.clone());
diff --git a/frame/babe/src/benchmarking.rs b/frame/babe/src/benchmarking.rs
index 4d75c36669eab..087cac2ed6cc6 100644
--- a/frame/babe/src/benchmarking.rs
+++ b/frame/babe/src/benchmarking.rs
@@ -23,8 +23,6 @@ use frame_benchmarking::benchmarks;
type Header = sp_runtime::generic::Header;
benchmarks! {
- _ { }
-
check_equivocation_proof {
let x in 0 .. 1;
diff --git a/frame/babe/src/lib.rs b/frame/babe/src/lib.rs
index d7da96a3ddd92..79b87cd5c018d 100644
--- a/frame/babe/src/lib.rs
+++ b/frame/babe/src/lib.rs
@@ -64,6 +64,8 @@ pub use equivocation::{BabeEquivocationOffence, EquivocationHandler, HandleEquiv
pub trait Config: pallet_timestamp::Config {
/// The amount of time, in slots, that each epoch should last.
+ /// NOTE: Currently it is not possible to change the epoch duration after
+ /// the chain has started. Attempting to do so will brick block production.
type EpochDuration: Get;
/// The expected average block time at which BABE should be creating
@@ -192,6 +194,9 @@ decl_storage! {
/// Next epoch randomness.
NextRandomness: schnorrkel::Randomness;
+ /// Next epoch authorities.
+ NextAuthorities: Vec<(AuthorityId, BabeAuthorityWeight)>;
+
/// Randomness under construction.
///
/// We make a tradeoff between storage accesses and list length.
@@ -233,6 +238,9 @@ decl_module! {
pub struct Module for enum Call where origin: T::Origin {
/// The number of **slots** that an epoch takes. We couple sessions to
/// epochs, i.e. we start a new session once the new epoch begins.
+ /// NOTE: Currently it is not possible to change the epoch duration
+ /// after the chain has started. Attempting to do so will brick block
+ /// production.
const EpochDuration: u64 = T::EpochDuration::get();
/// The expected average block time at which BABE should be creating
@@ -464,6 +472,9 @@ impl Module {
let randomness = Self::randomness_change_epoch(next_epoch_index);
Randomness::put(randomness);
+ // Update the next epoch authorities.
+ NextAuthorities::put(&next_authorities);
+
// After we update the current epoch, we signal the *next* epoch change
// so that nodes can track changes.
let next_randomness = NextRandomness::get();
@@ -483,7 +494,7 @@ impl Module {
// give correct results after `do_initialize` of the first block
// in the chain (as its result is based off of `GenesisSlot`).
pub fn current_epoch_start() -> SlotNumber {
- (EpochIndex::get() * T::EpochDuration::get()) + GenesisSlot::get()
+ Self::epoch_start(EpochIndex::get())
}
/// Produces information about the current epoch.
@@ -497,6 +508,36 @@ impl Module {
}
}
+ /// Produces information about the next epoch (which was already previously
+ /// announced).
+ pub fn next_epoch() -> Epoch {
+ let next_epoch_index = EpochIndex::get().checked_add(1).expect(
+ "epoch index is u64; it is always only incremented by one; \
+ if u64 is not enough we should crash for safety; qed.",
+ );
+
+ Epoch {
+ epoch_index: next_epoch_index,
+ start_slot: Self::epoch_start(next_epoch_index),
+ duration: T::EpochDuration::get(),
+ authorities: NextAuthorities::get(),
+ randomness: NextRandomness::get(),
+ }
+ }
+
+ fn epoch_start(epoch_index: u64) -> SlotNumber {
+ // (epoch_index * epoch_duration) + genesis_slot
+
+ const PROOF: &str = "slot number is u64; it should relate in some way to wall clock time; \
+ if u64 is not enough we should crash for safety; qed.";
+
+ let epoch_start = epoch_index
+ .checked_mul(T::EpochDuration::get())
+ .expect(PROOF);
+
+ epoch_start.checked_add(GenesisSlot::get()).expect(PROOF)
+ }
+
fn deposit_consensus(new: U) {
let log: DigestItem = DigestItem::Consensus(BABE_ENGINE_ID, new.encode());
>::deposit_log(log.into())
diff --git a/frame/babe/src/mock.rs b/frame/babe/src/mock.rs
index d29e467b79194..58e2af873fd91 100644
--- a/frame/babe/src/mock.rs
+++ b/frame/babe/src/mock.rs
@@ -295,7 +295,7 @@ pub fn start_era(era_index: EraIndex) {
assert_eq!(Staking::current_era(), Some(era_index));
}
-pub fn make_pre_digest(
+pub fn make_primary_pre_digest(
authority_index: sp_consensus_babe::AuthorityIndex,
slot_number: sp_consensus_babe::SlotNumber,
vrf_output: VRFOutput,
diff --git a/frame/babe/src/tests.rs b/frame/babe/src/tests.rs
index 0d0536359f61c..4bef98873444f 100644
--- a/frame/babe/src/tests.rs
+++ b/frame/babe/src/tests.rs
@@ -66,7 +66,7 @@ fn first_block_epoch_zero_start() {
let (vrf_output, vrf_proof, vrf_randomness) = make_vrf_output(genesis_slot, &pairs[0]);
let first_vrf = vrf_output;
- let pre_digest = make_pre_digest(
+ let pre_digest = make_primary_pre_digest(
0,
genesis_slot,
first_vrf.clone(),
@@ -122,7 +122,7 @@ fn author_vrf_output_for_primary() {
ext.execute_with(|| {
let genesis_slot = 10;
let (vrf_output, vrf_proof, vrf_randomness) = make_vrf_output(genesis_slot, &pairs[0]);
- let primary_pre_digest = make_pre_digest(0, genesis_slot, vrf_output, vrf_proof);
+ let primary_pre_digest = make_primary_pre_digest(0, genesis_slot, vrf_output, vrf_proof);
System::initialize(
&1,
@@ -252,6 +252,33 @@ fn can_enact_next_config() {
});
}
+#[test]
+fn can_fetch_current_and_next_epoch_data() {
+ new_test_ext(5).execute_with(|| {
+ // 1 era = 3 epochs
+ // 1 epoch = 3 slots
+ // Eras start from 0.
+ // Therefore at era 1 we should be starting epoch 3 with slot 10.
+ start_era(1);
+
+ let current_epoch = Babe::current_epoch();
+ assert_eq!(current_epoch.epoch_index, 3);
+ assert_eq!(current_epoch.start_slot, 10);
+ assert_eq!(current_epoch.authorities.len(), 5);
+
+ let next_epoch = Babe::next_epoch();
+ assert_eq!(next_epoch.epoch_index, 4);
+ assert_eq!(next_epoch.start_slot, 13);
+ assert_eq!(next_epoch.authorities.len(), 5);
+
+ // the on-chain randomness should always change across epochs
+ assert!(current_epoch.randomness != next_epoch.randomness);
+
+ // but in this case the authorities stay the same
+ assert!(current_epoch.authorities == next_epoch.authorities);
+ });
+}
+
#[test]
fn report_equivocation_current_session_works() {
let (pairs, mut ext) = new_test_ext_with_pairs(3);
diff --git a/frame/balances/src/benchmarking.rs b/frame/balances/src/benchmarking.rs
index 249934a61b4d7..53cf273d850de 100644
--- a/frame/balances/src/benchmarking.rs
+++ b/frame/balances/src/benchmarking.rs
@@ -33,8 +33,6 @@ const ED_MULTIPLIER: u32 = 10;
benchmarks! {
- _ { }
-
// Benchmark `transfer` extrinsic with the worst possible conditions:
// * Transfer will kill the sender account.
// * Transfer will create the recipient account.
diff --git a/frame/benchmarking/src/lib.rs b/frame/benchmarking/src/lib.rs
index 308e5285d3f61..6db8674a3d2de 100644
--- a/frame/benchmarking/src/lib.rs
+++ b/frame/benchmarking/src/lib.rs
@@ -88,24 +88,18 @@ pub use sp_storage::TrackedStorageKey;
/// benchmarks! {
/// where_clause { where T::A: From } // Optional line to give additional bound on `T`.
///
-/// // common parameter; just one for this example.
-/// // will be `1`, `MAX_LENGTH` or any value inbetween
-/// _ {
-/// let l in 1 .. MAX_LENGTH => initialize_l(l);
-/// }
-///
/// // first dispatchable: foo; this is a user dispatchable and operates on a `u8` vector of
-/// // size `l`, which we allow to be initialized as usual.
+/// // size `l`
/// foo {
/// let caller = account::(b"caller", 0, benchmarks_seed);
-/// let l = ...;
+/// let l in 1 .. MAX_LENGTH => initialize_l(l);
/// }: _(Origin::Signed(caller), vec![0u8; l])
///
/// // second dispatchable: bar; this is a root dispatchable and accepts a `u8` vector of size
-/// // `l`. We don't want it pre-initialized like before so we override using the `=> ()` notation.
+/// // `l`.
/// // In this case, we explicitly name the call using `bar` instead of `_`.
/// bar {
-/// let l = _ .. _ => ();
+/// let l in 1 .. MAX_LENGTH => initialize_l(l);
/// }: bar(Origin::Root, vec![0u8; l])
///
/// // third dispatchable: baz; this is a user dispatchable. It isn't dependent on length like the
@@ -176,18 +170,11 @@ pub use sp_storage::TrackedStorageKey;
#[macro_export]
macro_rules! benchmarks {
(
- $( where_clause { where $( $where_ty:ty: $where_bound:path ),* $(,)? } )?
- _ {
- $(
- let $common:ident in $common_from:tt .. $common_to:expr => $common_instancer:expr;
- )*
- }
$( $rest:tt )*
) => {
$crate::benchmarks_iter!(
{ }
- { $( $( $where_ty: $where_bound ),* )? }
- { $( { $common , $common_from , $common_to , $common_instancer } )* }
+ { }
( )
( )
$( $rest )*
@@ -199,18 +186,11 @@ macro_rules! benchmarks {
#[macro_export]
macro_rules! benchmarks_instance {
(
- $( where_clause { where $( $where_ty:ty: $where_bound:path ),* $(,)? } )?
- _ {
- $(
- let $common:ident in $common_from:tt .. $common_to:expr => $common_instancer:expr;
- )*
- }
$( $rest:tt )*
) => {
$crate::benchmarks_iter!(
{ I }
- { $( $( $where_ty: $where_bound ),* )? }
- { $( { $common , $common_from , $common_to , $common_instancer } )* }
+ { }
( )
( )
$( $rest )*
@@ -221,11 +201,27 @@ macro_rules! benchmarks_instance {
#[macro_export]
#[doc(hidden)]
macro_rules! benchmarks_iter {
+ // detect and extract where clause:
+ (
+ { $( $instance:ident )? }
+ { $( $where_clause:tt )* }
+ ( $( $names:tt )* )
+ ( $( $names_extra:tt )* )
+ where_clause { where $( $where_ty:ty: $where_bound:path ),* $(,)? }
+ $( $rest:tt )*
+ ) => {
+ $crate::benchmarks_iter! {
+ { $( $instance)? }
+ { $( $where_ty: $where_bound ),* }
+ ( $( $names )* )
+ ( $( $names_extra )* )
+ $( $rest )*
+ }
+ };
// detect and extract extra tag:
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
#[extra]
@@ -235,7 +231,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter! {
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* $name )
$name
@@ -246,7 +241,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* ) // This contains $( $( { $instance } )? $name:ident )*
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: _ ( $origin:expr $( , $arg:expr )* )
@@ -256,7 +250,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter! {
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* )
$name { $( $code )* }: $name ( $origin $( , $arg )* )
@@ -268,7 +261,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: $dispatch:ident ( $origin:expr $( , $arg:expr )* )
@@ -278,7 +270,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter! {
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* )
$name { $( $code )* }: {
@@ -296,7 +287,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: $eval:block
@@ -307,7 +297,6 @@ macro_rules! benchmarks_iter {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{ }
{ $eval }
{ $( $code )* }
@@ -324,7 +313,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter!(
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* { $( $instance )? } $name )
( $( $names_extra )* )
$( $rest )*
@@ -334,7 +322,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
) => {
@@ -354,7 +341,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: _ ( $origin:expr $( , $arg:expr )* )
@@ -363,7 +349,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter! {
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* )
$name { $( $code )* }: _ ( $origin $( , $arg )* )
@@ -375,7 +360,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: $dispatch:ident ( $origin:expr $( , $arg:expr )* )
@@ -384,7 +368,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter! {
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* )
$name { $( $code )* }: $dispatch ( $origin $( , $arg )* )
@@ -396,7 +379,6 @@ macro_rules! benchmarks_iter {
(
{ $( $instance:ident )? }
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
( $( $names:tt )* )
( $( $names_extra:tt )* )
$name:ident { $( $code:tt )* }: $eval:block
@@ -405,7 +387,6 @@ macro_rules! benchmarks_iter {
$crate::benchmarks_iter!(
{ $( $instance)? }
{ $( $where_clause )* }
- { $( $common )* }
( $( $names )* )
( $( $names_extra )* )
$name { $( $code )* }: $eval
@@ -423,7 +404,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
{ $( PRE { $( $pre_parsed:tt )* } )* }
{ $eval:block }
{
@@ -436,7 +416,6 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{
$( PRE { $( $pre_parsed )* } )*
PRE { $pre_id , $pre_ty , $pre_ex }
@@ -450,7 +429,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -463,7 +441,6 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{
$( $parsed )*
PARAM { $param , $param_from , $param_to , $param_instancer }
@@ -478,7 +455,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( { $common:ident , $common_from:tt , $common_to:expr , $common_instancer:expr } )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -491,16 +467,8 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( { $common , $common_from , $common_to , $common_instancer } )* }
{ $( $parsed )* }
{ $eval }
- {
- let $param
- in ({ $( let $common = $common_from; )* $param })
- .. ({ $( let $common = $common_to; )* $param })
- => ({ $( let $common = || -> Result<(), &'static str> { $common_instancer ; Ok(()) }; )* $param()? });
- $( $rest )*
- }
$postcode
}
};
@@ -509,7 +477,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( { $common:ident , $common_from:tt , $common_to:expr , $common_instancer:expr } )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -522,16 +489,8 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( { $common , $common_from , $common_to , $common_instancer } )* }
{ $( $parsed )* }
{ $eval }
- {
- let $param
- in ({ $( let $common = $common_from; )* $param })
- .. ({ $( let $common = $common_to; )* $param })
- => $param_instancer ;
- $( $rest )*
- }
$postcode
}
};
@@ -540,7 +499,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -553,7 +511,6 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{ $( $parsed )* }
{ $eval }
{
@@ -568,7 +525,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -581,7 +537,6 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{ $( $parsed )* }
{ $eval }
{
@@ -596,7 +551,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( $common:tt )* }
{ $( $parsed:tt )* }
{ $eval:block }
{
@@ -609,7 +563,6 @@ macro_rules! benchmark_backend {
{ $( $instance)? }
$name
{ $( $where_clause )* }
- { $( $common )* }
{ $( $parsed )* }
{ $eval }
{
@@ -624,7 +577,6 @@ macro_rules! benchmark_backend {
{ $( $instance:ident )? }
$name:ident
{ $( $where_clause:tt )* }
- { $( { $common:ident , $common_from:tt , $common_to:expr , $common_instancer:expr } )* }
{
$( PRE { $pre_id:tt , $pre_ty:ty , $pre_ex:expr } )*
$( PARAM { $param:ident , $param_from:expr , $param_to:expr , $param_instancer:expr } )*
@@ -653,9 +605,6 @@ macro_rules! benchmark_backend {
components: &[($crate::BenchmarkParameter, u32)],
verify: bool
) -> Result Result<(), &'static str>>, &'static str> {
- $(
- let $common = $common_from;
- )*
$(
// Prepare instance
let $param = components.iter()
diff --git a/frame/benchmarking/src/tests.rs b/frame/benchmarking/src/tests.rs
index 7ea6bfd9afa25..2cbf4b9d1950c 100644
--- a/frame/benchmarking/src/tests.rs
+++ b/frame/benchmarking/src/tests.rs
@@ -112,13 +112,8 @@ fn new_test_ext() -> sp_io::TestExternalities {
benchmarks!{
where_clause { where ::OtherEvent: Into<::Event> }
- _ {
- // Define a common range for `b`.
- let b in 1 .. 1000 => ();
- }
-
set_value {
- let b in ...;
+ let b in 1 .. 1000;
let caller = account::("caller", 0, 0);
}: _ (RawOrigin::Signed(caller), b.into())
verify {
@@ -126,7 +121,7 @@ benchmarks!{
}
other_name {
- let b in ...;
+ let b in 1 .. 1000;
}: dummy (RawOrigin::None, b.into())
sort_vector {
@@ -142,7 +137,7 @@ benchmarks!{
}
bad_origin {
- let b in ...;
+ let b in 1 .. 1000;
let caller = account::("caller", 0, 0);
}: dummy (RawOrigin::Signed(caller), b.into())
diff --git a/frame/bounties/src/benchmarking.rs b/frame/bounties/src/benchmarking.rs
index 0fe479bda7bda..f6fc11ad0bf06 100644
--- a/frame/bounties/src/benchmarking.rs
+++ b/frame/bounties/src/benchmarking.rs
@@ -94,8 +94,6 @@ fn assert_last_event(generic_event: ::Event) {
const MAX_BYTES: u32 = 16384;
benchmarks! {
- _ { }
-
propose_bounty {
let d in 0 .. MAX_BYTES;
diff --git a/frame/collective/src/benchmarking.rs b/frame/collective/src/benchmarking.rs
index 50fab1b3e474e..bff7dad59d891 100644
--- a/frame/collective/src/benchmarking.rs
+++ b/frame/collective/src/benchmarking.rs
@@ -42,8 +42,7 @@ fn assert_last_event, I: Instance>(generic_event: >:
}
benchmarks_instance! {
- _{ }
-
+
set_members {
let m in 1 .. T::MaxMembers::get();
let n in 1 .. T::MaxMembers::get();
diff --git a/frame/contracts/src/benchmarking/mod.rs b/frame/contracts/src/benchmarking/mod.rs
index d08f0ab5e65ec..9fa365116c7a0 100644
--- a/frame/contracts/src/benchmarking/mod.rs
+++ b/frame/contracts/src/benchmarking/mod.rs
@@ -282,9 +282,6 @@ benchmarks! {
T::AccountId: AsRef<[u8]>,
}
- _ {
- }
-
// The base weight without any actual work performed apart from the setup costs.
on_initialize {}: {
Storage::::process_deletion_queue_batch(Weight::max_value())
diff --git a/frame/democracy/src/benchmarking.rs b/frame/democracy/src/benchmarking.rs
index 7460249b6c393..c66ce20dab87c 100644
--- a/frame/democracy/src/benchmarking.rs
+++ b/frame/democracy/src/benchmarking.rs
@@ -97,8 +97,6 @@ fn account_vote(b: BalanceOf) -> AccountVote> {
}
benchmarks! {
- _ { }
-
propose {
let p = T::MaxProposals::get();
diff --git a/frame/elections-phragmen/src/benchmarking.rs b/frame/elections-phragmen/src/benchmarking.rs
index db3a8c96023a3..3ed4af2487df3 100644
--- a/frame/elections-phragmen/src/benchmarking.rs
+++ b/frame/elections-phragmen/src/benchmarking.rs
@@ -167,8 +167,6 @@ fn clean() {
}
benchmarks! {
- _ {}
-
// -- Signed ones
vote {
let v in 1 .. (MAXIMUM_VOTE as u32);
diff --git a/frame/example/src/lib.rs b/frame/example/src/lib.rs
index 382d67263d1b0..05526d2c7a29e 100644
--- a/frame/example/src/lib.rs
+++ b/frame/example/src/lib.rs
@@ -655,20 +655,15 @@ mod benchmarking {
use frame_system::RawOrigin;
benchmarks!{
- _ {
- // Define a common range for `b`.
- let b in 1 .. 1000 => ();
- }
-
// This will measure the execution time of `accumulate_dummy` for b in [1..1000] range.
accumulate_dummy {
- let b in ...;
+ let b in 1 .. 1000;
let caller = account("caller", 0, 0);
}: _ (RawOrigin::Signed(caller), b.into())
// This will measure the execution time of `set_dummy` for b in [1..1000] range.
set_dummy {
- let b in ...;
+ let b in 1 .. 1000;
}: set_dummy (RawOrigin::Root, b.into())
// This will measure the execution time of `set_dummy` for b in [1..10] range.
diff --git a/frame/grandpa/src/benchmarking.rs b/frame/grandpa/src/benchmarking.rs
index d91bd223a5706..5f08a5ea4bac0 100644
--- a/frame/grandpa/src/benchmarking.rs
+++ b/frame/grandpa/src/benchmarking.rs
@@ -25,8 +25,6 @@ use frame_system::RawOrigin;
use sp_core::H256;
benchmarks! {
- _ { }
-
check_equivocation_proof {
let x in 0 .. 1;
diff --git a/frame/identity/src/benchmarking.rs b/frame/identity/src/benchmarking.rs
index dccef494a0e88..e916bdfa50461 100644
--- a/frame/identity/src/benchmarking.rs
+++ b/frame/identity/src/benchmarking.rs
@@ -107,25 +107,6 @@ fn create_identity_info(num_fields: u32) -> IdentityInfo {
}
benchmarks! {
- // These are the common parameters along with their instancing.
- _ {
- let r in 1 .. T::MaxRegistrars::get() => add_registrars::(r)?;
- // extra parameter for the set_subs bench for previous sub accounts
- let p in 1 .. T::MaxSubAccounts::get() => ();
- let s in 1 .. T::MaxSubAccounts::get() => {
- // Give them s many sub accounts
- let caller: T::AccountId = whitelisted_caller();
- let _ = add_sub_accounts::(&caller, s)?;
- };
- let x in 1 .. T::MaxAdditionalFields::get() => {
- // Create their main identity with x additional fields
- let info = create_identity_info::(x);
- let caller: T::AccountId = whitelisted_caller();
- let caller_origin = ::Origin::from(RawOrigin::Signed(caller));
- Identity::::set_identity(caller_origin, info)?;
- };
- }
-
add_registrar {
let r in 1 .. T::MaxRegistrars::get() - 1 => add_registrars::(r)?;
ensure!(Registrars::::get().len() as u32 == r, "Registrars not set up correctly.");
@@ -135,10 +116,8 @@ benchmarks! {
}
set_identity {
- let r in ...;
- // This X doesn't affect the caller ID up front like with the others, so we don't use the
- // standard preparation.
- let x in _ .. _ => ();
+ let r in 1 .. T::MaxRegistrars::get() => add_registrars::(r)?;
+ let x in 1 .. T::MaxAdditionalFields::get();
let caller = {
// The target user
let caller: T::AccountId = whitelisted_caller();
@@ -204,9 +183,19 @@ benchmarks! {
let caller_lookup = ::unlookup(caller.clone());
let _ = T::Currency::make_free_balance_be(&caller, BalanceOf::::max_value());
- let r in ...;
- let s in ...;
- let x in ...;
+ let r in 1 .. T::MaxRegistrars::get() => add_registrars::