From e7f716f8bada815cfb879824595c2d6b93e08531 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 28 Sep 2021 14:36:19 +0200 Subject: [PATCH] core/: Merge pending and established connection limits Merge pending and established limits for both incoming and outgoing connections. More specifically merge `ConnectionLimits::with_max_pending_incoming` with `ConnectionLimits::with_max_established_incoming` and `ConnectionLimits::with_max_pending_outgoing` with `ConnectionLimits::with_max_established_outgoing`. Connection limits are checked on `Network::dial` for outgoing and on `Network::accept` for incoming connections. This (a) simplifies connection limits from an implementations and user perspective and (b) simplifies returning a connection handler on limit error as limits can only be exceeded at the start of dialing and accepting. See [1]. [1]: https://github.com/libp2p/rust-libp2p/issues/2242#issuecomment-928229155 --- core/CHANGELOG.md | 14 +- core/src/connection/error.rs | 9 -- core/src/connection/manager.rs | 8 +- core/src/connection/manager/task.rs | 14 +- core/src/connection/pool.rs | 198 ++++++++-------------------- core/src/network.rs | 2 + core/src/network/event.rs | 12 +- core/tests/connection_limits.rs | 63 +++------ core/tests/network_dial_error.rs | 5 +- swarm/src/lib.rs | 7 +- 10 files changed, 117 insertions(+), 215 deletions(-) diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 1c4ace6835f..30e7c138745 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,6 +1,13 @@ # 0.30.0 [unreleased] -- Add `ConnectionLimit::with_max_established` (see [PR 2137]). +- Add `ConnectionLimits::with_max_established` (see [PR 2137]). + +- Merge pending and established limits for both incoming and outgoing connections. More specifically + merge `ConnectionLimits::with_max_pending_incoming` with + `ConnectionLimits::with_max_established_incoming` and + `ConnectionLimits::with_max_pending_outgoing` with + `ConnectionLimits::with_max_established_outgoing`. Connection limits are checked on + `Network::dial` for outgoing and on `Network::accept` for incoming connections. - Add `Keypair::to_protobuf_encoding` (see [PR 2142]). @@ -22,11 +29,6 @@ - Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]). -- Report `ConnectionLimit` error through `ConnectionError` and thus through - `NetworkEvent::ConnectionClosed` instead of previously through - `PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError, - DialError}` (see [PR 2191]). - - Report abortion of pending connection through `DialError`, `UnknownPeerDialError` or `IncomingConnectionError` (see [PR 2191]). diff --git a/core/src/connection/error.rs b/core/src/connection/error.rs index ec4f7ff6e61..fb1f8cfbc9d 100644 --- a/core/src/connection/error.rs +++ b/core/src/connection/error.rs @@ -18,7 +18,6 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::connection::ConnectionLimit; use crate::transport::TransportError; use std::{fmt, io}; @@ -29,10 +28,6 @@ pub enum ConnectionError { // TODO: Eventually this should also be a custom error? IO(io::Error), - /// The connection was dropped because the connection limit - /// for a peer has been reached. - ConnectionLimit(ConnectionLimit), - /// The connection handler produced an error. Handler(THandlerErr), } @@ -45,9 +40,6 @@ where match self { ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err), ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err), - ConnectionError::ConnectionLimit(l) => { - write!(f, "Connection error: Connection limit: {}.", l) - } } } } @@ -60,7 +52,6 @@ where match self { ConnectionError::IO(err) => Some(err), ConnectionError::Handler(err) => Some(err), - ConnectionError::ConnectionLimit(..) => None, } } } diff --git a/core/src/connection/manager.rs b/core/src/connection/manager.rs index 51c08024202..dd87f760488 100644 --- a/core/src/connection/manager.rs +++ b/core/src/connection/manager.rs @@ -20,8 +20,8 @@ use super::{ handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit, - IntoConnectionHandler, PendingConnectionError, Substream, + Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler, + PendingConnectionError, Substream, }; use crate::{muxing::StreamMuxer, Executor}; use fnv::FnvHashMap; @@ -439,7 +439,7 @@ impl<'a, I> EstablishedEntry<'a, I> { /// /// When the connection is ultimately closed, [`Event::ConnectionClosed`] /// is emitted by [`Manager::poll`]. - pub fn start_close(mut self, error: Option) { + pub fn start_close(mut self) { // Clone the sender so that we are guaranteed to have // capacity for the close command (every sender gets a slot). match self @@ -447,7 +447,7 @@ impl<'a, I> EstablishedEntry<'a, I> { .get_mut() .sender .clone() - .try_send(task::Command::Close(error)) + .try_send(task::Command::Close) { Ok(()) => {} Err(e) => assert!(e.is_disconnected(), "No capacity for close command."), diff --git a/core/src/connection/manager/task.rs b/core/src/connection/manager/task.rs index 5860bcc6c9d..0f818de4e0c 100644 --- a/core/src/connection/manager/task.rs +++ b/core/src/connection/manager/task.rs @@ -23,8 +23,8 @@ use crate::{ connection::{ self, handler::{THandlerError, THandlerInEvent, THandlerOutEvent}, - Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit, - IntoConnectionHandler, PendingConnectionError, Substream, + Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler, + PendingConnectionError, Substream, }, muxing::StreamMuxer, Multiaddr, @@ -43,7 +43,7 @@ pub enum Command { NotifyHandler(T), /// Gracefully close the connection (active close) before /// terminating the task. - Close(Option), + Close, } /// Events that a task can emit to its manager. @@ -163,7 +163,6 @@ where Closing { closing_muxer: Close, handler: H::Handler, - error: Option, }, /// The task is terminating with a final event for the `Manager`. @@ -257,7 +256,7 @@ where Poll::Ready(Some(Command::NotifyHandler(event))) => { connection.inject_event(event) } - Poll::Ready(Some(Command::Close(error))) => { + Poll::Ready(Some(Command::Close)) => { // Don't accept any further commands. this.commands.get_mut().close(); // Discard the event, if any, and start a graceful close. @@ -265,7 +264,6 @@ where this.state = State::Closing { handler, closing_muxer, - error, }; continue 'poll; } @@ -340,7 +338,6 @@ where State::Closing { handler, - error, mut closing_muxer, } => { // Try to gracefully close the connection. @@ -348,7 +345,7 @@ where Poll::Ready(Ok(())) => { let event = Event::Closed { id: this.id, - error: error.map(ConnectionError::ConnectionLimit), + error: None, handler, }; this.state = State::Terminating(event); @@ -364,7 +361,6 @@ where Poll::Pending => { this.state = State::Closing { handler, - error, closing_muxer, }; return Poll::Pending; diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index f147f766d4d..bc4d9afd0d3 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -225,8 +225,9 @@ impl Pool { TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { - self.counters.check_max_pending_incoming()?; let endpoint = info.to_connected_point(); + // TODO: This drops the handler. + self.counters.check_max(&endpoint)?; Ok(self.add_pending(future, handler, endpoint, None)) } @@ -252,10 +253,10 @@ impl Pool { TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send + 'static, { - if let Err(limit) = self.counters.check_max_pending_outgoing() { + let endpoint = info.to_connected_point(); + if let Err(limit) = self.counters.check_max(&endpoint) { return Err(DialError::ConnectionLimit { limit, handler }); }; - let endpoint = info.to_connected_point(); Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned())) } @@ -304,7 +305,7 @@ impl Pool { }); let id = self.manager.add_pending(future, handler); - self.counters.inc_pending(&endpoint); + self.counters.inc(&endpoint); self.pending.insert(id, (endpoint, peer)); id } @@ -382,7 +383,7 @@ impl Pool { if let Some(conns) = self.established.get(peer) { for (&id, _endpoint) in conns.iter() { if let Some(manager::Entry::Established(e)) = self.manager.entry(id) { - e.start_close(None); + e.start_close(); } } } @@ -489,7 +490,7 @@ impl Pool { match item { manager::Event::PendingConnectionError { id, error, handler } => { if let Some((endpoint, peer)) = self.pending.remove(&id) { - self.counters.dec_pending(&endpoint); + self.counters.dec(&endpoint); return Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, @@ -509,7 +510,7 @@ impl Pool { let num_established = if let Some(conns) = self.established.get_mut(&connected.peer_id) { if let Some(endpoint) = conns.remove(&id) { - self.counters.dec_established(&endpoint); + self.counters.dec(&endpoint); } u32::try_from(conns.len()).unwrap() } else { @@ -530,22 +531,6 @@ impl Pool { manager::Event::ConnectionEstablished { entry } => { let id = entry.id(); if let Some((endpoint, peer)) = self.pending.remove(&id) { - self.counters.dec_pending(&endpoint); - - // Check general established connection limit. - if let Err(e) = self.counters.check_max_established(&endpoint) { - entry.start_close(Some(e)); - continue; - } - - // Check per-peer established connection limit. - let current = - num_peer_established(&self.established, &entry.connected().peer_id); - if let Err(e) = self.counters.check_max_established_per_peer(current) { - entry.start_close(Some(e)); - continue; - } - // Peer ID checks must already have happened. See `add_pending`. if cfg!(debug_assertions) { if self.local_id == entry.connected().peer_id { @@ -564,7 +549,6 @@ impl Pool { let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap()) .expect("n + 1 is always non-zero; qed"); - self.counters.inc_established(&endpoint); conns.insert(id, endpoint); match self.get(id) { Some(PoolConnection::Established(connection)) => { @@ -663,7 +647,7 @@ impl PendingConnection<'_, TInEvent> { .remove(&self.entry.id()) .expect("`entry` is a pending entry") .0; - self.counters.dec_pending(&endpoint); + self.counters.dec(&endpoint); self.entry.abort(); } } @@ -732,7 +716,7 @@ impl EstablishedConnection<'_, TInEvent> { /// /// Has no effect if the connection is already closing. pub fn start_close(self) { - self.entry.start_close(None) + self.entry.start_close() } } @@ -798,23 +782,17 @@ pub struct ConnectionCounters { /// The effective connection limits. limits: ConnectionLimits, /// The current number of incoming connections. - pending_incoming: u32, + incoming: u32, /// The current number of outgoing connections. - pending_outgoing: u32, - /// The current number of established inbound connections. - established_incoming: u32, - /// The current number of established outbound connections. - established_outgoing: u32, + outgoing: u32, } impl ConnectionCounters { fn new(limits: ConnectionLimits) -> Self { Self { limits, - pending_incoming: 0, - pending_outgoing: 0, - established_incoming: 0, - established_outgoing: 0, + incoming: 0, + outgoing: 0, } } @@ -823,111 +801,63 @@ impl ConnectionCounters { &self.limits } - /// The total number of connections, both pending and established. - pub fn num_connections(&self) -> u32 { - self.num_pending() + self.num_established() + /// The number of incoming connections. + pub fn num_incoming(&self) -> u32 { + self.incoming } - /// The total number of pending connections, both incoming and outgoing. - pub fn num_pending(&self) -> u32 { - self.pending_incoming + self.pending_outgoing + /// The number of outgoing connections. + pub fn num_outgoing(&self) -> u32 { + self.outgoing } - /// The number of incoming connections being established. - pub fn num_pending_incoming(&self) -> u32 { - self.pending_incoming + /// The total number of connections. + pub fn num_total(&self) -> u32 { + self.outgoing + self.incoming } - /// The number of outgoing connections being established. - pub fn num_pending_outgoing(&self) -> u32 { - self.pending_outgoing - } - - /// The number of established incoming connections. - pub fn num_established_incoming(&self) -> u32 { - self.established_incoming - } - - /// The number of established outgoing connections. - pub fn num_established_outgoing(&self) -> u32 { - self.established_outgoing - } - - /// The total number of established connections. - pub fn num_established(&self) -> u32 { - self.established_outgoing + self.established_incoming - } - - fn inc_pending(&mut self, endpoint: &ConnectedPoint) { + fn inc(&mut self, endpoint: &ConnectedPoint) { match endpoint { ConnectedPoint::Dialer { .. } => { - self.pending_outgoing += 1; + self.outgoing += 1; } ConnectedPoint::Listener { .. } => { - self.pending_incoming += 1; + self.incoming += 1; } } } - fn dec_pending(&mut self, endpoint: &ConnectedPoint) { + fn dec(&mut self, endpoint: &ConnectedPoint) { match endpoint { ConnectedPoint::Dialer { .. } => { - self.pending_outgoing -= 1; + self.outgoing -= 1; } ConnectedPoint::Listener { .. } => { - self.pending_incoming -= 1; + self.incoming -= 1; } } } - fn inc_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing += 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming += 1; - } - } - } - - fn dec_established(&mut self, endpoint: &ConnectedPoint) { - match endpoint { - ConnectedPoint::Dialer { .. } => { - self.established_outgoing -= 1; - } - ConnectedPoint::Listener { .. } => { - self.established_incoming -= 1; - } - } - } - - fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_outgoing, self.limits.max_pending_outgoing) + fn check_max_outgoing(&self) -> Result<(), ConnectionLimit> { + Self::check(self.outgoing, self.limits.max_outgoing) } - fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> { - Self::check(self.pending_incoming, self.limits.max_pending_incoming) + fn check_max_incoming(&self) -> Result<(), ConnectionLimit> { + Self::check(self.incoming, self.limits.max_incoming) } - fn check_max_established(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { + fn check_max(&self, endpoint: &ConnectedPoint) -> Result<(), ConnectionLimit> { // Check total connection limit. - Self::check(self.num_established(), self.limits.max_established_total)?; + Self::check(self.num_total(), self.limits.max_total)?; // Check incoming/outgoing connection limits match endpoint { - ConnectedPoint::Dialer { .. } => Self::check( - self.established_outgoing, - self.limits.max_established_outgoing, - ), - ConnectedPoint::Listener { .. } => Self::check( - self.established_incoming, - self.limits.max_established_incoming, - ), + ConnectedPoint::Dialer { .. } => Self::check(self.outgoing, self.limits.max_outgoing), + ConnectedPoint::Listener { .. } => Self::check(self.incoming, self.limits.max_incoming), } } - fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { - Self::check(current, self.limits.max_established_per_peer) + fn check_max_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> { + Self::check(current, self.limits.max_per_peer) } fn check(current: u32, limit: Option) -> Result<(), ConnectionLimit> { @@ -955,54 +885,40 @@ fn num_peer_established( /// By default no connection limits apply. #[derive(Debug, Clone, Default)] pub struct ConnectionLimits { - max_pending_incoming: Option, - max_pending_outgoing: Option, - max_established_incoming: Option, - max_established_outgoing: Option, - max_established_per_peer: Option, - max_established_total: Option, + max_incoming: Option, + max_outgoing: Option, + max_per_peer: Option, + max_total: Option, } impl ConnectionLimits { - /// Configures the maximum number of concurrently incoming connections being established. - pub fn with_max_pending_incoming(mut self, limit: Option) -> Self { - self.max_pending_incoming = limit; - self - } - - /// Configures the maximum number of concurrently outgoing connections being established. - pub fn with_max_pending_outgoing(mut self, limit: Option) -> Self { - self.max_pending_outgoing = limit; - self - } - - /// Configures the maximum number of concurrent established inbound connections. - pub fn with_max_established_incoming(mut self, limit: Option) -> Self { - self.max_established_incoming = limit; + /// Configures the maximum number of incoming connections. + pub fn with_max_incoming(mut self, limit: Option) -> Self { + self.max_incoming = limit; self } - /// Configures the maximum number of concurrent established outbound connections. - pub fn with_max_established_outgoing(mut self, limit: Option) -> Self { - self.max_established_outgoing = limit; + /// Configures the maximum number of outgoing connections. + pub fn with_max_outgoing(mut self, limit: Option) -> Self { + self.max_outgoing = limit; self } - /// Configures the maximum number of concurrent established connections (both - /// inbound and outbound). + /// Configures the maximum number of connections (both + /// incoming and outgoing). /// /// Note: This should be used in conjunction with - /// [`ConnectionLimits::with_max_established_incoming`] to prevent possible + /// [`ConnectionLimits::with_max_incoming`] to prevent possible /// eclipse attacks (all connections being inbound). - pub fn with_max_established(mut self, limit: Option) -> Self { - self.max_established_total = limit; + pub fn with_max_total(mut self, limit: Option) -> Self { + self.max_total = limit; self } - /// Configures the maximum number of concurrent established connections per peer, + /// Configures the maximum number of connections per peer, /// regardless of direction (incoming or outgoing). - pub fn with_max_established_per_peer(mut self, limit: Option) -> Self { - self.max_established_per_peer = limit; + pub fn with_max_per_peer(mut self, limit: Option) -> Self { + self.max_per_peer = limit; self } } diff --git a/core/src/network.rs b/core/src/network.rs index 831a99c4b01..bc93624a008 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -614,6 +614,7 @@ where ( opts, NetworkEvent::DialError { + id, attempts_remaining, peer_id, multiaddr: failed_addr, @@ -626,6 +627,7 @@ where ConnectedPoint::Dialer { address } => ( None, NetworkEvent::UnknownPeerDialError { + id, multiaddr: address, error, handler, diff --git a/core/src/network/event.rs b/core/src/network/event.rs index cea5bbddc21..5c12406a2d6 100644 --- a/core/src/network/event.rs +++ b/core/src/network/event.rs @@ -130,6 +130,8 @@ where /// A dialing attempt to an address of a peer failed. DialError { + /// The ID of the connection that encountered an error. + id: ConnectionId, /// The number of remaining dialing attempts. attempts_remaining: DialAttemptsRemaining, @@ -145,6 +147,8 @@ where /// Failed to reach a peer that we were trying to dial. UnknownPeerDialError { + /// The ID of the connection that encountered an error. + id: ConnectionId, /// The multiaddr we failed to reach. multiaddr: Multiaddr, @@ -262,21 +266,27 @@ where .field("error", error) .finish(), NetworkEvent::DialError { + id, attempts_remaining, peer_id, multiaddr, error, } => f .debug_struct("DialError") + .field("id", id) .field("attempts_remaining", &attempts_remaining.get_attempts()) .field("peer_id", peer_id) .field("multiaddr", multiaddr) .field("error", error) .finish(), NetworkEvent::UnknownPeerDialError { - multiaddr, error, .. + id, + multiaddr, + error, + .. } => f .debug_struct("UnknownPeerDialError") + .field("id", id) .field("multiaddr", multiaddr) .field("error", error) .finish(), diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index d5156664ffb..bdde7246376 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -35,7 +35,7 @@ use util::{test_network, TestHandler}; fn max_outgoing() { let outgoing_limit = rand::thread_rng().gen_range(1, 10); - let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit)); + let limits = ConnectionLimits::default().with_max_outgoing(Some(outgoing_limit)); let cfg = NetworkConfig::default().with_connection_limits(limits); let mut network = test_network(cfg); @@ -62,10 +62,7 @@ fn max_outgoing() { let info = network.info(); assert_eq!(info.num_peers(), 0); - assert_eq!( - info.connection_counters().num_pending_outgoing(), - outgoing_limit - ); + assert_eq!(info.connection_counters().num_outgoing(), outgoing_limit); // Abort all dialing attempts. let mut peer = network @@ -78,18 +75,15 @@ fn max_outgoing() { attempt.abort(); } - assert_eq!( - network.info().connection_counters().num_pending_outgoing(), - 0 - ); + assert_eq!(network.info().connection_counters().num_outgoing(), 0); } #[test] -fn max_established_incoming() { +fn max_incoming() { let limit = rand::thread_rng().gen_range(1, 10); fn config(limit: u32) -> NetworkConfig { - let limits = ConnectionLimits::default().with_max_established_incoming(Some(limit)); + let limits = ConnectionLimits::default().with_max_incoming(Some(limit)); NetworkConfig::default().with_connection_limits(limits) } @@ -102,28 +96,18 @@ fn max_established_incoming() { let mut addr_sender = Some(addr_sender); // Spawn the listener. - let listener = async_std::task::spawn(poll_fn(move |cx| loop { - match ready!(network1.poll(cx)) { - NetworkEvent::NewListenerAddress { listen_addr, .. } => { - addr_sender.take().unwrap().send(listen_addr).unwrap(); - } - NetworkEvent::IncomingConnection { connection, .. } => { - network1.accept(connection, TestHandler()).unwrap(); - } - NetworkEvent::ConnectionEstablished { .. } => {} - NetworkEvent::ConnectionClosed { - error: Some(ConnectionError::ConnectionLimit(err)), - .. - } => { - assert_eq!(err.limit, limit); - assert_eq!(err.limit, err.current); - let info = network1.info(); - let counters = info.connection_counters(); - assert_eq!(counters.num_established_incoming(), limit); - assert_eq!(counters.num_established(), limit); - return Poll::Ready(()); + async_std::task::spawn(poll_fn(move |cx| -> Poll<()> { + loop { + match ready!(network1.poll(cx)) { + NetworkEvent::NewListenerAddress { listen_addr, .. } => { + addr_sender.take().unwrap().send(listen_addr).unwrap(); + } + NetworkEvent::IncomingConnection { connection, .. } => { + network1.accept(connection, TestHandler()); + } + NetworkEvent::ConnectionEstablished { .. } => {} + e => panic!("Unexpected network event: {:?}", e), } - e => panic!("Unexpected network event: {:?}", e), } })); @@ -147,19 +131,15 @@ fn max_established_incoming() { expected_closed = Some(id); } } else { - // This connection exceeds the limit for the listener and - // is expected to close shortly. For the dialer, these connections - // will first appear established before the listener closes them as - // a result of the limit violation. - assert_eq!(Some(connection.id()), expected_closed); + panic!("Unexpected established connection beyond connection limit.") } } - NetworkEvent::ConnectionClosed { id, .. } => { + NetworkEvent::UnknownPeerDialError { id, .. } => { assert_eq!(Some(id), expected_closed); let info = network2.info(); let counters = info.connection_counters(); - assert_eq!(counters.num_established_outgoing(), limit); - assert_eq!(counters.num_established(), limit); + assert_eq!(counters.num_outgoing(), limit); + assert_eq!(counters.num_total(), limit); return Poll::Ready(()); } e => panic!("Unexpected network event: {:?}", e), @@ -168,7 +148,4 @@ fn max_established_incoming() { }) .await }); - - // Wait for the listener to complete. - async_std::task::block_on(listener); } diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index bed4c06e023..c8b8493e205 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -51,7 +51,7 @@ fn deny_incoming_connec() { _ => panic!("Was expecting the listen address to be reported"), })); - swarm2 + let (connection_id, _) = swarm2 .peer(swarm1.local_peer_id().clone()) .dial(address.clone(), Vec::new(), TestHandler()) .unwrap(); @@ -65,11 +65,13 @@ fn deny_incoming_connec() { match swarm2.poll(cx) { Poll::Ready(NetworkEvent::DialError { + id, attempts_remaining, peer_id, multiaddr, error: PendingConnectionError::Transport(_), }) => { + assert_eq!(id, connection_id); assert_eq!(0u32, attempts_remaining.get_attempts()); assert_eq!(&peer_id, swarm1.local_peer_id()); assert_eq!( @@ -191,6 +193,7 @@ fn multiple_addresses_err() { loop { match swarm.poll(cx) { Poll::Ready(NetworkEvent::DialError { + id: _, attempts_remaining, peer_id, multiaddr, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index a903113c2a9..02bf2b0b6c7 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -624,6 +624,7 @@ where ); let local_addr = connection.local_addr.clone(); let send_back_addr = connection.send_back_addr.clone(); + // TODO: Handler is lost here. if let Err(e) = this.network.accept(connection, handler) { log::warn!("Incoming connection rejected: {:?}", e); } @@ -709,6 +710,7 @@ where }); } Poll::Ready(NetworkEvent::DialError { + id: _, peer_id, multiaddr, error, @@ -745,7 +747,10 @@ where }); } Poll::Ready(NetworkEvent::UnknownPeerDialError { - multiaddr, error, .. + id: _, + multiaddr, + error, + .. }) => { log::debug!( "Connection attempt to address {:?} of unknown peer failed with {:?}",