-
Notifications
You must be signed in to change notification settings - Fork 930
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
core/: Concurrent connection attempts #2248
Changes from 57 commits
75292bb
10d941b
f01b323
d6da039
5887d69
08c196f
6c1ce85
9070bd2
4043970
d594208
bdfd5d1
a705bb0
436b876
db3fba7
63c4a32
eb86b41
ae21f9e
7d3114b
c43496d
e06131c
7c6a0fe
752bb70
0dcd1cf
2392249
b27c735
1703d40
794ca73
036c433
6eda209
999b481
9994a90
8eb278d
73ffc76
1a97d9b
f5d0e91
62ed126
42ef7f5
520c4ec
f76cf2b
0ba84ab
30401cb
2147b07
c1491aa
77e36e9
a80fe6c
095e081
b621ff0
2c24542
07ed888
dd49cfc
a725fbf
386f231
66cb2be
4dba4c8
dc3c46d
559bb31
8547941
3aca8d0
ef36c5e
0fea620
d25c95b
ecdb812
2a6dc54
e7beec4
34daf90
d099106
d5e06b6
7331d90
9fc5870
e840d65
c35992a
cb553cf
04c694f
7aafc52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -23,23 +23,40 @@ pub(crate) mod handler; | |||||
mod listeners; | ||||||
mod substream; | ||||||
|
||||||
pub(crate) mod manager; | ||||||
pub(crate) mod pool; | ||||||
|
||||||
pub use error::{ConnectionError, PendingConnectionError}; | ||||||
pub use error::{ | ||||||
ConnectionError, PendingConnectionError, PendingInboundConnectionError, | ||||||
PendingOutboundConnectionError, | ||||||
}; | ||||||
pub use handler::{ConnectionHandler, ConnectionHandlerEvent, IntoConnectionHandler}; | ||||||
pub use listeners::{ListenerId, ListenersEvent, ListenersStream}; | ||||||
pub use manager::ConnectionId; | ||||||
pub use pool::{ConnectionCounters, ConnectionLimits}; | ||||||
pub use pool::{EstablishedConnection, EstablishedConnectionIter, PendingConnection}; | ||||||
pub use substream::{Close, Substream, SubstreamEndpoint}; | ||||||
|
||||||
use crate::muxing::StreamMuxer; | ||||||
use crate::{Multiaddr, PeerId}; | ||||||
use futures::stream::Stream; | ||||||
use std::hash::Hash; | ||||||
use std::{error::Error, fmt, pin::Pin, task::Context, task::Poll}; | ||||||
use substream::{Muxing, SubstreamEvent}; | ||||||
|
||||||
/// Connection identifier. | ||||||
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] | ||||||
pub struct ConnectionId(usize); | ||||||
|
||||||
impl ConnectionId { | ||||||
/// Creates a `ConnectionId` from a non-negative integer. | ||||||
/// | ||||||
/// This is primarily useful for creating connection IDs | ||||||
/// in test environments. There is in general no guarantee | ||||||
/// that all connection IDs are based on non-negative integers. | ||||||
pub fn new(id: usize) -> Self { | ||||||
ConnectionId(id) | ||||||
} | ||||||
} | ||||||
|
||||||
/// The endpoint roles associated with a peer-to-peer communication channel. | ||||||
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] | ||||||
pub enum Endpoint { | ||||||
|
@@ -72,7 +89,40 @@ impl Endpoint { | |||||
} | ||||||
} | ||||||
|
||||||
/// The endpoint roles associated with a peer-to-peer connection. | ||||||
/// The endpoint roles associated with a pending peer-to-peer connection. | ||||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)] | ||||||
pub enum PendingPoint { | ||||||
/// The socket comes from a dialer. | ||||||
/// | ||||||
/// There is no single address associated with the Dialer of a pending | ||||||
/// connection. Addresses are dialed in parallel. Only once the first dial | ||||||
/// is successful is the address of the connection known. | ||||||
Dialer, | ||||||
/// The socket comes from a listener. | ||||||
Listener { | ||||||
/// Local connection address. | ||||||
local_addr: Multiaddr, | ||||||
/// Stack of protocols used to send back data to the remote. | ||||||
send_back_addr: Multiaddr, | ||||||
}, | ||||||
} | ||||||
|
||||||
impl From<ConnectedPoint> for PendingPoint { | ||||||
fn from(endpoint: ConnectedPoint) -> Self { | ||||||
match endpoint { | ||||||
ConnectedPoint::Dialer { .. } => PendingPoint::Dialer, | ||||||
ConnectedPoint::Listener { | ||||||
local_addr, | ||||||
send_back_addr, | ||||||
} => PendingPoint::Listener { | ||||||
local_addr, | ||||||
send_back_addr, | ||||||
}, | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
/// The endpoint roles associated with an established peer-to-peer connection. | ||||||
#[derive(PartialEq, Eq, Debug, Clone, Hash)] | ||||||
pub enum ConnectedPoint { | ||||||
/// We dialed the node. | ||||||
|
@@ -234,13 +284,18 @@ where | |||||
pub fn close(self) -> (THandler, Close<TMuxer>) { | ||||||
(self.handler, self.muxing.close().0) | ||||||
} | ||||||
} | ||||||
|
||||||
impl<TMuxer, THandler> Stream for Connection<TMuxer, THandler> | ||||||
where | ||||||
TMuxer: StreamMuxer, | ||||||
THandler: ConnectionHandler<Substream = Substream<TMuxer>>, | ||||||
{ | ||||||
type Item = Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>; | ||||||
|
||||||
/// Polls the connection for events produced by the associated handler | ||||||
/// as a result of I/O activity on the substream multiplexer. | ||||||
pub fn poll( | ||||||
mut self: Pin<&mut Self>, | ||||||
cx: &mut Context<'_>, | ||||||
) -> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>> { | ||||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||||||
loop { | ||||||
let mut io_pending = false; | ||||||
|
||||||
|
@@ -260,9 +315,9 @@ where | |||||
} | ||||||
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => { | ||||||
self.handler.inject_address_change(&address); | ||||||
return Poll::Ready(Ok(Event::AddressChange(address))); | ||||||
return Poll::Ready(Some(Ok(Event::AddressChange(address)))); | ||||||
} | ||||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))), | ||||||
Poll::Ready(Err(err)) => return Poll::Ready(Some(Err(ConnectionError::IO(err)))), | ||||||
} | ||||||
|
||||||
// Poll the handler for new events. | ||||||
|
@@ -276,9 +331,11 @@ where | |||||
self.muxing.open_substream(user_data); | ||||||
} | ||||||
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => { | ||||||
return Poll::Ready(Ok(Event::Handler(event))); | ||||||
return Poll::Ready(Some(Ok(Event::Handler(event)))); | ||||||
} | ||||||
Poll::Ready(Err(err)) => { | ||||||
return Poll::Ready(Some(Err(ConnectionError::Handler(err)))) | ||||||
} | ||||||
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
@@ -295,26 +352,17 @@ pub struct IncomingInfo<'a> { | |||||
|
||||||
impl<'a> IncomingInfo<'a> { | ||||||
/// Builds the `ConnectedPoint` corresponding to the incoming connection. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in ecdb812. |
||||||
pub fn to_connected_point(&self) -> ConnectedPoint { | ||||||
ConnectedPoint::Listener { | ||||||
pub fn to_pending_point(&self) -> PendingPoint { | ||||||
PendingPoint::Listener { | ||||||
local_addr: self.local_addr.clone(), | ||||||
send_back_addr: self.send_back_addr.clone(), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
/// Borrowed information about an outgoing connection currently being negotiated. | ||||||
#[derive(Debug, Copy, Clone)] | ||||||
pub struct OutgoingInfo<'a> { | ||||||
pub address: &'a Multiaddr, | ||||||
pub peer_id: Option<&'a PeerId>, | ||||||
} | ||||||
|
||||||
impl<'a> OutgoingInfo<'a> { | ||||||
/// Builds a `ConnectedPoint` corresponding to the outgoing connection. | ||||||
/// Builds the `ConnectedPoint` corresponding to the incoming connection. | ||||||
pub fn to_connected_point(&self) -> ConnectedPoint { | ||||||
ConnectedPoint::Dialer { | ||||||
address: self.address.clone(), | ||||||
ConnectedPoint::Listener { | ||||||
local_addr: self.local_addr.clone(), | ||||||
send_back_addr: self.send_back_addr.clone(), | ||||||
} | ||||||
} | ||||||
} | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ | |
|
||
use crate::connection::ConnectionLimit; | ||
use crate::transport::TransportError; | ||
use crate::Multiaddr; | ||
use std::{fmt, io}; | ||
|
||
/// Errors that can occur in the context of an established `Connection`. | ||
|
@@ -29,10 +30,6 @@ pub enum ConnectionError<THandlerErr> { | |
// TODO: Eventually this should also be a custom error? | ||
IO(io::Error), | ||
|
||
/// The connection was dropped because the connection limit | ||
/// for a peer has been reached. | ||
ConnectionLimit(ConnectionLimit), | ||
|
||
/// The connection handler produced an error. | ||
Handler(THandlerErr), | ||
} | ||
|
@@ -45,9 +42,6 @@ where | |
match self { | ||
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), | ||
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), | ||
ConnectionError::ConnectionLimit(l) => { | ||
write!(f, "Connection error: Connection limit: {}.", l) | ||
} | ||
} | ||
} | ||
} | ||
|
@@ -60,16 +54,31 @@ where | |
match self { | ||
ConnectionError::IO(err) => Some(err), | ||
ConnectionError::Handler(err) => Some(err), | ||
ConnectionError::ConnectionLimit(..) => None, | ||
} | ||
} | ||
} | ||
|
||
/// Errors that can occur in the context of a pending outgoing `Connection`. | ||
/// | ||
/// Note: Addresses for an outbound connection are dialed in parallel. Thus, compared to | ||
/// [`PendingInboundConnectionError`], one or more [`TransportError`]s can occur for a single | ||
/// connection. | ||
pub type PendingOutboundConnectionError<TTransErr> = | ||
PendingConnectionError<Vec<(Multiaddr, TransportError<TTransErr>)>>; | ||
|
||
/// Errors that can occur in the context of a pending incoming `Connection`. | ||
pub type PendingInboundConnectionError<TTransErr> = | ||
PendingConnectionError<TransportError<TTransErr>>; | ||
|
||
/// Errors that can occur in the context of a pending `Connection`. | ||
#[derive(Debug)] | ||
pub enum PendingConnectionError<TTransErr> { | ||
/// An error occurred while negotiating the transport protocol(s). | ||
Transport(TransportError<TTransErr>), | ||
pub enum PendingConnectionError<TransportError> { | ||
/// An error occurred while negotiating the transport protocol(s) on a connection. | ||
Transport(TransportError), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider naming the generic here also
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 Done via 2a6dc54. |
||
|
||
/// The connection was dropped because the connection limit | ||
/// for a peer has been reached. | ||
ConnectionLimit(ConnectionLimit), | ||
|
||
/// Pending connection attempt has been aborted. | ||
Aborted, | ||
|
@@ -83,16 +92,23 @@ pub enum PendingConnectionError<TTransErr> { | |
IO(io::Error), | ||
} | ||
|
||
impl<TTransErr> fmt::Display for PendingConnectionError<TTransErr> | ||
impl<TransportError> fmt::Display for PendingConnectionError<TransportError> | ||
where | ||
TTransErr: fmt::Display, | ||
TransportError: fmt::Display + fmt::Debug, | ||
{ | ||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
match self { | ||
PendingConnectionError::IO(err) => write!(f, "Pending connection: I/O error: {}", err), | ||
PendingConnectionError::Aborted => write!(f, "Pending connection: Aborted."), | ||
PendingConnectionError::Transport(err) => { | ||
write!(f, "Pending connection: Transport error: {}", err) | ||
write!( | ||
f, | ||
"Pending connection: Transport error on connection: {}", | ||
err | ||
) | ||
} | ||
PendingConnectionError::ConnectionLimit(l) => { | ||
write!(f, "Connection error: Connection limit: {}.", l) | ||
} | ||
PendingConnectionError::InvalidPeerId => { | ||
write!(f, "Pending connection: Invalid peer ID.") | ||
|
@@ -101,16 +117,17 @@ where | |
} | ||
} | ||
|
||
impl<TTransErr> std::error::Error for PendingConnectionError<TTransErr> | ||
impl<TransportError> std::error::Error for PendingConnectionError<TransportError> | ||
where | ||
TTransErr: std::error::Error + 'static, | ||
TransportError: std::error::Error + 'static, | ||
{ | ||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { | ||
match self { | ||
PendingConnectionError::IO(err) => Some(err), | ||
PendingConnectionError::Transport(err) => Some(err), | ||
PendingConnectionError::Transport(_) => None, | ||
PendingConnectionError::InvalidPeerId => None, | ||
PendingConnectionError::Aborted => None, | ||
PendingConnectionError::ConnectionLimit(..) => None, | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it really worth implementing
Stream
if we always returnSome
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is a trait off between ergonomics (e.g. being able to call
next
onConnection
viaStreamExt
) and strict typing.Thinking about it some more, I don't think the extra ergonomics are worth it. Reverted with 0fea620.