diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c6595a4b73..bbf9976b24e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# Version ??? + +- Support for multiple connections per peer and configurable connection limits. + See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440), + [PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and + [issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details. + # Version 0.16.2 (2020-02-28) - Fixed yamux connections not properly closing and being stuck in the `CLOSE_WAIT` state. diff --git a/core/src/connection.rs b/core/src/connection.rs index de6a03d00c7..82747f602b1 100644 --- a/core/src/connection.rs +++ b/core/src/connection.rs @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti use crate::muxing::StreamMuxer; use crate::{Multiaddr, PeerId}; -use std::{fmt, pin::Pin, task::Context, task::Poll}; +use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; use std::hash::Hash; use substream::{Muxing, SubstreamEvent}; @@ -334,3 +334,6 @@ impl fmt::Display for ConnectionLimit { write!(f, "{}/{}", self.current, self.limit) } } + +/// A `ConnectionLimit` can represent an error if it has been exceeded. +impl Error for ConnectionLimit {} diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 300c649aa29..56e112369e1 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -225,7 +225,7 @@ where TPeerId: Clone + Send + 'static, { let endpoint = info.to_connected_point(); - if let Some(limit) = self.limits.max_pending_incoming { + if let Some(limit) = self.limits.max_incoming { let current = self.iter_pending_incoming().count(); if current >= limit { return Err(ConnectionLimit { limit, current }) @@ -834,8 +834,8 @@ where /// The configurable limits of a connection [`Pool`]. #[derive(Debug, Clone, Default)] pub struct PoolLimits { - pub max_pending_outgoing: Option, - pub max_pending_incoming: Option, + pub max_outgoing: Option, + pub max_incoming: Option, pub max_established_per_peer: Option, } @@ -851,7 +851,7 @@ impl PoolLimits { where F: FnOnce() -> usize { - Self::check(current, self.max_pending_outgoing) + Self::check(current, self.max_outgoing) } fn check(current: F, limit: Option) -> Result<(), ConnectionLimit> diff --git a/core/src/network.rs b/core/src/network.rs index 39ea68ed25c..10b6e063202 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -220,7 +220,7 @@ where /// [`Connection`](crate::connection::Connection) upon success and the /// connection ID is returned. pub fn dial(&mut self, address: &Multiaddr, handler: THandler) - -> Result> + -> Result where TTrans: Transport, TTrans::Error: Send + 'static, @@ -232,10 +232,17 @@ where TConnInfo: Send + 'static, TPeerId: Send + 'static, { - let future = self.transport().clone().dial(address.clone())? - .map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); let info = OutgoingInfo { address, peer_id: None }; - self.pool.add_outgoing(future, handler, info).map_err(DialError::MaxPending) + match self.transport().clone().dial(address.clone()) { + Ok(f) => { + let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); + self.pool.add_outgoing(f, handler, info) + } + Err(err) => { + let f = future::err(PendingConnectionError::Transport(err)); + self.pool.add_outgoing(f, handler, info) + } + } } /// Returns information about the state of the `Network`. @@ -275,6 +282,22 @@ where self.pool.iter_connected() } + /// Checks whether the network has an established connection to a peer. + pub fn is_connected(&self, peer: &TPeerId) -> bool { + self.pool.is_connected(peer) + } + + /// Checks whether the network has an ongoing dialing attempt to a peer. + pub fn is_dialing(&self, peer: &TPeerId) -> bool { + self.dialing.contains_key(peer) + } + + /// Checks whether the network has neither an ongoing dialing attempt, + /// nor an established connection to a peer. + pub fn is_disconnected(&self, peer: &TPeerId) -> bool { + !self.is_connected(peer) && !self.is_dialing(peer) + } + /// Returns a list of all the peers to whom a new outgoing connection /// is currently being established. pub fn dialing_peers(&self) -> impl Iterator { @@ -284,7 +307,7 @@ where /// Gets the configured limit on pending incoming connections, /// i.e. concurrent incoming connection attempts. pub fn incoming_limit(&self) -> Option { - self.pool.limits().max_pending_incoming + self.pool.limits().max_incoming } /// The total number of established connections in the `Network`. @@ -380,8 +403,9 @@ where } event } - Poll::Ready(PoolEvent::ConnectionError { connected, error, num_established, .. }) => { + Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => { NetworkEvent::ConnectionError { + id, connected, error, num_established, @@ -557,43 +581,6 @@ pub struct NetworkInfo { pub num_connections_established: usize, } -/// The possible errors of [`Network::dial`]. -#[derive(Debug)] -pub enum DialError { - /// The configured limit of pending outgoing connections has been reached. - MaxPending(ConnectionLimit), - /// A transport error occurred when creating the connection. - Transport(TransportError), -} - -impl fmt::Display for DialError -where T: fmt::Display, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - DialError::MaxPending(limit) => write!(f, "Dial error (pending limit): {}", limit.current), - DialError::Transport(err) => write!(f, "Dial error (transport): {}", err), - } - } -} - -impl std::error::Error for DialError -where T: std::error::Error + 'static, -{ - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - DialError::MaxPending(_) => None, - DialError::Transport(e) => Some(e), - } - } -} - -impl From> for DialError { - fn from(e: TransportError) -> DialError { - DialError::Transport(e) - } -} - /// The (optional) configuration for a [`Network`]. /// /// The default configuration specifies no dedicated task executor @@ -610,17 +597,29 @@ impl NetworkConfig { self } + /// Shortcut for calling `executor` with an object that calls the given closure. + pub fn set_executor_fn(mut self, f: impl Fn(Pin + Send>>) + Send + 'static) -> Self { + struct SpawnImpl(F); + impl + Send>>)> Executor for SpawnImpl { + fn exec(&self, f: Pin + Send>>) { + (self.0)(f) + } + } + self.set_executor(Box::new(SpawnImpl(f))); + self + } + pub fn executor(&self) -> Option<&Box> { self.executor.as_ref() } - pub fn set_pending_incoming_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_pending_incoming = Some(n); + pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self { + self.pool_limits.max_incoming = Some(n); self } - pub fn set_pending_outgoing_limit(&mut self, n: usize) -> &mut Self { - self.pool_limits.max_pending_outgoing = Some(n); + pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self { + self.pool_limits.max_outgoing = Some(n); self } diff --git a/core/src/network/event.rs b/core/src/network/event.rs index 233b35fd9d3..afbedcf77e2 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -114,6 +114,8 @@ where /// /// The connection is closed as a result of the error. ConnectionError { + /// The ID of the connection that encountered an error. + id: ConnectionId, /// Information about the connection that encountered the error. connected: Connected, /// The error that occurred. diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index d9f1bda4850..b06be772fe2 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -174,28 +174,66 @@ where TConnInfo: fmt::Debug + ConnectionInfo + Send + 'static, TPeerId: Eq + Hash + Clone + Send + 'static, { + /// Checks whether the peer is currently connected. + /// + /// Returns `true` iff [`Peer::into_connected`] returns `Some`. + pub fn is_connected(&self) -> bool { + match self { + Peer::Connected(..) => true, + Peer::Dialing(peer) => peer.is_connected(), + Peer::Disconnected(..) => false, + Peer::Local => false + } + } + + /// Checks whether the peer is currently being dialed. + /// + /// Returns `true` iff [`Peer::into_dialing`] returns `Some`. + pub fn is_dialing(&self) -> bool { + match self { + Peer::Dialing(_) => true, + Peer::Connected(peer) => peer.is_dialing(), + Peer::Disconnected(..) => false, + Peer::Local => false + } + } + + /// Checks whether the peer is currently disconnected. + /// + /// Returns `true` iff [`Peer::into_disconnected`] returns `Some`. + pub fn is_disconnected(&self) -> bool { + match self { + Peer::Disconnected(..) => true, + _ => false + } + } - /// If we are connected, returns the `ConnectedPeer`. + /// Converts the peer into a `ConnectedPeer`, if there an established connection exists. pub fn into_connected(self) -> Option< ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { match self { Peer::Connected(peer) => Some(peer), - _ => None, + Peer::Dialing(peer) => peer.into_connected(), + Peer::Disconnected(..) => None, + Peer::Local => None, } } - /// If a connection is pending, returns the `DialingPeer`. + /// Converts the peer into a `DialingPeer`, if a dialing attempt exists. pub fn into_dialing(self) -> Option< DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { match self { Peer::Dialing(peer) => Some(peer), - _ => None, + Peer::Connected(peer) => peer.into_dialing(), + Peer::Disconnected(..) => None, + Peer::Local => None } } - /// If we are not connected, returns the `DisconnectedPeer`. + /// Converts the peer into a `DisconnectedPeer`, if neither an established connection + /// nor a dialing attempt exists. pub fn into_disconnected(self) -> Option< DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> > { @@ -225,6 +263,10 @@ where TConnInfo: ConnectionInfo, TPeerId: Eq + Hash + Clone, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Attempts to establish a new connection to this peer using the given addresses, /// if there is currently no ongoing dialing attempt. /// @@ -294,7 +336,7 @@ where self.network.dialing.contains_key(&self.peer_id) } - /// Turns this peer into a [`DialingPeer`], if there is an ongoing + /// Converts this peer into a [`DialingPeer`], if there is an ongoing /// dialing attempt, `None` otherwise. pub fn into_dialing(self) -> Option< DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> @@ -373,12 +415,34 @@ where TConnInfo: ConnectionInfo, TPeerId: Eq + Hash + Clone, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Disconnects from this peer, closing all pending connections. pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> { self.network.disconnect(&self.peer_id); DisconnectedPeer { network: self.network, peer_id: self.peer_id } } + /// Checks whether there is an established connection to the peer. + /// + /// Returns `true` iff [`DialingPeer::into_connected`] returns `Some`. + pub fn is_connected(&self) -> bool { + self.network.pool.is_connected(&self.peer_id) + } + + /// Converts the peer into a `ConnectedPeer`, if an established connection exists. + pub fn into_connected(self) + -> Option> + { + if self.is_connected() { + Some(ConnectedPeer { peer_id: self.peer_id, network: self.network }) + } else { + None + } + } + /// Obtains the connection that is currently being established. pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> { let attempt = match self.network.dialing.entry(self.peer_id.clone()) { @@ -452,6 +516,10 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { + pub fn id(&self) -> &TPeerId { + &self.peer_id + } + /// Attempts to connect to this peer using the given addresses. pub fn connect(self, first: Multiaddr, rest: TIter, handler: THandler) -> Result, diff --git a/misc/core-derive/src/lib.rs b/misc/core-derive/src/lib.rs index 46a21fd3d46..205d4c45f0f 100644 --- a/misc/core-derive/src/lib.rs +++ b/misc/core-derive/src/lib.rs @@ -131,44 +131,52 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { // Build the list of statements to put in the body of `inject_connected()`. let inject_connected_stmts = { - let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { if is_ignored(&field) { return None; } - - Some(if field_n == num_fields - 1 { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_connected(peer_id, endpoint); }, - None => quote!{ self.#field_n.inject_connected(peer_id, endpoint); }, - } - } else { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_connected(peer_id.clone(), endpoint.clone()); }, - None => quote!{ self.#field_n.inject_connected(peer_id.clone(), endpoint.clone()); }, - } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connected(peer_id); }, + None => quote!{ self.#field_n.inject_connected(peer_id); }, }) }) }; // Build the list of statements to put in the body of `inject_disconnected()`. let inject_disconnected_stmts = { - let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count(); data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { if is_ignored(&field) { return None; } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id); }, + None => quote!{ self.#field_n.inject_disconnected(peer_id); }, + }) + }) + }; - Some(if field_n == num_fields - 1 { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint); }, - None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint); }, - } - } else { - match field.ident { - Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint.clone()); }, - None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint.clone()); }, - } + // Build the list of statements to put in the body of `inject_connection_established()`. + let inject_connection_established_stmts = { + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint); }, + None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint); }, + }) + }) + }; + + // Build the list of statements to put in the body of `inject_connection_closed()`. + let inject_connection_closed_stmts = { + data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| { + if is_ignored(&field) { + return None; + } + Some(match field.ident { + Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint); }, + None => quote!{ self.#field_n.inject_connection_closed(peer_id, connection_id, endpoint); }, }) }) }; @@ -383,8 +391,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => { return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }); } - std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => { - return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }); + std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }) => { + return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }); } std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => { return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { @@ -421,14 +429,22 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream { out } - fn inject_connected(&mut self, peer_id: #peer_id, endpoint: #connected_point) { + fn inject_connected(&mut self, peer_id: &#peer_id) { #(#inject_connected_stmts);* } - fn inject_disconnected(&mut self, peer_id: &#peer_id, endpoint: #connected_point) { + fn inject_disconnected(&mut self, peer_id: &#peer_id) { #(#inject_disconnected_stmts);* } + fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) { + #(#inject_connection_established_stmts);* + } + + fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) { + #(#inject_connection_closed_stmts);* + } + fn inject_addr_reach_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) { #(#inject_addr_reach_failure_stmts);* } diff --git a/protocols/floodsub/src/layer.rs b/protocols/floodsub/src/layer.rs index 18e87c542cc..1c837b2d0f0 100644 --- a/protocols/floodsub/src/layer.rs +++ b/protocols/floodsub/src/layer.rs @@ -22,14 +22,15 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubs use crate::topic::Topic; use cuckoofilter::CuckooFilter; use fnv::FnvHashSet; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, OneShotHandler, - NotifyHandler + NotifyHandler, + DialPeerCondition, }; use rand; use smallvec::SmallVec; @@ -96,7 +97,9 @@ impl Floodsub { } if self.target_peers.insert(peer_id.clone()) { - self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id }); + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id, condition: DialPeerCondition::Disconnected + }); } } @@ -236,9 +239,9 @@ impl NetworkBehaviour for Floodsub { Vec::new() } - fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + fn inject_connected(&mut self, id: &PeerId) { // We need to send our subscriptions to the newly-connected node. - if self.target_peers.contains(&id) { + if self.target_peers.contains(id) { for topic in self.subscribed_topics.iter().cloned() { self.events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: id.clone(), @@ -257,14 +260,17 @@ impl NetworkBehaviour for Floodsub { self.connected_peers.insert(id.clone(), SmallVec::new()); } - fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { let was_in = self.connected_peers.remove(id); debug_assert!(was_in.is_some()); // We can be disconnected by the remote in case of inactivity for example, so we always // try to reconnect. if self.target_peers.contains(id) { - self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: id.clone() }); + self.events.push_back(NetworkBehaviourAction::DialPeer { + peer_id: id.clone(), + condition: DialPeerCondition::Disconnected + }); } } diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2e55571ac84..63611df9820 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -27,7 +27,7 @@ use crate::protocol::{ }; use crate::topic::{Topic, TopicHash}; use futures::prelude::*; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{ NetworkBehaviour, NetworkBehaviourAction, @@ -1012,7 +1012,7 @@ impl NetworkBehaviour for Gossipsub { Vec::new() } - fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) { + fn inject_connected(&mut self, id: &PeerId) { info!("New peer connected: {:?}", id); // We need to send our subscriptions to the newly-connected node. let mut subscriptions = vec![]; @@ -1040,7 +1040,7 @@ impl NetworkBehaviour for Gossipsub { self.peer_topics.insert(id.clone(), Vec::new()); } - fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { // remove from mesh, topic_peers, peer_topic and fanout debug!("Peer disconnected: {:?}", id); { @@ -1164,8 +1164,8 @@ impl NetworkBehaviour for Gossipsub { NetworkBehaviourAction::DialAddress { address } => { return Poll::Ready(NetworkBehaviourAction::DialAddress { address }); } - NetworkBehaviourAction::DialPeer { peer_id } => { - return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }); + NetworkBehaviourAction::DialPeer { peer_id, condition } => { + return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }); } NetworkBehaviourAction::ReportObservedAddr { address } => { return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address }); diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index a3ca6ea5799..e207315111a 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -49,17 +49,13 @@ mod tests { // build and connect peer_no random peers let mut peers = vec![]; - let dummy_connected_point = ConnectedPoint::Dialer { - address: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), - }; for _ in 0..peer_no { let peer = PeerId::random(); peers.push(peer.clone()); ::inject_connected( &mut gs, - peer.clone(), - dummy_connected_point.clone(), + &peer, ); if to_subscribe { gs.handle_received_subscriptions( diff --git a/protocols/identify/src/identify.rs b/protocols/identify/src/identify.rs index fe78bc1b4bd..312e273ded1 100644 --- a/protocols/identify/src/identify.rs +++ b/protocols/identify/src/identify.rs @@ -37,7 +37,13 @@ use libp2p_swarm::{ ProtocolsHandler, ProtocolsHandlerUpgrErr }; -use std::{collections::HashMap, collections::VecDeque, io, pin::Pin, task::Context, task::Poll}; +use std::{ + collections::{HashMap, VecDeque}, + io, + pin::Pin, + task::Context, + task::Poll +}; /// Network behaviour that automatically identifies nodes periodically, returns information /// about them, and answers identify queries from other nodes. @@ -49,7 +55,7 @@ pub struct Identify { /// The public key of the local node. To report on the wire. local_public_key: PublicKey, /// For each peer we're connected to, the observed address to send back to it. - observed_addresses: HashMap, + observed_addresses: HashMap>, /// Pending replies to send. pending_replies: VecDeque, /// Pending events to be emitted when polled. @@ -97,23 +103,32 @@ impl NetworkBehaviour for Identify { Vec::new() } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { - let observed = match endpoint { - ConnectedPoint::Dialer { address } => address, - ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, + fn inject_connected(&mut self, _: &PeerId) { + } + + fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + let addr = match endpoint { + ConnectedPoint::Dialer { address } => address.clone(), + ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), }; - self.observed_addresses.insert(peer_id, observed); + self.observed_addresses.entry(peer_id.clone()).or_default().insert(*conn, addr); + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) { + if let Some(addrs) = self.observed_addresses.get_mut(peer_id) { + addrs.remove(conn); + } } - fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { + fn inject_disconnected(&mut self, peer_id: &PeerId) { self.observed_addresses.remove(peer_id); } fn inject_event( &mut self, peer_id: PeerId, - _connection: ConnectionId, + connection: ConnectionId, event: ::OutEvent, ) { match event { @@ -132,9 +147,9 @@ impl NetworkBehaviour for Identify { } IdentifyHandlerEvent::Identify(sender) => { let observed = self.observed_addresses.get(&peer_id) - .expect("We only receive events from nodes we're connected to. We insert \ - into the hashmap when we connect to a node and remove only when we \ - disconnect; QED"); + .and_then(|addrs| addrs.get(&connection)) + .expect("`inject_event` is only called with an established connection \ + and `inject_connection_established` ensures there is an entry; qed"); self.pending_replies.push_back( Reply::Queued { peer: peer_id, diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index d00d8e8bf19..fab9fbd6f72 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -33,6 +33,7 @@ use crate::record::{self, store::{self, RecordStore}, Record, ProviderRecord}; use fnv::{FnvHashMap, FnvHashSet}; use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{ + DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, @@ -343,6 +344,7 @@ where kbucket::InsertResult::Pending { disconnected } => { self.queued_events.push_back(NetworkBehaviourAction::DialPeer { peer_id: disconnected.into_preimage(), + condition: DialPeerCondition::Disconnected }) }, } @@ -675,6 +677,7 @@ where debug_assert!(!self.connected_peers.contains(disconnected.preimage())); self.queued_events.push_back(NetworkBehaviourAction::DialPeer { peer_id: disconnected.into_preimage(), + condition: DialPeerCondition::Disconnected }) }, } @@ -1100,12 +1103,25 @@ where peer_addrs } - fn inject_connected(&mut self, peer: PeerId, endpoint: ConnectedPoint) { + fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, endpoint: &ConnectedPoint) { + // The remote's address can only be put into the routing table, + // and thus shared with other nodes, if the local node is the dialer, + // since the remote address on an inbound connection is specific to + // that connection (e.g. typically the TCP port numbers). + let address = match endpoint { + ConnectedPoint::Dialer { address } => Some(address.clone()), + ConnectedPoint::Listener { .. } => None, + }; + + self.connection_updated(peer.clone(), address, NodeStatus::Connected); + } + + fn inject_connected(&mut self, peer: &PeerId) { // Queue events for sending pending RPCs to the connected peer. // There can be only one pending RPC for a particular peer and query per definition. for (peer_id, event) in self.queries.iter_mut().filter_map(|q| q.inner.pending_rpcs.iter() - .position(|(p, _)| p == &peer) + .position(|(p, _)| p == peer) .map(|p| q.inner.pending_rpcs.remove(p))) { self.queued_events.push_back(NetworkBehaviourAction::NotifyHandler { @@ -1113,17 +1129,7 @@ where }); } - // The remote's address can only be put into the routing table, - // and thus shared with other nodes, if the local node is the dialer, - // since the remote address on an inbound connection is specific to - // that connection (e.g. typically the TCP port numbers). - let address = match endpoint { - ConnectedPoint::Dialer { address } => Some(address), - ConnectedPoint::Listener { .. } => None, - }; - - self.connection_updated(peer.clone(), address, NodeStatus::Connected); - self.connected_peers.insert(peer); + self.connected_peers.insert(peer.clone()); } fn inject_addr_reach_failure( @@ -1173,7 +1179,7 @@ where } } - fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) { + fn inject_disconnected(&mut self, id: &PeerId) { for query in self.queries.iter_mut() { query.on_failure(id); } @@ -1441,7 +1447,7 @@ where } else if &peer_id != self.kbuckets.local_key().preimage() { query.inner.pending_rpcs.push((peer_id.clone(), event)); self.queued_events.push_back(NetworkBehaviourAction::DialPeer { - peer_id + peer_id, condition: DialPeerCondition::Disconnected }); } } diff --git a/protocols/mdns/src/behaviour.rs b/protocols/mdns/src/behaviour.rs index 85685f725e0..abc580ff536 100644 --- a/protocols/mdns/src/behaviour.rs +++ b/protocols/mdns/src/behaviour.rs @@ -21,7 +21,6 @@ use crate::service::{MdnsService, MdnsPacket, build_query_response, build_service_discovery_response}; use futures::prelude::*; use libp2p_core::{ - ConnectedPoint, Multiaddr, PeerId, address_translation, @@ -199,9 +198,9 @@ impl NetworkBehaviour for Mdns { .collect() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} fn inject_event( &mut self, diff --git a/protocols/ping/src/lib.rs b/protocols/ping/src/lib.rs index 691266446ff..82b828ad9d6 100644 --- a/protocols/ping/src/lib.rs +++ b/protocols/ping/src/lib.rs @@ -47,7 +47,7 @@ pub mod handler; pub use handler::{PingConfig, PingResult, PingSuccess, PingFailure}; use handler::PingHandler; -use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId}; +use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId}; use libp2p_swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters}; use std::{collections::VecDeque, task::Context, task::Poll}; use void::Void; @@ -100,9 +100,9 @@ impl NetworkBehaviour for Ping { Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} - fn inject_disconnected(&mut self, _: &PeerId, _: ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} fn inject_event(&mut self, peer: PeerId, _: ConnectionId, result: PingResult) { self.events.push_front(PingEvent { peer, result }) diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index 97497b8aebf..e7deaf7530f 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -72,18 +72,34 @@ pub trait NetworkBehaviour: Send + 'static { /// address should be the most likely to be reachable. fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec; - /// Indicates the behaviour that we connected to the node with the given peer id through the - /// given endpoint. + /// Indicates the behaviour that we connected to the node with the given peer id. /// /// This node now has a handler (as spawned by `new_handler`) running in the background. - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint); + /// + /// This method is only called when the connection to the peer is + /// established, preceded by `inject_connection_established`. + fn inject_connected(&mut self, peer_id: &PeerId); - /// Indicates the behaviour that we disconnected from the node with the given peer id. The - /// endpoint is the one we used to be connected to. + /// Indicates the behaviour that we disconnected from the node with the given peer id. /// /// There is no handler running anymore for this node. Any event that has been sent to it may /// or may not have been processed by the handler. - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint); + /// + /// This method is only called when the last established connection to the peer + /// is closed, preceded by `inject_connection_closed`. + fn inject_disconnected(&mut self, peer_id: &PeerId); + + /// Informs the behaviour about a newly established connection to a peer. + fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) + {} + + /// Informs the behaviour about a closed connection to a peer. + /// + /// A call to this method is always paired with an earlier call to + /// `inject_connection_established` with the same peer ID, connection ID and + /// endpoint. + fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) + {} /// Informs the behaviour about an event generated by the handler dedicated to the peer identified by `peer_id`. /// for the behaviour. @@ -204,6 +220,8 @@ pub enum NetworkBehaviourAction { DialPeer { /// The peer to try reach. peer_id: PeerId, + /// The condition for initiating a new dialing attempt. + condition: DialPeerCondition, }, /// Instructs the `Swarm` to send an event to the handler dedicated to a @@ -253,3 +271,37 @@ pub enum NotifyHandler { All } +/// The available conditions under which a new dialing attempt to +/// a peer is initiated when requested by [`NetworkBehaviourAction::DialPeer`]. +#[derive(Debug, Copy, Clone)] +#[non_exhaustive] +pub enum DialPeerCondition { + /// A new dialing attempt is initiated _only if_ the peer is currently + /// considered disconnected, i.e. there is no established connection + /// and no ongoing dialing attempt. + /// + /// If there is an ongoing dialing attempt, the addresses reported by + /// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing + /// dialing attempt, ignoring duplicates. + Disconnected, + /// A new dialing attempt is initiated _only if_ there is currently + /// no ongoing dialing attempt, i.e. the peer is either considered + /// disconnected or connected but without an ongoing dialing attempt. + /// + /// If there is an ongoing dialing attempt, the addresses reported by + /// [`NetworkBehaviour::addresses_of_peer`] are added to the ongoing + /// dialing attempt, ignoring duplicates. + /// + /// This condition implies [`DialPeerCondition::Disconnected`]. + NotDialing, + // TODO: Once multiple dialing attempts per peer are permitted. + // See https://github.com/libp2p/rust-libp2p/pull/1506. + // Always, +} + +impl Default for DialPeerCondition { + fn default() -> Self { + DialPeerCondition::Disconnected + } +} + diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index b9a2e6dc94a..0d4d125530c 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -65,7 +65,8 @@ pub use behaviour::{ NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, - NotifyHandler + NotifyHandler, + DialPeerCondition }; pub use protocols_handler::{ IntoProtocolsHandler, @@ -89,7 +90,6 @@ use futures::{ stream::FusedStream, }; use libp2p_core::{ - ConnectedPoint, Executor, Transport, Multiaddr, @@ -99,6 +99,8 @@ use libp2p_core::{ ConnectionError, ConnectionId, ConnectionInfo, + ConnectionLimit, + ConnectedPoint, EstablishedConnection, IntoConnectionHandler, ListenerId, @@ -108,7 +110,6 @@ use libp2p_core::{ transport::{TransportError, boxed::Boxed as BoxTransport}, muxing::{StreamMuxer, StreamMuxerBox}, network::{ - DialError, Network, NetworkInfo, NetworkEvent, @@ -201,12 +202,6 @@ pub enum SwarmEvent { /// Endpoint of the connection that has been closed. endpoint: ConnectedPoint, }, - /// Starting to try to reach the given peer. - /// - /// We are trying to connect to this peer until a [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) - /// event is reported, or a [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported - /// with `attempts_remaining` equal to 0. - Dialing(PeerId), /// Tried to dial an address but it ended up being unreachaable. UnreachableAddr { /// `PeerId` that we were trying to reach. @@ -246,6 +241,13 @@ pub enum SwarmEvent { /// The listener error. error: io::Error, }, + /// A new dialing attempt has been initiated. + /// + /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) + /// event is reported if the dialing attempt succeeds, otherwise a + /// [`UnreachableAddr`](SwarmEvent::UnreachableAddr) event is reported + /// with `attempts_remaining` equal to 0. + Dialing(PeerId), } /// Contains the state of the network, plus the way it should behave. @@ -367,31 +369,65 @@ where TBehaviour: NetworkBehaviour, /// Tries to dial the given address. /// /// Returns an error if the address is not supported. - pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> { + pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { let handler = me.behaviour.new_handler(); me.network.dial(&addr, handler.into_node_handler_builder()).map(|_id| ()) } - /// Tries to reach the given peer using the elements in the topology. + /// Tries to initiate a dialing attempt to the given peer. /// - /// Has no effect if we are already connected to that peer, or if no address is known for the - /// peer. - pub fn dial(me: &mut Self, peer_id: PeerId) { - let addrs = me.behaviour.addresses_of_peer(&peer_id); + /// If a new dialing attempt has been initiated, `Ok(true)` is returned. + /// + /// If there is an ongoing dialing attempt, the current addresses of the + /// peer, as reported by [`NetworkBehaviour::addresses_of_peer`] are added + /// to the ongoing dialing attempt, ignoring duplicates. In this case no + /// new dialing attempt is initiated. + /// + /// If no new dialing attempt has been initiated, meaning there is an ongoing + /// dialing attempt or `addresses_of_peer` reports no addresses, `Ok(false)` + /// is returned. + pub fn dial(me: &mut Self, peer_id: &PeerId) -> Result { + let mut addrs = me.behaviour.addresses_of_peer(peer_id).into_iter(); match me.network.peer(peer_id.clone()) { Peer::Disconnected(peer) => { - let mut addrs = addrs.into_iter(); if let Some(first) = addrs.next() { let handler = me.behaviour.new_handler().into_node_handler_builder(); - if peer.connect(first, addrs, handler).is_err() { - me.behaviour.inject_dial_failure(&peer_id); + match peer.connect(first, addrs, handler) { + Ok(_) => return Ok(true), + Err(error) => { + log::debug!( + "New dialing attempt to disconnected peer {:?} failed: {:?}.", + peer_id, error); + me.behaviour.inject_dial_failure(&peer_id); + return Err(error) + } } } + Ok(false) }, + Peer::Connected(peer) => { + if let Some(first) = addrs.next() { + let handler = me.behaviour.new_handler().into_node_handler_builder(); + match peer.connect(first, addrs, handler) { + Ok(_) => return Ok(true), + Err(error) => { + log::debug!( + "New dialing attempt to connected peer {:?} failed: {:?}.", + peer_id, error); + me.behaviour.inject_dial_failure(&peer_id); + return Err(error) + } + } + } + Ok(false) + } Peer::Dialing(mut peer) => { - peer.connection().add_addresses(addrs) + peer.connection().add_addresses(addrs); + Ok(false) }, - Peer::Connected(_) | Peer::Local => {} + Peer::Local => { + Err(ConnectionLimit { current: 0, limit: 0 }) + } } } @@ -498,35 +534,29 @@ where TBehaviour: NetworkBehaviour, peer_id, endpoint, }); - } else if num_established.get() == 1 { - this.behaviour.inject_connected(peer_id.clone(), endpoint.clone()); - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - }); } else { - // For now, secondary connections are not explicitly reported to - // the behaviour. A behaviour only gets awareness of the - // connections via the events emitted from the connection handlers. - log::trace!("Secondary connection established: {:?}; Total (peer): {}.", + log::debug!("Connection established: {:?}; Total (peer): {}.", connection.connected(), num_established); + let endpoint = connection.endpoint().clone(); + this.behaviour.inject_connection_established(&peer_id, &connection.id(), &endpoint); + if num_established.get() == 1 { + this.behaviour.inject_connected(&peer_id); + } return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, + peer_id, num_established, endpoint }); } }, - Poll::Ready(NetworkEvent::ConnectionError { connected, error, num_established }) => { - log::debug!("Connection {:?} closed by {:?}", connected, error); - let peer_id = connected.peer_id().clone(); + Poll::Ready(NetworkEvent::ConnectionError { id, connected, error, num_established }) => { + log::debug!("Connection {:?} closed: {:?}", connected, error); + let info = connected.info; let endpoint = connected.endpoint; + this.behaviour.inject_connection_closed(info.peer_id(), &id, &endpoint); if num_established == 0 { - this.behaviour.inject_disconnected(&peer_id, endpoint.clone()); + this.behaviour.inject_disconnected(info.peer_id()); } return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, + peer_id: info.peer_id().clone(), endpoint, cause: error, num_established, @@ -663,12 +693,40 @@ where TBehaviour: NetworkBehaviour, Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => { let _ = ExpandedSwarm::dial_addr(&mut *this, address); }, - Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id }) => { + Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => { if this.banned_peers.contains(&peer_id) { this.behaviour.inject_dial_failure(&peer_id); } else { - ExpandedSwarm::dial(&mut *this, peer_id.clone()); - return Poll::Ready(SwarmEvent::Dialing(peer_id)) + let result = match condition { + DialPeerCondition::Disconnected + if this.network.is_disconnected(&peer_id) => + { + ExpandedSwarm::dial(this, &peer_id) + } + DialPeerCondition::NotDialing + if !this.network.is_dialing(&peer_id) => + { + ExpandedSwarm::dial(this, &peer_id) + } + _ => { + log::trace!("Condition for new dialing attempt to {:?} not met: {:?}", + peer_id, condition); + if let Some(mut peer) = this.network.peer(peer_id.clone()).into_dialing() { + let addrs = this.behaviour.addresses_of_peer(peer.id()); + peer.connection().add_addresses(addrs); + } + Ok(false) + } + }; + match result { + Ok(false) => {}, + Ok(true) => return Poll::Ready(SwarmEvent::Dialing(peer_id)), + Err(err) => { + log::debug!("Initiating dialing attempt to {:?} failed: {:?}", + &peer_id, err); + this.behaviour.inject_dial_failure(&peer_id); + } + } } }, Poll::Ready(NetworkBehaviourAction::NotifyHandler { peer_id, handler, event }) => { @@ -922,28 +980,33 @@ impl<'a> PollParameters for SwarmPollParameters<'a> { } } +/// A `SwarmBuilder` provides an API for configuring and constructing a `Swarm`, +/// including the underlying [`Network`]. pub struct SwarmBuilder { local_peer_id: PeerId, transport: BoxTransport<(TConnInfo, StreamMuxerBox), io::Error>, behaviour: TBehaviour, - network: NetworkConfig, + network_config: NetworkConfig, } impl SwarmBuilder where TBehaviour: NetworkBehaviour, TConnInfo: ConnectionInfo + fmt::Debug + Clone + Send + 'static, { - pub fn new(transport: TTransport, behaviour: TBehaviour, local_peer_id: PeerId) -> Self + /// Creates a new `SwarmBuilder` from the given transport, behaviour and + /// local peer ID. The `Swarm` with its underlying `Network` is obtained + /// via [`SwarmBuilder::build`]. + pub fn new(transport: TTrans, behaviour: TBehaviour, local_peer_id: PeerId) -> Self where TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, ::OutboundSubstream: Send + 'static, ::Substream: Send + 'static, - TTransport: Transport + Clone + Send + Sync + 'static, - TTransport::Error: Send + Sync + 'static, - TTransport::Listener: Send + 'static, - TTransport::ListenerUpgrade: Send + 'static, - TTransport::Dial: Send + 'static, + TTrans: Transport + Clone + Send + Sync + 'static, + TTrans::Error: Send + Sync + 'static, + TTrans::Listener: Send + 'static, + TTrans::ListenerUpgrade: Send + 'static, + TTrans::Dial: Send + 'static, { let transport = transport .map(|(conn_info, muxer), _| (conn_info, StreamMuxerBox::new(muxer))) @@ -954,35 +1017,41 @@ where TBehaviour: NetworkBehaviour, local_peer_id, transport, behaviour, - network: NetworkConfig::default(), + network_config: Default::default(), } } - pub fn incoming_limit(mut self, incoming_limit: usize) -> Self { - self.network.set_pending_incoming_limit(incoming_limit); + /// Configures the `Executor` to use for spawning background tasks. + /// + /// By default, unless another executor has been configured, + /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`. + pub fn executor(mut self, e: Box) -> Self { + self.network_config.set_executor(e); self } - /// Sets the executor to use to spawn background tasks. - /// - /// By default, uses a threads pool. - pub fn executor(mut self, executor: impl Executor + Send + 'static) -> Self { - self.network.set_executor(Box::new(executor)); + /// Configures a limit for the number of simultaneous incoming + /// connection attempts. + pub fn incoming_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_incoming_limit(n); self } - /// Shortcut for calling `executor` with an object that calls the given closure. - pub fn executor_fn(mut self, executor: impl Fn(Pin + Send>>) + Send + 'static) -> Self { - struct SpawnImpl(F); - impl + Send>>)> Executor for SpawnImpl { - fn exec(&self, f: Pin + Send>>) { - (self.0)(f) - } - } - self.network.set_executor(Box::new(SpawnImpl(executor))); + /// Configures a limit for the number of simultaneous outgoing + /// connection attempts. + pub fn outgoing_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_outgoing_limit(n); + self + } + + /// Configures a limit for the number of simultaneous + /// established connections per peer. + pub fn peer_connection_limit(mut self, n: usize) -> Self { + self.network_config.set_established_per_peer_limit(n); self } + /// Builds a `Swarm` with the current configuration. pub fn build(mut self) -> Swarm { let supported_protocols = self.behaviour .new_handler() @@ -992,9 +1061,10 @@ where TBehaviour: NetworkBehaviour, .map(|info| info.protocol_name().to_vec()) .collect(); - // If no executor has been explicitly configured, try to set up - // a thread pool. - if self.network.executor().is_none() { + let mut network_cfg = self.network_config; + + // If no executor has been explicitly configured, try to set up a thread pool. + if network_cfg.executor().is_none() { struct PoolWrapper(ThreadPool); impl Executor for PoolWrapper { fn exec(&self, f: Pin + Send>>) { @@ -1002,21 +1072,17 @@ where TBehaviour: NetworkBehaviour, } } - if let Some(executor) = ThreadPoolBuilder::new() - .name_prefix("libp2p-task-") + match ThreadPoolBuilder::new() + .name_prefix("libp2p-swarm-task-") .create() - .ok() .map(|tp| Box::new(PoolWrapper(tp)) as Box<_>) { - self.network.set_executor(Box::new(executor)); + Ok(executor) => { network_cfg.set_executor(Box::new(executor)); }, + Err(err) => log::warn!("Failed to create executor thread pool: {:?}", err) } } - let network = Network::new( - self.transport, - self.local_peer_id, - self.network, - ); + let network = Network::new(self.transport, self.local_peer_id, network_cfg); ExpandedSwarm { network, @@ -1047,9 +1113,13 @@ impl NetworkBehaviour for DummyBehaviour { Vec::new() } - fn inject_connected(&mut self, _: PeerId, _: libp2p_core::ConnectedPoint) {} + fn inject_connected(&mut self, _: &PeerId) {} + + fn inject_connection_established(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} - fn inject_disconnected(&mut self, _: &PeerId, _: libp2p_core::ConnectedPoint) {} + fn inject_disconnected(&mut self, _: &PeerId) {} + + fn inject_connection_closed(&mut self, _: &PeerId, _: &ConnectionId, _: &ConnectedPoint) {} fn inject_event(&mut self, _: PeerId, _: ConnectionId, _: ::OutEvent) {} @@ -1067,9 +1137,9 @@ impl NetworkBehaviour for DummyBehaviour { mod tests { use crate::{DummyBehaviour, SwarmBuilder}; use libp2p_core::{ - identity, PeerId, PublicKey, + identity, transport::dummy::{DummyStream, DummyTransport} }; use libp2p_mplex::Multiplex; @@ -1084,7 +1154,8 @@ mod tests { let transport = DummyTransport::<(PeerId, Multiplex)>::new(); let behaviour = DummyBehaviour {}; let swarm = SwarmBuilder::new(transport, behaviour, id.into()) - .incoming_limit(4).build(); + .incoming_connection_limit(4) + .build(); assert_eq!(swarm.network.incoming_limit(), Some(4)); } diff --git a/swarm/src/protocols_handler/node_handler.rs b/swarm/src/protocols_handler/node_handler.rs index 3191ca9e3f3..8b04506170c 100644 --- a/swarm/src/protocols_handler/node_handler.rs +++ b/swarm/src/protocols_handler/node_handler.rs @@ -133,10 +133,10 @@ enum Shutdown { /// Error generated by the `NodeHandlerWrapper`. #[derive(Debug)] pub enum NodeHandlerWrapperError { - /// Error generated by the handler. + /// The connection handler encountered an error. Handler(TErr), - /// The connection has been deemed useless and has been closed. - UselessTimeout, + /// The connection keep-alive timeout expired. + KeepAliveTimeout, } impl From for NodeHandlerWrapperError { @@ -152,8 +152,8 @@ where fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { NodeHandlerWrapperError::Handler(err) => write!(f, "{}", err), - NodeHandlerWrapperError::UselessTimeout => - write!(f, "Node has been closed due to inactivity"), + NodeHandlerWrapperError::KeepAliveTimeout => + write!(f, "Connection closed due to expired keep-alive timeout."), } } } @@ -165,7 +165,7 @@ where fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { NodeHandlerWrapperError::Handler(err) => Some(err), - NodeHandlerWrapperError::UselessTimeout => None, + NodeHandlerWrapperError::KeepAliveTimeout => None, } } } @@ -314,9 +314,9 @@ where if self.negotiating_in.is_empty() && self.negotiating_out.is_empty() { match self.shutdown { Shutdown::None => {}, - Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)), + Shutdown::Asap => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)), Shutdown::Later(ref mut delay, _) => match Future::poll(Pin::new(delay), cx) { - Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::UselessTimeout)), + Poll::Ready(_) => return Poll::Ready(Err(NodeHandlerWrapperError::KeepAliveTimeout)), Poll::Pending => {} } } diff --git a/swarm/src/toggle.rs b/swarm/src/toggle.rs index f38e8b0653c..2ba58e9559a 100644 --- a/swarm/src/toggle.rs +++ b/swarm/src/toggle.rs @@ -76,15 +76,27 @@ where self.inner.as_mut().map(|b| b.addresses_of_peer(peer_id)).unwrap_or_else(Vec::new) } - fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) { + fn inject_connected(&mut self, peer_id: &PeerId) { if let Some(inner) = self.inner.as_mut() { - inner.inject_connected(peer_id, endpoint) + inner.inject_connected(peer_id) } } - fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) { + fn inject_disconnected(&mut self, peer_id: &PeerId) { if let Some(inner) = self.inner.as_mut() { - inner.inject_disconnected(peer_id, endpoint) + inner.inject_disconnected(peer_id) + } + } + + fn inject_connection_established(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) { + if let Some(inner) = self.inner.as_mut() { + inner.inject_connection_established(peer_id, connection, endpoint) + } + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, connection: &ConnectionId, endpoint: &ConnectedPoint) { + if let Some(inner) = self.inner.as_mut() { + inner.inject_connection_closed(peer_id, connection, endpoint) } }