Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/: Concurrent connection attempts #2248

Merged
merged 74 commits into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
75292bb
*: Implement concurrent dialing
mxinden Aug 30, 2021
10d941b
core/: Track pending connection via Endpoint
mxinden Sep 6, 2021
f01b323
core/: Add PendingPoint
mxinden Sep 6, 2021
d6da039
swarm/src/lib: Adjust
mxinden Sep 7, 2021
5887d69
*: Associate Multiaddr with transport error
mxinden Sep 8, 2021
08c196f
protocols: Update
mxinden Sep 8, 2021
6c1ce85
core/src/: Remove printlns
mxinden Sep 9, 2021
9070bd2
misc/metrics: Update
mxinden Sep 9, 2021
4043970
core/: Remove printlns
mxinden Sep 9, 2021
d594208
core/src/network/concurrent: Catch address error
mxinden Sep 9, 2021
bdfd5d1
Merge branch 'libp2p/master' into concurrent-dial
mxinden Sep 9, 2021
a705bb0
core/src/connection: Remove meta file
mxinden Sep 26, 2021
436b876
Merge branch 'libp2p/master' into concurrent-dial
mxinden Sep 26, 2021
db3fba7
Merge branch 'master' into concurrent-dial
mxinden Sep 28, 2021
63c4a32
core/: Stage connection task in pending and established
mxinden Sep 28, 2021
eb86b41
core/src/connection: Send pending result through events channel
mxinden Sep 30, 2021
ae21f9e
core/src/connection: Handle connection error
mxinden Sep 30, 2021
7d3114b
core/src/connection: Use oneshot Void for pending connection
mxinden Sep 30, 2021
c43496d
core/src/connection: Remove task
mxinden Sep 30, 2021
e06131c
core/: Bubble dial errors on success up
mxinden Sep 30, 2021
7c6a0fe
*: Pass dial success errors to behaviours
mxinden Sep 30, 2021
752bb70
core/src/network/concurrent_dial: Limit concurrency factor
mxinden Oct 1, 2021
0dcd1cf
core/src/network/concurrent_dial: Remove bound on Debug
mxinden Oct 1, 2021
2392249
core/src/connection: Remove task module
mxinden Oct 4, 2021
b27c735
core/src/connection: Revive Pool::disconnect
mxinden Oct 4, 2021
1703d40
core/src/connection: Fold manager into pool
mxinden Oct 5, 2021
794ca73
core/src/connection: Use different channel for pending and established
mxinden Oct 5, 2021
036c433
core/: Fix lints
mxinden Oct 5, 2021
6eda209
*: Adjust to changes in core
mxinden Oct 5, 2021
999b481
core/src/connection/pool: Take care of events before triggering more
mxinden Oct 5, 2021
9994a90
core/src/connection: Document PendingPoint
mxinden Oct 6, 2021
8eb278d
core/src/connection: Wrap listener error in task.rs not network.rs
mxinden Oct 6, 2021
73ffc76
core/src/connection/pool: Clean up trait bounds
mxinden Oct 6, 2021
1a97d9b
core/src/connection: Move ConnectionId to connection.rs
mxinden Oct 6, 2021
f5d0e91
protocols/rendezvous/tests: Await events sequentially to prevent race
mxinden Oct 6, 2021
62ed126
swarm/src/lib: Return dial errors in SwarmEvent::EstablishedConnection
mxinden Oct 6, 2021
42ef7f5
core/src/network: Lazily call Transport dial on concurrent dialing
mxinden Oct 6, 2021
520c4ec
core/src: Move ConcurrentDial into pool/
mxinden Oct 6, 2021
f76cf2b
core/src/connection: Only bound poll to TMuxer
mxinden Oct 8, 2021
0ba84ab
core/src/connection: Split PendingConnectionError
mxinden Oct 8, 2021
30401cb
core/src/network: Remove Network::dialing
mxinden Oct 8, 2021
2147b07
core/src/network/peer: Remove DialingAttempt::address
mxinden Oct 8, 2021
c1491aa
*: Adjust to changes in libp2p-core
mxinden Oct 8, 2021
77e36e9
core/src/connection: Log muxer closing error
mxinden Oct 8, 2021
a80fe6c
core/src/connection: Return handler on incoming connection limit
mxinden Oct 8, 2021
095e081
swarm/: Return handler on dial failure to unknown peer
mxinden Oct 8, 2021
b621ff0
Merge branch 'libp2p/master' into concurrent-dial
mxinden Oct 8, 2021
2c24542
*: Address minor TODOs
mxinden Oct 8, 2021
07ed888
swarm/src/behaviour: Fix doc example
mxinden Oct 10, 2021
dd49cfc
core/src/connection: Address clippy suggestions
mxinden Oct 10, 2021
a725fbf
swarm/src/lib: Fix doc comment
mxinden Oct 10, 2021
386f231
Merge branch 'libp2p/master' into concurrent-dial
mxinden Oct 10, 2021
66cb2be
*: Minor rewording on doc comments
mxinden Oct 10, 2021
4dba4c8
core/tests: Use MemoryTransport
mxinden Oct 10, 2021
dc3c46d
protocols/rendezvous: Implement assert_behaviour_events for single swarm
mxinden Oct 10, 2021
559bb31
core/src/connection: Move dial concurrency factor to constant
mxinden Oct 10, 2021
8547941
*: Add changelog entries
mxinden Oct 10, 2021
3aca8d0
{core,swarm}/: Make concurrent dialing configurable
mxinden Oct 13, 2021
ef36c5e
core/tests/: Test concurrent dialing
mxinden Oct 13, 2021
0fea620
core/src/connection: Revert Stream impl for Connection
mxinden Oct 13, 2021
d25c95b
core/: Replace "Stack of protocols" with "Address"
mxinden Oct 13, 2021
ecdb812
core/src/connection: Fix doc comment on to_pending_point
mxinden Oct 13, 2021
2a6dc54
core/src/connection: Rename trait bound TransportError to TTransErr
mxinden Oct 13, 2021
e7beec4
core/src/connection/pool: Impl start_close on EstablishedConnInfo
mxinden Oct 13, 2021
34daf90
core/src/network: Fix trait bounds on DialingOpts
mxinden Oct 13, 2021
d099106
core/src/network/peer: Fix doc comment on DialingAttempt
mxinden Oct 13, 2021
d5e06b6
core/src/network/peer: Remove outdated comment on DialingAttempt::abort
mxinden Oct 13, 2021
7331d90
{core,swarm}/: Rename outgoing to concurrent_dial_errors
mxinden Oct 13, 2021
9fc5870
core/src/connection/pool: Implement expect_occupied for hash_map::Entry
mxinden Oct 13, 2021
e840d65
Merge branch 'libp2p/master' into concurrent-dial
mxinden Oct 13, 2021
c35992a
Merge branch 'master' into concurrent-dial
mxinden Oct 14, 2021
cb553cf
{core,swarm}/CHANGELOG: Update now that concurrency is configurable
mxinden Oct 14, 2021
04c694f
protocols/src/identify: Update to changed address failure reporting
mxinden Oct 14, 2021
7aafc52
core/CHANGELOG.md: Fix typo
mxinden Oct 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
- Add `SignedEnvelope` and `PeerRecord` according to [RFC0002] and [RFC0003]
(see [PR 2107]).

