diff --git a/client/authority-discovery/Cargo.toml b/client/authority-discovery/Cargo.toml index d335800d105bc..6b619ecb809a0 100644 --- a/client/authority-discovery/Cargo.toml +++ b/client/authority-discovery/Cargo.toml @@ -21,9 +21,9 @@ codec = { package = "parity-scale-codec", version = "3", default-features = fals futures = "0.3" futures-timer = "3" ip_network = "0.4.1" -libp2p = { version = "0.51", features = ["kad", "ed25519"] } -multihash = { version = "0.17", default-features = false, features = ["std", "sha2"] } -log = "0.4" +libp2p = { version = "0.52.1", features = ["kad", "ed25519"] } +multihash-codetable = { version = "0.1.0", features = ["sha2", "digest"] } +log = "0.4.17" prost = "0.11" rand = "0.8" thiserror = "1" diff --git a/client/authority-discovery/src/tests.rs b/client/authority-discovery/src/tests.rs index 4fbc196c5ecd1..16700e35ca517 100644 --- a/client/authority-discovery/src/tests.rs +++ b/client/authority-discovery/src/tests.rs @@ -56,7 +56,7 @@ fn get_addresses_and_authority_id() { let remote_addr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333" .parse::() .unwrap() - .with(Protocol::P2p(remote_peer_id.into())); + .with(Protocol::P2p(remote_peer_id)); let test_api = Arc::new(TestApi { authorities: vec![] }); diff --git a/client/authority-discovery/src/worker.rs b/client/authority-discovery/src/worker.rs index a29e74df9accc..2552646c12559 100644 --- a/client/authority-discovery/src/worker.rs +++ b/client/authority-discovery/src/worker.rs @@ -34,8 +34,8 @@ use futures::{channel::mpsc, future, stream::Fuse, FutureExt, Stream, StreamExt} use addr_cache::AddrCache; use codec::{Decode, Encode}; use ip_network::IpNetwork; -use libp2p::{core::multiaddr, identity::PublicKey, multihash::Multihash, Multiaddr, PeerId}; -use multihash::{Code, MultihashDigest}; +use libp2p::{core::multiaddr, identity::PublicKey, Multiaddr}; +use multihash_codetable::{Code, MultihashDigest}; use log::{debug, error, log_enabled}; use prometheus_endpoint::{register, Counter, CounterVec, Gauge, Opts, U64}; @@ -304,7 +304,7 @@ where } fn addresses_to_publish(&self) -> impl Iterator { - let peer_id: Multihash = self.network.local_peer_id().into(); + let peer_id = self.network.local_peer_id(); let publish_non_global_ips = self.publish_non_global_ips; self.network .external_addresses() @@ -529,7 +529,7 @@ where .map_err(Error::ParsingMultiaddress)?; let get_peer_id = |a: &Multiaddr| match a.iter().last() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key).ok(), + Some(multiaddr::Protocol::P2p(peer_id)) => Some(peer_id), _ => None, }; diff --git a/client/authority-discovery/src/worker/addr_cache.rs b/client/authority-discovery/src/worker/addr_cache.rs index 8084b7f0a6dff..ad7bf61d650f0 100644 --- a/client/authority-discovery/src/worker/addr_cache.rs +++ b/client/authority-discovery/src/worker/addr_cache.rs @@ -162,8 +162,8 @@ impl AddrCache { fn peer_id_from_multiaddr(addr: &Multiaddr) -> Option { addr.iter().last().and_then(|protocol| { - if let Protocol::P2p(multihash) = protocol { - PeerId::from_multihash(multihash).ok() + if let Protocol::P2p(peer_id) = protocol { + Some(peer_id) } else { None } @@ -178,7 +178,8 @@ fn addresses_to_peer_ids(addresses: &HashSet) -> HashSet { mod tests { use super::*; - use libp2p::multihash::{self, Multihash}; + use libp2p::multihash::Multihash; + use multihash_codetable::Code; use quickcheck::{Arbitrary, Gen, QuickCheck, TestResult}; use sp_authority_discovery::{AuthorityId, AuthorityPair}; @@ -200,14 +201,13 @@ mod tests { impl Arbitrary for TestMultiaddr { fn arbitrary(g: &mut Gen) -> Self { let seed = (0..32).map(|_| u8::arbitrary(g)).collect::>(); - let peer_id = PeerId::from_multihash( - Multihash::wrap(multihash::Code::Sha2_256.into(), &seed).unwrap(), - ) - .unwrap(); + let peer_id = + PeerId::from_multihash(Multihash::wrap(Code::Sha2_256.into(), &seed).unwrap()) + .unwrap(); let multiaddr = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333" .parse::() .unwrap() - .with(Protocol::P2p(peer_id.into())); + .with(Protocol::P2p(peer_id)); TestMultiaddr(multiaddr) } @@ -219,18 +219,17 @@ mod tests { impl Arbitrary for TestMultiaddrsSamePeerCombo { fn arbitrary(g: &mut Gen) -> Self { let seed = (0..32).map(|_| u8::arbitrary(g)).collect::>(); - let peer_id = PeerId::from_multihash( - Multihash::wrap(multihash::Code::Sha2_256.into(), &seed).unwrap(), - ) - .unwrap(); + let peer_id = + PeerId::from_multihash(Multihash::wrap(Code::Sha2_256.into(), &seed).unwrap()) + .unwrap(); let multiaddr1 = "/ip6/2001:db8:0:0:0:0:0:2/tcp/30333" .parse::() .unwrap() - .with(Protocol::P2p(peer_id.into())); + .with(Protocol::P2p(peer_id)); let multiaddr2 = "/ip6/2002:db8:0:0:0:0:0:2/tcp/30133" .parse::() .unwrap() - .with(Protocol::P2p(peer_id.into())); + .with(Protocol::P2p(peer_id)); TestMultiaddrsSamePeerCombo(multiaddr1, multiaddr2) } } @@ -367,7 +366,7 @@ mod tests { let mut addr_cache = AddrCache::new(); let peer_id = PeerId::random(); - let addr = Multiaddr::empty().with(Protocol::P2p(peer_id.into())); + let addr = Multiaddr::empty().with(Protocol::P2p(peer_id)); let authority_id0 = AuthorityPair::generate().0.public(); let authority_id1 = AuthorityPair::generate().0.public(); diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index c29120881940c..791f013cfedb7 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -415,7 +415,7 @@ fn dont_stop_polling_dht_event_stream_after_bogus_event() { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - address.with(multiaddr::Protocol::P2p(peer_id.into())) + address.with(multiaddr::Protocol::P2p(peer_id)) }; let remote_key_store = MemoryKeystore::new(); let remote_public_key: AuthorityId = remote_key_store @@ -526,7 +526,7 @@ impl DhtValueFoundTester { let address: Multiaddr = format!("/ip6/2001:db8:0:0:0:0:0:{:x}/tcp/30333", idx).parse().unwrap(); - address.with(multiaddr::Protocol::P2p(peer_id.into())) + address.with(multiaddr::Protocol::P2p(peer_id)) } fn process_value_found( @@ -749,7 +749,7 @@ fn lookup_throttling() { let peer_id = PeerId::random(); let address: Multiaddr = "/ip6/2001:db8:0:0:0:0:0:1/tcp/30333".parse().unwrap(); - address.with(multiaddr::Protocol::P2p(peer_id.into())) + address.with(multiaddr::Protocol::P2p(peer_id)) }; let remote_key_store = MemoryKeystore::new(); let remote_public_keys: Vec = (0..20) diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index fe0c709cd1b62..d79f62d189b1c 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -13,23 +13,23 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -array-bytes = "6" -chrono = "0.4" -clap = { version = "4", features = ["derive", "string"] } -fdlimit = "0.2" -futures = "0.3" -libp2p-identity = { version = "0.1", features = ["peerid", "ed25519"]} -log = "0.4" +array-bytes = "6.1" +chrono = "0.4.10" +clap = { version = "4.2.5", features = ["derive", "string"] } +fdlimit = "0.2.1" +futures = "0.3.21" +libp2p-identity = { version = "0.2.0", features = ["peerid", "ed25519"]} +log = "0.4.17" names = { version = "0.14", default-features = false } -parity-scale-codec = "3" -rand = "0.8" -regex = "1" -rpassword = "7" -serde = "1" -serde_json = "1" -thiserror = "1" -tiny-bip39 = "1" -tokio = { version = "1", features = ["signal", "rt-multi-thread", "parking_lot"] } +parity-scale-codec = "3.6.1" +rand = "0.8.5" +regex = "1.6.0" +rpassword = "7.0.0" +serde = "1.0.163" +serde_json = "1.0.85" +thiserror = "1.0.30" +tiny-bip39 = "1.0.0" +tokio = { version = "1.22.0", features = ["signal", "rt-multi-thread", "parking_lot"] } sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-client-db = { version = "0.10.0-dev", default-features = false, path = "../db" } sc-keystore = { version = "4.0.0-dev", path = "../keystore" } diff --git a/client/consensus/common/Cargo.toml b/client/consensus/common/Cargo.toml index 4eb704d84bd45..82319cacb08f9 100644 --- a/client/consensus/common/Cargo.toml +++ b/client/consensus/common/Cargo.toml @@ -13,12 +13,12 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -async-trait = "0.1" -futures = { version = "0.3", features = ["thread-pool"] } -futures-timer = "3" -libp2p-identity = { version = "0.1", features = ["peerid", "ed25519"] } -log = "0.4" -mockall = "0.11" +async-trait = "0.1.57" +futures = { version = "0.3.21", features = ["thread-pool"] } +futures-timer = "3.0.1" +libp2p-identity = { version = "0.2.0", features = ["peerid", "ed25519"] } +log = "0.4.17" +mockall = "0.11.3" parking_lot = "0.12.1" serde = { version = "1", features = ["derive"] } thiserror = "1" diff --git a/client/network-gossip/Cargo.toml b/client/network-gossip/Cargo.toml index f6ba1483b17f8..d49c03c7a9beb 100644 --- a/client/network-gossip/Cargo.toml +++ b/client/network-gossip/Cargo.toml @@ -14,13 +14,14 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -ahash = "0.8" -futures = "0.3" -futures-timer = "3" -libp2p = "0.51" -log = "0.4" -schnellru = "0.2" -tracing = "0.1" +ahash = "0.8.2" +futures = "0.3.21" +futures-timer = "3.0.1" +libp2p-identity = { version = "0.2.0", features = ["peerid", "ed25519"]} +multiaddr = "0.18.0" +log = "0.4.17" +schnellru = "0.2.1" +tracing = "0.1.29" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-network = { version = "0.10.0-dev", path = "../network/" } sc-network-common = { version = "0.10.0-dev", path = "../network/common" } diff --git a/client/network-gossip/src/bridge.rs b/client/network-gossip/src/bridge.rs index 2cd4e18171568..f3ad7983482b2 100644 --- a/client/network-gossip/src/bridge.rs +++ b/client/network-gossip/src/bridge.rs @@ -28,7 +28,7 @@ use futures::{ channel::mpsc::{channel, Receiver, Sender}, prelude::*, }; -use libp2p::PeerId; +use libp2p_identity::PeerId; use log::trace; use prometheus_endpoint::Registry; use sp_runtime::traits::Block as BlockT; @@ -330,12 +330,13 @@ impl futures::future::FusedFuture for GossipEngine { #[cfg(test)] mod tests { use super::*; - use crate::{multiaddr::Multiaddr, ValidationResult, ValidatorContext}; + use crate::{ValidationResult, ValidatorContext}; use futures::{ channel::mpsc::{unbounded, UnboundedSender}, executor::{block_on, block_on_stream}, future::poll_fn, }; + use multiaddr::Multiaddr; use quickcheck::{Arbitrary, Gen, QuickCheck}; use sc_network::{ config::MultiaddrWithPeerId, NetworkBlock, NetworkEventStream, NetworkNotification, diff --git a/client/network-gossip/src/lib.rs b/client/network-gossip/src/lib.rs index ef87dd599e010..d126f85646e6c 100644 --- a/client/network-gossip/src/lib.rs +++ b/client/network-gossip/src/lib.rs @@ -67,7 +67,8 @@ pub use self::{ validator::{DiscardAll, MessageIntent, ValidationResult, Validator, ValidatorContext}, }; -use libp2p::{multiaddr, PeerId}; +use libp2p_identity::PeerId; +use multiaddr::{Multiaddr, Protocol}; use sc_network::{ types::ProtocolName, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, }; @@ -82,8 +83,7 @@ mod validator; /// Abstraction over a network. pub trait Network: NetworkPeers + NetworkEventStream + NetworkNotification { fn add_set_reserved(&self, who: PeerId, protocol: ProtocolName) { - let addr = - iter::once(multiaddr::Protocol::P2p(who.into())).collect::(); + let addr = Multiaddr::empty().with(Protocol::P2p(who)); let result = self.add_peers_to_reserved_set(protocol, iter::once(addr).collect()); if let Err(err) = result { log::error!(target: "gossip", "add_set_reserved failed: {}", err); diff --git a/client/network-gossip/src/state_machine.rs b/client/network-gossip/src/state_machine.rs index f874a5c15b38b..4b474baab76bd 100644 --- a/client/network-gossip/src/state_machine.rs +++ b/client/network-gossip/src/state_machine.rs @@ -19,7 +19,7 @@ use crate::{MessageIntent, Network, ValidationResult, Validator, ValidatorContext}; use ahash::AHashSet; -use libp2p::PeerId; +use libp2p_identity::PeerId; use schnellru::{ByLength, LruMap}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; @@ -521,8 +521,8 @@ impl Metrics { #[cfg(test)] mod tests { use super::*; - use crate::multiaddr::Multiaddr; use futures::prelude::*; + use multiaddr::Multiaddr; use sc_network::{ config::MultiaddrWithPeerId, event::Event, NetworkBlock, NetworkEventStream, NetworkNotification, NetworkPeers, NotificationSenderError, diff --git a/client/network-gossip/src/validator.rs b/client/network-gossip/src/validator.rs index 2272efba50652..04585bb721849 100644 --- a/client/network-gossip/src/validator.rs +++ b/client/network-gossip/src/validator.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use libp2p::PeerId; +use libp2p_identity::PeerId; use sc_network_common::role::ObservedRole; use sp_runtime::traits::Block as BlockT; diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 434b559b9f3c1..c42f0c69f6bc2 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -19,26 +19,28 @@ async-channel = "1" async-trait = "0.1" asynchronous-codec = "0.6" bytes = "1" -codec = { package = "parity-scale-codec", version = "3", features = ["derive"] } -either = "1" -fnv = "1" -futures = "0.3" -futures-timer = "3" -ip_network = "0.4" -libp2p = { version = "0.51", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux", "websocket", "request-response"] } -linked_hash_set = "0.1" -log = "0.4" -mockall = "0.11" -parking_lot = "0.12" -partial_sort = "0.2" -pin-project = "1" -rand = "0.8" -serde = { version = "1", features = ["derive"] } -serde_json = "1" -smallvec = "1" -thiserror = "1" -unsigned-varint = { version = "0.7", features = ["futures", "asynchronous_codec"] } -zeroize = "1" +codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] } +either = "1.5.3" +fnv = "1.0.6" +futures = "0.3.21" +futures-timer = "3.0.2" +ip_network = "0.4.1" +libp2p = { version = "0.52.1", features = ["dns", "identify", "kad", "macros", "mdns", "noise", "ping", "tcp", "tokio", "yamux", "websocket", "request-response"] } +libp2p-kad = { version = "0.44.2" } +linked_hash_set = "0.1.3" +log = "0.4.17" +mockall = "0.11.3" +parking_lot = "0.12.1" +partial_sort = "0.2.0" +pin-project = "1.0.12" +rand = "0.8.5" +serde = { version = "1.0.163", features = ["derive"] } +serde_json = "1.0.85" +smallvec = "1.11.0" +thiserror = "1.0" +unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] } +void = "1" +zeroize = "1.4.3" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../utils/prometheus" } sc-client-api = { version = "4.0.0-dev", path = "../api" } sc-network-common = { version = "0.10.0-dev", path = "./common" } @@ -50,14 +52,14 @@ sp-runtime = { version = "24", path = "../../primitives/runtime" } wasm-timer = "0.2" [dev-dependencies] -assert_matches = "1" -mockall = "0.11" -multistream-select = "0.12" -rand = "0.8" -tempfile = "3" -tokio = { version = "1", features = ["macros"] } -tokio-util = { version = "0.7", features = ["compat"] } -tokio-test = "0.4" +assert_matches = "1.3" +mockall = "0.11.3" +multistream-select = "0.13.0" +rand = "0.8.5" +tempfile = "3.1.0" +tokio = { version = "1.22.0", features = ["macros"] } +tokio-util = { version = "0.7.4", features = ["compat"] } +tokio-test = "0.4.2" sc-network-light = { version = "0.10.0-dev", path = "./light" } sc-network-sync = { version = "0.10.0-dev", path = "./sync" } sp-test-primitives = { version = "2", path = "../../primitives/test-primitives" } diff --git a/client/network/bitswap/Cargo.toml b/client/network/bitswap/Cargo.toml index 73c6265b034fc..46d9cbf8a9251 100644 --- a/client/network/bitswap/Cargo.toml +++ b/client/network/bitswap/Cargo.toml @@ -17,10 +17,10 @@ prost-build = "0.11" [dependencies] async-channel = "1.8.0" -cid = "0.9" -futures = "0.3" -libp2p-identity = { version = "0.1", features = ["peerid"] } -log = "0.4" +cid = "0.9.0" +futures = "0.3.21" +libp2p-identity = { version = "0.2.0", features = ["peerid"] } +log = "0.4.17" prost = "0.11" thiserror = "1.0" unsigned-varint = { version = "0.7.1", features = ["futures", "asynchronous_codec"] } diff --git a/client/network/common/Cargo.toml b/client/network/common/Cargo.toml index f1857516d00bb..fdfa10ba23d1b 100644 --- a/client/network/common/Cargo.toml +++ b/client/network/common/Cargo.toml @@ -21,8 +21,8 @@ bitflags = "1" codec = { package = "parity-scale-codec", version = "3", features = [ "derive", ] } -futures = "0.3" -libp2p-identity = { version = "0.1", features = ["peerid"] } +futures = "0.3.21" +libp2p-identity = { version = "0.2.0", features = ["peerid"] } sc-consensus = { version = "0.10.0-dev", path = "../../consensus/common" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } sp-consensus-grandpa = { version = "4.0.0-dev", path = "../../../primitives/consensus/grandpa" } diff --git a/client/network/light/Cargo.toml b/client/network/light/Cargo.toml index 49d4f378afb0c..16dfa7076644a 100644 --- a/client/network/light/Cargo.toml +++ b/client/network/light/Cargo.toml @@ -21,9 +21,9 @@ array-bytes = "6" codec = { package = "parity-scale-codec", version = "3", features = [ "derive", ] } -futures = "0.3" -libp2p-identity = { version = "0.1", features = ["peerid"] } -log = "0.4" +futures = "0.3.21" +libp2p-identity = { version = "0.2.0", features = ["peerid"] } +log = "0.4.16" prost = "0.11" sp-blockchain = { version = "4.0.0-dev", path = "../../../primitives/blockchain" } sc-client-api = { version = "4.0.0-dev", path = "../../api" } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index a4c19c47c3a80..4799807d2cb33 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -30,8 +30,8 @@ use crate::{ use bytes::Bytes; use futures::channel::oneshot; use libp2p::{ - core::Multiaddr, identify::Info as IdentifyInfo, identity::PublicKey, kad::RecordKey, - swarm::NetworkBehaviour, PeerId, + connection_limits::ConnectionLimits, core::Multiaddr, identify::Info as IdentifyInfo, + identity::PublicKey, kad::RecordKey, swarm::NetworkBehaviour, PeerId, }; use sc_network_common::role::{ObservedRole, Roles}; @@ -42,7 +42,7 @@ pub use crate::request_responses::{InboundFailure, OutboundFailure, RequestId, R /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] -#[behaviour(out_event = "BehaviourOut")] +#[behaviour(to_swarm = "BehaviourOut")] pub struct Behaviour { /// All the substrate-specific protocols. substrate: Protocol, @@ -51,6 +51,8 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + /// Connection limits. + connection_limits: libp2p::connection_limits::Behaviour, /// Generic request-response protocols. request_responses: request_responses::RequestResponsesBehaviour, } @@ -171,11 +173,13 @@ impl Behaviour { disco_config: DiscoveryConfig, request_response_protocols: Vec, peerset: PeersetHandle, + connection_limits: ConnectionLimits, ) -> Result { Ok(Self { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), + connection_limits: libp2p::connection_limits::Behaviour::new(connection_limits), request_responses: request_responses::RequestResponsesBehaviour::new( request_response_protocols.into_iter(), peerset, @@ -247,7 +251,7 @@ impl Behaviour { pub fn add_self_reported_address_to_dht( &mut self, peer_id: &PeerId, - supported_protocols: &[impl AsRef<[u8]>], + supported_protocols: &[impl AsRef], addr: Multiaddr, ) { self.discovery.add_self_reported_address(peer_id, supported_protocols, addr); @@ -351,3 +355,9 @@ impl From for BehaviourOut { } } } + +impl From for BehaviourOut { + fn from(e: void::Void) -> Self { + void::unreachable(e) + } +} diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 4c1247f786fe6..e13b6ac0a370d 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -109,8 +109,7 @@ pub fn parse_str_addr(addr_str: &str) -> Result<(PeerId, Multiaddr), ParseErr> { /// Splits a Multiaddress into a Multiaddress and PeerId. pub fn parse_addr(mut addr: Multiaddr) -> Result<(PeerId, Multiaddr), ParseErr> { let who = match addr.pop() { - Some(multiaddr::Protocol::P2p(key)) => - PeerId::from_multihash(key).map_err(|_| ParseErr::InvalidPeerId)?, + Some(multiaddr::Protocol::P2p(peer_id)) => peer_id, _ => return Err(ParseErr::PeerIdMissing), }; @@ -143,7 +142,7 @@ pub struct MultiaddrWithPeerId { impl MultiaddrWithPeerId { /// Concatenates the multiaddress and peer ID into one multiaddress containing both. pub fn concat(&self) -> Multiaddr { - let proto = multiaddr::Protocol::P2p(From::from(self.peer_id)); + let proto = multiaddr::Protocol::P2p(self.peer_id); self.multiaddr.clone().with(proto) } } @@ -181,8 +180,6 @@ impl TryFrom for MultiaddrWithPeerId { pub enum ParseErr { /// Error while parsing the multiaddress. MultiaddrParse(multiaddr::Error), - /// Multihash of the peer ID is invalid. - InvalidPeerId, /// The peer ID is missing from the address. PeerIdMissing, } @@ -191,7 +188,6 @@ impl fmt::Display for ParseErr { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Self::MultiaddrParse(err) => write!(f, "{}", err), - Self::InvalidPeerId => write!(f, "Peer id at the end of the address is invalid"), Self::PeerIdMissing => write!(f, "Peer id is missing from the address"), } } @@ -201,7 +197,6 @@ impl std::error::Error for ParseErr { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { match self { Self::MultiaddrParse(err) => Some(err), - Self::InvalidPeerId => None, Self::PeerIdMissing => None, } } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index b6cb29584658f..152239987ae84 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -55,7 +55,6 @@ use ip_network::IpNetwork; use libp2p::{ core::{Endpoint, Multiaddr}, kad::{ - handler::KademliaHandler, record::store::{MemoryStore, RecordStore}, GetClosestPeersError, GetRecordOk, Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryId, QueryResult, Quorum, Record, RecordKey, @@ -65,10 +64,10 @@ use libp2p::{ swarm::{ behaviour::{ toggle::{Toggle, ToggleConnectionHandler}, - DialFailure, FromSwarm, NewExternalAddr, + DialFailure, ExternalAddrConfirmed, FromSwarm, }, - ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters, THandler, - THandlerInEvent, THandlerOutEvent, ToSwarm, + ConnectionDenied, ConnectionId, DialError, NetworkBehaviour, PollParameters, + StreamProtocol, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, PeerId, }; @@ -104,7 +103,7 @@ pub struct DiscoveryConfig { discovery_only_if_under_num: u64, enable_mdns: bool, kademlia_disjoint_query_paths: bool, - kademlia_protocols: Vec>, + kademlia_protocols: Vec, kademlia_replication_factor: NonZeroUsize, } @@ -353,7 +352,7 @@ impl DiscoveryBehaviour { pub fn add_self_reported_address( &mut self, peer_id: &PeerId, - supported_protocols: &[impl AsRef<[u8]>], + supported_protocols: &[impl AsRef], addr: Multiaddr, ) { if let Some(kademlia) = self.kademlia.as_mut() { @@ -372,7 +371,7 @@ impl DiscoveryBehaviour { trace!( target: "sub-libp2p", "Adding self-reported address {} from {} to Kademlia DHT {}.", - addr, peer_id, String::from_utf8_lossy(matching_protocol.as_ref()), + addr, peer_id, matching_protocol.as_ref(), ); kademlia.add_address(peer_id, addr.clone()); } else { @@ -498,8 +497,9 @@ pub enum DiscoveryOut { } impl NetworkBehaviour for DiscoveryBehaviour { - type ConnectionHandler = ToggleConnectionHandler>; - type OutEvent = DiscoveryOut; + type ConnectionHandler = + ToggleConnectionHandler< as NetworkBehaviour>::ConnectionHandler>; + type ToSwarm = DiscoveryOut; fn handle_established_inbound_connection( &mut self, @@ -629,11 +629,11 @@ impl NetworkBehaviour for DiscoveryBehaviour { FromSwarm::ListenerError(e) => { self.kademlia.on_swarm_event(FromSwarm::ListenerError(e)); }, - FromSwarm::ExpiredExternalAddr(e) => { + FromSwarm::ExternalAddrExpired(e) => { // We intentionally don't remove the element from `known_external_addresses` in // order to not print the log line again. - self.kademlia.on_swarm_event(FromSwarm::ExpiredExternalAddr(e)); + self.kademlia.on_swarm_event(FromSwarm::ExternalAddrExpired(e)); }, FromSwarm::NewListener(e) => { self.kademlia.on_swarm_event(FromSwarm::NewListener(e)); @@ -641,8 +641,21 @@ impl NetworkBehaviour for DiscoveryBehaviour { FromSwarm::ExpiredListenAddr(e) => { self.kademlia.on_swarm_event(FromSwarm::ExpiredListenAddr(e)); }, - FromSwarm::NewExternalAddr(e @ NewExternalAddr { addr }) => { - let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id.into())); + FromSwarm::NewExternalAddrCandidate(e) => { + self.kademlia.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e)); + }, + FromSwarm::AddressChange(e) => { + self.kademlia.on_swarm_event(FromSwarm::AddressChange(e)); + }, + FromSwarm::NewListenAddr(e) => { + self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e)); + + if let Some(ref mut mdns) = self.mdns { + mdns.on_swarm_event(FromSwarm::NewListenAddr(e)); + } + }, + FromSwarm::ExternalAddrConfirmed(e @ ExternalAddrConfirmed { addr }) => { + let new_addr = addr.clone().with(Protocol::P2p(self.local_peer_id)); if Self::can_add_to_dht(addr) { // NOTE: we might re-discover the same address multiple times @@ -656,17 +669,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } - self.kademlia.on_swarm_event(FromSwarm::NewExternalAddr(e)); - }, - FromSwarm::AddressChange(e) => { - self.kademlia.on_swarm_event(FromSwarm::AddressChange(e)); - }, - FromSwarm::NewListenAddr(e) => { - self.kademlia.on_swarm_event(FromSwarm::NewListenAddr(e)); - - if let Some(ref mut mdns) = self.mdns { - mdns.on_swarm_event(FromSwarm::NewListenAddr(e)); - } + self.kademlia.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); }, } } @@ -684,7 +687,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { // Immediately process the content of `discovered`. if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ToSwarm::GenerateEvent(ev)) @@ -895,10 +898,17 @@ impl NetworkBehaviour for DiscoveryBehaviour { ToSwarm::Dial { opts } => return Poll::Ready(ToSwarm::Dial { opts }), ToSwarm::NotifyHandler { peer_id, handler, event } => return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), - ToSwarm::ReportObservedAddr { address, score } => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), ToSwarm::CloseConnection { peer_id, connection } => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + ToSwarm::NewExternalAddrCandidate(observed) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + ToSwarm::ExternalAddrConfirmed(addr) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + ToSwarm::ExternalAddrExpired(addr) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => + return Poll::Ready(ToSwarm::RemoveListener { id }), } } @@ -912,8 +922,10 @@ impl NetworkBehaviour for DiscoveryBehaviour { continue } - self.pending_events - .extend(list.map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id))); + self.pending_events.extend( + list.into_iter() + .map(|(peer_id, _)| DiscoveryOut::Discovered(peer_id)), + ); if let Some(ev) = self.pending_events.pop_front() { return Poll::Ready(ToSwarm::GenerateEvent(ev)) } @@ -926,10 +938,17 @@ impl NetworkBehaviour for DiscoveryBehaviour { ToSwarm::NotifyHandler { event, .. } => match event {}, /* `event` is an */ // enum with no // variant - ToSwarm::ReportObservedAddr { address, score } => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), ToSwarm::CloseConnection { peer_id, connection } => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + ToSwarm::NewExternalAddrCandidate(observed) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + ToSwarm::ExternalAddrConfirmed(addr) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + ToSwarm::ExternalAddrExpired(addr) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + ToSwarm::ListenOn { opts } => return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => + return Poll::Ready(ToSwarm::RemoveListener { id }), } } } @@ -939,21 +958,23 @@ impl NetworkBehaviour for DiscoveryBehaviour { } /// Legacy (fallback) Kademlia protocol name based on `protocol_id`. -fn legacy_kademlia_protocol_name(id: &ProtocolId) -> Vec { - let mut v = vec![b'/']; - v.extend_from_slice(id.as_ref().as_bytes()); - v.extend_from_slice(b"/kad"); - v +fn legacy_kademlia_protocol_name(id: &ProtocolId) -> StreamProtocol { + let name = format!("/{}/kad", id.as_ref()); + StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed") } /// Kademlia protocol name based on `genesis_hash` and `fork_id`. -fn kademlia_protocol_name>(genesis_hash: Hash, fork_id: Option<&str>) -> Vec { +fn kademlia_protocol_name>( + genesis_hash: Hash, + fork_id: Option<&str>, +) -> StreamProtocol { let genesis_hash_hex = bytes2hex("", genesis_hash.as_ref()); - if let Some(fork_id) = fork_id { - format!("/{}/{}/kad", genesis_hash_hex, fork_id).as_bytes().into() + let name = if let Some(fork_id) = fork_id { + format!("/{}/{}/kad", genesis_hash_hex, fork_id) } else { - format!("/{}/kad", genesis_hash_hex).as_bytes().into() - } + format!("/{}/kad", genesis_hash_hex) + }; + StreamProtocol::try_from_owned(name).expect("protocol name is valid. qed") } #[cfg(test)] @@ -969,7 +990,7 @@ mod tests { upgrade, }, identity::Keypair, - noise, + kad, noise, swarm::{Executor, Swarm, SwarmBuilder, SwarmEvent}, yamux, Multiaddr, }; @@ -1023,6 +1044,18 @@ mod tests { TokioExecutor(runtime), ) .build(); + + // Set the Kademlia mode to server so that it can accept incoming requests. + // + // Note: the server mode is set automatically when the node learns its external + // address, but that does not happen in tests => hence we set it manually. + swarm + .behaviour_mut() + .kademlia + .as_mut() + .unwrap() + .set_mode(Some(kad::Mode::Server)); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); @@ -1083,7 +1116,7 @@ mod tests { .add_self_reported_address( &other, &[protocol_name], - addr, + addr.clone(), ); to_discover[swarm_n].remove(&other); diff --git a/client/network/src/peer_info.rs b/client/network/src/peer_info.rs index aab3fc9487e8c..d7f9e6858fa31 100644 --- a/client/network/src/peer_info.rs +++ b/client/network/src/peer_info.rs @@ -31,13 +31,13 @@ use libp2p::{ Info as IdentifyInfo, }, identity::PublicKey, - ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent, Success as PingSuccess}, + ping::{Behaviour as Ping, Config as PingConfig, Event as PingEvent}, swarm::{ behaviour::{ AddressChange, ConnectionClosed, ConnectionEstablished, DialFailure, FromSwarm, ListenFailure, }, - ConnectionDenied, ConnectionHandler, ConnectionId, IntoConnectionHandlerSelect, + ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId, NetworkBehaviour, PollParameters, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, }, Multiaddr, PeerId, @@ -121,13 +121,18 @@ impl PeerInfoBehaviour { /// Inserts a ping time in the cache. Has no effect if we don't have any entry for that node, /// which shouldn't happen. - fn handle_ping_report(&mut self, peer_id: &PeerId, ping_time: Duration) { - trace!(target: "sub-libp2p", "Ping time with {:?}: {:?}", peer_id, ping_time); + fn handle_ping_report( + &mut self, + peer_id: &PeerId, + ping_time: Duration, + connection: ConnectionId, + ) { + trace!(target: "sub-libp2p", "Ping time with {:?} via {:?}: {:?}", peer_id, connection, ping_time); if let Some(entry) = self.nodes_info.get_mut(peer_id) { entry.latest_ping = Some(ping_time); } else { error!(target: "sub-libp2p", - "Received ping from node we're not connected to {:?}", peer_id); + "Received ping from node we're not connected to {:?} via {:?}", peer_id, connection); } } @@ -181,11 +186,11 @@ pub enum PeerInfoEvent { } impl NetworkBehaviour for PeerInfoBehaviour { - type ConnectionHandler = IntoConnectionHandlerSelect< + type ConnectionHandler = ConnectionHandlerSelect< ::ConnectionHandler, ::ConnectionHandler, >; - type OutEvent = PeerInfoEvent; + type ToSwarm = PeerInfoEvent; fn handle_pending_inbound_connection( &mut self, @@ -351,9 +356,9 @@ impl NetworkBehaviour for PeerInfoBehaviour { self.ping.on_swarm_event(FromSwarm::ListenerError(e)); self.identify.on_swarm_event(FromSwarm::ListenerError(e)); }, - FromSwarm::ExpiredExternalAddr(e) => { - self.ping.on_swarm_event(FromSwarm::ExpiredExternalAddr(e)); - self.identify.on_swarm_event(FromSwarm::ExpiredExternalAddr(e)); + FromSwarm::ExternalAddrExpired(e) => { + self.ping.on_swarm_event(FromSwarm::ExternalAddrExpired(e)); + self.identify.on_swarm_event(FromSwarm::ExternalAddrExpired(e)); }, FromSwarm::NewListener(e) => { self.ping.on_swarm_event(FromSwarm::NewListener(e)); @@ -363,9 +368,13 @@ impl NetworkBehaviour for PeerInfoBehaviour { self.ping.on_swarm_event(FromSwarm::ExpiredListenAddr(e)); self.identify.on_swarm_event(FromSwarm::ExpiredListenAddr(e)); }, - FromSwarm::NewExternalAddr(e) => { - self.ping.on_swarm_event(FromSwarm::NewExternalAddr(e)); - self.identify.on_swarm_event(FromSwarm::NewExternalAddr(e)); + FromSwarm::NewExternalAddrCandidate(e) => { + self.ping.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e)); + self.identify.on_swarm_event(FromSwarm::NewExternalAddrCandidate(e)); + }, + FromSwarm::ExternalAddrConfirmed(e) => { + self.ping.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); + self.identify.on_swarm_event(FromSwarm::ExternalAddrConfirmed(e)); }, FromSwarm::AddressChange(e @ AddressChange { peer_id, old, new, .. }) => { self.ping.on_swarm_event(FromSwarm::AddressChange(e)); @@ -408,13 +417,13 @@ impl NetworkBehaviour for PeerInfoBehaviour { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { loop { match self.ping.poll(cx, params) { Poll::Pending => break, Poll::Ready(ToSwarm::GenerateEvent(ev)) => { - if let PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } = ev { - self.handle_ping_report(&peer, rtt) + if let PingEvent { peer, result: Ok(rtt), connection } = ev { + self.handle_ping_report(&peer, rtt, connection) } }, Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), @@ -424,10 +433,18 @@ impl NetworkBehaviour for PeerInfoBehaviour { handler, event: Either::Left(event), }), - Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + Poll::Ready(ToSwarm::ListenOn { opts }) => + return Poll::Ready(ToSwarm::ListenOn { opts }), + Poll::Ready(ToSwarm::RemoveListener { id }) => + return Poll::Ready(ToSwarm::RemoveListener { id }), } } @@ -453,10 +470,18 @@ impl NetworkBehaviour for PeerInfoBehaviour { handler, event: Either::Right(event), }), - Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + Poll::Ready(ToSwarm::ListenOn { opts }) => + return Poll::Ready(ToSwarm::ListenOn { opts }), + Poll::Ready(ToSwarm::RemoveListener { id }) => + return Poll::Ready(ToSwarm::RemoveListener { id }), } } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index e57bc3e520be4..20f5707e2c764 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -353,7 +353,7 @@ pub enum CustomMessageOutcome { impl NetworkBehaviour for Protocol { type ConnectionHandler = ::ConnectionHandler; - type OutEvent = CustomMessageOutcome; + type ToSwarm = CustomMessageOutcome; fn handle_established_inbound_connection( &mut self, @@ -414,7 +414,7 @@ impl NetworkBehaviour for Protocol { &mut self, cx: &mut std::task::Context, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { while let Poll::Ready(Some(validation_result)) = self.sync_substream_validations.poll_next_unpin(cx) { @@ -438,10 +438,18 @@ impl NetworkBehaviour for Protocol { Poll::Ready(ToSwarm::Dial { opts }) => return Poll::Ready(ToSwarm::Dial { opts }), Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }) => return Poll::Ready(ToSwarm::NotifyHandler { peer_id, handler, event }), - Poll::Ready(ToSwarm::ReportObservedAddr { address, score }) => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }) => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + Poll::Ready(ToSwarm::ExternalAddrExpired(addr)) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + Poll::Ready(ToSwarm::ListenOn { opts }) => + return Poll::Ready(ToSwarm::ListenOn { opts }), + Poll::Ready(ToSwarm::RemoveListener { id }) => + return Poll::Ready(ToSwarm::RemoveListener { id }), }; let outcome = match event { diff --git a/client/network/src/protocol/notifications/behaviour.rs b/client/network/src/protocol/notifications/behaviour.rs index 1659626cae4c8..0927ecf318be9 100644 --- a/client/network/src/protocol/notifications/behaviour.rs +++ b/client/network/src/protocol/notifications/behaviour.rs @@ -997,7 +997,7 @@ impl Notifications { impl NetworkBehaviour for Notifications { type ConnectionHandler = NotifsHandler; - type OutEvent = NotificationsOut; + type ToSwarm = NotificationsOut; fn handle_pending_inbound_connection( &mut self, @@ -1466,10 +1466,11 @@ impl NetworkBehaviour for Notifications { FromSwarm::ListenerClosed(_) => {}, FromSwarm::ListenFailure(_) => {}, FromSwarm::ListenerError(_) => {}, - FromSwarm::ExpiredExternalAddr(_) => {}, + FromSwarm::ExternalAddrExpired(_) => {}, FromSwarm::NewListener(_) => {}, FromSwarm::ExpiredListenAddr(_) => {}, - FromSwarm::NewExternalAddr(_) => {}, + FromSwarm::NewExternalAddrCandidate(_) => {}, + FromSwarm::ExternalAddrConfirmed(_) => {}, FromSwarm::AddressChange(_) => {}, FromSwarm::NewListenAddr(_) => {}, } @@ -2004,7 +2005,7 @@ impl NetworkBehaviour for Notifications { &mut self, cx: &mut Context, _params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { if let Some(event) = self.events.pop_front() { return Poll::Ready(event) } @@ -2104,7 +2105,6 @@ impl NetworkBehaviour for Notifications { mod tests { use super::*; use crate::{peerset::IncomingIndex, protocol::notifications::handler::tests::*}; - use libp2p::swarm::AddressRecord; use std::{collections::HashSet, iter}; impl PartialEq for ConnectionState { @@ -2123,31 +2123,14 @@ mod tests { } #[derive(Clone)] - struct MockPollParams { - peer_id: PeerId, - addr: Multiaddr, - } + struct MockPollParams {} impl PollParameters for MockPollParams { type SupportedProtocolsIter = std::vec::IntoIter>; - type ListenedAddressesIter = std::vec::IntoIter; - type ExternalAddressesIter = std::vec::IntoIter; fn supported_protocols(&self) -> Self::SupportedProtocolsIter { vec![].into_iter() } - - fn listened_addresses(&self) -> Self::ListenedAddressesIter { - vec![self.addr.clone()].into_iter() - } - - fn external_addresses(&self) -> Self::ExternalAddressesIter { - vec![].into_iter() - } - - fn local_peer_id(&self) -> &PeerId { - &self.peer_id - } } fn development_notifs() -> (Notifications, crate::peerset::PeersetHandle) { @@ -3011,7 +2994,7 @@ mod tests { notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure { peer_id: Some(peer), - error: &libp2p::swarm::DialError::Banned, + error: &libp2p::swarm::DialError::Aborted, connection_id: ConnectionId::new_unchecked(1337), })); @@ -3548,7 +3531,7 @@ mod tests { let now = Instant::now(); notif.on_swarm_event(FromSwarm::DialFailure(libp2p::swarm::behaviour::DialFailure { peer_id: Some(peer), - error: &libp2p::swarm::DialError::Banned, + error: &libp2p::swarm::DialError::Aborted, connection_id: ConnectionId::new_unchecked(0), })); @@ -3668,7 +3651,7 @@ mod tests { assert!(notif.peers.get(&(peer, set_id)).is_some()); if tokio::time::timeout(Duration::from_secs(5), async { - let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() }; + let mut params = MockPollParams {}; loop { futures::future::poll_fn(|cx| { @@ -3777,7 +3760,7 @@ mod tests { // verify that the code continues to keep the peer disabled by resetting the timer // after the first one expired. if tokio::time::timeout(Duration::from_secs(5), async { - let mut params = MockPollParams { peer_id: PeerId::random(), addr: Multiaddr::empty() }; + let mut params = MockPollParams {}; loop { futures::future::poll_fn(|cx| { diff --git a/client/network/src/protocol/notifications/handler.rs b/client/network/src/protocol/notifications/handler.rs index 0ac2e250a2ea3..9ac47a93b1dbc 100644 --- a/client/network/src/protocol/notifications/handler.rs +++ b/client/network/src/protocol/notifications/handler.rs @@ -74,8 +74,8 @@ use futures::{ use libp2p::{ core::ConnectedPoint, swarm::{ - handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, - NegotiatedSubstream, SubstreamProtocol, + handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, KeepAlive, Stream, + SubstreamProtocol, }, PeerId, }; @@ -189,7 +189,7 @@ enum State { /// emitted. OpenDesiredByRemote { /// Substream opened by the remote and that hasn't been accepted/rejected yet. - in_substream: NotificationsInSubstream, + in_substream: NotificationsInSubstream, /// See [`State::Closed::pending_opening`]. pending_opening: bool, @@ -202,7 +202,9 @@ enum State { /// be emitted when transitionning to respectively [`State::Open`] or [`State::Closed`]. Opening { /// Substream opened by the remote. If `Some`, has been accepted. - in_substream: Option>, + in_substream: Option>, + /// Is the connection inbound. + inbound: bool, }, /// Protocol is in the "Open" state. @@ -224,14 +226,14 @@ enum State { /// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote /// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been /// emitted. - out_substream: Option>, + out_substream: Option>, /// Substream opened by the remote. /// /// Contrary to the `out_substream` field, operations continue as normal even if the /// substream has been closed by the remote. A `None` is treated the same way as if there /// was an idle substream. - in_substream: Option>, + in_substream: Option>, }, } @@ -435,8 +437,8 @@ pub enum NotifsHandlerError { } impl ConnectionHandler for NotifsHandler { - type InEvent = NotifsHandlerIn; - type OutEvent = NotifsHandlerOut; + type FromBehaviour = NotifsHandlerIn; + type ToBehaviour = NotifsHandlerOut; type Error = NotifsHandlerError; type InboundProtocol = UpgradeCollec; type OutboundProtocol = NotificationsOut; @@ -471,7 +473,7 @@ impl ConnectionHandler for NotifsHandler { match protocol_info.state { State::Closed { pending_opening } => { - self.events_queue.push_back(ConnectionHandlerEvent::Custom( + self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::OpenDesiredByRemote { protocol_index }, )); @@ -536,7 +538,7 @@ impl ConnectionHandler for NotifsHandler { in_substream: in_substream.take(), }; - self.events_queue.push_back(ConnectionHandlerEvent::Custom( + self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::OpenResultOk { protocol_index, negotiated_fallback: new_open.negotiated_fallback, @@ -549,6 +551,8 @@ impl ConnectionHandler for NotifsHandler { } }, ConnectionEvent::AddressChange(_address_change) => {}, + ConnectionEvent::LocalProtocolsChange(_protocols_change) => {}, + ConnectionEvent::RemoteProtocolsChange(_protocols_change) => {}, ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols [dial_upgrade_error.info] .state @@ -563,7 +567,7 @@ impl ConnectionHandler for NotifsHandler { self.protocols[dial_upgrade_error.info].state = State::Closed { pending_opening: false }; - self.events_queue.push_back(ConnectionHandlerEvent::Custom( + self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info }, )); }, @@ -649,7 +653,7 @@ impl ConnectionHandler for NotifsHandler { self.protocols[protocol_index].state = State::Closed { pending_opening: true }; - self.events_queue.push_back(ConnectionHandlerEvent::Custom( + self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::OpenResultErr { protocol_index }, )); }, @@ -659,7 +663,7 @@ impl ConnectionHandler for NotifsHandler { State::Closed { .. } => {}, } - self.events_queue.push_back(ConnectionHandlerEvent::Custom( + self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::CloseResult { protocol_index }, )); }, @@ -684,7 +688,7 @@ impl ConnectionHandler for NotifsHandler { ConnectionHandlerEvent< Self::OutboundProtocol, Self::OutboundOpenInfo, - Self::OutEvent, + Self::ToBehaviour, Self::Error, >, > { @@ -753,7 +757,7 @@ impl ConnectionHandler for NotifsHandler { Poll::Ready(Err(_)) => { *out_substream = None; let event = NotifsHandlerOut::CloseDesired { protocol_index }; - return Poll::Ready(ConnectionHandlerEvent::Custom(event)) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) }, }; }, @@ -775,11 +779,14 @@ impl ConnectionHandler for NotifsHandler { State::Opening { in_substream: None } => {}, State::Open { in_substream: in_substream @ Some(_), .. } => - match Stream::poll_next(Pin::new(in_substream.as_mut().unwrap()), cx) { + match futures::prelude::Stream::poll_next( + Pin::new(in_substream.as_mut().unwrap()), + cx, + ) { Poll::Pending => {}, Poll::Ready(Some(Ok(message))) => { let event = NotifsHandlerOut::Notification { protocol_index, message }; - return Poll::Ready(ConnectionHandlerEvent::Custom(event)) + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event)) }, Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None, }, @@ -791,7 +798,7 @@ impl ConnectionHandler for NotifsHandler { Poll::Ready(Err(_)) => { self.protocols[protocol_index].state = State::Closed { pending_opening: *pending_opening }; - return Poll::Ready(ConnectionHandlerEvent::Custom( + return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( NotifsHandlerOut::CloseDesired { protocol_index }, )) }, @@ -819,22 +826,18 @@ impl ConnectionHandler for NotifsHandler { #[cfg(test)] pub mod tests { use super::*; - use crate::protocol::notifications::upgrade::{ - NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen, - }; - use asynchronous_codec::Framed; - use libp2p::{ - core::muxing::SubstreamBox, - swarm::{handler, ConnectionHandlerUpgrErr}, - Multiaddr, - }; - use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}; + // use crate::protocol::notifications::upgrade::{ + // NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen, + // }; + // use asynchronous_codec::Framed; + // use libp2p::Multiaddr; + // use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version}; use std::{ collections::HashMap, io::{Error, IoSlice, IoSliceMut}, }; use tokio::sync::mpsc; - use unsigned_varint::codec::UviBytes; + // use unsigned_varint::codec::UviBytes; struct OpenSubstream { notifications: stream::Peekable< @@ -933,20 +936,20 @@ pub mod tests { ) } - /// Create new negotiated substream pair. - pub async fn negotiated() -> (Negotiated, Negotiated) { - let (socket1, socket2) = Self::new(); - let socket1 = SubstreamBox::new(socket1); - let socket2 = SubstreamBox::new(socket2); + // /// Create new negotiated substream pair. + // pub async fn negotiated() -> (Negotiated, Negotiated) { + // let (socket1, socket2) = Self::new(); + // let socket1 = SubstreamBox::new(socket1); + // let socket2 = SubstreamBox::new(socket2); - let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; - let (res1, res2) = tokio::join!( - dialer_select_proto(socket1, protos.clone(), Version::V1), - listener_select_proto(socket2, protos), - ); + // let protos = vec![b"/echo/1.0.0", b"/echo/2.5.0"]; + // let (res1, res2) = tokio::join!( + // dialer_select_proto(socket1, protos.clone(), Version::V1), + // listener_select_proto(socket2, protos), + // ); - (res1.unwrap().1, res2.unwrap().1) - } + // (res1.unwrap().1, res2.unwrap().1) + // } } impl AsyncWrite for MockSubstream { @@ -1011,596 +1014,596 @@ pub mod tests { } } - /// Create new [`NotifsHandler`]. - fn notifs_handler() -> NotifsHandler { - let proto = Protocol { - config: ProtocolConfig { - name: "/foo".into(), - fallback_names: vec![], - handshake: Arc::new(RwLock::new(b"hello, world".to_vec())), - max_notification_size: u64::MAX, - }, - in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX), - state: State::Closed { pending_opening: false }, - }; - - NotifsHandler { - protocols: vec![proto], - when_connection_open: Instant::now(), - endpoint: ConnectedPoint::Listener { - local_addr: Multiaddr::empty(), - send_back_addr: Multiaddr::empty(), - }, - peer_id: PeerId::random(), - events_queue: VecDeque::new(), - } - } - - // verify that if another substream is attempted to be opened by remote while an inbound - // substream already exists, the new inbound stream is rejected and closed by the local node. - #[tokio::test] - async fn second_open_desired_by_remote_rejected() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // attempt to open another inbound substream and verify that it is rejected - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the new substream is rejected and closed - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - - if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); - } - - Poll::Ready(()) - }) - .await; - } - - #[tokio::test] - async fn open_rejected_if_substream_is_opening() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // move the handler state to 'Opening' - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - // remote now tries to open another substream, verify that it is rejected and closed - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the new substream is rejected and closed but that the first substream is - // still in correct state - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - - if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); - } else { - panic!("unexpected result"); - } - - Poll::Ready(()) - }) - .await; - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - } - - #[tokio::test] - async fn open_rejected_if_substream_already_open() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // move the handler state to 'Opening' - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - // accept the substream and move its state to `Open` - let (io, _io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_out = NotificationsOutOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - )); - - assert!(std::matches!( - handler.protocols[0].state, - State::Open { in_substream: Some(_), .. } - )); - - // remote now tries to open another substream, verify that it is rejected and closed - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the new substream is rejected and closed but that the first substream is - // still in correct state - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - - if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { - assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); - } else { - panic!("unexpected result"); - } - - Poll::Ready(()) - }) - .await; - assert!(std::matches!( - handler.protocols[0].state, - State::Open { in_substream: Some(_), .. } - )); - } - - #[tokio::test] - async fn fully_negotiated_resets_state_for_closed_substream() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // first instruct the handler to open a connection and then close it right after - // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // verify that if the the outbound substream is successfully negotiated, the state is not - // changed as the substream was commanded to be closed by the handler. - let (io, _io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_out = NotificationsOutOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - )); - - assert!(std::matches!( - handler.protocols[0].state, - State::Closed { pending_opening: false } - )); - } - - #[tokio::test] - async fn fully_negotiated_resets_state_for_open_desired_substream() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // first instruct the handler to open a connection and then close it right after - // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // attempt to open another inbound substream and verify that it is rejected - let (io, _io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - assert!(std::matches!( - handler.protocols[0].state, - State::OpenDesiredByRemote { pending_opening: true, .. } - )); - - // verify that if the the outbound substream is successfully negotiated, the state is not - // changed as the substream was commanded to be closed by the handler. - let (io, _io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_out = NotificationsOutOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsOutSubstream::new(Framed::new(io, codec)), - }; - - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( - handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, - )); - - assert!(std::matches!( - handler.protocols[0].state, - State::OpenDesiredByRemote { pending_opening: false, .. } - )); - } - - #[tokio::test] - async fn dial_upgrade_error_resets_closed_outbound_state() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // first instruct the handler to open a connection and then close it right after - // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - // inject dial failure to an already closed substream and verify outbound state is reset - handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( - handler::DialUpgradeError { info: 0, error: ConnectionHandlerUpgrErr::Timeout }, - )); - assert!(std::matches!( - handler.protocols[0].state, - State::Closed { pending_opening: false } - )); - } - - #[tokio::test] - async fn dial_upgrade_error_resets_open_desired_state() { - let mut handler = notifs_handler(); - let (io, mut io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - // verify that the substream is in (partly) opened state - assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); - futures::future::poll_fn(|cx| { - let mut buf = Vec::with_capacity(512); - assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); - Poll::Ready(()) - }) - .await; - - // first instruct the handler to open a connection and then close it right after - // so the handler is in state `Closed { pending_opening: true }` - handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); - assert!(std::matches!( - handler.protocols[0].state, - State::Opening { in_substream: Some(_) } - )); - - handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); - assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); - - let (io, _io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::NotSent, - ), - }; - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - - assert!(std::matches!( - handler.protocols[0].state, - State::OpenDesiredByRemote { pending_opening: true, .. } - )); - - // inject dial failure to an already closed substream and verify outbound state is reset - handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( - handler::DialUpgradeError { info: 0, error: ConnectionHandlerUpgrErr::Timeout }, - )); - assert!(std::matches!( - handler.protocols[0].state, - State::OpenDesiredByRemote { pending_opening: false, .. } - )); - } - - #[tokio::test] - async fn sync_notifications_clogged() { - let mut handler = notifs_handler(); - let (io, _) = MockSubstream::negotiated().await; - let codec = UviBytes::default(); - - let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); - let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1); - let notifications_sink = NotificationsSink { - inner: Arc::new(NotificationsSinkInner { - peer_id: PeerId::random(), - async_channel: FuturesMutex::new(async_tx), - sync_channel: Mutex::new(Some(sync_tx)), - }), - }; - - handler.protocols[0].state = State::Open { - notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), - out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))), - in_substream: None, - }; - - notifications_sink.send_sync_notification(vec![1, 3, 3, 7]); - notifications_sink.send_sync_notification(vec![1, 3, 3, 8]); - notifications_sink.send_sync_notification(vec![1, 3, 3, 9]); - notifications_sink.send_sync_notification(vec![1, 3, 4, 0]); - - futures::future::poll_fn(|cx| { - assert!(std::matches!( - handler.poll(cx), - Poll::Ready(ConnectionHandlerEvent::Close( - NotifsHandlerError::SyncNotificationsClogged, - )) - )); - Poll::Ready(()) - }) - .await; - } - - #[tokio::test] - async fn close_desired_by_remote() { - let mut handler = notifs_handler(); - let (io, io2) = MockSubstream::negotiated().await; - let mut codec = UviBytes::default(); - codec.set_max_len(usize::MAX); - - let notif_in = NotificationsInOpen { - handshake: b"hello, world".to_vec(), - negotiated_fallback: None, - substream: NotificationsInSubstream::new( - Framed::new(io, codec), - NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]), - ), - }; - - // add new inbound substream but close it immediately and verify that correct events are - // emitted - handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( - handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, - )); - drop(io2); - - futures::future::poll_fn(|cx| { - assert!(std::matches!( - handler.poll(cx), - Poll::Ready(ConnectionHandlerEvent::Custom( - NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, - )) - )); - assert!(std::matches!( - handler.poll(cx), - Poll::Ready(ConnectionHandlerEvent::Custom(NotifsHandlerOut::CloseDesired { - protocol_index: 0 - },)) - )); - Poll::Ready(()) - }) - .await; - } + // /// Create new [`NotifsHandler`]. + // fn notifs_handler() -> NotifsHandler { + // let proto = Protocol { + // config: ProtocolConfig { + // name: "/foo".into(), + // fallback_names: vec![], + // handshake: Arc::new(RwLock::new(b"hello, world".to_vec())), + // max_notification_size: u64::MAX, + // }, + // in_upgrade: NotificationsIn::new("/foo", Vec::new(), u64::MAX), + // state: State::Closed { pending_opening: false }, + // }; + + // NotifsHandler { + // protocols: vec![proto], + // when_connection_open: Instant::now(), + // endpoint: ConnectedPoint::Listener { + // local_addr: Multiaddr::empty(), + // send_back_addr: Multiaddr::empty(), + // }, + // peer_id: PeerId::random(), + // events_queue: VecDeque::new(), + // } + // } + + // // verify that if another substream is attempted to be opened by remote while an inbound + // // substream already exists, the new inbound stream is rejected and closed by the local node. + // #[tokio::test] + // async fn second_open_desired_by_remote_rejected() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // attempt to open another inbound substream and verify that it is rejected + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the new substream is rejected and closed + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + + // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); + // } + + // Poll::Ready(()) + // }) + // .await; + // } + + // #[tokio::test] + // async fn open_rejected_if_substream_is_opening() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // move the handler state to 'Opening' + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // // remote now tries to open another substream, verify that it is rejected and closed + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the new substream is rejected and closed but that the first substream is + // // still in correct state + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + + // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,); + // } else { + // panic!("unexpected result"); + // } + + // Poll::Ready(()) + // }) + // .await; + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + // } + + // #[tokio::test] + // async fn open_rejected_if_substream_already_open() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // move the handler state to 'Opening' + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // // accept the substream and move its state to `Open` + // let (io, _io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_out = NotificationsOutOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + // )); + + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Open { in_substream: Some(_), .. } + // )); + + // // remote now tries to open another substream, verify that it is rejected and closed + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the new substream is rejected and closed but that the first substream is + // // still in correct state + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + + // if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) { + // assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof); + // } else { + // panic!("unexpected result"); + // } + + // Poll::Ready(()) + // }) + // .await; + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Open { in_substream: Some(_), .. } + // )); + // } + + // #[tokio::test] + // async fn fully_negotiated_resets_state_for_closed_substream() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // first instruct the handler to open a connection and then close it right after + // // so the handler is in state `Closed { pending_opening: true }` + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // // verify that if the the outbound substream is successfully negotiated, the state is not + // // changed as the substream was commanded to be closed by the handler. + // let (io, _io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_out = NotificationsOutOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + // )); + + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Closed { pending_opening: false } + // )); + // } + + // #[tokio::test] + // async fn fully_negotiated_resets_state_for_open_desired_substream() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // first instruct the handler to open a connection and then close it right after + // // so the handler is in state `Closed { pending_opening: true }` + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // // attempt to open another inbound substream and verify that it is rejected + // let (io, _io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // assert!(std::matches!( + // handler.protocols[0].state, + // State::OpenDesiredByRemote { pending_opening: true, .. } + // )); + + // // verify that if the the outbound substream is successfully negotiated, the state is not + // // changed as the substream was commanded to be closed by the handler. + // let (io, _io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_out = NotificationsOutOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsOutSubstream::new(Framed::new(io, codec)), + // }; + + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound( + // handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 }, + // )); + + // assert!(std::matches!( + // handler.protocols[0].state, + // State::OpenDesiredByRemote { pending_opening: false, .. } + // )); + // } + + // #[tokio::test] + // async fn dial_upgrade_error_resets_closed_outbound_state() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // first instruct the handler to open a connection and then close it right after + // // so the handler is in state `Closed { pending_opening: true }` + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // // inject dial failure to an already closed substream and verify outbound state is reset + // handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( + // handler::DialUpgradeError { info: 0, error: ConnectionHandlerUpgrErr::Timeout }, + // )); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Closed { pending_opening: false } + // )); + // } + + // #[tokio::test] + // async fn dial_upgrade_error_resets_open_desired_state() { + // let mut handler = notifs_handler(); + // let (io, mut io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // // verify that the substream is in (partly) opened state + // assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. })); + // futures::future::poll_fn(|cx| { + // let mut buf = Vec::with_capacity(512); + // assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending)); + // Poll::Ready(()) + // }) + // .await; + + // // first instruct the handler to open a connection and then close it right after + // // so the handler is in state `Closed { pending_opening: true }` + // handler.on_behaviour_event(NotifsHandlerIn::Open { protocol_index: 0 }); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::Opening { in_substream: Some(_), .. } + // )); + + // handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 }); + // assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true })); + + // let (io, _io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::NotSent, + // ), + // }; + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + + // assert!(std::matches!( + // handler.protocols[0].state, + // State::OpenDesiredByRemote { pending_opening: true, .. } + // )); + + // // inject dial failure to an already closed substream and verify outbound state is reset + // handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError( + // handler::DialUpgradeError { info: 0, error: ConnectionHandlerUpgrErr::Timeout }, + // )); + // assert!(std::matches!( + // handler.protocols[0].state, + // State::OpenDesiredByRemote { pending_opening: false, .. } + // )); + // } + + // #[tokio::test] + // async fn sync_notifications_clogged() { + // let mut handler = notifs_handler(); + // let (io, _) = MockSubstream::negotiated().await; + // let codec = UviBytes::default(); + + // let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE); + // let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1); + // let notifications_sink = NotificationsSink { + // inner: Arc::new(NotificationsSinkInner { + // peer_id: PeerId::random(), + // async_channel: FuturesMutex::new(async_tx), + // sync_channel: Mutex::new(Some(sync_tx)), + // }), + // }; + + // handler.protocols[0].state = State::Open { + // notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(), + // out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))), + // in_substream: None, + // }; + + // notifications_sink.send_sync_notification(vec![1, 3, 3, 7]); + // notifications_sink.send_sync_notification(vec![1, 3, 3, 8]); + // notifications_sink.send_sync_notification(vec![1, 3, 3, 9]); + // notifications_sink.send_sync_notification(vec![1, 3, 4, 0]); + + // futures::future::poll_fn(|cx| { + // assert!(std::matches!( + // handler.poll(cx), + // Poll::Ready(ConnectionHandlerEvent::Close( + // NotifsHandlerError::SyncNotificationsClogged, + // )) + // )); + // Poll::Ready(()) + // }) + // .await; + // } + + // #[tokio::test] + // async fn close_desired_by_remote() { + // let mut handler = notifs_handler(); + // let (io, io2) = MockSubstream::negotiated().await; + // let mut codec = UviBytes::default(); + // codec.set_max_len(usize::MAX); + + // let notif_in = NotificationsInOpen { + // handshake: b"hello, world".to_vec(), + // negotiated_fallback: None, + // substream: NotificationsInSubstream::new( + // Framed::new(io, codec), + // NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]), + // ), + // }; + + // // add new inbound substream but close it immediately and verify that correct events are + // // emitted + // handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound( + // handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () }, + // )); + // drop(io2); + + // futures::future::poll_fn(|cx| { + // assert!(std::matches!( + // handler.poll(cx), + // Poll::Ready(ConnectionHandlerEvent::Custom( + // NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0 }, + // )) + // )); + // assert!(std::matches!( + // handler.poll(cx), + // Poll::Ready(ConnectionHandlerEvent::Custom(NotifsHandlerOut::CloseDesired { + // protocol_index: 0 + // },)) + // )); + // Poll::Ready(()) + // }) + // .await; + // } } diff --git a/client/network/src/protocol/notifications/tests.rs b/client/network/src/protocol/notifications/tests.rs index 0c2eb89262f93..ddeea495a2737 100644 --- a/client/network/src/protocol/notifications/tests.rs +++ b/client/network/src/protocol/notifications/tests.rs @@ -151,7 +151,7 @@ impl std::ops::DerefMut for CustomProtoWithAddr { impl NetworkBehaviour for CustomProtoWithAddr { type ConnectionHandler = ::ConnectionHandler; - type OutEvent = ::OutEvent; + type ToSwarm = ::ToSwarm; fn handle_pending_inbound_connection( &mut self, @@ -229,7 +229,7 @@ impl NetworkBehaviour for CustomProtoWithAddr { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { self.inner.poll(cx, params) } } diff --git a/client/network/src/protocol/notifications/upgrade/collec.rs b/client/network/src/protocol/notifications/upgrade/collec.rs index 791821b3f75da..2b26410fbe8b6 100644 --- a/client/network/src/protocol/notifications/upgrade/collec.rs +++ b/client/network/src/protocol/notifications/upgrade/collec.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use futures::prelude::*; -use libp2p::core::upgrade::{InboundUpgrade, ProtocolName, UpgradeInfo}; +use libp2p::core::upgrade::{InboundUpgrade, UpgradeInfo}; use std::{ iter::FromIterator, pin::Pin, @@ -76,9 +76,9 @@ where #[derive(Debug, Clone, PartialEq)] pub struct ProtoNameWithUsize(T, usize); -impl ProtocolName for ProtoNameWithUsize { - fn protocol_name(&self) -> &[u8] { - self.0.protocol_name() +impl> AsRef for ProtoNameWithUsize { + fn as_ref(&self) -> &str { + self.0.as_ref() } } @@ -104,13 +104,13 @@ impl>, O, E> Future for FutWithUsize { mod tests { use super::*; use crate::types::ProtocolName as ProtoName; - use libp2p::core::upgrade::{ProtocolName, UpgradeInfo}; + use libp2p::core::upgrade::UpgradeInfo; // TODO: move to mocks mockall::mock! { pub ProtocolUpgrade {} - impl UpgradeInfo for ProtocolUpgrade { + impl> UpgradeInfo for ProtocolUpgrade { type Info = T; type InfoIter = vec::IntoIter; fn protocol_info(&self) -> vec::IntoIter; diff --git a/client/network/src/protocol/notifications/upgrade/notifications.rs b/client/network/src/protocol/notifications/upgrade/notifications.rs index 4e1c033f33b68..0148fd22bd883 100644 --- a/client/network/src/protocol/notifications/upgrade/notifications.rs +++ b/client/network/src/protocol/notifications/upgrade/notifications.rs @@ -114,13 +114,6 @@ pub struct NotificationsOutSubstream { socket: Framed>>>, } -#[cfg(test)] -impl NotificationsOutSubstream { - pub fn new(socket: Framed>>>) -> Self { - Self { socket } - } -} - impl NotificationsIn { /// Builds a new potential upgrade. pub fn new( @@ -203,13 +196,13 @@ impl NotificationsInSubstream where TSubstream: AsyncRead + AsyncWrite + Unpin, { - #[cfg(test)] - pub fn new( - socket: Framed>>>, - handshake: NotificationsInSubstreamHandshake, - ) -> Self { - Self { socket, handshake } - } + // #[cfg(test)] + // pub fn new( + // socket: Framed>>>, + // handshake: NotificationsInSubstreamHandshake, + // ) -> Self { + // Self { socket, handshake } + // } /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) { @@ -498,41 +491,92 @@ pub enum NotificationsOutError { #[cfg(test)] mod tests { - use super::{NotificationsIn, NotificationsInOpen, NotificationsOut, NotificationsOutOpen}; - use futures::{channel::oneshot, prelude::*}; - use libp2p::core::upgrade; + use super::*; + use futures::channel::oneshot; + use libp2p::core::{upgrade, InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use tokio::net::{TcpListener, TcpStream}; use tokio_util::compat::TokioAsyncReadCompatExt; + /// Opens a substream to the given address, negotiates the protocol, and returns the substream + /// along with the handshake message. + async fn dial( + addr: std::net::SocketAddr, + handshake: impl Into>, + ) -> Result< + ( + Vec, + NotificationsOutSubstream< + multistream_select::Negotiated>, + >, + ), + NotificationsHandshakeError, + > { + let socket = TcpStream::connect(addr).await.unwrap(); + let notifs_out = NotificationsOut::new("/test/proto/1", Vec::new(), handshake, 1024 * 1024); + let (_, substream) = multistream_select::dialer_select_proto( + socket.compat(), + notifs_out.protocol_info().into_iter(), + upgrade::Version::V1, + ) + .await + .unwrap(); + let NotificationsOutOpen { handshake, substream, .. } = + >::upgrade_outbound( + notifs_out, + substream, + "/test/proto/1".into(), + ) + .await?; + Ok((handshake, substream)) + } + + /// Listens on a localhost, negotiates the protocol, and returns the substream along with the + /// handshake message. + /// + /// Also sends the listener address through the given channel. + async fn listen_on_localhost( + listener_addr_tx: oneshot::Sender, + ) -> Result< + ( + Vec, + NotificationsInSubstream< + multistream_select::Negotiated>, + >, + ), + NotificationsHandshakeError, + > { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); + + let (socket, _) = listener.accept().await.unwrap(); + let notifs_in = NotificationsIn::new("/test/proto/1", Vec::new(), 1024 * 1024); + let (_, substream) = + multistream_select::listener_select_proto(socket.compat(), notifs_in.protocol_info()) + .await + .unwrap(); + let NotificationsInOpen { handshake, substream, .. } = + >::upgrade_inbound( + notifs_in, + substream, + "/test/proto/1".into(), + ) + .await?; + Ok((handshake, substream)) + } + #[tokio::test] async fn basic_works() { - const PROTO_NAME: &str = "/test/proto/1"; let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let client = tokio::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound( - socket.compat(), - NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), - upgrade::Version::V1, - ) - .await - .unwrap(); + let (handshake, mut substream) = + dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await.unwrap(); assert_eq!(handshake, b"hello world"); substream.send(b"test message".to_vec()).await.unwrap(); }); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( - socket.compat(), - NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), - ) - .await - .unwrap(); + let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap(); assert_eq!(handshake, b"initial message"); substream.send_handshake(&b"hello world"[..]); @@ -547,33 +591,17 @@ mod tests { async fn empty_handshake() { // Check that everything still works when the handshake messages are empty. - const PROTO_NAME: &str = "/test/proto/1"; let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let client = tokio::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let NotificationsOutOpen { handshake, mut substream, .. } = upgrade::apply_outbound( - socket.compat(), - NotificationsOut::new(PROTO_NAME, Vec::new(), vec![], 1024 * 1024), - upgrade::Version::V1, - ) - .await - .unwrap(); + let (handshake, mut substream) = + dial(listener_addr_rx.await.unwrap(), vec![]).await.unwrap(); assert!(handshake.is_empty()); substream.send(Default::default()).await.unwrap(); }); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( - socket.compat(), - NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), - ) - .await - .unwrap(); + let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap(); assert!(handshake.is_empty()); substream.send_handshake(vec![]); @@ -586,17 +614,10 @@ mod tests { #[tokio::test] async fn refused() { - const PROTO_NAME: &str = "/test/proto/1"; let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let client = tokio::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let outcome = upgrade::apply_outbound( - socket.compat(), - NotificationsOut::new(PROTO_NAME, Vec::new(), &b"hello"[..], 1024 * 1024), - upgrade::Version::V1, - ) - .await; + let outcome = dial(listener_addr_rx.await.unwrap(), &b"hello"[..]).await; // Despite the protocol negotiation being successfully conducted on the listener // side, we have to receive an error here because the listener didn't send the @@ -604,16 +625,7 @@ mod tests { assert!(outcome.is_err()); }); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let NotificationsInOpen { handshake, substream, .. } = upgrade::apply_inbound( - socket.compat(), - NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), - ) - .await - .unwrap(); + let (handshake, substream) = listen_on_localhost(listener_addr_tx).await.unwrap(); assert_eq!(handshake, b"hello"); @@ -625,35 +637,16 @@ mod tests { #[tokio::test] async fn large_initial_message_refused() { - const PROTO_NAME: &str = "/test/proto/1"; let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let client = tokio::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let ret = upgrade::apply_outbound( - socket.compat(), - // We check that an initial message that is too large gets refused. - NotificationsOut::new( - PROTO_NAME, - Vec::new(), - (0..32768).map(|_| 0).collect::>(), - 1024 * 1024, - ), - upgrade::Version::V1, - ) - .await; + let ret = + dial(listener_addr_rx.await.unwrap(), (0..32768).map(|_| 0).collect::>()) + .await; assert!(ret.is_err()); }); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let ret = upgrade::apply_inbound( - socket.compat(), - NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), - ) - .await; + let ret = listen_on_localhost(listener_addr_tx).await; assert!(ret.is_err()); client.await.unwrap(); @@ -661,30 +654,14 @@ mod tests { #[tokio::test] async fn large_handshake_refused() { - const PROTO_NAME: &str = "/test/proto/1"; let (listener_addr_tx, listener_addr_rx) = oneshot::channel(); let client = tokio::spawn(async move { - let socket = TcpStream::connect(listener_addr_rx.await.unwrap()).await.unwrap(); - let ret = upgrade::apply_outbound( - socket.compat(), - NotificationsOut::new(PROTO_NAME, Vec::new(), &b"initial message"[..], 1024 * 1024), - upgrade::Version::V1, - ) - .await; + let ret = dial(listener_addr_rx.await.unwrap(), &b"initial message"[..]).await; assert!(ret.is_err()); }); - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - listener_addr_tx.send(listener.local_addr().unwrap()).unwrap(); - - let (socket, _) = listener.accept().await.unwrap(); - let NotificationsInOpen { handshake, mut substream, .. } = upgrade::apply_inbound( - socket.compat(), - NotificationsIn::new(PROTO_NAME, Vec::new(), 1024 * 1024), - ) - .await - .unwrap(); + let (handshake, mut substream) = listen_on_localhost(listener_addr_tx).await.unwrap(); assert_eq!(handshake, b"initial message"); // We check that a handshake that is too large gets refused. diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 44e6f588ab520..1937367a7489e 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -331,13 +331,13 @@ impl RequestResponsesBehaviour { ProtocolSupport::Outbound }; - let rq_rp = Behaviour::new( + let rq_rp = Behaviour::with_codec( GenericCodec { max_request_size: protocol.max_request_size, max_response_size: protocol.max_response_size, }, - iter::once(protocol.name.as_bytes().to_vec()) - .chain(protocol.fallback_names.iter().map(|name| name.as_bytes().to_vec())) + iter::once(protocol.name.clone()) + .chain(protocol.fallback_names) .zip(iter::repeat(protocol_support)), cfg, ); @@ -403,7 +403,7 @@ impl RequestResponsesBehaviour { impl NetworkBehaviour for RequestResponsesBehaviour { type ConnectionHandler = MultiHandler as NetworkBehaviour>::ConnectionHandler>; - type OutEvent = Event; + type ToSwarm = Event; fn handle_pending_inbound_connection( &mut self, @@ -519,9 +519,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { for (p, _) in self.protocols.values_mut() { NetworkBehaviour::on_swarm_event(p, FromSwarm::ListenerError(e)); }, - FromSwarm::ExpiredExternalAddr(e) => + FromSwarm::ExternalAddrExpired(e) => for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredExternalAddr(e)); + NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrExpired(e)); }, FromSwarm::NewListener(e) => for (p, _) in self.protocols.values_mut() { @@ -531,9 +531,13 @@ impl NetworkBehaviour for RequestResponsesBehaviour { for (p, _) in self.protocols.values_mut() { NetworkBehaviour::on_swarm_event(p, FromSwarm::ExpiredListenAddr(e)); }, - FromSwarm::NewExternalAddr(e) => + FromSwarm::NewExternalAddrCandidate(e) => for (p, _) in self.protocols.values_mut() { - NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddr(e)); + NetworkBehaviour::on_swarm_event(p, FromSwarm::NewExternalAddrCandidate(e)); + }, + FromSwarm::ExternalAddrConfirmed(e) => + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::on_swarm_event(p, FromSwarm::ExternalAddrConfirmed(e)); }, FromSwarm::AddressChange(e) => for (p, _) in self.protocols.values_mut() { @@ -568,7 +572,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { &mut self, cx: &mut Context, params: &mut impl PollParameters, - ) -> Poll>> { + ) -> Poll>> { 'poll_all: loop { if let Some(message_request) = self.message_request.take() { // Now we can can poll `MessageRequest` until we get the reputation @@ -723,10 +727,18 @@ impl NetworkBehaviour for RequestResponsesBehaviour { handler, event: ((*protocol).to_string(), event), }), - ToSwarm::ReportObservedAddr { address, score } => - return Poll::Ready(ToSwarm::ReportObservedAddr { address, score }), ToSwarm::CloseConnection { peer_id, connection } => return Poll::Ready(ToSwarm::CloseConnection { peer_id, connection }), + ToSwarm::NewExternalAddrCandidate(observed) => + return Poll::Ready(ToSwarm::NewExternalAddrCandidate(observed)), + ToSwarm::ExternalAddrConfirmed(addr) => + return Poll::Ready(ToSwarm::ExternalAddrConfirmed(addr)), + ToSwarm::ExternalAddrExpired(addr) => + return Poll::Ready(ToSwarm::ExternalAddrExpired(addr)), + ToSwarm::ListenOn { opts } => + return Poll::Ready(ToSwarm::ListenOn { opts }), + ToSwarm::RemoveListener { id } => + return Poll::Ready(ToSwarm::RemoveListener { id }), }; match ev { @@ -922,7 +934,7 @@ pub struct GenericCodec { #[async_trait::async_trait] impl Codec for GenericCodec { - type Protocol = Vec; + type Protocol = ProtocolName; type Request = Vec; type Response = Result, ()>; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index b6fda5ab079cd..28ad85074c466 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -55,17 +55,15 @@ use crate::{ use either::Either; use futures::{channel::oneshot, prelude::*}; -#[allow(deprecated)] use libp2p::{ - connection_limits::Exceeded, + connection_limits::{ConnectionLimits, Exceeded}, core::{upgrade, ConnectedPoint, Endpoint}, identify::Info as IdentifyInfo, kad::record::Key as KademliaKey, multiaddr, - ping::Failure as PingFailure, swarm::{ - AddressScore, ConnectionError, ConnectionId, ConnectionLimits, DialError, Executor, - ListenError, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent, THandlerErr, + ConnectionError, ConnectionId, DialError, Executor, ListenError, NetworkBehaviour, Swarm, + SwarmBuilder, SwarmEvent, THandlerErr, }, Multiaddr, PeerId, }; @@ -351,6 +349,11 @@ where discovery_config, request_response_protocols, peerset_handle.clone(), + ConnectionLimits::default() + .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32)) + .with_max_established_incoming(Some( + crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING, + )), ); match result { @@ -374,15 +377,7 @@ where SpawnImpl(params.executor), ) }; - #[allow(deprecated)] let builder = builder - .connection_limits( - ConnectionLimits::default() - .with_max_established_per_peer(Some(crate::MAX_CONNECTIONS_PER_PEER as u32)) - .with_max_established_incoming(Some( - crate::MAX_CONNECTIONS_ESTABLISHED_INCOMING, - )), - ) .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) .notify_handler_buffer_size(NonZeroUsize::new(32).expect("32 != 0; qed")) // NOTE: 24 is somewhat arbitrary and should be tuned in the future if necessary. @@ -414,11 +409,7 @@ where // Add external addresses. for addr in &network_config.public_addresses { - Swarm::>::add_external_address( - &mut swarm, - addr.clone(), - AddressScore::Infinite, - ); + Swarm::>::add_external_address(&mut swarm, addr.clone()); } let external_addresses = Arc::new(Mutex::new(Vec::new())); @@ -602,7 +593,7 @@ where let peer_id = Swarm::>::local_peer_id(swarm).to_base58(); let listened_addresses = swarm.listeners().cloned().collect(); - let external_addresses = swarm.external_addresses().map(|r| &r.addr).cloned().collect(); + let external_addresses = swarm.external_addresses().cloned().collect(); NetworkState { peer_id, @@ -680,8 +671,7 @@ impl NetworkService { .into_iter() .map(|mut addr| { let peer = match addr.pop() { - Some(multiaddr::Protocol::P2p(key)) => PeerId::from_multihash(key) - .map_err(|_| "Invalid PeerId format".to_string())?, + Some(multiaddr::Protocol::P2p(peer_id)) => peer_id, _ => return Err("Missing PeerId from address".to_string()), }; @@ -1197,8 +1187,7 @@ where self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); self.num_connected.store(num_connected_peers, Ordering::Relaxed); { - let external_addresses = - self.network_service.external_addresses().map(|r| &r.addr).cloned().collect(); + let external_addresses = self.network_service.external_addresses().cloned().collect(); *self.external_addresses.lock() = external_addresses; let listen_addresses = @@ -1401,7 +1390,12 @@ where peer_id, info: IdentifyInfo { - protocol_version, agent_version, mut listen_addrs, protocols, .. + protocol_version, + agent_version, + mut listen_addrs, + protocols, + observed_addr, + .. }, }) => { if listen_addrs.len() > 30 { @@ -1413,11 +1407,17 @@ where listen_addrs.truncate(30); } for addr in listen_addrs { - self.network_service - .behaviour_mut() - .add_self_reported_address_to_dht(&peer_id, &protocols, addr); + self.network_service.behaviour_mut().add_self_reported_address_to_dht( + &peer_id, + &protocols, + addr.clone(), + ); } self.network_service.behaviour_mut().user_protocol_mut().add_known_peer(peer_id); + // Confirm the observed address manually since they are no longer trusted by + // default (libp2p >= 0.52) + // TODO: remove this when/if AutoNAT is implemented. + self.network_service.add_external_address(observed_addr); }, SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => { self.network_service.behaviour_mut().user_protocol_mut().add_known_peer(peer_id); @@ -1562,8 +1562,14 @@ where } } }, - SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, num_established } => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); + SwarmEvent::ConnectionClosed { + connection_id, + peer_id, + cause, + endpoint, + num_established, + } => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?} via {:?}: {:?})", peer_id, connection_id, cause); if let Some(metrics) = self.metrics.as_ref() { let direction = match endpoint { ConnectedPoint::Dialer { .. } => "out", @@ -1572,10 +1578,12 @@ where let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(Either::Left(Either::Left( - Either::Right(Either::Left(PingFailure::Timeout)), + Either::Left(Either::Right(_)), )))) => "ping-timeout", Some(ConnectionError::Handler(Either::Left(Either::Left( - Either::Left(NotifsHandlerError::SyncNotificationsClogged), + Either::Left(Either::Left( + NotifsHandlerError::SyncNotificationsClogged, + )), )))) => "sync-notifications-clogged", Some(ConnectionError::Handler(_)) => "protocol-error", Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", @@ -1601,12 +1609,12 @@ where metrics.listeners_local_addresses.dec(); } }, - SwarmEvent::OutgoingConnectionError { peer_id, error } => { + SwarmEvent::OutgoingConnectionError { connection_id, peer_id, error } => { if let Some(peer_id) = peer_id { trace!( target: "sub-libp2p", - "Libp2p => Failed to reach {:?}: {}", - peer_id, error, + "Libp2p => Failed to reach {:?} via {:?}: {}", + peer_id, connection_id, error, ); let not_reported = !self.reported_invalid_boot_nodes.contains(&peer_id); @@ -1644,12 +1652,9 @@ where } else { None }, - DialError::ConnectionLimit(_) => Some("limit-reached"), - DialError::InvalidPeerId(_) | - DialError::WrongPeerId { .. } | - DialError::LocalPeerId { .. } => Some("invalid-peer-id"), + DialError::WrongPeerId { .. } | DialError::LocalPeerId { .. } => + Some("invalid-peer-id"), DialError::Transport(_) => Some("transport-error"), - DialError::Banned | DialError::NoAddresses | DialError::DialPeerConditionFalse(_) | DialError::Aborted => None, // ignore them @@ -1659,21 +1664,26 @@ where } } }, - SwarmEvent::Dialing(peer_id) => { - trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) + SwarmEvent::Dialing { peer_id, connection_id } => { + trace!(target: "sub-libp2p", "Libp2p => Dialing({:?} via {:?})", peer_id, connection_id) }, - SwarmEvent::IncomingConnection { local_addr, send_back_addr } => { - trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", - local_addr, send_back_addr); + SwarmEvent::IncomingConnection { connection_id, local_addr, send_back_addr } => { + trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{} via {:?}))", + local_addr, send_back_addr, connection_id); if let Some(metrics) = self.metrics.as_ref() { metrics.incoming_connections_total.inc(); } }, - SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => { + SwarmEvent::IncomingConnectionError { + connection_id, + local_addr, + send_back_addr, + error, + } => { debug!( target: "sub-libp2p", - "Libp2p => IncomingConnectionError({},{}): {}", - local_addr, send_back_addr, error, + "Libp2p => IncomingConnectionError({},{} via {:?}): {}", + local_addr, send_back_addr, connection_id, error, ); if let Some(metrics) = self.metrics.as_ref() { #[allow(deprecated)] @@ -1684,7 +1694,6 @@ where } else { None }, - ListenError::ConnectionLimit(_) => Some("limit-reached"), ListenError::WrongPeerId { .. } | ListenError::LocalPeerId { .. } => Some("invalid-peer-id"), ListenError::Transport(_) => Some("transport-error"), @@ -1699,17 +1708,6 @@ where } } }, - #[allow(deprecated)] - SwarmEvent::BannedPeer { peer_id, endpoint } => { - debug!( - target: "sub-libp2p", - "Libp2p => BannedPeer({}). Connected via {:?}.", - peer_id, endpoint, - ); - if let Some(metrics) = self.metrics.as_ref() { - metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc(); - } - }, SwarmEvent::ListenerClosed { reason, addresses, .. } => { if let Some(metrics) = self.metrics.as_ref() { metrics.listeners_local_addresses.sub(addresses.len() as u64); diff --git a/client/network/src/types.rs b/client/network/src/types.rs index b0e32ae109149..5444a1e060311 100644 --- a/client/network/src/types.rs +++ b/client/network/src/types.rs @@ -18,8 +18,6 @@ //! `sc-network` type definitions -use libp2p::core::upgrade; - use std::{ borrow::Borrow, fmt, @@ -92,9 +90,9 @@ impl fmt::Display for ProtocolName { } } -impl upgrade::ProtocolName for ProtocolName { - fn protocol_name(&self) -> &[u8] { - (self as &str).as_bytes() +impl AsRef for ProtocolName { + fn as_ref(&self) -> &str { + self as &str } } diff --git a/client/network/statement/Cargo.toml b/client/network/statement/Cargo.toml index f76c4357ad303..e70fc5e929ab3 100644 --- a/client/network/statement/Cargo.toml +++ b/client/network/statement/Cargo.toml @@ -13,12 +13,12 @@ documentation = "https://docs.rs/sc-network-statement" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -array-bytes = "6" -async-channel = "1" -codec = { package = "parity-scale-codec", version = "3", features = ["derive"] } -futures = "0.3" -libp2p = "0.51" -log = "0.4" +array-bytes = "6.1" +async-channel = "1.8.0" +codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] } +futures = "0.3.21" +libp2p = "0.52.1" +log = "0.4.17" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-network = { version = "0.10.0-dev", path = "../" } diff --git a/client/network/statement/src/lib.rs b/client/network/statement/src/lib.rs index 2c966a346ad87..a055cd07a0740 100644 --- a/client/network/statement/src/lib.rs +++ b/client/network/statement/src/lib.rs @@ -286,8 +286,8 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::(); + let addr = + iter::once(multiaddr::Protocol::P2p(remote)).collect::(); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), diff --git a/client/network/sync/Cargo.toml b/client/network/sync/Cargo.toml index 65ee46721f0f3..65fa6c7265a5d 100644 --- a/client/network/sync/Cargo.toml +++ b/client/network/sync/Cargo.toml @@ -16,18 +16,18 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.11" [dependencies] -array-bytes = "6" -async-channel = "1" -async-trait = "0.1" -codec = { package = "parity-scale-codec", version = "3", features = ["derive"] } -futures = "0.3" -futures-timer = "3" -libp2p = "0.51" -log = "0.4" -mockall = "0.11" +array-bytes = "6.1" +async-channel = "1.8.0" +async-trait = "0.1.58" +codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] } +futures = "0.3.21" +futures-timer = "3.0.2" +libp2p = "0.52.1" +log = "0.4.17" +mockall = "0.11.3" prost = "0.11" -schnellru = "0.2" -smallvec = "1" +schnellru = "0.2.1" +smallvec = "1.11.0" thiserror = "1.0" fork-tree = { version = "3", path = "../../../utils/fork-tree" } prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } diff --git a/client/network/test/Cargo.toml b/client/network/test/Cargo.toml index cc816ec4d9445..4edcf9b939d7c 100644 --- a/client/network/test/Cargo.toml +++ b/client/network/test/Cargo.toml @@ -13,12 +13,12 @@ repository = "https://github.com/paritytech/substrate/" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -tokio = "1" -async-trait = "0.1" -futures = "0.3" -futures-timer = "3" -libp2p = "0.51" -log = "0.4" +tokio = "1.22.0" +async-trait = "0.1.57" +futures = "0.3.21" +futures-timer = "3.0.1" +libp2p = "0.52.1" +log = "0.4.17" parking_lot = "0.12.1" rand = "0.8.5" sc-block-builder = { version = "0.10.0-dev", path = "../../block-builder" } diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml index b61651e1c3b6c..dcb6980db8f1c 100644 --- a/client/network/transactions/Cargo.toml +++ b/client/network/transactions/Cargo.toml @@ -13,11 +13,11 @@ documentation = "https://docs.rs/sc-network-transactions" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -array-bytes = "6" -codec = { package = "parity-scale-codec", version = "3", features = ["derive"] } -futures = "0.3" -libp2p = "0.51" -log = "0.4" +array-bytes = "6.1" +codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] } +futures = "0.3.21" +libp2p = "0.52.1" +log = "0.4.17" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } sc-network = { version = "0.10.0-dev", path = "../" } sc-network-common = { version = "0.10.0-dev", path = "../common" } diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 78fd432ab0c4e..5d86e3c35d8ab 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -327,8 +327,8 @@ where fn handle_sync_event(&mut self, event: SyncEvent) { match event { SyncEvent::PeerConnected(remote) => { - let addr = iter::once(multiaddr::Protocol::P2p(remote.into())) - .collect::(); + let addr = + iter::once(multiaddr::Protocol::P2p(remote)).collect::(); let result = self.network.add_peers_to_reserved_set( self.protocol_name.clone(), iter::once(addr).collect(), diff --git a/client/offchain/Cargo.toml b/client/offchain/Cargo.toml index a3e3207e2ffc1..299c1caa12424 100644 --- a/client/offchain/Cargo.toml +++ b/client/offchain/Cargo.toml @@ -13,17 +13,17 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -array-bytes = "6" -bytes = "1" -codec = { package = "parity-scale-codec", version = "3", features = ["derive"] } -fnv = "1" -futures = "0.3" -futures-timer = "3" -hyper = { version = "0.14", features = ["stream", "http2"] } -hyper-rustls = { version = "0.24", features = ["http2"] } -libp2p = "0.51.3" -num_cpus = "1.15" -once_cell = "1.17" +array-bytes = "6.1" +bytes = "1.1" +codec = { package = "parity-scale-codec", version = "3.6.1", features = ["derive"] } +fnv = "1.0.6" +futures = "0.3.21" +futures-timer = "3.0.2" +hyper = { version = "0.14.16", features = ["stream", "http2"] } +hyper-rustls = { version = "0.24.0", features = ["http2"] } +libp2p = "0.52.1" +num_cpus = "1.13" +once_cell = "1.8" parking_lot = "0.12.1" rand = "0.8.5" threadpool = "1.8" diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index c987c2471907d..ae138dab7f3ac 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -255,7 +255,7 @@ pub async fn build_system_rpc_future< let _ = sender.send(network_service.local_peer_id().to_base58()); }, sc_rpc::system::Request::LocalListenAddresses(sender) => { - let peer_id = (network_service.local_peer_id()).into(); + let peer_id = network_service.local_peer_id(); let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); let addresses = network_service .listen_addresses() diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index ec4784084dbb8..f61c6bf0164c7 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -14,12 +14,12 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -chrono = "0.4" -futures = "0.3" -libp2p = { version = "0.51", features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"] } -log = "0.4" -parking_lot = "0.12" -pin-project = "1" +chrono = "0.4.19" +futures = "0.3.21" +libp2p = { version = "0.52.1", features = ["dns", "tcp", "tokio", "wasm-ext", "websocket"] } +log = "0.4.17" +parking_lot = "0.12.1" +pin-project = "1.0.12" sc-utils = { version = "4.0.0-dev", path = "../utils" } rand = "0.8" serde = { version = "1", features = ["derive"] } diff --git a/frame/support/Cargo.toml b/frame/support/Cargo.toml index 097c84b071196..c8b18eeb7abd2 100644 --- a/frame/support/Cargo.toml +++ b/frame/support/Cargo.toml @@ -31,14 +31,14 @@ sp-debug-derive = { default-features = false, path = "../../primitives/debug-der tt-call = "1" macro_magic = "0.4" frame-support-procedural = { version = "4.0.0-dev", default-features = false, path = "./procedural" } -paste = "1" -sp-state-machine = { version = "0.28", default-features = false, optional = true, path = "../../primitives/state-machine" } -bitflags = "1" -impl-trait-for-tuples = "0.2" -smallvec = "1" -log = { version = "0.4", default-features = false } -sp-core-hashing-proc-macro = { version = "9", path = "../../primitives/core/hashing/proc-macro" } -k256 = { version = "0.13", default-features = false, features = ["ecdsa"] } +paste = "1.0" +sp-state-machine = { version = "0.28.0", default-features = false, optional = true, path = "../../primitives/state-machine" } +bitflags = "1.3" +impl-trait-for-tuples = "0.2.2" +smallvec = "1.11.0" +log = { version = "0.4.17", default-features = false } +sp-core-hashing-proc-macro = { version = "9.0.0", path = "../../primitives/core/hashing/proc-macro" } +k256 = { version = "0.13.1", default-features = false, features = ["ecdsa"] } environmental = { version = "1.1.4", default-features = false } [dev-dependencies] diff --git a/primitives/core/hashing/Cargo.toml b/primitives/core/hashing/Cargo.toml index 1e85b2e50dc90..43a13883358ea 100644 --- a/primitives/core/hashing/Cargo.toml +++ b/primitives/core/hashing/Cargo.toml @@ -13,12 +13,12 @@ documentation = "https://docs.rs/sp-core-hashing" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -blake2b_simd = { version = "1", default-features = false } -byteorder = { version = "1", default-features = false } -digest = { version = "0.10", default-features = false } -sha2 = { version = "0.10", default-features = false } -sha3 = { version = "0.10", default-features = false } -twox-hash = { version = "1", default-features = false, features = ["digest_0_10"] } +blake2b_simd = { version = "1.0.1", default-features = false } +byteorder = { version = "1.3.2", default-features = false } +digest = { version = "0.10.3", default-features = false } +sha2 = { version = "0.10.7", default-features = false } +sha3 = { version = "0.10.0", default-features = false } +twox-hash = { version = "1.6.3", default-features = false, features = ["digest_0_10"] } [features] default = ["std"] diff --git a/primitives/state-machine/Cargo.toml b/primitives/state-machine/Cargo.toml index 261efc8212de0..d35f87a254395 100644 --- a/primitives/state-machine/Cargo.toml +++ b/primitives/state-machine/Cargo.toml @@ -14,20 +14,20 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -codec = { package = "parity-scale-codec", version = "3", default-features = false } -hash-db = { version = "0.16", default-features = false } -log = { version = "0.4", default-features = false } -parking_lot = { version = "0.12", optional = true } -rand = { version = "0.8", optional = true } -smallvec = "1" -thiserror = { version = "1", optional = true } -tracing = { version = "0.1", optional = true } -sp-core = { version = "21", default-features = false, path = "../core" } -sp-externalities = { version = "0.19", default-features = false, path = "../externalities" } -sp-panic-handler = { version = "8", optional = true, path = "../panic-handler" } -sp-std = { version = "8", default-features = false, path = "../std" } -sp-trie = { version = "22", default-features = false, path = "../trie" } -trie-db = { version = "0.27", default-features = false } +codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false } +hash-db = { version = "0.16.0", default-features = false } +log = { version = "0.4.17", default-features = false } +parking_lot = { version = "0.12.1", optional = true } +rand = { version = "0.8.5", optional = true } +smallvec = "1.11.0" +thiserror = { version = "1.0.30", optional = true } +tracing = { version = "0.1.29", optional = true } +sp-core = { version = "21.0.0", default-features = false, path = "../core" } +sp-externalities = { version = "0.19.0", default-features = false, path = "../externalities" } +sp-panic-handler = { version = "8.0.0", optional = true, path = "../panic-handler" } +sp-std = { version = "8.0.0", default-features = false, path = "../std" } +sp-trie = { version = "22.0.0", default-features = false, path = "../trie" } +trie-db = { version = "0.27.1", default-features = false } [dev-dependencies] array-bytes = "6" diff --git a/primitives/statement-store/Cargo.toml b/primitives/statement-store/Cargo.toml index 2f3eff5dcf613..7e76395f0784b 100644 --- a/primitives/statement-store/Cargo.toml +++ b/primitives/statement-store/Cargo.toml @@ -19,10 +19,19 @@ sp-core = { version = "21", default-features = false, path = "../core" } sp-runtime = { version = "24", default-features = false, path = "../runtime" } sp-std = { version = "8", default-features = false, path = "../std" } sp-api = { version = "4.0.0-dev", default-features = false, path = "../api" } -sp-application-crypto = { version = "23", default-features = false, path = "../application-crypto" } -sp-runtime-interface = { version = "17", default-features = false, path = "../runtime-interface" } -sp-externalities = { version = "0.19", default-features = false, path = "../externalities" } -thiserror = { version = "1", optional = true } +sp-application-crypto = { version = "23.0.0", default-features = false, path = "../application-crypto" } +sp-runtime-interface = { version = "17.0.0", default-features = false, path = "../runtime-interface" } +sp-externalities = { version = "0.19.0", default-features = false, path = "../externalities" } +thiserror = { version = "1.0", optional = true } + +# ECIES dependencies +ed25519-dalek = { version = "1.0", optional = true } +x25519-dalek = { version = "2.0.0-pre.1", optional = true } +curve25519-dalek = { version = "3.2", optional = true } +aes-gcm = { version = "0.10", optional = true } +hkdf = { version = "0.12.0", optional = true } +sha2 = { version = "0.10.7", optional = true } +rand = { version = "0.8.5", features = ["small_rng"], optional = true } [features] default = ["std"] diff --git a/primitives/weights/Cargo.toml b/primitives/weights/Cargo.toml index 6e01c836b28b9..05fc58a10e105 100644 --- a/primitives/weights/Cargo.toml +++ b/primitives/weights/Cargo.toml @@ -13,14 +13,14 @@ documentation = "https://docs.rs/sp-wasm-interface" targets = ["x86_64-unknown-linux-gnu"] [dependencies] -codec = { package = "parity-scale-codec", version = "3", default-features = false, features = ["derive"] } -scale-info = { version = "2", default-features = false, features = ["derive"] } -serde = { version = "1", optional = true, default-features = false, features = ["derive", "alloc"] } -smallvec = "1" -sp-arithmetic = { version = "16", default-features = false, path = "../arithmetic" } -sp-core = { version = "21", default-features = false, path = "../core" } -sp-debug-derive = { version = "8", default-features = false, path = "../debug-derive" } -sp-std = { version = "8", default-features = false, path = "../std" } +codec = { package = "parity-scale-codec", version = "3.6.1", default-features = false, features = ["derive"] } +scale-info = { version = "2.5.0", default-features = false, features = ["derive"] } +serde = { version = "1.0.163", default-features = false, optional = true, features = ["derive", "alloc"] } +smallvec = "1.11.0" +sp-arithmetic = { version = "16.0.0", default-features = false, path = "../arithmetic" } +sp-core = { version = "21.0.0", default-features = false, path = "../core" } +sp-debug-derive = { version = "8.0.0", default-features = false, path = "../debug-derive" } +sp-std = { version = "8.0.0", default-features = false, path = "../std" } [features] default = [ "std" ]