Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for multiple connections per peer in libp2p-swarm. #1519

Merged
merged 14 commits into from
Mar 31, 2020
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# Version ???

- Support for multiple connections per peer and configurable connection limits.
See [PR #1440](https://github.com/libp2p/rust-libp2p/pull/1440),
[PR #1519](https://github.com/libp2p/rust-libp2p/pull/1519) and
[issue #912](https://github.com/libp2p/rust-libp2p/issues/912) for details.

# Version 0.16.2 (2020-02-28)

- Fixed yamux connections not properly closing and being stuck in the `CLOSE_WAIT` state.
Expand Down
5 changes: 4 additions & 1 deletion core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnecti

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use std::{fmt, pin::Pin, task::Context, task::Poll};
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use std::hash::Hash;
use substream::{Muxing, SubstreamEvent};

Expand Down Expand Up @@ -334,3 +334,6 @@ impl fmt::Display for ConnectionLimit {
write!(f, "{}/{}", self.current, self.limit)
}
}

/// A `ConnectionLimit` can represent an error if it has been exceeded.
impl Error for ConnectionLimit {}
8 changes: 4 additions & 4 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
Expand Down Expand Up @@ -834,8 +834,8 @@ where
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_pending_outgoing: Option<usize>,
pub max_pending_incoming: Option<usize>,
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
}

Expand All @@ -851,7 +851,7 @@ impl PoolLimits {
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_outgoing)
Self::check(current, self.max_outgoing)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
Expand Down
93 changes: 46 additions & 47 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ where
/// [`Connection`](crate::connection::Connection) upon success and the
/// connection ID is returned.
pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
-> Result<ConnectionId, DialError<TTrans::Error>>
-> Result<ConnectionId, ConnectionLimit>
where
TTrans: Transport<Output = (TConnInfo, TMuxer)>,
TTrans::Error: Send + 'static,
Expand All @@ -232,10 +232,17 @@ where
TConnInfo: Send + 'static,
TPeerId: Send + 'static,
{
let future = self.transport().clone().dial(address.clone())?
.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
let info = OutgoingInfo { address, peer_id: None };
self.pool.add_outgoing(future, handler, info).map_err(DialError::MaxPending)
match self.transport().clone().dial(address.clone()) {
Ok(f) => {
let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
self.pool.add_outgoing(f, handler, info)
}
Err(err) => {
let f = future::err(PendingConnectionError::Transport(err));
self.pool.add_outgoing(f, handler, info)
}
}
}

/// Returns information about the state of the `Network`.
Expand Down Expand Up @@ -275,6 +282,22 @@ where
self.pool.iter_connected()
}

/// Checks whether the network has an established connection to a peer.
pub fn is_connected(&self, peer: &TPeerId) -> bool {
self.pool.is_connected(peer)
}

/// Checks whether the network has an ongoing dialing attempt to a peer.
pub fn is_dialing(&self, peer: &TPeerId) -> bool {
self.dialing.contains_key(peer)
}

/// Checks whether the network has neither an ongoing dialing attempt,
/// nor an established connection to a peer.
pub fn is_disconnected(&self, peer: &TPeerId) -> bool {
!self.is_connected(peer) && !self.is_dialing(peer)
}

/// Returns a list of all the peers to whom a new outgoing connection
/// is currently being established.
pub fn dialing_peers(&self) -> impl Iterator<Item = &TPeerId> {
Expand All @@ -284,7 +307,7 @@ where
/// Gets the configured limit on pending incoming connections,
/// i.e. concurrent incoming connection attempts.
pub fn incoming_limit(&self) -> Option<usize> {
self.pool.limits().max_pending_incoming
self.pool.limits().max_incoming
}

/// The total number of established connections in the `Network`.
Expand Down Expand Up @@ -380,8 +403,9 @@ where
}
event
}
Poll::Ready(PoolEvent::ConnectionError { connected, error, num_established, .. }) => {
Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => {
NetworkEvent::ConnectionError {
id,
connected,
error,
num_established,
Expand Down Expand Up @@ -557,43 +581,6 @@ pub struct NetworkInfo {
pub num_connections_established: usize,
}

/// The possible errors of [`Network::dial`].
#[derive(Debug)]
pub enum DialError<T> {
/// The configured limit of pending outgoing connections has been reached.
MaxPending(ConnectionLimit),
/// A transport error occurred when creating the connection.
Transport(TransportError<T>),
}

impl<T> fmt::Display for DialError<T>
where T: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DialError::MaxPending(limit) => write!(f, "Dial error (pending limit): {}", limit.current),
DialError::Transport(err) => write!(f, "Dial error (transport): {}", err),
}
}
}

impl<T> std::error::Error for DialError<T>
where T: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
DialError::MaxPending(_) => None,
DialError::Transport(e) => Some(e),
}
}
}

impl<T> From<TransportError<T>> for DialError<T> {
fn from(e: TransportError<T>) -> DialError<T> {
DialError::Transport(e)
}
}