- Concurrently dial up to 5 address candidates within a single dial attempt (see [PR 2248]).

- On success of a single address, provide errors of the thus far failed dials via
`NetworkEvent::ConnectionEstablished::outgoing`.

- On failure of all addresses, provide the errors via `NetworkEvent::DialError`.

[PR 2145]: https://github.com/libp2p/rust-libp2p/pull/2145
[PR 2213]: https://github.com/libp2p/rust-libp2p/pull/2213
[PR 2142]: https://github.com/libp2p/rust-libp2p/pull/2142
Expand All @@ -44,6 +51,7 @@
[PR 2191]: https://github.com/libp2p/rust-libp2p/pull/2191
[PR 2195]: https://github.com/libp2p/rust-libp2p/pull/2195
[PR 2107]: https://github.com/libp2p/rust-libp2p/pull/2107
[PR 2248]: https://github.com/libp2p/rust-libp2p/pull/2248
[RFC0002]: https://github.com/libp2p/specs/blob/master/RFC/0002-signed-envelopes.md
[RFC0003]: https://github.com/libp2p/specs/blob/master/RFC/0003-routing-records.md

Expand Down
102 changes: 75 additions & 27 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,40 @@ pub(crate) mod handler;
mod listeners;
mod substream;

pub(crate) mod manager;
pub(crate) mod pool;

