diff --git a/Cargo.lock b/Cargo.lock index 7a5ef8e09ae7b..cef5985957cf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9000,6 +9000,8 @@ dependencies = [ "sc-block-builder", "sc-client-api", "sc-consensus", + "sc-network-common", + "sc-network-sync", "sc-peerset", "sc-utils", "serde", @@ -9023,6 +9025,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "sc-network-common" +version = "0.10.0-dev" +dependencies = [ + "futures 0.3.21", + "libp2p", + "parity-scale-codec", + "prost-build", + "sc-peerset", + "smallvec 1.8.0", +] + [[package]] name = "sc-network-gossip" version = "0.10.0-dev" @@ -9042,6 +9056,39 @@ dependencies = [ "tracing", ] +[[package]] +name = "sc-network-sync" +version = "0.10.0-dev" +dependencies = [ + "bitflags", + "either", + "fork-tree", + "futures 0.3.21", + "libp2p", + "log 0.4.16", + "lru", + "parity-scale-codec", + "prost", + "prost-build", + "quickcheck", + "sc-block-builder", + "sc-client-api", + "sc-consensus", + "sc-network-common", + "sc-peerset", + "smallvec 1.8.0", + "sp-arithmetic", + "sp-blockchain", + "sp-consensus", + "sp-core", + "sp-finality-grandpa", + "sp-runtime", + "sp-test-primitives", + "sp-tracing", + "substrate-test-runtime-client", + "thiserror", +] + [[package]] name = "sc-network-test" version = "0.8.0" @@ -9058,6 +9105,7 @@ dependencies = [ "sc-client-api", "sc-consensus", "sc-network", + "sc-network-common", "sc-service", "sp-blockchain", "sp-consensus", @@ -9246,6 +9294,7 @@ dependencies = [ "sc-informant", "sc-keystore", "sc-network", + "sc-network-common", "sc-offchain", "sc-rpc", "sc-rpc-server", diff --git a/Cargo.toml b/Cargo.toml index f91e6226ccfd0..39ccceeb3a030 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,9 @@ members = [ "client/informant", "client/keystore", "client/network", + "client/network/common", "client/network-gossip", + "client/network/sync", "client/network/test", "client/offchain", "client/peerset", diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 93b389d4361c1..d44f95657ac2f 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -53,6 +53,8 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm sc-consensus = { version = "0.10.0-dev", path = "../consensus/common" } sp-core = { version = "6.0.0", path = "../../primitives/core" } sp-runtime = { version = "6.0.0", path = "../../primitives/runtime" } +sc-network-common = { version = "0.10.0-dev", path = "./common" } +sc-network-sync = { version = "0.10.0-dev", path = "./sync" } sc-utils = { version = "4.0.0-dev", path = "../utils" } sp-finality-grandpa = { version = "4.0.0-dev", path = "../../primitives/finality-grandpa" } thiserror = "1.0" @@ -66,7 +68,6 @@ libp2p = "0.44.0" [dev-dependencies] assert_matches = "1.3" -libp2p = { version = "0.44.0", default-features = false } quickcheck = "1.0.3" rand = "0.7.2" sp-test-primitives = { version = "2.0.0", path = "../../primitives/test-primitives" } diff --git a/client/network/build.rs b/client/network/build.rs index 6e5b83d4e58ae..f551f61dab3d4 100644 --- a/client/network/build.rs +++ b/client/network/build.rs @@ -1,5 +1,4 @@ -const PROTOS: &[&str] = - &["src/schema/api.v1.proto", "src/schema/light.v1.proto", "src/schema/bitswap.v1.2.0.proto"]; +const PROTOS: &[&str] = &["src/schema/light.v1.proto", "src/schema/bitswap.v1.2.0.proto"]; fn main() { prost_build::compile_protos(PROTOS, &["src/schema"]).unwrap(); diff --git a/client/network/common/Cargo.toml b/client/network/common/Cargo.toml new file mode 100644 index 0000000000000..5e3150ee9bc82 --- /dev/null +++ b/client/network/common/Cargo.toml @@ -0,0 +1,26 @@ +[package] +description = "Substrate network common" +name = "sc-network-common" +version = "0.10.0-dev" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +authors = ["Parity Technologies "] +edition = "2021" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +documentation = "https://docs.rs/sc-network-sync" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +prost-build = "0.9" + +[dependencies] +codec = { package = "parity-scale-codec", version = "3.0.0", features = [ + "derive", +] } +futures = "0.3.21" +libp2p = "0.44.0" +sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +smallvec = "1.8.0" diff --git a/client/network/common/src/config.rs b/client/network/common/src/config.rs new file mode 100644 index 0000000000000..92f8df5cd380f --- /dev/null +++ b/client/network/common/src/config.rs @@ -0,0 +1,44 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Configuration of the networking layer. + +use std::{fmt, str}; + +/// Name of a protocol, transmitted on the wire. Should be unique for each chain. Always UTF-8. +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); + +impl<'a> From<&'a str> for ProtocolId { + fn from(bytes: &'a str) -> ProtocolId { + Self(bytes.as_bytes().into()) + } +} + +impl AsRef for ProtocolId { + fn as_ref(&self) -> &str { + str::from_utf8(&self.0[..]) + .expect("the only way to build a ProtocolId is through a UTF-8 String; qed") + } +} + +impl fmt::Debug for ProtocolId { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + fmt::Debug::fmt(self.as_ref(), f) + } +} diff --git a/client/network/common/src/lib.rs b/client/network/common/src/lib.rs new file mode 100644 index 0000000000000..81769e23debbb --- /dev/null +++ b/client/network/common/src/lib.rs @@ -0,0 +1,23 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Common data structures of the networking layer. + +pub mod config; +pub mod message; +pub mod request_responses; diff --git a/client/network/common/src/message.rs b/client/network/common/src/message.rs new file mode 100644 index 0000000000000..930fe5ca52847 --- /dev/null +++ b/client/network/common/src/message.rs @@ -0,0 +1,23 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Network packet message types. These get serialized and put into the lower level protocol +//! payload. + +/// A unique ID of a request. +pub type RequestId = u64; diff --git a/client/network/common/src/request_responses.rs b/client/network/common/src/request_responses.rs new file mode 100644 index 0000000000000..71570e6beb864 --- /dev/null +++ b/client/network/common/src/request_responses.rs @@ -0,0 +1,114 @@ +// This file is part of Substrate. + +// Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Collection of generic data structures for request-response protocols. + +use futures::channel::{mpsc, oneshot}; +use libp2p::PeerId; +use sc_peerset::ReputationChange; +use std::{borrow::Cow, time::Duration}; + +/// Configuration for a single request-response protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + /// Name of the protocol on the wire. Should be something like `/foo/bar`. + pub name: Cow<'static, str>, + + /// Maximum allowed size, in bytes, of a request. + /// + /// Any request larger than this value will be declined as a way to avoid allocating too + /// much memory for it. + pub max_request_size: u64, + + /// Maximum allowed size, in bytes, of a response. + /// + /// Any response larger than this value will be declined as a way to avoid allocating too + /// much memory for it. + pub max_response_size: u64, + + /// Duration after which emitted requests are considered timed out. + /// + /// If you expect the response to come back quickly, you should set this to a smaller duration. + pub request_timeout: Duration, + + /// Channel on which the networking service will send incoming requests. + /// + /// Every time a peer sends a request to the local node using this protocol, the networking + /// service will push an element on this channel. The receiving side of this channel then has + /// to pull this element, process the request, and send back the response to send back to the + /// peer. + /// + /// The size of the channel has to be carefully chosen. If the channel is full, the networking + /// service will discard the incoming request send back an error to the peer. Consequently, + /// the channel being full is an indicator that the node is overloaded. + /// + /// You can typically set the size of the channel to `T / d`, where `T` is the + /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to + /// build a response. + /// + /// Can be `None` if the local node does not support answering incoming requests. + /// If this is `None`, then the local node will not advertise support for this protocol towards + /// other peers. If this is `Some` but the channel is closed, then the local node will + /// advertise support for this protocol, but any incoming request will lead to an error being + /// sent back. + pub inbound_queue: Option>, +} + +/// A single request received by a peer on a request-response protocol. +#[derive(Debug)] +pub struct IncomingRequest { + /// Who sent the request. + pub peer: PeerId, + + /// Request sent by the remote. Will always be smaller than + /// [`ProtocolConfig::max_request_size`]. + pub payload: Vec, + + /// Channel to send back the response. + /// + /// There are two ways to indicate that handling the request failed: + /// + /// 1. Drop `pending_response` and thus not changing the reputation of the peer. + /// + /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for + /// the given peer. + pub pending_response: oneshot::Sender, +} + +/// Response for an incoming request to be send by a request protocol handler. +#[derive(Debug)] +pub struct OutgoingResponse { + /// The payload of the response. + /// + /// `Err(())` if none is available e.g. due an error while handling the request. + pub result: Result, ()>, + + /// Reputation changes accrued while handling the request. To be applied to the reputation of + /// the peer sending the request. + pub reputation_changes: Vec, + + /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the + /// peer. + /// + /// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of + /// > outgoing data for each TCP socket, and it is not possible for a user + /// > application to inspect this buffer. This channel here is not actually notified + /// > when the response has been fully sent out, but rather when it has fully been + /// > written to the buffer managed by the operating system. + pub sent_feedback: Option>, +} diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index b0bf3d6a1c135..091dd116e4c9c 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -18,7 +18,6 @@ use crate::{ bitswap::Bitswap, - config::ProtocolId, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, peer_info, protocol::{message::Roles, CustomMessageOutcome, NotificationsSink, Protocol}, @@ -42,6 +41,7 @@ use log::debug; use prost::Message; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::import_queue::{IncomingBlock, Origin}; +use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig}; use sc_peerset::PeersetHandle; use sp_blockchain::{HeaderBackend, HeaderMetadata}; use sp_consensus::BlockOrigin; @@ -220,13 +220,13 @@ where user_agent: String, local_public_key: PublicKey, disco_config: DiscoveryConfig, - block_request_protocol_config: request_responses::ProtocolConfig, - state_request_protocol_config: request_responses::ProtocolConfig, - warp_sync_protocol_config: Option, + block_request_protocol_config: ProtocolConfig, + state_request_protocol_config: ProtocolConfig, + warp_sync_protocol_config: Option, bitswap: Option>, - light_client_request_protocol_config: request_responses::ProtocolConfig, + light_client_request_protocol_config: ProtocolConfig, // All remaining request protocol configs. - mut request_response_protocols: Vec, + mut request_response_protocols: Vec, peerset: PeersetHandle, ) -> Result { // Extract protocol name and add to `request_response_protocols`. diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 2b448ed14eab0..cfb06331b55a1 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -21,12 +21,14 @@ //! The [`Params`] struct is the struct that must be passed in order to initialize the networking. //! See the documentation of [`Params`]. -pub use crate::{ +pub use sc_network_common::{ + config::ProtocolId, request_responses::{ IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, }, - warp_request_handler::WarpSyncProvider, }; +pub use sc_network_sync::warp_request_handler::WarpSyncProvider; + pub use libp2p::{build_multiaddr, core::PublicKey, identity}; // Note: this re-export shouldn't be part of the public API of the crate and will be removed in @@ -111,10 +113,10 @@ where /// protocol name. In addition all of [`RequestResponseConfig`] is used to handle incoming /// block requests, if enabled. /// - /// Can be constructed either via [`crate::block_request_handler::generate_protocol_config`] - /// allowing outgoing but not incoming requests, or constructed via - /// [`crate::block_request_handler::BlockRequestHandler::new`] allowing both outgoing and - /// incoming requests. + /// Can be constructed either via + /// [`sc_network_sync::block_request_handler::generate_protocol_config`] allowing outgoing but + /// not incoming requests, or constructed via [`sc_network_sync::block_request_handler:: + /// BlockRequestHandler::new`] allowing both outgoing and incoming requests. pub block_request_protocol_config: RequestResponseConfig, /// Request response configuration for the light client request protocol. @@ -129,8 +131,8 @@ where /// Request response configuration for the state request protocol. /// /// Can be constructed either via - /// [`crate::block_request_handler::generate_protocol_config`] allowing outgoing but not - /// incoming requests, or constructed via + /// [`sc_network_sync::block_request_handler::generate_protocol_config`] allowing outgoing but + /// not incoming requests, or constructed via /// [`crate::state_request_handler::StateRequestHandler::new`] allowing /// both outgoing and incoming requests. pub state_request_protocol_config: RequestResponseConfig, @@ -232,29 +234,6 @@ impl TransactionPool for EmptyTransaction } } -/// Name of a protocol, transmitted on the wire. Should be unique for each chain. Always UTF-8. -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct ProtocolId(smallvec::SmallVec<[u8; 6]>); - -impl<'a> From<&'a str> for ProtocolId { - fn from(bytes: &'a str) -> ProtocolId { - Self(bytes.as_bytes().into()) - } -} - -impl AsRef for ProtocolId { - fn as_ref(&self) -> &str { - str::from_utf8(&self.0[..]) - .expect("the only way to build a ProtocolId is through a UTF-8 String; qed") - } -} - -impl fmt::Debug for ProtocolId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - fmt::Debug::fmt(self.as_ref(), f) - } -} - /// Parses a string address and splits it into Multiaddress and PeerId, if /// valid. /// diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index ae65d4f23cec7..2bae2fb807f7e 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -46,7 +46,7 @@ //! active mechanism that asks nodes for the addresses they are listening on. Whenever we learn //! of a node's address, you must call `add_self_reported_address`. -use crate::{config::ProtocolId, utils::LruHashSet}; +use crate::utils::LruHashSet; use futures::prelude::*; use futures_timer::Delay; use ip_network::IpNetwork; @@ -72,6 +72,7 @@ use libp2p::{ }, }; use log::{debug, error, info, trace, warn}; +use sc_network_common::config::ProtocolId; use sp_core::hexdisplay::HexDisplay; use std::{ cmp, @@ -1001,7 +1002,6 @@ impl MdnsWrapper { #[cfg(test)] mod tests { use super::{protocol_name_from_protocol_id, DiscoveryConfig, DiscoveryOut}; - use crate::config::ProtocolId; use futures::prelude::*; use libp2p::{ core::{ @@ -1013,6 +1013,7 @@ mod tests { swarm::{Swarm, SwarmEvent}, yamux, Multiaddr, PeerId, }; + use sc_network_common::config::ProtocolId; use std::{collections::HashSet, task::Poll}; #[test] diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index 973e0b15b7509..3957aab22cca9 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -255,22 +255,25 @@ mod transport; mod utils; pub mod bitswap; -pub mod block_request_handler; pub mod config; pub mod error; pub mod light_client_requests; pub mod network_state; -pub mod state_request_handler; pub mod transactions; -pub mod warp_request_handler; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use protocol::{ event::{DhtEvent, Event, ObservedRole}, - sync::{StateDownloadProgress, SyncState, WarpSyncPhase, WarpSyncProgress}, PeerInfo, }; +pub use sc_network_sync::{ + block_request_handler, + state::StateDownloadProgress, + state_request_handler, + warp::{WarpSyncPhase, WarpSyncProgress}, + warp_request_handler, SyncState, +}; pub use service::{ DecodingError, IfDisconnected, KademliaKey, Keypair, NetworkService, NetworkWorker, NotificationSender, NotificationSenderReady, OutboundFailure, PublicKey, RequestFailure, @@ -325,7 +328,7 @@ pub struct NetworkStatus { /// The total number of bytes sent. pub total_bytes_outbound: u64, /// State sync in progress. - pub state_sync: Option, + pub state_sync: Option, /// Warp sync in progress. - pub warp_sync: Option>, + pub warp_sync: Option>, } diff --git a/client/network/src/light_client_requests.rs b/client/network/src/light_client_requests.rs index d36158b2a373a..9eccef41e833d 100644 --- a/client/network/src/light_client_requests.rs +++ b/client/network/src/light_client_requests.rs @@ -21,7 +21,7 @@ /// For incoming light client requests. pub mod handler; -use crate::{config::ProtocolId, request_responses::ProtocolConfig}; +use sc_network_common::{config::ProtocolId, request_responses::ProtocolConfig}; use std::time::Duration; diff --git a/client/network/src/light_client_requests/handler.rs b/client/network/src/light_client_requests/handler.rs index cb9bd960767ff..bf65cba5f82e5 100644 --- a/client/network/src/light_client_requests/handler.rs +++ b/client/network/src/light_client_requests/handler.rs @@ -22,16 +22,16 @@ //! `crate::request_responses::RequestResponsesBehaviour` with //! [`LightClientRequestHandler`](handler::LightClientRequestHandler). -use crate::{ - config::ProtocolId, - request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, - schema, PeerId, -}; +use crate::{schema, PeerId}; use codec::{self, Decode, Encode}; use futures::{channel::mpsc, prelude::*}; use log::{debug, trace}; use prost::Message; use sc_client_api::{ProofProvider, StorageProof}; +use sc_network_common::{ + config::ProtocolId, + request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, +}; use sc_peerset::ReputationChange; use sp_core::{ hexdisplay::HexDisplay, @@ -55,7 +55,7 @@ where B: Block, Client: ProofProvider + Send + Sync + 'static, { - /// Create a new [`crate::block_request_handler::BlockRequestHandler`]. + /// Create a new [`sc_network_sync::block_request_handler::BlockRequestHandler`]. pub fn new(protocol_id: &ProtocolId, client: Arc) -> (Self, ProtocolConfig) { // For now due to lack of data on light client request handling in production systems, this // value is chosen to match the block request limit. diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 7214e60172aaa..5db8f102d037b 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -17,12 +17,10 @@ // along with this program. If not, see . use crate::{ - config::{self, ProtocolId, WarpSyncProvider}, - error, + config, error, request_responses::RequestFailure, - schema::v1::StateResponse, utils::{interval, LruHashSet}, - warp_request_handler::EncodedProof, + warp_request_handler::{EncodedProof, WarpSyncProvider}, }; use bytes::Bytes; @@ -43,13 +41,23 @@ use libp2p::{ use log::{debug, error, info, log, trace, warn, Level}; use message::{ generic::{Message as GenericMessage, Roles}, - BlockAnnounce, Message, + Message, }; use notifications::{Notifications, NotificationsOut}; use prometheus_endpoint::{register, Gauge, GaugeVec, Opts, PrometheusError, Registry, U64}; use prost::Message as _; use sc_client_api::{BlockBackend, HeaderBackend, ProofProvider}; use sc_consensus::import_queue::{BlockImportError, BlockImportStatus, IncomingBlock, Origin}; +use sc_network_common::config::ProtocolId; +use sc_network_sync::{ + message::{ + BlockAnnounce, BlockAttributes, BlockData, BlockRequest, BlockResponse, BlockState, + FromBlock, + }, + schema::v1::StateResponse, + BadPeer, ChainSync, OnBlockData, OnBlockJustification, OnStateData, + PollBlockAnnounceValidation, Status as SyncStatus, +}; use sp_arithmetic::traits::SaturatedConversion; use sp_consensus::{block_validation::BlockAnnounceValidator, BlockOrigin}; use sp_runtime::{ @@ -67,13 +75,11 @@ use std::{ task::Poll, time, }; -use sync::{ChainSync, Status as SyncStatus}; mod notifications; pub mod event; pub mod message; -pub mod sync; pub use notifications::{NotificationsSink, NotifsHandlerError, Ready}; use sp_blockchain::HeaderMetadata; @@ -202,7 +208,7 @@ pub struct Protocol { #[derive(Debug)] enum PeerRequest { - Block(message::BlockRequest), + Block(BlockRequest), State, WarpProof, } @@ -240,15 +246,15 @@ pub struct ProtocolConfig { } impl ProtocolConfig { - fn sync_mode(&self) -> sync::SyncMode { + fn sync_mode(&self) -> sc_network_sync::SyncMode { if self.roles.is_light() { - sync::SyncMode::Light + sc_network_sync::SyncMode::Light } else { match self.sync_mode { - config::SyncMode::Full => sync::SyncMode::Full, + config::SyncMode::Full => sc_network_sync::SyncMode::Full, config::SyncMode::Fast { skip_proofs, storage_chain_mode } => - sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, - config::SyncMode::Warp => sync::SyncMode::Warp, + sc_network_sync::SyncMode::LightState { skip_proofs, storage_chain_mode }, + config::SyncMode::Warp => sc_network_sync::SyncMode::Warp, } } } @@ -563,7 +569,7 @@ where fn prepare_block_request( &mut self, who: PeerId, - request: message::BlockRequest, + request: BlockRequest, ) -> CustomMessageOutcome { prepare_block_request::(&mut self.peers, who, request) } @@ -579,9 +585,7 @@ where } if let Some(_peer_data) = self.peers.remove(&peer) { - if let Some(sync::OnBlockData::Import(origin, blocks)) = - self.sync.peer_disconnected(&peer) - { + if let Some(OnBlockData::Import(origin, blocks)) = self.sync.peer_disconnected(&peer) { self.pending_messages .push_back(CustomMessageOutcome::BlockImport(origin, blocks)); } @@ -601,21 +605,21 @@ where pub fn on_block_response( &mut self, peer_id: PeerId, - request: message::BlockRequest, - response: crate::schema::v1::BlockResponse, + request: BlockRequest, + response: sc_network_sync::schema::v1::BlockResponse, ) -> CustomMessageOutcome { let blocks = response .blocks .into_iter() .map(|block_data| { - Ok(message::BlockData:: { + Ok(BlockData:: { hash: Decode::decode(&mut block_data.hash.as_ref())?, header: if !block_data.header.is_empty() { Some(Decode::decode(&mut block_data.header.as_ref())?) } else { None }, - body: if request.fields.contains(message::BlockAttributes::BODY) { + body: if request.fields.contains(BlockAttributes::BODY) { Some( block_data .body @@ -626,8 +630,7 @@ where } else { None }, - indexed_body: if request.fields.contains(message::BlockAttributes::INDEXED_BODY) - { + indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) { Some(block_data.indexed_body) } else { None @@ -667,7 +670,7 @@ where }, }; - let block_response = message::BlockResponse:: { id: request.id, blocks }; + let block_response = BlockResponse:: { id: request.id, blocks }; let blocks_range = || match ( block_response @@ -687,12 +690,12 @@ where blocks_range(), ); - if request.fields == message::BlockAttributes::JUSTIFICATION { + if request.fields == BlockAttributes::JUSTIFICATION { match self.sync.on_block_justification(peer_id, block_response) { - Ok(sync::OnBlockJustification::Nothing) => CustomMessageOutcome::None, - Ok(sync::OnBlockJustification::Import { peer, hash, number, justifications }) => + Ok(OnBlockJustification::Nothing) => CustomMessageOutcome::None, + Ok(OnBlockJustification::Import { peer, hash, number, justifications }) => CustomMessageOutcome::JustificationImport(peer, hash, number, justifications), - Err(sync::BadPeer(id, repu)) => { + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); CustomMessageOutcome::None @@ -700,10 +703,10 @@ where } } else { match self.sync.on_block_data(&peer_id, Some(request), block_response) { - Ok(sync::OnBlockData::Import(origin, blocks)) => + Ok(OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(sync::OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), - Err(sync::BadPeer(id, repu)) => { + Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); CustomMessageOutcome::None @@ -720,10 +723,10 @@ where response: StateResponse, ) -> CustomMessageOutcome { match self.sync.on_state_data(&peer_id, response) { - Ok(sync::OnStateData::Import(origin, block)) => + Ok(OnStateData::Import(origin, block)) => CustomMessageOutcome::BlockImport(origin, vec![block]), - Ok(sync::OnStateData::Continue) => CustomMessageOutcome::None, - Err(sync::BadPeer(id, repu)) => { + Ok(OnStateData::Continue) => CustomMessageOutcome::None, + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); CustomMessageOutcome::None @@ -740,7 +743,7 @@ where ) -> CustomMessageOutcome { match self.sync.on_warp_sync_data(&peer_id, response) { Ok(()) => CustomMessageOutcome::None, - Err(sync::BadPeer(id, repu)) => { + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); CustomMessageOutcome::None @@ -849,7 +852,7 @@ where let req = if peer.info.roles.is_full() { match self.sync.new_peer(who, peer.info.best_hash, peer.info.best_number) { Ok(req) => req, - Err(sync::BadPeer(id, repu)) => { + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); return Err(()) @@ -906,13 +909,9 @@ where let inserted = peer.known_blocks.insert(hash); if inserted { trace!(target: "sync", "Announcing block {:?} to {}", hash, who); - let message = message::BlockAnnounce { + let message = BlockAnnounce { header: header.clone(), - state: if is_best { - Some(message::BlockState::Best) - } else { - Some(message::BlockState::Normal) - }, + state: if is_best { Some(BlockState::Best) } else { Some(BlockState::Normal) }, data: Some(data.clone()), }; @@ -949,9 +948,9 @@ where peer.known_blocks.insert(hash); - let is_best = match announce.state.unwrap_or(message::BlockState::Best) { - message::BlockState::Best => true, - message::BlockState::Normal => false, + let is_best = match announce.state.unwrap_or(BlockState::Best) { + BlockState::Best => true, + BlockState::Normal => false, }; if peer.info.roles.is_full() { @@ -962,11 +961,11 @@ where /// Process the result of the block announce validation. fn process_block_announce_validation_result( &mut self, - validation_result: sync::PollBlockAnnounceValidation, + validation_result: PollBlockAnnounceValidation, ) -> CustomMessageOutcome { let (header, is_best, who) = match validation_result { - sync::PollBlockAnnounceValidation::Skip => return CustomMessageOutcome::None, - sync::PollBlockAnnounceValidation::Nothing { is_best, who, announce } => { + PollBlockAnnounceValidation::Skip => return CustomMessageOutcome::None, + PollBlockAnnounceValidation::Nothing { is_best, who, announce } => { self.update_peer_info(&who); if let Some(data) = announce.data { @@ -987,7 +986,7 @@ where return CustomMessageOutcome::None } }, - sync::PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => { + PollBlockAnnounceValidation::ImportHeader { announce, is_best, who } => { self.update_peer_info(&who); if let Some(data) = announce.data { @@ -998,7 +997,7 @@ where (announce.header, is_best, who) }, - sync::PollBlockAnnounceValidation::Failure { who, disconnect } => { + PollBlockAnnounceValidation::Failure { who, disconnect } => { if disconnect { self.behaviour.disconnect_peer(&who, HARDCODED_PEERSETS_SYNC); } @@ -1015,9 +1014,9 @@ where let blocks_to_import = self.sync.on_block_data( &who, None, - message::generic::BlockResponse { + BlockResponse:: { id: 0, - blocks: vec![message::generic::BlockData { + blocks: vec![BlockData:: { hash: header.hash(), header: Some(header), body: None, @@ -1035,10 +1034,10 @@ where } match blocks_to_import { - Ok(sync::OnBlockData::Import(origin, blocks)) => + Ok(OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(sync::OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), - Err(sync::BadPeer(id, repu)) => { + Ok(OnBlockData::Request(peer, req)) => self.prepare_block_request(peer, req), + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu); CustomMessageOutcome::None @@ -1096,7 +1095,7 @@ where req, )); }, - Err(sync::BadPeer(id, repu)) => { + Err(BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id, HARDCODED_PEERSETS_SYNC); self.peerset_handle.report_peer(id, repu) }, @@ -1263,7 +1262,7 @@ where fn prepare_block_request( peers: &mut HashMap>, who: PeerId, - request: message::BlockRequest, + request: BlockRequest, ) -> CustomMessageOutcome { let (tx, rx) = oneshot::channel(); @@ -1271,13 +1270,13 @@ fn prepare_block_request( peer.request = Some((PeerRequest::Block(request.clone()), rx)); } - let request = crate::schema::v1::BlockRequest { + let request = sc_network_sync::schema::v1::BlockRequest { fields: request.fields.to_be_u32(), from_block: match request.from { - message::FromBlock::Hash(h) => - Some(crate::schema::v1::block_request::FromBlock::Hash(h.encode())), - message::FromBlock::Number(n) => - Some(crate::schema::v1::block_request::FromBlock::Number(n.encode())), + FromBlock::Hash(h) => + Some(sc_network_sync::schema::v1::block_request::FromBlock::Hash(h.encode())), + FromBlock::Number(n) => + Some(sc_network_sync::schema::v1::block_request::FromBlock::Number(n.encode())), }, to_block: request.to.map(|h| h.encode()).unwrap_or_default(), direction: request.direction as i32, @@ -1291,7 +1290,7 @@ fn prepare_block_request( fn prepare_state_request( peers: &mut HashMap>, who: PeerId, - request: crate::schema::v1::StateRequest, + request: sc_network_sync::schema::v1::StateRequest, ) -> CustomMessageOutcome { let (tx, rx) = oneshot::channel(); @@ -1348,13 +1347,13 @@ pub enum CustomMessageOutcome { /// A new block request must be emitted. BlockRequest { target: PeerId, - request: crate::schema::v1::BlockRequest, + request: sc_network_sync::schema::v1::BlockRequest, pending_response: oneshot::Sender, RequestFailure>>, }, /// A new storage request must be emitted. StateRequest { target: PeerId, - request: crate::schema::v1::StateRequest, + request: sc_network_sync::schema::v1::StateRequest, pending_response: oneshot::Sender, RequestFailure>>, }, /// A new warp sync request must be emitted. @@ -1458,7 +1457,9 @@ where match req { PeerRequest::Block(req) => { let protobuf_response = - match crate::schema::v1::BlockResponse::decode(&resp[..]) { + match sc_network_sync::schema::v1::BlockResponse::decode( + &resp[..], + ) { Ok(proto) => proto, Err(e) => { debug!( @@ -1478,7 +1479,9 @@ where }, PeerRequest::State => { let protobuf_response = - match crate::schema::v1::StateResponse::decode(&resp[..]) { + match sc_network_sync::schema::v1::StateResponse::decode( + &resp[..], + ) { Ok(proto) => proto, Err(e) => { debug!( @@ -1764,7 +1767,7 @@ where }, NotificationsOut::Notification { peer_id, set_id, message } => match set_id { HARDCODED_PEERSETS_SYNC if self.peers.contains_key(&peer_id) => { - if let Ok(announce) = message::BlockAnnounce::decode(&mut message.as_ref()) { + if let Ok(announce) = BlockAnnounce::decode(&mut message.as_ref()) { self.push_block_announce_validation(peer_id, announce); // Make sure that the newly added block announce validation future was diff --git a/client/network/src/protocol/message.rs b/client/network/src/protocol/message.rs index f173fff8503a5..a57740ec2746b 100644 --- a/client/network/src/protocol/message.rs +++ b/client/network/src/protocol/message.rs @@ -20,19 +20,13 @@ //! payload. pub use self::generic::{ - BlockAnnounce, FromBlock, RemoteCallRequest, RemoteChangesRequest, RemoteChangesResponse, - RemoteHeaderRequest, RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, Roles, + RemoteCallRequest, RemoteChangesRequest, RemoteChangesResponse, RemoteHeaderRequest, + RemoteHeaderResponse, RemoteReadChildRequest, RemoteReadRequest, Roles, }; -use bitflags::bitflags; -use codec::{Decode, Encode, Error, Input, Output}; +use codec::{Decode, Encode}; use sc_client_api::StorageProof; -use sp_runtime::{ - traits::{Block as BlockT, Header as HeaderT}, - ConsensusEngineId, -}; - -/// A unique ID of a request. -pub type RequestId = u64; +use sc_network_common::message::RequestId; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; /// Type alias for using the message type using block type parameters. pub type Message = generic::Message< @@ -42,86 +36,9 @@ pub type Message = generic::Message< ::Extrinsic, >; -/// Type alias for using the block request type using block type parameters. -pub type BlockRequest = - generic::BlockRequest<::Hash, <::Header as HeaderT>::Number>; - -/// Type alias for using the BlockData type using block type parameters. -pub type BlockData = - generic::BlockData<::Header, ::Hash, ::Extrinsic>; - -/// Type alias for using the BlockResponse type using block type parameters. -pub type BlockResponse = - generic::BlockResponse<::Header, ::Hash, ::Extrinsic>; - /// A set of transactions. pub type Transactions = Vec; -// Bits of block data and associated artifacts to request. -bitflags! { - /// Node roles bitmask. - pub struct BlockAttributes: u8 { - /// Include block header. - const HEADER = 0b00000001; - /// Include block body. - const BODY = 0b00000010; - /// Include block receipt. - const RECEIPT = 0b00000100; - /// Include block message queue. - const MESSAGE_QUEUE = 0b00001000; - /// Include a justification for the block. - const JUSTIFICATION = 0b00010000; - /// Include indexed transactions for a block. - const INDEXED_BODY = 0b00100000; - } -} - -impl BlockAttributes { - /// Encodes attributes as big endian u32, compatible with SCALE-encoding (i.e the - /// significant byte has zero index). - pub fn to_be_u32(&self) -> u32 { - u32::from_be_bytes([self.bits(), 0, 0, 0]) - } - - /// Decodes attributes, encoded with the `encode_to_be_u32()` call. - pub fn from_be_u32(encoded: u32) -> Result { - Self::from_bits(encoded.to_be_bytes()[0]) - .ok_or_else(|| Error::from("Invalid BlockAttributes")) - } -} - -impl Encode for BlockAttributes { - fn encode_to(&self, dest: &mut T) { - dest.push_byte(self.bits()) - } -} - -impl codec::EncodeLike for BlockAttributes {} - -impl Decode for BlockAttributes { - fn decode(input: &mut I) -> Result { - Self::from_bits(input.read_byte()?).ok_or_else(|| Error::from("Invalid bytes")) - } -} - -#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)] -/// Block enumeration direction. -pub enum Direction { - /// Enumerate in ascending order (from child to parent). - Ascending = 0, - /// Enumerate in descending order (from parent to canonical child). - Descending = 1, -} - -/// Block state in the chain. -#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)] -pub enum BlockState { - /// Block is not part of the best chain. - Normal, - /// Latest best block. - Best, -} - /// Remote call response. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub struct RemoteCallResponse { @@ -140,35 +57,18 @@ pub struct RemoteReadResponse { pub proof: StorageProof, } -/// Announcement summary used for debug logging. -#[derive(Debug)] -pub struct AnnouncementSummary { - pub block_hash: H::Hash, - pub number: H::Number, - pub parent_hash: H::Hash, - pub state: Option, -} - -impl generic::BlockAnnounce { - pub fn summary(&self) -> AnnouncementSummary { - AnnouncementSummary { - block_hash: self.header.hash(), - number: *self.header.number(), - parent_hash: *self.header.parent_hash(), - state: self.state, - } - } -} - /// Generic types. pub mod generic { - use super::{ - BlockAttributes, BlockState, ConsensusEngineId, Direction, RemoteCallResponse, - RemoteReadResponse, RequestId, StorageProof, Transactions, - }; + use super::{RemoteCallResponse, RemoteReadResponse, Transactions}; use bitflags::bitflags; use codec::{Decode, Encode, Input, Output}; - use sp_runtime::{EncodedJustification, Justifications}; + use sc_client_api::StorageProof; + use sc_network_common::message::RequestId; + use sc_network_sync::message::{ + generic::{BlockRequest, BlockResponse}, + BlockAnnounce, + }; + use sp_runtime::ConsensusEngineId; bitflags! { /// Bitmask of the roles that a node fulfills. @@ -212,7 +112,7 @@ pub mod generic { } impl codec::Encode for Roles { - fn encode_to(&self, dest: &mut T) { + fn encode_to(&self, dest: &mut T) { dest.push_byte(self.bits()) } } @@ -220,7 +120,7 @@ pub mod generic { impl codec::EncodeLike for Roles {} impl codec::Decode for Roles { - fn decode(input: &mut I) -> Result { + fn decode(input: &mut I) -> Result { Self::from_bits(input.read_byte()?).ok_or_else(|| codec::Error::from("Invalid bytes")) } } @@ -234,36 +134,6 @@ pub mod generic { pub data: Vec, } - /// Block data sent in the response. - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] - pub struct BlockData { - /// Block header hash. - pub hash: Hash, - /// Block header if requested. - pub header: Option
, - /// Block body if requested. - pub body: Option>, - /// Block body indexed transactions if requested. - pub indexed_body: Option>>, - /// Block receipt if requested. - pub receipt: Option>, - /// Block message queue if requested. - pub message_queue: Option>, - /// Justification if requested. - pub justification: Option, - /// Justifications if requested. - pub justifications: Option, - } - - /// Identifies starting point of a block sequence. - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] - pub enum FromBlock { - /// Start with given hash. - Hash(Hash), - /// Start with given block number. - Number(Number), - } - /// A network message. #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] pub enum Message { @@ -380,68 +250,6 @@ pub mod generic { } } - /// Request block data from a peer. - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] - pub struct BlockRequest { - /// Unique request id. - pub id: RequestId, - /// Bits of block data to request. - pub fields: BlockAttributes, - /// Start from this block. - pub from: FromBlock, - /// End at this block. An implementation defined maximum is used when unspecified. - pub to: Option, - /// Sequence direction. - pub direction: Direction, - /// Maximum number of blocks to return. An implementation defined maximum is used when - /// unspecified. - pub max: Option, - } - - /// Response to `BlockRequest` - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] - pub struct BlockResponse { - /// Id of a request this response was made for. - pub id: RequestId, - /// Block data for the requested sequence. - pub blocks: Vec>, - } - - /// Announce a new complete relay chain block on the network. - #[derive(Debug, PartialEq, Eq, Clone)] - pub struct BlockAnnounce { - /// New block header. - pub header: H, - /// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common. - pub state: Option, - /// Data associated with this block announcement, e.g. a candidate message. - pub data: Option>, - } - - // Custom Encode/Decode impl to maintain backwards compatibility with v3. - // This assumes that the packet contains nothing but the announcement message. - // TODO: Get rid of it once protocol v4 is common. - impl Encode for BlockAnnounce { - fn encode_to(&self, dest: &mut T) { - self.header.encode_to(dest); - if let Some(state) = &self.state { - state.encode_to(dest); - } - if let Some(data) = &self.data { - data.encode_to(dest) - } - } - } - - impl Decode for BlockAnnounce { - fn decode(input: &mut I) -> Result { - let header = H::decode(input)?; - let state = BlockState::decode(input).ok(); - let data = Vec::decode(input).ok(); - Ok(Self { header, state, data }) - } - } - #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] /// Remote call request. pub struct RemoteCallRequest { diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 04d6ccb54339f..6c7b0f3fcdfc8 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -53,6 +53,7 @@ use libp2p::{ NetworkBehaviourAction, PollParameters, }, }; +use sc_network_common::request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}; use std::{ borrow::Cow, collections::{hash_map::Entry, HashMap}, @@ -65,96 +66,6 @@ use std::{ pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; use sc_peerset::{PeersetHandle, BANNED_THRESHOLD}; -/// Configuration for a single request-response protocol. -#[derive(Debug, Clone)] -pub struct ProtocolConfig { - /// Name of the protocol on the wire. Should be something like `/foo/bar`. - pub name: Cow<'static, str>, - - /// Maximum allowed size, in bytes, of a request. - /// - /// Any request larger than this value will be declined as a way to avoid allocating too - /// much memory for it. - pub max_request_size: u64, - - /// Maximum allowed size, in bytes, of a response. - /// - /// Any response larger than this value will be declined as a way to avoid allocating too - /// much memory for it. - pub max_response_size: u64, - - /// Duration after which emitted requests are considered timed out. - /// - /// If you expect the response to come back quickly, you should set this to a smaller duration. - pub request_timeout: Duration, - - /// Channel on which the networking service will send incoming requests. - /// - /// Every time a peer sends a request to the local node using this protocol, the networking - /// service will push an element on this channel. The receiving side of this channel then has - /// to pull this element, process the request, and send back the response to send back to the - /// peer. - /// - /// The size of the channel has to be carefully chosen. If the channel is full, the networking - /// service will discard the incoming request send back an error to the peer. Consequently, - /// the channel being full is an indicator that the node is overloaded. - /// - /// You can typically set the size of the channel to `T / d`, where `T` is the - /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to - /// build a response. - /// - /// Can be `None` if the local node does not support answering incoming requests. - /// If this is `None`, then the local node will not advertise support for this protocol towards - /// other peers. If this is `Some` but the channel is closed, then the local node will - /// advertise support for this protocol, but any incoming request will lead to an error being - /// sent back. - pub inbound_queue: Option>, -} - -/// A single request received by a peer on a request-response protocol. -#[derive(Debug)] -pub struct IncomingRequest { - /// Who sent the request. - pub peer: PeerId, - - /// Request sent by the remote. Will always be smaller than - /// [`ProtocolConfig::max_request_size`]. - pub payload: Vec, - - /// Channel to send back the response. - /// - /// There are two ways to indicate that handling the request failed: - /// - /// 1. Drop `pending_response` and thus not changing the reputation of the peer. - /// - /// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for - /// the given peer. - pub pending_response: oneshot::Sender, -} - -/// Response for an incoming request to be send by a request protocol handler. -#[derive(Debug)] -pub struct OutgoingResponse { - /// The payload of the response. - /// - /// `Err(())` if none is available e.g. due an error while handling the request. - pub result: Result, ()>, - - /// Reputation changes accrued while handling the request. To be applied to the reputation of - /// the peer sending the request. - pub reputation_changes: Vec, - - /// If provided, the `oneshot::Sender` will be notified when the request has been sent to the - /// peer. - /// - /// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of - /// > outgoing data for each TCP socket, and it is not possible for a user - /// > application to inspect this buffer. This channel here is not actually notified - /// > when the response has been fully sent out, but rather when it has fully been - /// > written to the buffer managed by the operating system. - pub sent_feedback: Option>, -} - /// Event generated by the [`RequestResponsesBehaviour`]. #[derive(Debug)] pub enum Event { diff --git a/client/network/src/schema.rs b/client/network/src/schema.rs index 032db5f1733c5..80301a59c29ef 100644 --- a/client/network/src/schema.rs +++ b/client/network/src/schema.rs @@ -19,7 +19,6 @@ //! Include sources generated from protobuf definitions. pub mod v1 { - include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); pub mod light { include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs")); } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 91517e14915bc..d2600e3295bf0 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -37,11 +37,8 @@ use crate::{ NetworkState, NotConnectedPeer as NetworkStateNotConnectedPeer, Peer as NetworkStatePeer, }, protocol::{ - self, - event::Event, - message::generic::Roles, - sync::{Status as SyncStatus, SyncState}, - NotificationsSink, NotifsHandlerError, PeerInfo, Protocol, Ready, + self, event::Event, message::generic::Roles, NotificationsSink, NotifsHandlerError, + PeerInfo, Protocol, Ready, }, transactions, transport, DhtEvent, ExHashT, NetworkStateInfo, NetworkStatus, ReputationChange, }; @@ -63,6 +60,7 @@ use metrics::{Histogram, HistogramVec, MetricSources, Metrics}; use parking_lot::Mutex; use sc_client_api::{BlockBackend, ProofProvider}; use sc_consensus::{BlockImportError, BlockImportStatus, ImportQueue, Link}; +use sc_network_sync::{Status as SyncStatus, SyncState}; use sc_peerset::PeersetHandle; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_blockchain::{HeaderBackend, HeaderMetadata}; diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index 03d647eade173..36205f32d33c6 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -17,13 +17,14 @@ // along with this program. If not, see . use crate::{ - block_request_handler::BlockRequestHandler, config, - light_client_requests::handler::LightClientRequestHandler, + config, light_client_requests::handler::LightClientRequestHandler, state_request_handler::StateRequestHandler, Event, NetworkService, NetworkWorker, }; use futures::prelude::*; use libp2p::PeerId; +use sc_network_common::config::ProtocolId; +use sc_network_sync::block_request_handler::BlockRequestHandler; use sp_runtime::traits::{Block as BlockT, Header as _}; use std::{borrow::Cow, sync::Arc, time::Duration}; use substrate_test_runtime_client::{TestClientBuilder, TestClientBuilderExt as _}; @@ -87,7 +88,7 @@ fn build_test_full_node( None, )); - let protocol_id = config::ProtocolId::from("/test-protocol-name"); + let protocol_id = ProtocolId::from("/test-protocol-name"); let block_request_protocol_config = { let (handler, protocol_config) = BlockRequestHandler::new(&protocol_id, client.clone(), 50); diff --git a/client/network/src/transactions.rs b/client/network/src/transactions.rs index 64e208d718889..1f54f05d7446f 100644 --- a/client/network/src/transactions.rs +++ b/client/network/src/transactions.rs @@ -27,7 +27,7 @@ //! `Future` that processes transactions. use crate::{ - config::{self, ProtocolId, TransactionImport, TransactionImportFuture, TransactionPool}, + config::{self, TransactionImport, TransactionImportFuture, TransactionPool}, error, protocol::message, service::NetworkService, @@ -40,6 +40,7 @@ use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; use libp2p::{multiaddr, PeerId}; use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; +use sc_network_common::config::ProtocolId; use sp_runtime::traits::Block as BlockT; use std::{ borrow::Cow, diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml new file mode 100644 index 0000000000000..c171194d5b24d --- /dev/null +++ b/client/network/sync/Cargo.toml @@ -0,0 +1,49 @@ +[package] +description = "Substrate sync network protocol" +name = "sc-network-sync" +version = "0.10.0-dev" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +authors = ["Parity Technologies "] +edition = "2021" +homepage = "https://substrate.io" +repository = "https://github.com/paritytech/substrate/" +documentation = "https://docs.rs/sc-network-sync" +readme = "README.md" + +[package.metadata.docs.rs] +targets = ["x86_64-unknown-linux-gnu"] + +[build-dependencies] +prost-build = "0.9" + +[dependencies] +bitflags = "1.3.2" +codec = { package = "parity-scale-codec", version = "3.0.0", features = [ + "derive", +] } +either = "1.5.3" +fork-tree = { version = "3.0.0", path = "../../../utils/fork-tree" } +futures = "0.3.21" +libp2p = "0.44.0" +log = "0.4.16" +lru = "0.7.5" +prost = "0.9" +sc-client-api = { version = "4.0.0-dev", path = "../../api" } +sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } +sc-network-common = { version = "0.10.0-dev", path = "../common" } +sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +smallvec = "1.8.0" +sp-arithmetic = { version = "5.0.0", path = "../../../primitives/arithmetic" } +sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } +sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } +sp-core = { version = "6.0.0", path = "../../../primitives/core" } +sp-finality-grandpa = { version = "4.0.0-dev", path = "../../../primitives/finality-grandpa" } +sp-runtime = { version = "6.0.0", path = "../../../primitives/runtime" } +thiserror = "1.0" + +[dev-dependencies] +quickcheck = "1.0.3" +sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" } +sp-test-primitives = { version = "2.0.0", path = "../../../primitives/test-primitives" } +sp-tracing = { version = "5.0.0", path = "../../../primitives/tracing" } +substrate-test-runtime-client = { version = "2.0.0", path = "../../../test-utils/runtime/client" } diff --git a/client/network/sync/build.rs b/client/network/sync/build.rs new file mode 100644 index 0000000000000..55794919cdb42 --- /dev/null +++ b/client/network/sync/build.rs @@ -0,0 +1,5 @@ +const PROTOS: &[&str] = &["src/schema/api.v1.proto"]; + +fn main() { + prost_build::compile_protos(PROTOS, &["src/schema"]).unwrap(); +} diff --git a/client/network/src/block_request_handler.rs b/client/network/sync/src/block_request_handler.rs similarity index 99% rename from client/network/src/block_request_handler.rs rename to client/network/sync/src/block_request_handler.rs index 7458f421af397..b9ffd24cee4c7 100644 --- a/client/network/src/block_request_handler.rs +++ b/client/network/sync/src/block_request_handler.rs @@ -18,21 +18,23 @@ //! `crate::request_responses::RequestResponsesBehaviour`. use crate::{ - config::ProtocolId, - protocol::message::BlockAttributes, - request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, + message::BlockAttributes, schema::v1::{block_request::FromBlock, BlockResponse, Direction}, - PeerId, ReputationChange, }; use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, }; +use libp2p::PeerId; use log::debug; use lru::LruCache; use prost::Message; use sc_client_api::BlockBackend; +use sc_network_common::{ + config::ProtocolId, + request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, +}; use sp_blockchain::HeaderBackend; use sp_runtime::{ generic::BlockId, @@ -51,7 +53,7 @@ const MAX_BODY_BYTES: usize = 8 * 1024 * 1024; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; mod rep { - use super::ReputationChange as Rep; + use sc_peerset::ReputationChange as Rep; /// Reputation change when a peer sent us the same request multiple times. pub const SAME_REQUEST: Rep = Rep::new_fatal("Same block request multiple times"); diff --git a/client/network/src/protocol/sync/blocks.rs b/client/network/sync/src/blocks.rs similarity index 99% rename from client/network/src/protocol/sync/blocks.rs rename to client/network/sync/src/blocks.rs index 43b70d17d8add..b897184e7a44c 100644 --- a/client/network/src/protocol/sync/blocks.rs +++ b/client/network/sync/src/blocks.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::protocol::message; +use crate::message; use libp2p::PeerId; use log::trace; use sp_runtime::traits::{Block as BlockT, NumberFor, One}; @@ -217,7 +217,8 @@ impl BlockCollection { #[cfg(test)] mod test { use super::{BlockCollection, BlockData, BlockRangeState}; - use crate::{protocol::message, PeerId}; + use crate::message; + use libp2p::PeerId; use sp_core::H256; use sp_runtime::testing::{Block as RawBlock, ExtrinsicWrapper}; diff --git a/client/network/src/protocol/sync/extra_requests.rs b/client/network/sync/src/extra_requests.rs similarity index 98% rename from client/network/src/protocol/sync/extra_requests.rs rename to client/network/sync/src/extra_requests.rs index 43122631d3c22..c684d8e72783e 100644 --- a/client/network/src/protocol/sync/extra_requests.rs +++ b/client/network/sync/src/extra_requests.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::protocol::sync::{PeerSync, PeerSyncState}; +use crate::{PeerSync, PeerSyncState}; use fork_tree::ForkTree; use libp2p::PeerId; use log::{debug, trace, warn}; @@ -31,7 +31,7 @@ use std::{ const EXTRA_RETRY_WAIT: Duration = Duration::from_secs(10); /// Pending extra data request for the given block (hash and number). -pub(crate) type ExtraRequest = (::Hash, NumberFor); +type ExtraRequest = (::Hash, NumberFor); /// Manages pending block extra data (e.g. justification) requests. /// @@ -57,11 +57,11 @@ pub(crate) struct ExtraRequests { } #[derive(Debug)] -pub(crate) struct Metrics { - pub(crate) pending_requests: u32, - pub(crate) active_requests: u32, - pub(crate) importing_requests: u32, - pub(crate) failed_requests: u32, +pub struct Metrics { + pub pending_requests: u32, + pub active_requests: u32, + pub importing_requests: u32, + pub failed_requests: u32, _priv: (), } @@ -352,7 +352,7 @@ impl<'a, B: BlockT> Matcher<'a, B> { #[cfg(test)] mod tests { use super::*; - use crate::protocol::sync::PeerSync; + use crate::PeerSync; use quickcheck::{Arbitrary, Gen, QuickCheck}; use sp_blockchain::Error as ClientError; use sp_test_primitives::{Block, BlockNumber, Hash}; diff --git a/client/network/src/protocol/sync.rs b/client/network/sync/src/lib.rs similarity index 98% rename from client/network/src/protocol/sync.rs rename to client/network/sync/src/lib.rs index fb89e12a44fe8..bc0ed46c3f068 100644 --- a/client/network/src/protocol/sync.rs +++ b/client/network/sync/src/lib.rs @@ -28,11 +28,25 @@ //! the network, or whenever a block has been successfully verified, call the appropriate method in //! order to update it. +pub mod block_request_handler; +pub mod blocks; +pub mod message; +pub mod schema; +pub mod state; +pub mod state_request_handler; +pub mod warp; +pub mod warp_request_handler; + use crate::{ - protocol::message::{self, BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse}, + blocks::BlockCollection, + message::{BlockAnnounce, BlockAttributes, BlockRequest, BlockResponse}, schema::v1::{StateRequest, StateResponse}, + state::{StateDownloadProgress, StateSync}, + warp::{ + EncodedProof, WarpProofImportResult, WarpProofRequest, WarpSync, WarpSyncPhase, + WarpSyncProgress, WarpSyncProvider, + }, }; -use blocks::BlockCollection; use codec::Encode; use either::Either; use extra_requests::ExtraRequests; @@ -55,8 +69,6 @@ use sp_runtime::{ }, EncodedJustification, Justifications, }; -pub use state::StateDownloadProgress; -use state::StateSync; use std::{ collections::{hash_map::Entry, HashMap, HashSet}, fmt, @@ -64,13 +76,8 @@ use std::{ pin::Pin, sync::Arc, }; -use warp::{WarpProofRequest, WarpSync, WarpSyncProvider}; -pub use warp::{WarpSyncPhase, WarpSyncProgress}; -mod blocks; mod extra_requests; -mod state; -mod warp; /// Maximum blocks to request in a single packet. const MAX_BLOCKS_TO_REQUEST: usize = 64; @@ -332,18 +339,6 @@ pub enum SyncState { Downloading, } -impl fmt::Display for WarpSyncPhase { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - Self::AwaitingPeers => write!(f, "Waiting for peers"), - Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), - Self::DownloadingState => write!(f, "Downloading state"), - Self::ImportingState => write!(f, "Importing state"), - Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n), - } - } -} - /// Syncing status and statistics. #[derive(Clone)] pub struct Status { @@ -1357,7 +1352,7 @@ where pub fn on_warp_sync_data( &mut self, who: &PeerId, - response: warp::EncodedProof, + response: EncodedProof, ) -> Result<(), BadPeer> { if let Some(peer) = self.peers.get_mut(who) { if let PeerSyncState::DownloadingWarpProof = peer.state { @@ -1379,8 +1374,8 @@ where }; match import_result { - warp::WarpProofImportResult::Success => Ok(()), - warp::WarpProofImportResult::BadResponse => { + WarpProofImportResult::Success => Ok(()), + WarpProofImportResult::BadResponse => { debug!(target: "sync", "Bad proof data received from {}", who); Err(BadPeer(*who, rep::BAD_BLOCK)) }, @@ -2173,7 +2168,7 @@ where } /// Return some key metrics. - pub(crate) fn metrics(&self) -> Metrics { + pub fn metrics(&self) -> Metrics { Metrics { queued_blocks: self.queue_blocks.len().try_into().unwrap_or(std::u32::MAX), fork_targets: self.fork_targets.len().try_into().unwrap_or(std::u32::MAX), @@ -2220,10 +2215,10 @@ fn legacy_justification_mapping( } #[derive(Debug)] -pub(crate) struct Metrics { - pub(crate) queued_blocks: u32, - pub(crate) fork_targets: u32, - pub(crate) justifications: extra_requests::Metrics, +pub struct Metrics { + pub queued_blocks: u32, + pub fork_targets: u32, + pub justifications: extra_requests::Metrics, _priv: (), } @@ -2575,9 +2570,10 @@ fn validate_blocks( #[cfg(test)] mod test { use super::{ - message::{BlockData, BlockState, FromBlock}, + message::{BlockState, FromBlock}, *, }; + use crate::message::BlockData; use futures::{executor::block_on, future::poll_fn}; use sc_block_builder::BlockBuilderProvider; use sp_blockchain::HeaderBackend; diff --git a/client/network/sync/src/message.rs b/client/network/sync/src/message.rs new file mode 100644 index 0000000000000..996ee5231cf2e --- /dev/null +++ b/client/network/sync/src/message.rs @@ -0,0 +1,222 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Network packet message types. These get serialized and put into the lower level protocol +//! payload. + +use bitflags::bitflags; +use codec::{Decode, Encode, Error, Input, Output}; +pub use generic::{BlockAnnounce, FromBlock}; +use sp_runtime::traits::{Block as BlockT, Header as HeaderT}; + +/// Type alias for using the block request type using block type parameters. +pub type BlockRequest = + generic::BlockRequest<::Hash, <::Header as HeaderT>::Number>; + +/// Type alias for using the BlockData type using block type parameters. +pub type BlockData = + generic::BlockData<::Header, ::Hash, ::Extrinsic>; + +/// Type alias for using the BlockResponse type using block type parameters. +pub type BlockResponse = + generic::BlockResponse<::Header, ::Hash, ::Extrinsic>; + +// Bits of block data and associated artifacts to request. +bitflags! { + /// Node roles bitmask. + pub struct BlockAttributes: u8 { + /// Include block header. + const HEADER = 0b00000001; + /// Include block body. + const BODY = 0b00000010; + /// Include block receipt. + const RECEIPT = 0b00000100; + /// Include block message queue. + const MESSAGE_QUEUE = 0b00001000; + /// Include a justification for the block. + const JUSTIFICATION = 0b00010000; + /// Include indexed transactions for a block. + const INDEXED_BODY = 0b00100000; + } +} + +impl BlockAttributes { + /// Encodes attributes as big endian u32, compatible with SCALE-encoding (i.e the + /// significant byte has zero index). + pub fn to_be_u32(&self) -> u32 { + u32::from_be_bytes([self.bits(), 0, 0, 0]) + } + + /// Decodes attributes, encoded with the `encode_to_be_u32()` call. + pub fn from_be_u32(encoded: u32) -> Result { + Self::from_bits(encoded.to_be_bytes()[0]) + .ok_or_else(|| Error::from("Invalid BlockAttributes")) + } +} + +impl Encode for BlockAttributes { + fn encode_to(&self, dest: &mut T) { + dest.push_byte(self.bits()) + } +} + +impl codec::EncodeLike for BlockAttributes {} + +impl Decode for BlockAttributes { + fn decode(input: &mut I) -> Result { + Self::from_bits(input.read_byte()?).ok_or_else(|| Error::from("Invalid bytes")) + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)] +/// Block enumeration direction. +pub enum Direction { + /// Enumerate in ascending order (from child to parent). + Ascending = 0, + /// Enumerate in descending order (from parent to canonical child). + Descending = 1, +} + +/// Block state in the chain. +#[derive(Debug, PartialEq, Eq, Clone, Copy, Encode, Decode)] +pub enum BlockState { + /// Block is not part of the best chain. + Normal, + /// Latest best block. + Best, +} + +/// Announcement summary used for debug logging. +#[derive(Debug)] +pub struct AnnouncementSummary { + pub block_hash: H::Hash, + pub number: H::Number, + pub parent_hash: H::Hash, + pub state: Option, +} + +impl BlockAnnounce { + pub fn summary(&self) -> AnnouncementSummary { + AnnouncementSummary { + block_hash: self.header.hash(), + number: *self.header.number(), + parent_hash: *self.header.parent_hash(), + state: self.state, + } + } +} + +/// Generic types. +pub mod generic { + use super::{BlockAttributes, BlockState, Direction}; + use codec::{Decode, Encode, Input, Output}; + use sc_network_common::message::RequestId; + use sp_runtime::{EncodedJustification, Justifications}; + + /// Block data sent in the response. + #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + pub struct BlockData { + /// Block header hash. + pub hash: Hash, + /// Block header if requested. + pub header: Option
, + /// Block body if requested. + pub body: Option>, + /// Block body indexed transactions if requested. + pub indexed_body: Option>>, + /// Block receipt if requested. + pub receipt: Option>, + /// Block message queue if requested. + pub message_queue: Option>, + /// Justification if requested. + pub justification: Option, + /// Justifications if requested. + pub justifications: Option, + } + + /// Request block data from a peer. + #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + pub struct BlockRequest { + /// Unique request id. + pub id: RequestId, + /// Bits of block data to request. + pub fields: BlockAttributes, + /// Start from this block. + pub from: FromBlock, + /// End at this block. An implementation defined maximum is used when unspecified. + pub to: Option, + /// Sequence direction. + pub direction: Direction, + /// Maximum number of blocks to return. An implementation defined maximum is used when + /// unspecified. + pub max: Option, + } + + /// Identifies starting point of a block sequence. + #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + pub enum FromBlock { + /// Start with given hash. + Hash(Hash), + /// Start with given block number. + Number(Number), + } + + /// Response to `BlockRequest` + #[derive(Debug, PartialEq, Eq, Clone, Encode, Decode)] + pub struct BlockResponse { + /// Id of a request this response was made for. + pub id: RequestId, + /// Block data for the requested sequence. + pub blocks: Vec>, + } + + /// Announce a new complete relay chain block on the network. + #[derive(Debug, PartialEq, Eq, Clone)] + pub struct BlockAnnounce { + /// New block header. + pub header: H, + /// Block state. TODO: Remove `Option` and custom encoding when v4 becomes common. + pub state: Option, + /// Data associated with this block announcement, e.g. a candidate message. + pub data: Option>, + } + + // Custom Encode/Decode impl to maintain backwards compatibility with v3. + // This assumes that the packet contains nothing but the announcement message. + // TODO: Get rid of it once protocol v4 is common. + impl Encode for BlockAnnounce { + fn encode_to(&self, dest: &mut T) { + self.header.encode_to(dest); + if let Some(state) = &self.state { + state.encode_to(dest); + } + if let Some(data) = &self.data { + data.encode_to(dest) + } + } + } + + impl Decode for BlockAnnounce { + fn decode(input: &mut I) -> Result { + let header = H::decode(input)?; + let state = BlockState::decode(input).ok(); + let data = Vec::decode(input).ok(); + Ok(Self { header, state, data }) + } + } +} diff --git a/client/network/sync/src/schema.rs b/client/network/sync/src/schema.rs new file mode 100644 index 0000000000000..aa3eb84621d8f --- /dev/null +++ b/client/network/sync/src/schema.rs @@ -0,0 +1,23 @@ +// This file is part of Substrate. + +// Copyright (C) 2017-2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Include sources generated from protobuf definitions. + +pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); +} diff --git a/client/network/src/schema/api.v1.proto b/client/network/sync/src/schema/api.v1.proto similarity index 100% rename from client/network/src/schema/api.v1.proto rename to client/network/sync/src/schema/api.v1.proto diff --git a/client/network/src/protocol/sync/state.rs b/client/network/sync/src/state.rs similarity index 99% rename from client/network/src/protocol/sync/state.rs rename to client/network/sync/src/state.rs index 6208b2bcdd18a..4041c28af0eba 100644 --- a/client/network/src/protocol/sync/state.rs +++ b/client/network/sync/src/state.rs @@ -16,6 +16,8 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +//! State sync support. + use crate::schema::v1::{StateEntry, StateRequest, StateResponse}; use codec::{Decode, Encode}; use log::debug; @@ -26,8 +28,6 @@ use sp_core::storage::well_known_keys; use sp_runtime::traits::{Block as BlockT, Header, NumberFor}; use std::{collections::HashMap, sync::Arc}; -/// State sync support. - /// State sync state machine. Accumulates partial state data until it /// is ready to be imported. pub struct StateSync { diff --git a/client/network/src/state_request_handler.rs b/client/network/sync/src/state_request_handler.rs similarity index 97% rename from client/network/src/state_request_handler.rs rename to client/network/sync/src/state_request_handler.rs index 7ac1a17e8e759..8e0bae14046da 100644 --- a/client/network/src/state_request_handler.rs +++ b/client/network/sync/src/state_request_handler.rs @@ -17,21 +17,21 @@ //! Helper for handling (i.e. answering) state requests from a remote peer via the //! `crate::request_responses::RequestResponsesBehaviour`. -use crate::{ - config::ProtocolId, - request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, - schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse}, - PeerId, ReputationChange, -}; +use crate::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse}; use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, }; +use libp2p::PeerId; use log::{debug, trace}; use lru::LruCache; use prost::Message; use sc_client_api::ProofProvider; +use sc_network_common::{ + config::ProtocolId, + request_responses::{IncomingRequest, OutgoingResponse, ProtocolConfig}, +}; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use std::{ hash::{Hash, Hasher}, @@ -44,7 +44,7 @@ const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigg const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2; mod rep { - use super::ReputationChange as Rep; + use sc_peerset::ReputationChange as Rep; /// Reputation change when a peer sent us the same request multiple times. pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times"); diff --git a/client/network/src/protocol/sync/warp.rs b/client/network/sync/src/warp.rs similarity index 90% rename from client/network/src/protocol/sync/warp.rs rename to client/network/sync/src/warp.rs index 6845d6d1dc862..d3d9d7d244153 100644 --- a/client/network/src/protocol/sync/warp.rs +++ b/client/network/sync/src/warp.rs @@ -16,17 +16,20 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -///! Warp sync support. -use super::state::{ImportResult, StateSync}; -use crate::schema::v1::{StateRequest, StateResponse}; +//! Warp sync support. + pub use crate::warp_request_handler::{ EncodedProof, Request as WarpProofRequest, VerificationResult, WarpSyncProvider, }; +use crate::{ + schema::v1::{StateRequest, StateResponse}, + state::{ImportResult, StateSync}, +}; use sc_client_api::ProofProvider; use sp_blockchain::HeaderBackend; use sp_finality_grandpa::{AuthorityList, SetId}; use sp_runtime::traits::{Block as BlockT, NumberFor, Zero}; -use std::sync::Arc; +use std::{fmt, sync::Arc}; enum Phase { WarpProof { set_id: SetId, authorities: AuthorityList, last_hash: B::Hash }, @@ -48,6 +51,18 @@ pub enum WarpSyncPhase { DownloadingBlocks(NumberFor), } +impl fmt::Display for WarpSyncPhase { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::AwaitingPeers => write!(f, "Waiting for peers"), + Self::DownloadingWarpProofs => write!(f, "Downloading finality proofs"), + Self::DownloadingState => write!(f, "Downloading state"), + Self::ImportingState => write!(f, "Importing state"), + Self::DownloadingBlocks(n) => write!(f, "Downloading block history (#{})", n), + } + } +} + /// Reported warp sync progress. #[derive(Clone, Eq, PartialEq, Debug)] pub struct WarpSyncProgress { diff --git a/client/network/src/warp_request_handler.rs b/client/network/sync/src/warp_request_handler.rs similarity index 97% rename from client/network/src/warp_request_handler.rs rename to client/network/sync/src/warp_request_handler.rs index d5bee5833a12a..4f66e0a6daf17 100644 --- a/client/network/src/warp_request_handler.rs +++ b/client/network/sync/src/warp_request_handler.rs @@ -16,13 +16,18 @@ //! Helper for handling (i.e. answering) grandpa warp sync requests from a remote peer. -use crate::config::{IncomingRequest, OutgoingResponse, ProtocolId, RequestResponseConfig}; use codec::{Decode, Encode}; use futures::{ channel::{mpsc, oneshot}, stream::StreamExt, }; use log::debug; +use sc_network_common::{ + config::ProtocolId, + request_responses::{ + IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, + }, +}; use sp_runtime::traits::Block as BlockT; use std::{sync::Arc, time::Duration}; diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index 39297dd3ea295..bf3317d0759ee 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] async-std = "1.11.0" +sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network = { version = "0.10.0-dev", path = "../" } log = "0.4.16" parking_lot = "0.12.0" diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 20f1a3014867b..9e23a4cc678e9 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -51,12 +51,13 @@ use sc_network::{ block_request_handler::BlockRequestHandler, config::{ MultiaddrWithPeerId, NetworkConfiguration, NonDefaultSetConfig, NonReservedPeerMode, - ProtocolConfig, ProtocolId, Role, SyncMode, TransportConfig, + ProtocolConfig, Role, SyncMode, TransportConfig, }, light_client_requests::handler::LightClientRequestHandler, state_request_handler::StateRequestHandler, warp_request_handler, Multiaddr, NetworkService, NetworkWorker, }; +pub use sc_network_common::config::ProtocolId; use sc_service::client::Client; use sp_blockchain::{ well_known_cache_keys::{self, Id as CacheKeyId}, diff --git a/client/service/Cargo.toml b/client/service/Cargo.toml index 575305a597798..7138d9d384eeb 100644 --- a/client/service/Cargo.toml +++ b/client/service/Cargo.toml @@ -51,6 +51,7 @@ sp-consensus = { version = "0.10.0-dev", path = "../../primitives/consensus/comm sc-consensus = { version = "0.10.0-dev", path = "../../client/consensus/common" } sp-inherents = { version = "4.0.0-dev", path = "../../primitives/inherents" } sp-storage = { version = "6.0.0", path = "../../primitives/storage" } +sc-network-common = { version = "0.10.0-dev", path = "../network/common" } sc-network = { version = "0.10.0-dev", path = "../network" } sc-chain-spec = { version = "4.0.0-dev", path = "../chain-spec" } sc-client-api = { version = "4.0.0-dev", path = "../api" } diff --git a/client/service/src/config.rs b/client/service/src/config.rs index 7c7bb480aa5c3..56980ad14425f 100644 --- a/client/service/src/config.rs +++ b/client/service/src/config.rs @@ -23,12 +23,17 @@ pub use sc_client_db::{Database, DatabaseSource, KeepBlocks, PruningMode}; pub use sc_executor::WasmExecutionMethod; pub use sc_network::{ config::{ - IncomingRequest, MultiaddrWithPeerId, NetworkConfiguration, NodeKeyConfig, - NonDefaultSetConfig, OutgoingResponse, RequestResponseConfig, Role, SetConfig, - TransportConfig, + MultiaddrWithPeerId, NetworkConfiguration, NodeKeyConfig, NonDefaultSetConfig, Role, + SetConfig, TransportConfig, }, Multiaddr, }; +pub use sc_network_common::{ + config::ProtocolId, + request_responses::{ + IncomingRequest, OutgoingResponse, ProtocolConfig as RequestResponseConfig, + }, +}; use prometheus_endpoint::Registry; use sc_chain_spec::ChainSpec; @@ -208,7 +213,7 @@ impl Configuration { } /// Returns the network protocol id from the chain spec, or the default. - pub fn protocol_id(&self) -> sc_network::config::ProtocolId { + pub fn protocol_id(&self) -> ProtocolId { let protocol_id_full = match self.chain_spec.protocol_id() { Some(pid) => pid, None => { @@ -220,7 +225,7 @@ impl Configuration { crate::DEFAULT_PROTOCOL_ID }, }; - sc_network::config::ProtocolId::from(protocol_id_full) + ProtocolId::from(protocol_id_full) } }