From c967b1d486741f6428693e09b38f09ecaf9d81d2 Mon Sep 17 00:00:00 2001 From: Toralf Wittner Date: Tue, 13 Aug 2019 16:00:52 +0200 Subject: [PATCH] network: Use "one shot" protocol handler. Add two new `NetworkBehaviour`s, one handling remote block requests and another one to handle light client requests (both local and from remote). The change is motivated by the desire to use multiple substreams of a single connection for different protocols. To achieve this, libp2p's `OneShotHandler` is used as a protocol handler in each behaviour. It will open a fresh substream for the duration of the request and close it afterwards. For block requests, we currently only handle incoming requests from remote and tests are missing. For light client handling we support incoming requests from remote and also ported a substantial amount of functionality over from `light_dispatch.rs` (including several tests). However the result lacks in at least two aspects: (1) We require external updates w.r.t. the best block per peer and currently nothing updates this information. (2) We carry a lot of peer-related state around. Both aspects could be simplified by externalising peer selection and just requiring a specific peer ID where the request should be sent to. We still have to maintain some peer related state due to the way libp2p's swarm and network behaviour work (e.g. we must make sure to always issue `NetworkBehaviourAction::SendEvent`s to peers we are connected to, otherwise the actions die a silent death. Another change implemented here is the use of protocol buffers as the encoding for network messages. Certain individual fields of messages are still SCALE encoded. There has been some discussion about this in another PR (https://github.com/paritytech/substrate/pull/3452), so far without resolution. --- Cargo.lock | 78 +- core/network/Cargo.toml | 5 + core/network/build.rs | 8 + core/network/src/behaviour.rs | 17 +- core/network/src/protocol.rs | 15 + core/network/src/protocol/block_requests.rs | 319 ++++ .../src/protocol/light_client_handler.rs | 1622 +++++++++++++++++ core/network/src/protocol/light_dispatch.rs | 6 +- core/network/src/protocol/schema/api.v1.proto | 59 + .../src/protocol/schema/light.v1.proto | 121 ++ core/network/src/service.rs | 28 +- 11 files changed, 2264 insertions(+), 14 deletions(-) create mode 100644 core/network/build.rs create mode 100644 core/network/src/protocol/block_requests.rs create mode 100644 core/network/src/protocol/light_client_handler.rs create mode 100644 core/network/src/protocol/schema/api.v1.proto create mode 100644 core/network/src/protocol/schema/light.v1.proto diff --git a/Cargo.lock b/Cargo.lock index 7cdf58413b3de..cb99dba5908b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -876,6 +876,11 @@ dependencies = [ "static_assertions 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "fixedbitset" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "flate2" version = "1.0.9" @@ -2236,6 +2241,11 @@ dependencies = [ "ws2_32-sys 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "multimap" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" + [[package]] name = "multistream-select" version = "0.5.1" @@ -2934,6 +2944,14 @@ name = "percent-encoding" version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "petgraph" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "pin-utils" version = "0.1.0-alpha.4" @@ -2997,6 +3015,54 @@ dependencies = [ "unicode-xid 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "prost" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)", + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-build" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "heck 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)", + "petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "which 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-derive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "failure 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", + "itertools 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)", + "proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)", + "quote 0.6.13 (registry+https://github.com/rust-lang/crates.io-index)", + "syn 0.15.42 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "prost-types" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "protobuf" version = "2.8.0" @@ -4831,6 +4897,7 @@ dependencies = [ name = "substrate-network" version = "2.0.0" dependencies = [ + "assert_matches 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)", "bitflags 1.1.0 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "derive_more 0.14.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -4849,6 +4916,8 @@ dependencies = [ "lru-cache 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)", "parity-scale-codec 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", + "prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)", "quickcheck 0.8.5 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "rustc-hex 2.0.1 (registry+https://github.com/rust-lang/crates.io-index)", @@ -5772,7 +5841,7 @@ name = "twox-hash" version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" dependencies = [ - "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.3.23 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -6364,6 +6433,7 @@ dependencies = [ "checksum fdlimit 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "b1ee15a7050e5580b3712877157068ea713b245b080ff302ae2ca973cfcd9baa" "checksum finality-grandpa 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "9681c1f75941ea47584573dd2bc10558b2067d460612945887e00744e43393be" "checksum fixed-hash 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "516877b7b9a1cc2d0293cbce23cd6203f0edbfd4090e6ca4489fecb5aa73050e" +"checksum fixedbitset 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "86d4de0081402f5e88cdac65c8dcdcc73118c1a7a465e2a05f0da05843a8ea33" "checksum flate2 1.0.9 (registry+https://github.com/rust-lang/crates.io-index)" = "550934ad4808d5d39365e5d61727309bf18b3b02c6c56b729cb92e7dd84bc3d8" "checksum fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)" = "2fad85553e09a6f881f739c29f0b00b0f01357c743266d478b68951ce23285f3" "checksum foreign-types 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)" = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" @@ -6492,6 +6562,7 @@ dependencies = [ "checksum mio-extras 2.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "46e73a04c2fa6250b8d802134d56d554a9ec2922bf977777c805ea5def61ce40" "checksum mio-uds 0.6.7 (registry+https://github.com/rust-lang/crates.io-index)" = "966257a94e196b11bb43aca423754d87429960a768de9414f3691d6957abf125" "checksum miow 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +"checksum multimap 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" "checksum multistream-select 0.5.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e8f3cb4c93f2d79811fc11fa01faab99d8b7b8cbe024b602c27434ff2b08a59d" "checksum names 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)" = "ef320dab323286b50fb5cdda23f61c796a72a89998ab565ca32525c5c556f2da" "checksum native-tls 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "4b2df1a4c22fd44a62147fd8f13dd0f95c9d8ca7b2610299b2a2f9cf8964274e" @@ -6538,6 +6609,7 @@ dependencies = [ "checksum peeking_take_while 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)" = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099" "checksum percent-encoding 1.0.1 (registry+https://github.com/rust-lang/crates.io-index)" = "31010dd2e1ac33d5b46a5b413495239882813e0369f8ed8a5e266f173602f831" "checksum percent-encoding 2.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" +"checksum petgraph 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)" = "9c3659d1ee90221741f65dd128d9998311b0e40c5d3c23a62445938214abce4f" "checksum pin-utils 0.1.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5894c618ce612a3fa23881b152b608bafb8c56cfc22f434a3ba3120b40f7b587" "checksum pkg-config 0.3.15 (registry+https://github.com/rust-lang/crates.io-index)" = "a7c1d2cfa5a714db3b5f24f0915e74fcdf91d09d496ba61329705dda7774d2af" "checksum ppv-lite86 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "e3cbf9f658cdb5000fcf6f362b8ea2ba154b9f146a61c7a20d647034c6b6561b" @@ -6546,6 +6618,10 @@ dependencies = [ "checksum proc-macro-crate 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "e10d4b51f154c8a7fb96fd6dad097cb74b863943ec010ac94b9fd1be8861fe1e" "checksum proc-macro-hack 0.5.8 (registry+https://github.com/rust-lang/crates.io-index)" = "982a35d1194084ba319d65c4a68d24ca28f5fdb5b8bc20899e4eef8641ea5178" "checksum proc-macro2 0.4.30 (registry+https://github.com/rust-lang/crates.io-index)" = "cf3d2011ab5c909338f7887f4fc896d35932e29146c12c8d01da6b22a80ba759" +"checksum prost 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" +"checksum prost-build 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" +"checksum prost-derive 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5e7dc378b94ac374644181a2247cebf59a6ec1c88b49ac77f3a94b86b79d0e11" +"checksum prost-types 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" "checksum protobuf 2.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8aefcec9f142b524d98fc81d07827743be89dd6586a1ba6ab21fa66a500b3fa5" "checksum pwasm-utils 0.6.2 (registry+https://github.com/rust-lang/crates.io-index)" = "efb0dcbddbb600f47a7098d33762a00552c671992171637f5bb310b37fe1f0e4" "checksum quick-error 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "5fb6ccf8db7bbcb9c2eae558db5ab4f3da1c2a87e4e597ed394726bc8ea6ca1d" diff --git a/core/network/Cargo.toml b/core/network/Cargo.toml index 00dc04254d053..89809cf20d96a 100644 --- a/core/network/Cargo.toml +++ b/core/network/Cargo.toml @@ -6,6 +6,9 @@ license = "GPL-3.0" authors = ["Parity Technologies "] edition = "2018" +[build-dependencies] +prost-build = "0.5" + [dependencies] bytes = "0.4" derive_more = "0.14.0" @@ -30,6 +33,7 @@ sr-primitives = { path = "../../core/sr-primitives" } primitives = { package = "substrate-primitives", path = "../../core/primitives" } codec = { package = "parity-scale-codec", version = "1.0.0", features = ["derive"] } peerset = { package = "substrate-peerset", path = "../../core/peerset" } +prost = "0.5" serde = { version = "1.0.70", features = ["derive"] } serde_json = "1.0.24" slog = { version = "^2", features = ["nested-values"] } @@ -47,6 +51,7 @@ zeroize = "0.9.0" babe-primitives = { package = "substrate-consensus-babe-primitives", path = "../consensus/babe/primitives" } [dev-dependencies] +assert_matches = "1.3.0" env_logger = { version = "0.6" } keyring = { package = "substrate-keyring", path = "../../core/keyring" } quickcheck = "0.8.5" diff --git a/core/network/build.rs b/core/network/build.rs new file mode 100644 index 0000000000000..0fd1f128660e9 --- /dev/null +++ b/core/network/build.rs @@ -0,0 +1,8 @@ +const PROTOS: &[&str] = &[ + "src/protocol/schema/api.v1.proto", + "src/protocol/schema/light.v1.proto" +]; + +fn main() { + prost_build::compile_protos(PROTOS, &["src/protocol"]).unwrap(); +} diff --git a/core/network/src/behaviour.rs b/core/network/src/behaviour.rs index 2471cbcaaf26a..5969a900323db 100644 --- a/core/network/src/behaviour.rs +++ b/core/network/src/behaviour.rs @@ -19,7 +19,7 @@ use crate::{ protocol::event::DhtEvent }; use crate::{ExHashT, specialization::NetworkSpecialization}; -use crate::protocol::{CustomMessageOutcome, Protocol}; +use crate::protocol::{self, CustomMessageOutcome, Protocol}; use futures::prelude::*; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; @@ -42,6 +42,10 @@ pub struct Behaviour, H: ExHashT> { debug_info: debug_info::DebugInfoBehaviour>, /// Discovers nodes of the network. discovery: DiscoveryBehaviour>, + /// Block request handling. + block_requests: protocol::BlockRequests, B>, + /// Light client request handling. + light_client_handler: protocol::LightClientHandler, B>, /// Queue of events to produce for the outside. #[behaviour(ignore)] @@ -62,12 +66,16 @@ impl, H: ExHashT> Behaviour { local_public_key: PublicKey, known_addresses: Vec<(PeerId, Multiaddr)>, enable_mdns: bool, + block_requests: protocol::BlockRequests, B>, + light_client_handler: protocol::LightClientHandler, B> ) -> Self { Behaviour { substrate, debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), discovery: DiscoveryBehaviour::new(local_public_key, known_addresses, enable_mdns), - events: Vec::new(), + block_requests, + light_client_handler, + events: Vec::new() } } @@ -109,6 +117,11 @@ impl, H: ExHashT> Behaviour { pub fn put_value(&mut self, key: record::Key, value: Vec) { self.discovery.put_value(key, value); } + + /// Get unique access to the light client handler. + pub fn light_client_handler(&mut self) -> &mut protocol::LightClientHandler, B> { + &mut self.light_client_handler + } } impl, H: ExHashT> NetworkBehaviourEventProcess for diff --git a/core/network/src/protocol.rs b/core/network/src/protocol.rs index b561322b5bbf8..986a6090a9d9d 100644 --- a/core/network/src/protocol.rs +++ b/core/network/src/protocol.rs @@ -49,14 +49,29 @@ use client::light::fetcher::{FetchChecker, ChangesProof}; use crate::error; use util::LruHashSet; +// Include sources generated from protobuf definitions. +pub mod api { + pub mod v1 { + include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); + pub mod light { + include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs")); + } + } +} + mod util; +pub mod block_requests; pub mod consensus_gossip; pub mod message; pub mod event; +pub mod light_client_handler; pub mod light_dispatch; pub mod specialization; pub mod sync; +pub use block_requests::BlockRequests; +pub use light_client_handler::LightClientHandler; + const REQUEST_TIMEOUT_SEC: u64 = 40; /// Interval at which we perform time based maintenance const TICK_TIMEOUT: time::Duration = time::Duration::from_millis(1100); diff --git a/core/network/src/protocol/block_requests.rs b/core/network/src/protocol/block_requests.rs new file mode 100644 index 0000000000000..526b7addd9ef5 --- /dev/null +++ b/core/network/src/protocol/block_requests.rs @@ -0,0 +1,319 @@ +//! `NetworkBehaviour` implementation which handles incoming block requests. +//! +//! Every request is coming in on a separate connection substream which gets +//! closed after we have sent the response back. Incoming requests are encoded +//! as protocol buffers (cf. `api.v1.proto`). + +use codec::{Encode, Decode}; +use crate::{ + chain::Client, + protocol::{api, message::BlockAttributes} +}; +use futures::prelude::*; +use libp2p::{ + core::{ + ConnectedPoint, + Multiaddr, + PeerId, + upgrade::{InboundUpgrade, ReadOneError, ReadRespond, UpgradeInfo, WriteOne, Negotiated, read_respond}, + upgrade::{DeniedUpgrade, write_one} + }, + swarm::{NetworkBehaviour, NetworkBehaviourAction, OneShotHandler, PollParameters, SubstreamProtocol} +}; +use log::{debug, trace}; +use prost::Message; +use sr_primitives::{generic::BlockId, traits::{Block, Header, One, Zero}}; +use std::{ + cmp::min, + collections::VecDeque, + io, + iter, + sync::Arc, + time::Duration +}; +use tokio_io::{AsyncRead, AsyncWrite}; +use void::{Void, unreachable}; + +// Type alias for convenience. +pub type Error = Box; + +/// Configuration options for `BlockRequests`. +#[derive(Debug, Clone)] +pub struct Config { + max_block_data_response: u32, + max_request_len: usize, + inactivity_timeout: Duration +} + +impl Default for Config { + fn default() -> Self { + Config::new() + } +} + +#[allow(unused)] +impl Config { + pub fn new() -> Self { + Config { + max_block_data_response: 128, + max_request_len: 1024 * 1024, + inactivity_timeout: Duration::from_secs(5) + } + } + + /// Limit the max. number of block data in a response. + pub fn set_max_block_data_response(&mut self, v: u32) -> &mut Self { + self.max_block_data_response = v; + self + } + + /// Limit the max. length of incoming block request bytes. + pub fn set_max_request_len(&mut self, v: usize) -> &mut Self { + self.max_request_len = v; + self + } + + /// Limit the max. duration the substream may remain inactive before closing it. + pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { + self.inactivity_timeout = v; + self + } +} + +/// The block request handling behaviour. +pub struct BlockRequests { + /// This behaviour's configuration. + config: Config, + /// Blockchain client. + chain: Arc>, + /// Pending futures, sending back the block request response. + outgoing: VecDeque, Vec>> +} + +impl BlockRequests +where + T: AsyncRead + AsyncWrite, + B: Block +{ + pub fn new(cfg: Config, chain: Arc>) -> Self { + BlockRequests { + config: cfg, + chain, + outgoing: VecDeque::new() + } + } + + /// Callback, invoked when a new block request has been received from remote. + fn on_block_request + ( &mut self + , peer: &PeerId + , request: &api::v1::BlockRequest + ) -> Result + { + trace!("block request {} from peer {}: from block {:?} to block {:?}, max blocks {:?}", + request.id, + peer, + request.from_block, + request.to_block, + request.max_blocks); + + let from_block_id = + match request.from_block { + Some(api::v1::block_request::FromBlock::Hash(ref h)) => { + let h = Decode::decode(&mut h.as_ref())?; + BlockId::::Hash(h) + } + Some(api::v1::block_request::FromBlock::Number(ref n)) => { + let n = Decode::decode(&mut n.as_ref())?; + BlockId::::Number(n) + } + None => { + let msg = "missing `BlockRequest::from_block` field"; + return Err(io::Error::new(io::ErrorKind::Other, msg).into()) + } + }; + + let max_blocks = + if request.max_blocks == 0 { + self.config.max_block_data_response + } else { + min(request.max_blocks, self.config.max_block_data_response) + }; + + let direction = + if request.direction == api::v1::Direction::Ascending as i32 { + api::v1::Direction::Ascending + } else if request.direction == api::v1::Direction::Descending as i32 { + api::v1::Direction::Descending + } else { + let msg = format!("invalid `BlockRequest::direction` value: {}", request.direction); + return Err(io::Error::new(io::ErrorKind::Other, msg).into()) + }; + + let attributes = BlockAttributes::decode(&mut request.fields.to_be_bytes().as_ref())?; + let get_header = attributes.contains(BlockAttributes::HEADER); + let get_body = attributes.contains(BlockAttributes::BODY); + let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION); + + let mut blocks = Vec::new(); + let mut block_id = from_block_id; + while let Some(header) = self.chain.header(&block_id).unwrap_or(None) { + if blocks.len() >= max_blocks as usize { + break + } + + let number = header.number().clone(); + let hash = header.hash(); + let parent_hash = header.parent_hash().clone(); + + let block_data = api::v1::BlockData { + hash: hash.encode(), + header: if get_header { + header.encode() + } else { + Vec::new() + }, + body: if get_body { + self.chain.body(&BlockId::Hash(hash))? + .unwrap_or(Vec::new()) + .iter_mut() + .map(|extrinsic| extrinsic.encode()) + .collect() + } else { + Vec::new() + }, + receipt: Vec::new(), + message_queue: Vec::new(), + justification: if get_justification { + self.chain.justification(&BlockId::Hash(hash))?.unwrap_or(Vec::new()) + } else { + Vec::new() + } + }; + + blocks.push(block_data); + + match direction { + api::v1::Direction::Ascending => { + block_id = BlockId::Number(number + One::one()) + } + api::v1::Direction::Descending => { + if number.is_zero() { + break + } + block_id = BlockId::Hash(parent_hash) + } + } + } + + Ok(api::v1::BlockResponse { id: request.id, blocks }) + } +} + +impl NetworkBehaviour for BlockRequests +where + T: AsyncRead + AsyncWrite, + B: Block +{ + type ProtocolsHandler = OneShotHandler>; + type OutEvent = Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let p = Protocol { + max_request_len: self.config.max_request_len + }; + OneShotHandler::new(SubstreamProtocol::new(p), self.config.inactivity_timeout) + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, _peer: PeerId, _info: ConnectedPoint) { + } + + fn inject_disconnected(&mut self, _peer: &PeerId, _info: ConnectedPoint) { + } + + fn inject_node_event(&mut self, peer: PeerId, Request(request, stream): Request) { + match self.on_block_request(&peer, &request) { + Ok(res) => { + trace!("enqueueing block response {} for peer {} with {} blocks", res.id, peer, res.blocks.len()); + let mut data = Vec::with_capacity(res.encoded_len()); + if let Err(e) = res.encode(&mut data) { + debug!("error encoding block response {} for peer {}: {}", res.id, peer, e) + } else { + self.outgoing.push_back(write_one(stream, data)) + } + } + Err(e) => debug!("error handling block request {} from peer {}: {}", request.id, peer, e) + } + } + + fn poll(&mut self, _: &mut impl PollParameters) -> Async> { + let mut remaining = self.outgoing.len(); + while let Some(mut write_future) = self.outgoing.pop_front() { + remaining -= 1; + match write_future.poll() { + Ok(Async::NotReady) => self.outgoing.push_back(write_future), + Ok(Async::Ready(())) => {} + Err(e) => debug!("error writing block response: {}", e) + } + if remaining == 0 { + break + } + } + Async::NotReady + } +} + +/// The incoming block request. +/// +/// Holds the protobuf value and the connection substream which made the +/// request and over which to send the response. +// TODO (after https://github.com/libp2p/rust-libp2p/pull/1226): #[derive(Debug)] +pub struct Request(api::v1::BlockRequest, Negotiated); + +impl From for Request { + fn from(v: Void) -> Self { + unreachable(v) + } +} + +/// Substream upgrade protocol. +/// +/// We attempt to parse an incoming protobuf encoded request (cf. `Request`) +/// which will be handled by the `BlockRequests` behaviour, i.e. the request +/// will become visible via `inject_node_event` which then dispatches to the +/// relevant callback to process the message and prepare a response. +#[derive(Debug, Clone)] +pub struct Protocol { + /// The max. request length in bytes. + max_request_len: usize +} + +impl UpgradeInfo for Protocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/polkadot/sync/1") + } +} + +impl InboundUpgrade for Protocol { + type Output = Request; + type Error = ReadOneError; + type Future = ReadRespond, (), fn(Negotiated, Vec, ()) -> Result>; + + fn upgrade_inbound(self, s: Negotiated, _: Self::Info) -> Self::Future { + read_respond(s, self.max_request_len, (), |s, buf, ()| { + api::v1::BlockRequest::decode(buf) + .map(move |r| Request(r, s)) + .map_err(|decode_error| { + ReadOneError::Io(std::io::Error::new(std::io::ErrorKind::Other, decode_error)) + }) + }) + } +} + diff --git a/core/network/src/protocol/light_client_handler.rs b/core/network/src/protocol/light_client_handler.rs new file mode 100644 index 0000000000000..30ddebfaa4160 --- /dev/null +++ b/core/network/src/protocol/light_client_handler.rs @@ -0,0 +1,1622 @@ +//! [`NetworkBehaviour`] implementation which handles light client requests. +//! +//! Every request is coming in on a separate connection substream which gets +//! closed after we have sent the response back. Requests and responses are +//! encoded as protocol buffers (cf. `api.v1.proto`). +//! +//! For every outgoing request we likewise open a separate substream. + +use codec::{self, Encode, Decode}; +use client::{error::Error as ClientError, light::fetcher}; +use crate::{chain::Client, protocol::api}; +use futures::{prelude::*, sync::oneshot}; +use libp2p::{ + core::{ + ConnectedPoint, + Multiaddr, + PeerId, + upgrade::{InboundUpgrade, ReadOneError, ReadRespond, UpgradeInfo, WriteOne, Negotiated, read_respond}, + upgrade::{OutboundUpgrade, write_one, RequestResponse, request_response} + }, + swarm::{NetworkBehaviour, NetworkBehaviourAction, OneShotHandler, PollParameters, SubstreamProtocol} +}; +use log::{debug, error, trace}; +use primitives::storage::StorageKey; +use prost::Message; +use rustc_hex::ToHex; +use sr_primitives::traits::{Block, Header, NumberFor, Zero}; +use std::{ + collections::{BTreeMap, VecDeque, HashMap}, + fmt, + iter, + sync::Arc, + time::{Duration, Instant} +}; +use tokio_io::{AsyncRead, AsyncWrite}; +use void::Void; + +/// Configuration options for `LightClientHandler` behaviour. +#[derive(Debug, Clone)] +pub struct Config { + max_data_size: usize, + max_pending_requests: usize, + inactivity_timeout: Duration, + request_timeout: Duration +} + +impl Default for Config { + fn default() -> Self { + Config::new() + } +} + +#[allow(unused)] +impl Config { + /// Create a fresh configuration with the following options: + /// + /// - `max_data_size` = 1 MiB + /// - `max_pending_requests` = 128 + /// - `inactivity_timeout` = 15s + /// - `request_timeout` = 15s + pub fn new() -> Self { + Config { + max_data_size: 1024 * 1024, + max_pending_requests: 128, + inactivity_timeout: Duration::from_secs(15), + request_timeout: Duration::from_secs(15) + } + } + + /// Limit the max. length of incoming request bytes. + pub fn set_max_data_size(&mut self, v: usize) -> &mut Self { + self.max_data_size = v; + self + } + + /// Limit the max. number of pending requests. + pub fn set_max_pending_requests(&mut self, v: usize) -> &mut Self { + self.max_pending_requests = v; + self + } + + /// Limit the max. duration the connection may remain inactive before closing it. + pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { + self.inactivity_timeout = v; + self + } + + /// Limit the max. request duration. + pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self { + self.request_timeout = v; + self + } +} + +/// Possible errors while handling light clients. +#[derive(Debug)] +pub enum Error { + /// There are currently too many pending request. + TooManyRequests, + /// The response type does not correspond to the issued request. + UnexpectedResponse, + /// The chain client errored. + Client(ClientError), + /// Encoding or decoding of some data failed. + Codec(codec::Error) +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Error::TooManyRequests => f.write_str("too many pending requests"), + Error::UnexpectedResponse => f.write_str("unexpected response"), + Error::Client(e) => write!(f, "client error: {}", e), + Error::Codec(e) => write!(f, "codec error: {}", e) + } + } +} + +impl std::error::Error for Error { + fn cause(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Error::Client(e) => Some(e), + Error::Codec(e) => Some(e), + Error::UnexpectedResponse | Error::TooManyRequests => None + } + } +} + +impl From for Error { + fn from(e: ClientError) -> Self { + Error::Client(e) + } +} + +impl From for Error { + fn from(e: codec::Error) -> Self { + Error::Codec(e) + } +} + +/// The possible light client requests we support. +/// +/// The associated `oneshot::Sender` will be used to convey the result of +/// their request back to them (cf. `Reply`). +// +// This is modeled after light_dispatch.rs's `RequestData` which is not +// used because we currently only support a subset of those. +#[derive(Debug)] +pub enum LightClientRequest { + Header(fetcher::RemoteHeaderRequest, oneshot::Sender>), + Read(fetcher::RemoteReadRequest, oneshot::Sender>, ClientError>>), + ReadChild(fetcher::RemoteReadChildRequest, oneshot::Sender>, ClientError>>), + Call(fetcher::RemoteCallRequest, oneshot::Sender, ClientError>>), + Changes(fetcher::RemoteChangesRequest, oneshot::Sender, u32)>, ClientError>>) +} + +/// The data to send back to the light client over the oneshot channel. +// +// It is unified here in order to be able to return it as a function +// result instead of delivering it to the client as a side effect of +// response processing. +#[derive(Debug)] +enum Reply { + VecU8(Vec), + VecNumberU32(Vec<(::Number, u32)>), + OptVecU8(Option>), + Header(B::Header) +} + +/// Augments a light client request with metadata. +#[derive(Debug)] +struct RequestWrapper { + /// Time when this value was created. + timestamp: Instant, + /// Remaining retries. + retries: usize, + /// The actual request. + request: LightClientRequest, + /// Peer information, e.g. `PeerId`. + peer: P +} + +/// Information we have about some peer. +#[derive(Debug)] +struct PeerInfo { + address: Multiaddr, + best_block: Option>, + status: PeerStatus +} + +/// A peer is either idle or busy processing a request from us. +#[derive(Debug, Clone, PartialEq, Eq)] +enum PeerStatus { + /// The peer is available. + Idle, + /// We wait for the peer to return us a response for the given request ID. + BusyWith(u64) +} + +/// The light client handler behaviour. +pub struct LightClientHandler { + /// This behaviour's configuration. + config: Config, + /// Blockchain client. + chain: Arc>, + /// Verifies that received responses are correct. + checker: Arc>, + /// Peer information (addresses, their best block, etc.) + peers: HashMap>, + /// Pending futures sending back response to remote clients. + responses: VecDeque, Vec>>, + /// Pending (local) requests. + pending_requests: VecDeque>, + /// Requests on their way to remote peers. + outstanding: HashMap>, + /// (Local) Request ID counter + next_request_id: u64, + /// Handle to use for reporting misbehaviour of peers. + peerset: peerset::PeersetHandle +} + +impl LightClientHandler +where + T: AsyncRead + AsyncWrite, + B: Block +{ + /// Construct a new light client handler. + pub fn new + ( cfg: Config + , chain: Arc> + , checker: Arc> + , peerset: peerset::PeersetHandle + ) -> Self + { + LightClientHandler { + config: cfg, + chain, + checker, + peers: HashMap::new(), + responses: VecDeque::new(), + pending_requests: VecDeque::new(), + outstanding: HashMap::new(), + next_request_id: 1, + peerset + } + } + + /// We rely on external information about peers best blocks as we lack the + /// means to determine it ourselves. + pub fn update_best_block(&mut self, peer: &PeerId, num: NumberFor) { + if let Some(info) = self.peers.get_mut(peer) { + info.best_block = Some(num) + } + } + + /// Issue a new light client request. + pub fn request(&mut self, req: LightClientRequest) -> Result<(), Error> { + if self.pending_requests.len() >= self.config.max_pending_requests { + return Err(Error::TooManyRequests) + } + let rw = RequestWrapper { + timestamp: Instant::now(), + retries: retries(&req), + request: req, + peer: () // we do not know the peer yet + }; + self.pending_requests.push_back(rw); + Ok(()) + } + + fn next_request_id(&mut self) -> u64 { + let id = self.next_request_id; + self.next_request_id += 1; + id + } + + // Iterate over peers known to possess a certain block. + fn idle_peers_with_block(&mut self, num: NumberFor) -> impl Iterator + '_ { + self.peers.iter() + .filter(move |(_, info)| { + info.status == PeerStatus::Idle && info.best_block >= Some(num) + }) + .map(|(peer, _)| peer.clone()) + } + + // Iterate over peers without a known block. + fn idle_peers_with_unknown_block(&mut self) -> impl Iterator + '_ { + self.peers.iter() + .filter(|(_, info)| { + info.status == PeerStatus::Idle && info.best_block.is_none() + }) + .map(|(peer, _)| peer.clone()) + } + + /// Remove the given peer. + /// + /// If we have a request to this peer in flight, we move it back to + /// the pending requests queue. + fn remove_peer(&mut self, peer: &PeerId) { + if let Some(id) = self.outstanding.iter().find(|(_, rw)| &rw.peer == peer).map(|(k, _)| *k) { + let rw = self.outstanding.remove(&id).expect("key belongs to entry in this map"); + let rw = RequestWrapper { + timestamp: rw.timestamp, + retries: rw.retries, + request: rw.request, + peer: () // need to find another peer + }; + self.pending_requests.push_back(rw); + } + self.peers.remove(peer); + } + + /// Process a local request's response from remote. + /// + /// If successful, this will give us the actual, checked data we should be + /// sending back to the client, otherwise an error. + fn on_response + ( &mut self + , peer: &PeerId + , request: &LightClientRequest + , response: api::v1::light::Response + ) -> Result, Error> + { + trace!("response {} from {}", response.id, peer); + use api::v1::light::response::Response; + match response.response { + Some(Response::RemoteCallResponse(res)) => + if let LightClientRequest::Call(req, _) = request { + let reply = self.checker.check_execution_proof(req, res.proof)?; + Ok(Reply::VecU8(reply)) + } else { + Err(Error::UnexpectedResponse) + } + Some(Response::RemoteReadResponse(res)) => + match request { + LightClientRequest::Read(req, _) => { + let reply = self.checker.check_read_proof(&req, res.proof)?; + Ok(Reply::OptVecU8(reply)) + } + LightClientRequest::ReadChild(req, _) => { + let reply = self.checker.check_read_child_proof(&req, res.proof)?; + Ok(Reply::OptVecU8(reply)) + } + _ => Err(Error::UnexpectedResponse) + } + Some(Response::RemoteChangesResponse(res)) => + if let LightClientRequest::Changes(req, _) = request { + let max_block = Decode::decode(&mut res.max.as_ref())?; + let roots = { + let mut r = BTreeMap::new(); + for pair in res.roots { + let k = Decode::decode(&mut pair.fst.as_ref())?; + let v = Decode::decode(&mut pair.snd.as_ref())?; + r.insert(k, v); + } + r + }; + let reply = self.checker.check_changes_proof(&req, fetcher::ChangesProof { + max_block, + proof: res.proof, + roots, + roots_proof: res.roots_proof + })?; + Ok(Reply::VecNumberU32(reply)) + } else { + Err(Error::UnexpectedResponse) + } + Some(Response::RemoteHeaderResponse(res)) => + if let LightClientRequest::Header(req, _) = request { + let header = + if res.header.is_empty() { + None + } else { + Some(Decode::decode(&mut res.header.as_ref())?) + }; + let reply = self.checker.check_header_proof(&req, header, res.proof)?; + Ok(Reply::Header(reply)) + } else { + Err(Error::UnexpectedResponse) + } + None => Err(Error::UnexpectedResponse) + } + } + + fn on_remote_call_request + ( &mut self + , peer: &PeerId + , request_id: u64 + , request: &api::v1::light::RemoteCallRequest + ) -> Result + { + trace!("remote call request {} from {} ({} at {:?})", request_id, peer, request.method, request.block); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let proof = match self.chain.execution_proof(&block, &request.method, &request.data) { + Ok((_, proof)) => proof, + Err(e) => { + trace!("remote call request {} from {} ({} at {:?}) failed with: {}", + request_id, + peer, + request.method, + request.block, + e); + Vec::new() + } + }; + + let response = { + let r = api::v1::light::RemoteCallResponse { proof }; + api::v1::light::response::Response::RemoteCallResponse(r) + }; + + Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + } + + fn on_remote_read_request + ( &mut self + , peer: &PeerId + , request_id: u64 + , request: &api::v1::light::RemoteReadRequest + ) -> Result + { + trace!("remote read request {} from {} ({} at {:?})", + request_id, + peer, + request.key.to_hex::(), + request.block); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let proof = match self.chain.read_proof(&block, &request.key) { + Ok(proof) => proof, + Err(error) => { + trace!("remote read request {} from {} ({} at {:?}) failed with: {}", + request_id, + peer, + request.key.to_hex::(), + request.block, + error); + Vec::new() + } + }; + + let response = { + let r = api::v1::light::RemoteReadResponse { proof }; + api::v1::light::response::Response::RemoteReadResponse(r) + }; + + Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + } + + fn on_remote_header_request + ( &mut self + , peer: &PeerId + , request_id: u64 + , request: &api::v1::light::RemoteHeaderRequest + ) -> Result + { + trace!("remote header proof request {} from {} ({:?})", request_id, peer, request.block); + + let block = Decode::decode(&mut request.block.as_ref())?; + + let (header, proof) = match self.chain.header_proof(block) { + Ok((header, proof)) => (header.encode(), proof), + Err(error) => { + trace!("remote header proof request {} from {} ({:?}) failed with: {}", + request_id, + peer, + request.block, + error); + (Default::default(), Vec::new()) + } + }; + + let response = { + let r = api::v1::light::RemoteHeaderResponse { header, proof }; + api::v1::light::response::Response::RemoteHeaderResponse(r) + }; + + Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + } + + fn on_remote_changes_request + ( &mut self + , peer: &PeerId + , request_id: u64 + , request: &api::v1::light::RemoteChangesRequest + ) -> Result + { + trace!("remote changes proof request {} from {} for key {} ({:?}..{:?})", + request_id, + peer, + request.key.to_hex::(), + request.first, + request.last + ); + + let first = Decode::decode(&mut request.first.as_ref())?; + let last = Decode::decode(&mut request.last.as_ref())?; + let min = Decode::decode(&mut request.min.as_ref())?; + let max = Decode::decode(&mut request.max.as_ref())?; + let key = StorageKey(request.key.clone()); + + let proof = match self.chain.key_changes_proof(first, last, min, max, &key) { + Ok(proof) => proof, + Err(error) => { + trace!("remote changes proof request {} from {} for key {} ({:?}..{:?}) failed with: {}", + request_id, + peer, + key.0.to_hex::(), + request.first, + request.last, + error + ); + fetcher::ChangesProof:: { + max_block: Zero::zero(), + proof: Vec::new(), + roots: BTreeMap::new(), + roots_proof: Vec::new() + } + } + }; + + let response = { + let r = api::v1::light::RemoteChangesResponse { + max: proof.max_block.encode(), + proof: proof.proof, + roots: proof.roots.into_iter() + .map(|(k, v)| api::v1::light::Pair { fst: k.encode(), snd: v.encode() }) + .collect(), + roots_proof: proof.roots_proof + }; + api::v1::light::response::Response::RemoteChangesResponse(r) + }; + + Ok(api::v1::light::Response { id: request_id, response: Some(response) }) + } +} + +impl NetworkBehaviour for LightClientHandler +where + T: AsyncRead + AsyncWrite, + B: Block +{ + type ProtocolsHandler = OneShotHandler>; + type OutEvent = Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let p = InboundProtocol { + max_data_size: self.config.max_data_size + }; + OneShotHandler::new(SubstreamProtocol::new(p), self.config.inactivity_timeout) + } + + fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { + self.peers.get(peer) + .map(|info| vec![info.address.clone()]) + .unwrap_or_default() + } + + fn inject_connected(&mut self, peer: PeerId, info: ConnectedPoint) { + let peer_address = match info { + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + ConnectedPoint::Dialer { address } => address + }; + + trace!("peer {} connected with address {}", peer, peer_address); + + let info = PeerInfo { + address: peer_address, + best_block: None, + status: PeerStatus::Idle + }; + + self.peers.insert(peer, info); + } + + fn inject_disconnected(&mut self, peer: &PeerId, _: ConnectedPoint) { + trace!("peer {} disconnected", peer); + self.remove_peer(peer) + } + + fn inject_node_event(&mut self, peer: PeerId, event: Event) { + match event { + // An incoming request from remote has been received. + Event::Request(request, stream) => { + let result = match &request.request { + Some(api::v1::light::request::Request::RemoteCallRequest(r)) => + self.on_remote_call_request(&peer, request.id, r), + Some(api::v1::light::request::Request::RemoteReadRequest(r)) => + self.on_remote_read_request(&peer, request.id, r), + Some(api::v1::light::request::Request::RemoteHeaderRequest(r)) => + self.on_remote_header_request(&peer, request.id, r), + Some(api::v1::light::request::Request::RemoteReadChildRequest(_)) => { + // Match protocol.rs behaviour. + // Cf. https://github.com/paritytech/substrate/blob/9b3e9f/core/network/src/protocol.rs#L550 + trace!("ignoring remote read child request {} from {}", request.id, peer); + return + } + Some(api::v1::light::request::Request::RemoteChangesRequest(r)) => + self.on_remote_changes_request(&peer, request.id, r), + None => { + debug!("ignoring request {} without request data from peer {}", request.id, peer); + return + } + }; + match result { + Ok(response) => { + trace!("enqueueing response {} for peer {}", response.id, peer); + let mut data = Vec::new(); + if let Err(e) = response.encode(&mut data) { + debug!("error encoding response {} for peer {}: {}", response.id, peer, e) + } else { + self.responses.push_back(write_one(stream, data)) + } + } + Err(e) => debug!("error handling request {} from peer {}: {}", request.id, peer, e) + } + } + // A response to one of our own requests has been received. + Event::Response(response) => { + let id = response.id; + if let Some(request) = self.outstanding.remove(&id) { + // We first just check if the response originates from the expected peer. + if request.peer != peer { + debug!("was expecting response {} from {} instead of {}", id, request.peer, peer); + self.outstanding.insert(id, request); + self.peerset.report_peer(peer.clone(), i32::min_value()); + self.remove_peer(&peer); + return + } + + if let Some(info) = self.peers.get_mut(&peer) { + if info.status != PeerStatus::BusyWith(id) { + // If we get here, something is wrong with our internal handling of peer + // status information. At any time, a single peer processes at most one + // request from us and its status should contain the request ID we are + // expecting a response for. If a peer would send us a response with a + // random ID, we should not have an entry for it with this peer ID in + // our `outstanding` map, so a malicious peer should not be able to get + // us here. It is our own fault and must be fixed! + panic!("unexpected peer status {:?} for {}", info.status, peer); + } + + info.status = PeerStatus::Idle; // Make peer available again. + + match self.on_response(&peer, &request.request, response) { + Ok(reply) => send_reply(Ok(reply), request.request), + Err(Error::UnexpectedResponse) => { + debug!("unexpected response {} from peer {}", id, peer); + self.peerset.report_peer(peer.clone(), i32::min_value()); + self.remove_peer(&peer); + let rw = RequestWrapper { + timestamp: request.timestamp, + retries: request.retries, + request: request.request, + peer: () + }; + self.pending_requests.push_back(rw); + } + Err(other) => { + debug!("error handling response {} from peer {}: {}", id, peer, other); + self.peerset.report_peer(peer.clone(), i32::min_value()); + self.remove_peer(&peer); + if request.retries > 0 { + let rw = RequestWrapper { + timestamp: request.timestamp, + retries: request.retries - 1, + request: request.request, + peer: () + }; + self.pending_requests.push_back(rw) + } else { + send_reply(Err(ClientError::RemoteFetchFailed), request.request); + } + } + } + } else { + // If we get here, something is wrong with our internal handling of peers. + // We apparently have an entry in our `outstanding` map and the peer is the one we + // expected. So, if we can not find an entry for it in our peer information table, + // then these two collections are out of sync which must not happen and is a clear + // programmer error that must be fixed! + panic!("missing peer information for {}; response {}", peer, id); + } + } else { + debug!("unexpected response {} from peer {}", id, peer); + self.peerset.report_peer(peer.clone(), i32::min_value()); + self.remove_peer(&peer); + } + } + } + } + + fn poll(&mut self, _: &mut impl PollParameters) -> Async> { + // Process response sending futures. + let mut remaining = self.responses.len(); + while let Some(mut io) = self.responses.pop_front() { + remaining -= 1; + match io.poll() { + Ok(Async::NotReady) => self.responses.push_back(io), + Ok(Async::Ready(())) => {} + Err(e) => debug!("error writing response: {}", e) + } + if remaining == 0 { + break + } + } + + // If we have a pending request to send, try to find an available peer and send it. + let now = Instant::now(); + while let Some(mut request) = self.pending_requests.pop_front() { + if now > request.timestamp + self.config.request_timeout { + if request.retries == 0 { + send_reply(Err(ClientError::RemoteFetchFailed), request.request); + continue + } + request.timestamp = Instant::now(); + request.retries -= 1 + } + let number = required_block(&request.request); + let available_peer = { + let p = self.idle_peers_with_block(number).next(); + if p.is_none() { + self.idle_peers_with_unknown_block().next() + } else { + p + } + }; + if let Some(peer) = available_peer { + let id = self.next_request_id(); + let rq = serialise_request(id, &request.request); + let mut buf = Vec::with_capacity(rq.encoded_len()); + if let Err(e) = rq.encode(&mut buf) { + debug!("failed to serialise request {}: {}", id, e); + send_reply(Err(ClientError::RemoteFetchFailed), request.request) + } else { + let protocol = OutboundProtocol { + request: buf, + max_data_size: self.config.max_data_size + }; + self.peers.get_mut(&peer).map(|info| info.status = PeerStatus::BusyWith(id)); + let rw = RequestWrapper { + timestamp: request.timestamp, + retries: request.retries, + request: request.request, + peer: peer.clone() + }; + self.outstanding.insert(id, rw); + return Async::Ready(NetworkBehaviourAction::SendEvent { peer_id: peer, event: protocol }) + } + } else { + self.pending_requests.push_front(request); + debug!("no peer available to send request to"); + break + } + } + + // Look for ongoing requests that have timed out. + let mut expired = Vec::new(); + for (id, rw) in &self.outstanding { + if now > rw.timestamp + self.config.request_timeout { + debug!("request {} timed out", id); + expired.push(*id) + } + } + for id in expired { + if let Some(rw) = self.outstanding.remove(&id) { + self.remove_peer(&rw.peer); + self.peerset.report_peer(rw.peer.clone(), crate::protocol::light_dispatch::TIMEOUT_REPUTATION_CHANGE); + if rw.retries == 0 { + send_reply(Err(ClientError::RemoteFetchFailed), rw.request); + continue + } + let rw = RequestWrapper { + timestamp: Instant::now(), + retries: rw.retries - 1, + request: rw.request, + peer: () + }; + self.pending_requests.push_back(rw) + } + } + + Async::NotReady + } +} + +fn required_block(request: &LightClientRequest) -> NumberFor { + match request { + LightClientRequest::Header(data, _) => data.block, + LightClientRequest::Read(data, _) => *data.header.number(), + LightClientRequest::ReadChild(data, _) => *data.header.number(), + LightClientRequest::Call(data, _) => *data.header.number(), + LightClientRequest::Changes(data, _) => data.max_block.0 + } +} + +fn retries(request: &LightClientRequest) -> usize { + let rc = match request { + LightClientRequest::Header(data, _) => data.retry_count, + LightClientRequest::Read(data, _) => data.retry_count, + LightClientRequest::ReadChild(data, _) => data.retry_count, + LightClientRequest::Call(data, _) => data.retry_count, + LightClientRequest::Changes(data, _) => data.retry_count + }; + rc.unwrap_or(0) +} + +fn serialise_request(id: u64, request: &LightClientRequest) -> api::v1::light::Request { + let request = match request { + LightClientRequest::Header(data, _) => { + let r = api::v1::light::RemoteHeaderRequest { block: data.block.encode() }; + api::v1::light::request::Request::RemoteHeaderRequest(r) + } + LightClientRequest::Read(data, _) => { + let r = api::v1::light::RemoteReadRequest { + block: data.block.encode(), + key: data.key.clone() + }; + api::v1::light::request::Request::RemoteReadRequest(r) + } + LightClientRequest::ReadChild(data, _) => { + let r = api::v1::light::RemoteReadChildRequest { + block: data.block.encode(), + storage_key: data.storage_key.clone(), + key: data.key.clone() + }; + api::v1::light::request::Request::RemoteReadChildRequest(r) + } + LightClientRequest::Call(data, _) => { + let r = api::v1::light::RemoteCallRequest { + block: data.block.encode(), + method: data.method.clone(), + data: data.call_data.clone() + }; + api::v1::light::request::Request::RemoteCallRequest(r) + } + LightClientRequest::Changes(data, _) => { + let r = api::v1::light::RemoteChangesRequest { + first: data.first_block.1.encode(), + last: data.last_block.1.encode(), + min: data.tries_roots.1.encode(), + max: data.max_block.1.encode(), + key: data.key.clone() + }; + api::v1::light::request::Request::RemoteChangesRequest(r) + } + }; + + api::v1::light::Request { id, request: Some(request) } +} + +fn send_reply(result: Result, ClientError>, request: LightClientRequest) { + fn send(item: T, sender: oneshot::Sender) { + let _ = sender.send(item); // It is okay if the other end already hung up. + } + match request { + LightClientRequest::Header(req, sender) => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::Header(x)) => send(Ok(x), sender), + reply => error!("invalid reply for header request: {:?}, {:?}", reply, req) + } + LightClientRequest::Read(req, sender) => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::OptVecU8(x)) => send(Ok(x), sender), + reply => error!("invalid reply for read request: {:?}, {:?}", reply, req) + } + LightClientRequest::ReadChild(req, sender) => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::OptVecU8(x)) => send(Ok(x), sender), + reply => error!("invalid reply for read child request: {:?}, {:?}", reply, req) + } + LightClientRequest::Call(req, sender) => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::VecU8(x)) => send(Ok(x), sender), + reply => error!("invalid reply for call request: {:?}, {:?}", reply, req) + } + LightClientRequest::Changes(req, sender) => match result { + Err(e) => send(Err(e), sender), + Ok(Reply::VecNumberU32(x)) => send(Ok(x), sender), + reply => error!("invalid reply for changes request: {:?}, {:?}", reply, req) + } + } +} + +/// Output type of inbound and outbound substream upgrades. +// TODO (after https://github.com/libp2p/rust-libp2p/pull/1226): #[derive(Debug)] +pub enum Event { + /// Incoming request from remote and substream to use for the response. + Request(api::v1::light::Request, Negotiated), + /// Incoming response from remote. + Response(api::v1::light::Response) +} + +/// Substream upgrade protocol. +/// +/// Reads incoming requests from remote. +#[derive(Debug, Clone)] +pub struct InboundProtocol { + /// The max. request length in bytes. + max_data_size: usize +} + +impl UpgradeInfo for InboundProtocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/polkadot/light/1") + } +} + +impl InboundUpgrade for InboundProtocol { + type Output = Event; + type Error = ReadOneError; + type Future = ReadRespond, (), fn(Negotiated, Vec, ()) -> Result>; + + fn upgrade_inbound(self, s: Negotiated, _: Self::Info) -> Self::Future { + read_respond(s, self.max_data_size, (), |s, buf, ()| { + api::v1::light::Request::decode(buf) + .map(move |r| Event::Request(r, s)) + .map_err(|decode_error| { + ReadOneError::Io(std::io::Error::new(std::io::ErrorKind::Other, decode_error)) + }) + }) + } +} + +/// Substream upgrade protocol. +/// +/// Sends a request to remote and awaits the response. +#[derive(Debug, Clone)] +pub struct OutboundProtocol { + /// The serialised protobuf request. + request: Vec, + /// The max. request length in bytes. + max_data_size: usize +} + +impl UpgradeInfo for OutboundProtocol { + type Info = &'static [u8]; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(b"/polkadot/light/1") + } +} + +impl OutboundUpgrade for OutboundProtocol { + type Output = Event; + type Error = ReadOneError; + type Future = RequestResponse, (), fn(Vec, ()) -> Result, ReadOneError>, Vec>; + + fn upgrade_outbound(self, s: Negotiated, _: Self::Info) -> Self::Future { + request_response(s, self.request, self.max_data_size, (), |data, ()| { + api::v1::light::Response::decode(data) + .map(Event::Response) + .map_err(|decode_error| { + ReadOneError::Io(std::io::Error::new(std::io::ErrorKind::Other, decode_error)) + }) + }) + } +} + +#[cfg(test)] +mod tests { + use assert_matches::assert_matches; + use client::{error::Error as ClientError, light::fetcher}; + use codec::Encode; + use crate::protocol::{api, light_dispatch::tests::{DummyFetchChecker, dummy_header}}; + use futures::{prelude::*, sync::oneshot}; + use libp2p::{ + PeerId, + Multiaddr, + core::{ + ConnectedPoint, + identity, + muxing::{StreamMuxerBox, SubstreamRef}, + transport::{Transport, boxed::Boxed, memory::MemoryTransport}, + upgrade + }, + noise::{self, Keypair, X25519, NoiseConfig}, + swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}, + yamux + }; + use std::{collections::HashSet, io, iter::{self, FromIterator}, sync::Arc}; + use super::{Event, LightClientHandler, LightClientRequest, OutboundProtocol, PeerStatus}; + use test_client::runtime::{changes_trie_config, Block}; + use tokio_io::{AsyncRead, AsyncWrite}; + use void::Void; + + type Handler = LightClientHandler>, Block>; + type Swarm = libp2p::swarm::Swarm, Handler>; + + fn make_swarm(ok: bool, ps: peerset::PeersetHandle, cf: super::Config) -> Swarm { + let id_key = identity::Keypair::generate_ed25519(); + let dh_key = Keypair::::new().into_authentic(&id_key).unwrap(); + let local_peer = id_key.public().into_peer_id(); + let transport = MemoryTransport::default() + .with_upgrade(NoiseConfig::xx(dh_key)) + .and_then(move |(remote, stream), endpoint| { + let peer = + if let noise::RemoteIdentity::IdentityKey(k) = remote { + k.into_peer_id() + } else { + panic!("Expected IdentityKey") + }; + upgrade::apply(stream, yamux::Config::default(), endpoint).map(|m| (peer, StreamMuxerBox::new(m))) + }) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .boxed(); + let client = Arc::new(test_client::new()); + let checker = Arc::new(DummyFetchChecker { ok }); + libp2p::swarm::Swarm::new(transport, LightClientHandler::new(cf, client, checker, ps), local_peer) + } + + struct EmptyPollParams(PeerId); + + impl PollParameters for EmptyPollParams { + type SupportedProtocolsIter = iter::Empty>; + type ListenedAddressesIter = iter::Empty; + type ExternalAddressesIter = iter::Empty; + + fn supported_protocols(&self) -> Self::SupportedProtocolsIter { + iter::empty() + } + + fn listened_addresses(&self) -> Self::ListenedAddressesIter { + iter::empty() + } + + fn external_addresses(&self) -> Self::ExternalAddressesIter { + iter::empty() + } + + fn local_peer_id(&self) -> &PeerId { + &self.0 + } + } + + fn peerset() -> (peerset::Peerset, peerset::PeersetHandle) { + let cfg = peerset::PeersetConfig { + in_peers: 128, + out_peers: 128, + bootnodes: Vec::new(), + reserved_only: false, + reserved_nodes: Vec::new() + }; + peerset::Peerset::from_config(cfg) + } + + fn make_behaviour + ( ok: bool + , ps: peerset::PeersetHandle + , cf: super::Config + ) -> LightClientHandler>, Block> + { + let client = Arc::new(test_client::new()); + let checker = Arc::new(DummyFetchChecker { ok }); + LightClientHandler::new(cf, client, checker, ps) + } + + fn empty_dialer() -> ConnectedPoint { + ConnectedPoint::Dialer { address: Multiaddr::empty() } + } + + fn poll(b: &mut LightClientHandler) -> Async> + where T: AsyncRead + AsyncWrite + { + b.poll(&mut EmptyPollParams(PeerId::random())) + } + + #[test] + fn disconnects_from_peer_if_told() { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + + behaviour.inject_disconnected(&peer, empty_dialer()); + assert_eq!(0, behaviour.peers.len()) + } + + #[test] + fn disconnects_from_peer_if_request_times_out() { + let peer0 = PeerId::random(); + let peer1 = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer0.clone(), empty_dialer()); + behaviour.inject_connected(peer1.clone(), empty_dialer()); + + // We now know about two peers. + assert_eq!(HashSet::from_iter(&[peer0.clone(), peer1.clone()]), behaviour.peers.keys().collect::>()); + + // No requests have been made yet. + assert!(behaviour.pending_requests.is_empty()); + assert!(behaviour.outstanding.is_empty()); + + // Issue our first request! + let chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + behaviour.request(LightClientRequest::Call(request, chan.0)).unwrap(); + assert_eq!(1, behaviour.pending_requests.len()); + + // The behaviour should now attempt to send the request. + assert_matches!(poll(&mut behaviour), Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, .. }) => { + assert!(peer_id == peer0 || peer_id == peer1) + }); + + // And we should have one busy peer. + assert!({ + let (idle, busy): (Vec<_>, Vec<_>) = + behaviour.peers.iter().partition(|(_, info)| info.status == PeerStatus::Idle); + + idle.len() == 1 && busy.len() == 1 + && (idle[0].0 == &peer0 || busy[0].0 == &peer0) + && (idle[0].0 == &peer1 || busy[0].0 == &peer1) + }); + + // No more pending requests, but one should be outstanding. + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + + // We now set back the timestamp of the outstanding request to make it expire. + let request = behaviour.outstanding.values_mut().next().unwrap(); + request.timestamp -= super::Config::default().request_timeout; + + // Make progress, but do not expect some action. + assert_matches!(poll(&mut behaviour), Async::NotReady); + + // The request should have timed out by now and the corresponding peer be removed. + assert_eq!(1, behaviour.peers.len()); + // Since we asked for one retry, the request should be back in the pending queue. + assert_eq!(1, behaviour.pending_requests.len()); + // No other request should be ongoing. + assert_eq!(0, behaviour.outstanding.len()); + } + + #[test] + fn disconnects_from_peer_on_response_with_wrong_id() { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + + let chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + behaviour.request(LightClientRequest::Call(request, chan.0)).unwrap(); + + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + poll(&mut behaviour); // Make progress + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + + // Construct response with bogus ID + let response = { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: 2365789, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + }; + + // Make sure our bogus ID is really not used. + assert!(!behaviour.outstanding.keys().any(|id| id == &response.id)); + + behaviour.inject_node_event(peer.clone(), Event::Response(response)); + assert!(behaviour.peers.is_empty()); + + poll(&mut behaviour); // More progress + + // The request should be back in the pending queue + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + } + + #[test] + fn disconnects_from_peer_on_incorrect_response() { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(false, pset.1, super::Config::default()); + // ^--- Making sure the response data check fails. + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + + let chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + behaviour.request(LightClientRequest::Call(request, chan.0)).unwrap(); + + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + poll(&mut behaviour); // Make progress + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + + let request_id = *behaviour.outstanding.keys().next().unwrap(); + + let response = { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: request_id, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + }; + + behaviour.inject_node_event(peer.clone(), Event::Response(response)); + assert!(behaviour.peers.is_empty()); + + poll(&mut behaviour); // More progress + + // The request should be back in the pending queue + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + } + + #[test] + fn disconnects_from_peer_on_unexpected_response() { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + + // Some unsolicited response + let response = { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: 2347895932, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + }; + + behaviour.inject_node_event(peer.clone(), Event::Response(response)); + + assert!(behaviour.peers.is_empty()); + poll(&mut behaviour); + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + } + + #[test] + fn disconnects_from_peer_on_wrong_response_type() { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + + let chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(1), + }; + behaviour.request(LightClientRequest::Call(request, chan.0)).unwrap(); + + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + poll(&mut behaviour); // Make progress + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + + let request_id = *behaviour.outstanding.keys().next().unwrap(); + + let response = { + let r = api::v1::light::RemoteReadResponse { proof: Vec::new() }; // Not a RemoteCallResponse! + api::v1::light::Response { + id: request_id, + response: Some(api::v1::light::response::Response::RemoteReadResponse(r)) + } + }; + + behaviour.inject_node_event(peer.clone(), Event::Response(response)); + assert!(behaviour.peers.is_empty()); + + poll(&mut behaviour); // More progress + + // The request should be back in the pending queue + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + } + + #[test] + fn receives_remote_failure_after_retry_count_failures() { + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); + let peer4 = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(false, pset.1, super::Config::default()); + // ^--- Making sure the response data check fails. + + behaviour.inject_connected(peer1.clone(), empty_dialer()); + behaviour.inject_connected(peer2.clone(), empty_dialer()); + behaviour.inject_connected(peer3.clone(), empty_dialer()); + behaviour.inject_connected(peer4.clone(), empty_dialer()); + assert_eq!(4, behaviour.peers.len()); + + let mut chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: Some(3), // Attempt up to three retries. + }; + behaviour.request(LightClientRequest::Call(request, chan.0)).unwrap(); + + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + assert_matches!(poll(&mut behaviour), Async::Ready(NetworkBehaviourAction::SendEvent { .. })); + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + + for _ in 0 .. 3 { + // Construct an invalid response + let request_id = *behaviour.outstanding.keys().next().unwrap(); + let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); + let response = { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: request_id, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + }; + behaviour.inject_node_event(responding_peer, Event::Response(response.clone())); + assert_matches!(poll(&mut behaviour), Async::Ready(NetworkBehaviourAction::SendEvent { .. })); + assert_matches!(chan.1.try_recv(), Ok(None)) + } + // Final invalid response + let request_id = *behaviour.outstanding.keys().next().unwrap(); + let responding_peer = behaviour.outstanding.values().next().unwrap().peer.clone(); + let response = { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: request_id, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + }; + behaviour.inject_node_event(responding_peer, Event::Response(response)); + assert_matches!(poll(&mut behaviour), Async::NotReady); + assert_matches!(chan.1.try_recv(), Ok(Some(Err(ClientError::RemoteFetchFailed)))) + } + + fn issue_request(request: LightClientRequest) { + let peer = PeerId::random(); + let pset = peerset(); + let mut behaviour = make_behaviour(true, pset.1, super::Config::default()); + + behaviour.inject_connected(peer.clone(), empty_dialer()); + assert_eq!(1, behaviour.peers.len()); + + let response = match request { + LightClientRequest::Header(..) => { + let r = api::v1::light::RemoteHeaderResponse { + header: dummy_header().encode(), + proof: Vec::new() + }; + api::v1::light::Response { + id: 1, + response: Some(api::v1::light::response::Response::RemoteHeaderResponse(r)) + } + } + LightClientRequest::Read(..) => { + let r = api::v1::light::RemoteReadResponse { proof: Vec::new() }; + api::v1::light::Response { + id: 1, + response: Some(api::v1::light::response::Response::RemoteReadResponse(r)) + } + } + LightClientRequest::ReadChild(..) => { + let r = api::v1::light::RemoteReadResponse { proof: Vec::new() }; + api::v1::light::Response { + id: 1, + response: Some(api::v1::light::response::Response::RemoteReadResponse(r)) + } + } + LightClientRequest::Call(..) => { + let r = api::v1::light::RemoteCallResponse { proof: Vec::new() }; + api::v1::light::Response { + id: 1, + response: Some(api::v1::light::response::Response::RemoteCallResponse(r)) + } + } + LightClientRequest::Changes(..) => { + let r = api::v1::light::RemoteChangesResponse { + max: iter::repeat(1).take(32).collect(), + proof: Vec::new(), + roots: Vec::new(), + roots_proof: Vec::new() + }; + api::v1::light::Response { + id: 1, + response: Some(api::v1::light::response::Response::RemoteChangesResponse(r)) + } + } + }; + + behaviour.request(request).unwrap(); + + assert_eq!(1, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()); + assert_matches!(poll(&mut behaviour), Async::Ready(NetworkBehaviourAction::SendEvent { .. })); + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(1, behaviour.outstanding.len()); + assert_eq!(1, *behaviour.outstanding.keys().next().unwrap()); + + behaviour.inject_node_event(peer.clone(), Event::Response(response)); + + poll(&mut behaviour); + + assert_eq!(0, behaviour.pending_requests.len()); + assert_eq!(0, behaviour.outstanding.len()) + } + + #[test] + fn receives_remote_call_response() { + let mut chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: None + }; + issue_request(LightClientRequest::Call(request, chan.0)); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_read_response() { + let mut chan = oneshot::channel(); + let request = fetcher::RemoteReadRequest { + header: dummy_header(), + block: Default::default(), + key: b":key".to_vec(), + retry_count: None + }; + issue_request(LightClientRequest::Read(request, chan.0)); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_read_child_response() { + let mut chan = oneshot::channel(); + let request = fetcher::RemoteReadChildRequest { + header: dummy_header(), + block: Default::default(), + storage_key: b":child_storage:sub".to_vec(), + key: b":key".to_vec(), + retry_count: None + }; + issue_request(LightClientRequest::ReadChild(request, chan.0)); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_header_response() { + let mut chan = oneshot::channel(); + let request = fetcher::RemoteHeaderRequest { + cht_root: Default::default(), + block: 1, + retry_count: None + }; + issue_request(LightClientRequest::Header(request, chan.0)); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + #[test] + fn receives_remote_changes_response() { + let mut chan = oneshot::channel(); + let request = fetcher::RemoteChangesRequest { + changes_trie_config: changes_trie_config(), + first_block: (1, Default::default()), + last_block: (100, Default::default()), + max_block: (100, Default::default()), + tries_roots: (1, Default::default(), Vec::new()), + key: Vec::new(), + retry_count: None + }; + issue_request(LightClientRequest::Changes(request, chan.0)); + assert_matches!(chan.1.try_recv(), Ok(Some(Ok(_)))) + } + + fn send_receive(runtime: &mut tokio::runtime::Runtime, request: LightClientRequest) { + // We start a swarm on the listening side which awaits incoming requests and answers them: + let local_pset = peerset(); + let local_listen_addr: libp2p::Multiaddr = libp2p::multiaddr::Protocol::Memory(rand::random()).into(); + let mut local_swarm = make_swarm(true, local_pset.1, super::Config::default()); + Swarm::listen_on(&mut local_swarm, local_listen_addr.clone()).unwrap(); + + // We also start a swarm that makes requests and awaits responses: + let remote_pset = peerset(); + let mut remote_swarm = make_swarm(true, remote_pset.1, super::Config::default()); + + // We now schedule a request, dial the remote and let the two swarm work it out: + remote_swarm.request(request).unwrap(); + Swarm::dial_addr(&mut remote_swarm, local_listen_addr).unwrap(); + + runtime.spawn(local_swarm.for_each(|_| Ok(())) + .join(remote_swarm.for_each(|_| Ok(()))) + .map(|_| ()) + .map_err(|e| panic!("{}", e))); + } + + #[test] + fn send_receive_call() { + let chan = oneshot::channel(); + let request = fetcher::RemoteCallRequest { + block: Default::default(), + header: dummy_header(), + method: "test".into(), + call_data: vec![], + retry_count: None + }; + let mut runtime = tokio::runtime::Runtime::new().expect("new tokio runtime"); + send_receive(&mut runtime, LightClientRequest::Call(request, chan.0)); + assert_eq!(vec![42], chan.1.wait().unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_execution_proof` + } + + #[test] + fn send_receive_read() { + let chan = oneshot::channel(); + let request = fetcher::RemoteReadRequest { + header: dummy_header(), + block: Default::default(), + key: b":key".to_vec(), + retry_count: None + }; + let mut runtime = tokio::runtime::Runtime::new().expect("new tokio runtime"); + send_receive(&mut runtime, LightClientRequest::Read(request, chan.0)); + assert_eq!(Some(vec![42]), chan.1.wait().unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_read_proof` + } + + // At the moment, remote read child requests are ignored. + // Cf. https://github.com/paritytech/substrate/blob/9b3e9f/core/network/src/protocol.rs#L550 + #[test] + #[ignore] + fn send_receive_read_child() { + let chan = oneshot::channel(); + let request = fetcher::RemoteReadChildRequest { + header: dummy_header(), + block: Default::default(), + storage_key: b":child_storage:sub".to_vec(), + key: b":key".to_vec(), + retry_count: None + }; + let mut runtime = tokio::runtime::Runtime::new().expect("new tokio runtime"); + send_receive(&mut runtime, LightClientRequest::ReadChild(request, chan.0)); + assert_eq!(Some(vec![42]), chan.1.wait().unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_read_child_proof` + } + + #[test] + fn send_receive_header() { + let _ = env_logger::try_init(); + let chan = oneshot::channel(); + let request = fetcher::RemoteHeaderRequest { + cht_root: Default::default(), + block: 1, + retry_count: None + }; + let mut runtime = tokio::runtime::Runtime::new().expect("new tokio runtime"); + send_receive(&mut runtime, LightClientRequest::Header(request, chan.0)); + // The remote does not know block 1: + assert_matches!(chan.1.wait().unwrap(), Err(ClientError::RemoteFetchFailed)); + } + + #[test] + fn send_receive_changes() { + let chan = oneshot::channel(); + let request = fetcher::RemoteChangesRequest { + changes_trie_config: changes_trie_config(), + first_block: (1, Default::default()), + last_block: (100, Default::default()), + max_block: (100, Default::default()), + tries_roots: (1, Default::default(), Vec::new()), + key: Vec::new(), + retry_count: None + }; + let mut runtime = tokio::runtime::Runtime::new().expect("new tokio runtime"); + send_receive(&mut runtime, LightClientRequest::Changes(request, chan.0)); + assert_eq!(vec![(100, 2)], chan.1.wait().unwrap().unwrap()); + // ^--- from `DummyFetchChecker::check_changes_proof` + } +} diff --git a/core/network/src/protocol/light_dispatch.rs b/core/network/src/protocol/light_dispatch.rs index e56ed3a08eee9..4dc9ca43b1094 100644 --- a/core/network/src/protocol/light_dispatch.rs +++ b/core/network/src/protocol/light_dispatch.rs @@ -39,7 +39,7 @@ const REQUEST_TIMEOUT: Duration = Duration::from_secs(15); /// Default request retry count. const RETRY_COUNT: usize = 1; /// Reputation change for a peer when a request timed out. -const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); +pub(crate) const TIMEOUT_REPUTATION_CHANGE: i32 = -(1 << 8); /// Trait used by the `LightDispatch` service to communicate messages back to the network. pub trait LightDispatchNetwork { @@ -676,7 +676,7 @@ pub mod tests { use super::{REQUEST_TIMEOUT, LightDispatch, LightDispatchNetwork, RequestData}; use test_client::runtime::{changes_trie_config, Block, Extrinsic, Header}; - struct DummyFetchChecker { ok: bool } + pub(crate) struct DummyFetchChecker { pub(crate) ok: bool } impl FetchChecker for DummyFetchChecker { fn check_header_proof( @@ -759,7 +759,7 @@ pub mod tests { }); } - fn dummy_header() -> Header { + pub(crate) fn dummy_header() -> Header { Header { parent_hash: Default::default(), number: 0, diff --git a/core/network/src/protocol/schema/api.v1.proto b/core/network/src/protocol/schema/api.v1.proto new file mode 100644 index 0000000000000..2f9de4dd40c7d --- /dev/null +++ b/core/network/src/protocol/schema/api.v1.proto @@ -0,0 +1,59 @@ +// Schema definition for block request/response messages. + +syntax = "proto3"; + +package api.v1; + +// Block enumeration direction. +enum Direction { + // Enumerate in ascending order (from child to parent). + Ascending = 0; + // Enumerate in descendfing order (from parent to canonical child). + Descending = 1; +} + +// Request block data from a peer. +message BlockRequest { + // Unique request id. + uint64 id = 1; + // Bits of block data to request. + uint32 fields = 2; + // Start from this block. + oneof from_block { + // Start with given hash. + bytes hash = 3; + // Start with given block number. + bytes number = 4; + } + // End at this block. An implementation defined maximum is used when unspecified. + bytes to_block = 5; + // Sequence direction. + Direction direction = 6; + // Maximum number of blocks to return. An implementation defined maximum is used when unspecified. + uint32 max_blocks = 7; +} + +// Response to `BlockRequest` +message BlockResponse { + // Id of a request this response was made for. + uint64 id = 1; + // Block data for the requested sequence. + repeated BlockData blocks = 2; +} + +// Block data sent in the response. +message BlockData { + // Block header hash. + bytes hash = 1; + // Block header if requested. + bytes header = 2; + // Block body if requested. + repeated bytes body = 3; + // Block receipt if requested. + bytes receipt = 4; + // Block message queue if requested. + bytes message_queue = 5; + // Justification if requested. + bytes justification = 6; +} + diff --git a/core/network/src/protocol/schema/light.v1.proto b/core/network/src/protocol/schema/light.v1.proto new file mode 100644 index 0000000000000..50e78176536e4 --- /dev/null +++ b/core/network/src/protocol/schema/light.v1.proto @@ -0,0 +1,121 @@ +// Schema definition for light client messages. + +syntax = "proto3"; + +package api.v1.light; + +// A pair of arbitrary bytes. +message Pair { + // The first element of the pair. + bytes fst = 1; + // The second element of the pair. + bytes snd = 2; +} + +// Enumerate all possible light client request messages. +message Request { + // Unique request id. + uint64 id = 1; + oneof request { + RemoteCallRequest remote_call_request = 2; + RemoteReadRequest remote_read_request = 3; + RemoteHeaderRequest remote_header_request = 4; + RemoteReadChildRequest remote_read_child_request = 5; + RemoteChangesRequest remote_changes_request = 6; + } +} + +// Enumerate all possible light client response messages. +message Response { + /// Id of a request this response was made for. + uint64 id = 1; + oneof response { + RemoteCallResponse remote_call_response = 2; + RemoteReadResponse remote_read_response = 3; + RemoteHeaderResponse remote_header_response = 4; + RemoteChangesResponse remote_changes_response = 6; + } +} + +// Remote call request. +message RemoteCallRequest { + // Block at which to perform call. + bytes block = 2; + // Method name. + string method = 3; + // Call data. + bytes data = 4; +} + +// Remote call response. +message RemoteCallResponse { + // Execution proof. + repeated bytes proof = 2; +} + +// Remote storage read request. +message RemoteReadRequest { + // Block at which to perform call. + bytes block = 2; + // Storage key. + bytes key = 3; +} + +// Remote read response. +message RemoteReadResponse { + // Read proof. + repeated bytes proof = 2; +} + +// Remote storage read child request. +message RemoteReadChildRequest { + // Block at which to perform call. + bytes block = 2; + // Child Storage key. + bytes storage_key = 3; + // Storage key. + bytes key = 4; +} + +// Remote header request. +message RemoteHeaderRequest { + // Block number to request header for. + bytes block =2; +} + +// Remote header response. +message RemoteHeaderResponse { + // Header. None if proof generation has failed (e.g. header is unknown). + bytes header = 2; + // Header proof. + repeated bytes proof = 3; +} + +/// Remote changes request. +message RemoteChangesRequest { + /// Hash of the first block of the range (including first) where changes are requested. + bytes first = 2; + /// Hash of the last block of the range (including last) where changes are requested. + bytes last = 3; + /// Hash of the first block for which the requester has the changes trie root. All other + /// affected roots must be proved. + bytes min = 4; + /// Hash of the last block that we can use when querying changes. + bytes max = 5; + /// Storage key which changes are requested. + bytes key = 6; +} + +// Remote changes response. +message RemoteChangesResponse { + // Proof has been generated using block with this number as a max block. Should be + // less than or equal to the RemoteChangesRequest::max block number. + bytes max = 2; + // Changes proof. + repeated bytes proof = 3; + // Changes tries roots missing on the requester' node. + repeated Pair roots = 4; + // Missing changes tries roots proof. + repeated bytes roots_proof = 5; +} + diff --git a/core/network/src/service.rs b/core/network/src/service.rs index c3f773e232e7a..da5eb3d07a269 100644 --- a/core/network/src/service.rs +++ b/core/network/src/service.rs @@ -113,9 +113,7 @@ impl, H: ExHashT> NetworkWorker /// Returns a `NetworkWorker` that implements `Future` and must be regularly polled in order /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. - pub fn new( - params: Params, - ) -> Result, Error> { + pub fn new(params: Params) -> Result, Error> { let (to_worker, from_worker) = mpsc::unbounded(); if let Some(ref path) = params.network_config.net_config_path { @@ -167,14 +165,18 @@ impl, H: ExHashT> NetworkWorker let num_connected = Arc::new(AtomicUsize::new(0)); let is_major_syncing = Arc::new(AtomicBool::new(false)); + + let checker = params.on_demand.as_ref() + .map(|od| od.checker().clone()) + .unwrap_or(Arc::new(AlwaysBadChecker)); + let (protocol, peerset_handle) = Protocol::new( protocol::ProtocolConfig { roles: params.roles }, - params.chain, - params.on_demand.as_ref().map(|od| od.checker().clone()) - .unwrap_or(Arc::new(AlwaysBadChecker)), + params.chain.clone(), + checker.clone(), params.specialization, params.transaction_pool, - params.finality_proof_provider, + params.finality_proof_provider.clone(), params.finality_proof_request_builder, params.protocol_id, peerset_config, @@ -187,6 +189,14 @@ impl, H: ExHashT> NetworkWorker params.network_config.client_version, params.network_config.node_name ); + let block_requests = { + let config = protocol::block_requests::Config::default(); + protocol::BlockRequests::new(config, params.chain.clone()) + }; + let light_client_handler = { + let config = protocol::light_client_handler::Config::default(); + protocol::LightClientHandler::new(config, params.chain, checker, peerset_handle.clone()) + }; let behaviour = Behaviour::new( protocol, user_agent, @@ -195,7 +205,9 @@ impl, H: ExHashT> NetworkWorker match params.network_config.transport { TransportConfig::MemoryOnly => false, TransportConfig::Normal { enable_mdns, .. } => enable_mdns, - } + }, + block_requests, + light_client_handler ); let (transport, bandwidth) = { let (config_mem, config_wasm) = match params.network_config.transport {