pub use error::{ConnectionError, PendingConnectionError};
pub use error::{
ConnectionError, PendingConnectionError, PendingInboundConnectionError,
PendingOutboundConnectionError,
};
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler};
pub use listeners::{ListenerId, ListenersEvent, ListenersStream};
pub use manager::ConnectionId;
pub use pool::{ConnectionCounters, ConnectionLimits};
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection};
pub use substream::{Close, Substream, SubstreamEndpoint};

use crate::muxing::StreamMuxer;
use crate::{Multiaddr, PeerId};
use futures::stream::Stream;
use std::hash::Hash;
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll};
use substream::{Muxing, SubstreamEvent};

/// Connection identifier.
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ConnectionId(usize);

impl ConnectionId {
/// Creates a `ConnectionId` from a non-negative integer.
///
/// This is primarily useful for creating connection IDs
/// in test environments. There is in general no guarantee
/// that all connection IDs are based on non-negative integers.
pub fn new(id: usize) -> Self {
ConnectionId(id)
}
}

/// The endpoint roles associated with a peer-to-peer communication channel.
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
pub enum Endpoint {
Expand Down Expand Up @@ -72,7 +89,40 @@ impl Endpoint {
}
}

/// The endpoint roles associated with a peer-to-peer connection.
/// The endpoint roles associated with a pending peer-to-peer connection.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PendingPoint {
/// The socket comes from a dialer.
///
/// There is no single address associated with the Dialer of a pending
/// connection. Addresses are dialed in parallel. Only once the first dial
/// is successful is the address of the connection known.
Dialer,
/// The socket comes from a listener.
Listener {
/// Local connection address.
local_addr: Multiaddr,
/// Stack of protocols used to send back data to the remote.
send_back_addr: Multiaddr,
},
}

impl From<ConnectedPoint> for PendingPoint {
fn from(endpoint: ConnectedPoint) -> Self {
match endpoint {
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer,
ConnectedPoint::Listener {
local_addr,
send_back_addr,
} => PendingPoint::Listener {
local_addr,
send_back_addr,
},
}
}
}

/// The endpoint roles associated with an established peer-to-peer connection.
#[derive(PartialEq, Eq, Debug, Clone, Hash)]
pub enum ConnectedPoint {
/// We dialed the node.
Expand Down Expand Up @@ -234,13 +284,18 @@ where
pub fn close(self) -> (THandler, Close<TMuxer>) {
(self.handler, self.muxing.close().0)
}
}

impl<TMuxer, THandler> Stream for Connection<TMuxer, THandler>
where
TMuxer: StreamMuxer,
THandler: ConnectionHandler<Substream = Substream<TMuxer>>,
{
type Item = Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>;

/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let mut io_pending = false;

Expand All @@ -260,9 +315,9 @@ where
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
return Poll::Ready(Some(Ok(Event::AddressChange(address))));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(ConnectionError::IO(err)))),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really worth implementing Stream if we always return Some?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is a trait off between ergonomics (e.g. being able to call next on Connection via StreamExt) and strict typing.

Thinking about it some more, I don't think the extra ergonomics are worth it. Reverted with 0fea620.

}