/// The (optional) configuration for a [`Network`].
///
/// The default configuration specifies no dedicated task executor
Expand All @@ -610,17 +597,29 @@ impl NetworkConfig {
self
}

/// Shortcut for calling `executor` with an object that calls the given closure.
pub fn set_executor_fn(mut self, f: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.set_executor(Box::new(SpawnImpl(f)));
self
}

pub fn executor(&self) -> Option<&Box<dyn Executor + Send>> {
self.executor.as_ref()
}

pub fn set_pending_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_incoming = Some(n);
pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_incoming = Some(n);
self
}

pub fn set_pending_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_outgoing = Some(n);
pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing = Some(n);
self
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ where
///
/// The connection is closed as a result of the error.
ConnectionError {
/// The ID of the connection that encountered an error.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected<TConnInfo>,
/// The error that occurred.
Expand Down
80 changes: 74 additions & 6 deletions core/src/network/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,28 +174,66 @@ where
TConnInfo: fmt::Debug + ConnectionInfo<PeerId = TPeerId> + Send + 'static,
TPeerId: Eq + Hash + Clone + Send + 'static,
{
/// Checks whether the peer is currently connected.
///
/// Returns `true` iff [`Peer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
match self {
Peer::Connected(..) => true,
Peer::Dialing(peer) => peer.is_connected(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}

/// Checks whether the peer is currently being dialed.
///
/// Returns `true` iff [`Peer::into_dialing`] returns `Some`.
pub fn is_dialing(&self) -> bool {
match self {
Peer::Dialing(_) => true,
Peer::Connected(peer) => peer.is_dialing(),
Peer::Disconnected(..) => false,
Peer::Local => false
}
}

/// Checks whether the peer is currently disconnected.
///
/// Returns `true` iff [`Peer::into_disconnected`] returns `Some`.
pub fn is_disconnected(&self) -> bool {
match self {
Peer::Disconnected(..) => true,
_ => false
}
}

/// If we are connected, returns the `ConnectedPeer`.
/// Converts the peer into a `ConnectedPeer`, if there an established connection exists.
pub fn into_connected(self) -> Option<
ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Connected(peer) => Some(peer),
_ => None,
Peer::Dialing(peer) => peer.into_connected(),
Peer::Disconnected(..) => None,
Peer::Local => None,
}
}

/// If a connection is pending, returns the `DialingPeer`.
/// Converts the peer into a `DialingPeer`, if a dialing attempt exists.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
match self {
Peer::Dialing(peer) => Some(peer),
_ => None,
Peer::Connected(peer) => peer.into_dialing(),
Peer::Disconnected(..) => None,
Peer::Local => None
}
}

/// If we are not connected, returns the `DisconnectedPeer`.
/// Converts the peer into a `DisconnectedPeer`, if neither an established connection
/// nor a dialing attempt exists.
pub fn into_disconnected(self) -> Option<
DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
> {
Expand Down Expand Up @@ -225,6 +263,10 @@ where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Attempts to establish a new connection to this peer using the given addresses,
/// if there is currently no ongoing dialing attempt.
///
Expand Down Expand Up @@ -294,7 +336,7 @@ where
self.network.dialing.contains_key(&self.peer_id)
}

/// Turns this peer into a [`DialingPeer`], if there is an ongoing
/// Converts this peer into a [`DialingPeer`], if there is an ongoing
/// dialing attempt, `None` otherwise.
pub fn into_dialing(self) -> Option<
DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>
Expand Down Expand Up @@ -373,12 +415,34 @@ where
TConnInfo: ConnectionInfo<PeerId = TPeerId>,
TPeerId: Eq + Hash + Clone,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Disconnects from this peer, closing all pending connections.
pub fn disconnect(self) -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId> {
self.network.disconnect(&self.peer_id);
DisconnectedPeer { network: self.network, peer_id: self.peer_id }
}

/// Checks whether there is an established connection to the peer.
///
/// Returns `true` iff [`DialingPeer::into_connected`] returns `Some`.
pub fn is_connected(&self) -> bool {
self.network.pool.is_connected(&self.peer_id)
}

/// Converts the peer into a `ConnectedPeer`, if an established connection exists.
pub fn into_connected(self)
-> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>>
{
if self.is_connected() {
Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
} else {
None
}
}

/// Obtains the connection that is currently being established.
pub fn connection<'b>(&'b mut self) -> DialingConnection<'b, TInEvent, TConnInfo, TPeerId> {
let attempt = match self.network.dialing.entry(self.peer_id.clone()) {
Expand Down Expand Up @@ -452,6 +516,10 @@ where
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
{
pub fn id(&self) -> &TPeerId {
&self.peer_id
}

/// Attempts to connect to this peer using the given addresses.
pub fn connect<TIter>(self, first: Multiaddr, rest: TIter, handler: THandler)
-> Result<DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler, TConnInfo, TPeerId>,
Expand Down
Loading