// Poll the handler for new events.
Expand All @@ -276,9 +331,11 @@ where
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(Event::Handler(event)));
return Poll::Ready(Some(Ok(Event::Handler(event))));
}
Poll::Ready(Err(err)) => {
return Poll::Ready(Some(Err(ConnectionError::Handler(err))))
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
}
}
Expand All @@ -295,26 +352,17 @@ pub struct IncomingInfo<'a> {

impl<'a> IncomingInfo<'a> {
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
/// Builds the `PendingPoint` corresponding to the incoming connection.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in ecdb812.

pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Listener {
pub fn to_pending_point(&self) -> PendingPoint {
PendingPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}

/// Borrowed information about an outgoing connection currently being negotiated.
#[derive(Debug, Copy, Clone)]
pub struct OutgoingInfo<'a> {
pub address: &'a Multiaddr,
pub peer_id: Option<&'a PeerId>,
}

impl<'a> OutgoingInfo<'a> {
/// Builds a `ConnectedPoint` corresponding to the outgoing connection.
/// Builds the `ConnectedPoint` corresponding to the incoming connection.
pub fn to_connected_point(&self) -> ConnectedPoint {
ConnectedPoint::Dialer {
address: self.address.clone(),
ConnectedPoint::Listener {
local_addr: self.local_addr.clone(),
send_back_addr: self.send_back_addr.clone(),
}
}
}
Expand Down
51 changes: 34 additions & 17 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

use crate::connection::ConnectionLimit;
use crate::transport::TransportError;
use crate::Multiaddr;
use std::{fmt, io};

/// Errors that can occur in the context of an established `Connection`.
Expand All @@ -29,10 +30,6 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -45,9 +42,6 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -60,16 +54,31 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}

/// Errors that can occur in the context of a pending outgoing `Connection`.
///
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single
/// connection.
pub type PendingOutboundConnectionError<TTransErr> =
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>;

/// Errors that can occur in the context of a pending incoming `Connection`.
pub type PendingInboundConnectionError<TTransErr> =
PendingConnectionError<TransportError<TTransErr>>;

/// Errors that can occur in the context of a pending `Connection`.
#[derive(Debug)]
pub enum PendingConnectionError<TTransErr> {
/// An error occurred while negotiating the transport protocol(s).
Transport(TransportError<TTransErr>),
pub enum PendingConnectionError<TransportError> {
/// An error occurred while negotiating the transport protocol(s) on a connection.
Transport(TransportError),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider naming the generic here also TTransErr:

  1. it would be consistent with PendingInboundConnectionError / PendingOutboundConnectionError above, and with the general naming convention for generic types in rust-libp2p
  2. it's easier to identify the type as generic when reading the code. TransportError gives the impression that this is an actual error type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Done via 2a6dc54.


/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// Pending connection attempt has been aborted.
Aborted,
Expand All @@ -83,16 +92,23 @@ pub enum PendingConnectionError<TTransErr> {
IO(io::Error),
}

impl<TTransErr> fmt::Display for PendingConnectionError<TTransErr>
impl<TransportError> fmt::Display for PendingConnectionError<TransportError>
where
TTransErr: fmt::Display,
TransportError: fmt::Display + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err),
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."),
PendingConnectionError::Transport(err) => {
write!(f, "Pending connection: Transport error: {}", err)
write!(
f,
"Pending connection: Transport error on connection: {}",
err
)
}
PendingConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
PendingConnectionError::InvalidPeerId => {
write!(f, "Pending connection: Invalid peer ID.")
Expand All @@ -101,16 +117,17 @@ where
}
}

impl<TTransErr> std::error::Error for PendingConnectionError<TTransErr>
impl<TransportError> std::error::Error for PendingConnectionError<TransportError>
where
TTransErr: std::error::Error + 'static,
TransportError: std::error::Error + 'static,
{
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
PendingConnectionError::IO(err) => Some(err),
PendingConnectionError::Transport(err) => Some(err),
PendingConnectionError::Transport(_) => None,
PendingConnectionError::InvalidPeerId => None,
PendingConnectionError::Aborted => None,
PendingConnectionError::ConnectionLimit(..) => None,
}
}
}
Loading