From 3dd27e181e887cb9a17c9f7e6f22b9fa830b866a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 12:44:19 +0200 Subject: [PATCH 1/6] swarm/src/connection/pool: Remove 'a lifetime in PoolEvent Simplifies `PoolEvent`, no longer carrying a reference to an `EstablishedConnection` or the `Pool`, but instead the `PeerId`, `ConnectionId` and `ConnectedPoint` directly. --- swarm/src/connection/pool.rs | 118 ++++++----------------------------- swarm/src/lib.rs | 36 +++++------ 2 files changed, 36 insertions(+), 118 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index 5fa1da1ef3f..d4943417dcf 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -155,13 +155,16 @@ impl fmt::Debug for Pool +#[derive(Debug)] +pub enum PoolEvent where TTrans: Transport, { /// A new connection has been established. ConnectionEstablished { - connection: EstablishedConnection<'a, ::InEvent>, + id: ConnectionId, + peer_id: PeerId, + endpoint: ConnectedPoint, /// List of other connections to the same peer. /// /// Note: Does not include the connection reported through this event. @@ -190,8 +193,6 @@ where /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. error: Option::Error>>, - /// A reference to the pool that used to manage the connection. - pool: &'a mut Pool, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, handler: THandler::Handler, @@ -225,16 +226,16 @@ where /// A node has produced an event. ConnectionEvent { - /// The connection that has generated the event. - connection: EstablishedConnection<'a, THandlerInEvent>, + id: ConnectionId, + peer_id: PeerId, /// The produced event. event: THandlerOutEvent, }, /// The connection to a node has changed its address. AddressChange { - /// The connection that has changed address. - connection: EstablishedConnection<'a, THandlerInEvent>, + id: ConnectionId, + peer_id: PeerId, /// The new endpoint. new_endpoint: ConnectedPoint, /// The old endpoint. @@ -242,73 +243,6 @@ where }, } -impl<'a, THandler: IntoConnectionHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans> -where - TTrans: Transport, - TTrans::Error: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - PoolEvent::ConnectionEstablished { - connection, - concurrent_dial_errors, - .. - } => f - .debug_tuple("PoolEvent::ConnectionEstablished") - .field(connection) - .field(concurrent_dial_errors) - .finish(), - PoolEvent::ConnectionClosed { - id, - connected, - error, - .. - } => f - .debug_struct("PoolEvent::ConnectionClosed") - .field("id", id) - .field("connected", connected) - .field("error", error) - .finish(), - PoolEvent::PendingOutboundConnectionError { - id, error, peer, .. - } => f - .debug_struct("PoolEvent::PendingOutboundConnectionError") - .field("id", id) - .field("error", error) - .field("peer", peer) - .finish(), - PoolEvent::PendingInboundConnectionError { - id, - error, - send_back_addr, - local_addr, - .. - } => f - .debug_struct("PoolEvent::PendingInboundConnectionError") - .field("id", id) - .field("error", error) - .field("send_back_addr", send_back_addr) - .field("local_addr", local_addr) - .finish(), - PoolEvent::ConnectionEvent { connection, event } => f - .debug_struct("PoolEvent::ConnectionEvent") - .field("peer", &connection.peer_id()) - .field("event", event) - .finish(), - PoolEvent::AddressChange { - connection, - new_endpoint, - old_endpoint, - } => f - .debug_struct("PoolEvent::AddressChange") - .field("peer", &connection.peer_id()) - .field("new_endpoint", new_endpoint) - .field("old_endpoint", old_endpoint) - .finish(), - } - } -} - impl Pool where THandler: IntoConnectionHandler, @@ -586,7 +520,7 @@ where /// /// > **Note**: We use a regular `poll` method instead of implementing `Stream`, /// > because we want the `Pool` to stay borrowed if necessary. - pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> where TTrans: Transport, THandler: IntoConnectionHandler + 'static, @@ -602,16 +536,7 @@ where Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."), Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => { - let entry = self - .established - .get_mut(&peer_id) - .expect("Receive `Notify` event for established peer.") - .entry(id) - .expect_occupied("Receive `Notify` event from established connection"); - return Poll::Ready(PoolEvent::ConnectionEvent { - connection: EstablishedConnection { entry }, - event, - }); + return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event }); } Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange { id, @@ -629,16 +554,12 @@ where let old_endpoint = std::mem::replace(&mut connection.endpoint, new_endpoint.clone()); - match self.get(id) { - Some(PoolConnection::Established(connection)) => { - return Poll::Ready(PoolEvent::AddressChange { - connection, - new_endpoint, - old_endpoint, - }) - } - _ => unreachable!("since `entry` is an `EstablishedEntry`."), - } + return Poll::Ready(PoolEvent::AddressChange { + peer_id, + id, + new_endpoint, + old_endpoint, + }); } Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, @@ -663,7 +584,6 @@ where connected: Connected { endpoint, peer_id }, error, remaining_established_connection_ids, - pool: self, handler, }); } @@ -841,7 +761,9 @@ where match self.get(id) { Some(PoolConnection::Established(connection)) => { return Poll::Ready(PoolEvent::ConnectionEstablished { - connection, + peer_id: connection.peer_id(), + endpoint: connection.endpoint().clone(), + id: connection.id(), other_established_connection_ids, concurrent_dial_errors, }) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 4c3a16fecda..27eff34eb24 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -781,16 +781,16 @@ where connections_not_ready = true; } Poll::Ready(PoolEvent::ConnectionEstablished { - connection, + id, + peer_id, + endpoint, other_established_connection_ids, concurrent_dial_errors, }) => { - let peer_id = connection.peer_id(); - let endpoint = connection.endpoint().clone(); if this.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any // future events from the connection to the behaviour. - this.banned_peer_connections.insert(connection.id()); + this.banned_peer_connections.insert(id); this.pool.disconnect(peer_id); return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }); } else { @@ -805,18 +805,17 @@ where log::debug!( "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", - connection.peer_id(), - connection.endpoint(), + peer_id, + endpoint, num_established, non_banned_established + 1, ); - let endpoint = connection.endpoint().clone(); let failed_addresses = concurrent_dial_errors .as_ref() .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); this.behaviour.inject_connection_established( &peer_id, - &connection.id(), + &id, &endpoint, failed_addresses.as_ref(), non_banned_established, @@ -913,26 +912,23 @@ where num_established, }); } - Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => { - let peer = connection.peer_id(); - let conn_id = connection.id(); - if this.banned_peer_connections.contains(&conn_id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer, conn_id); + Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => { + if this.banned_peer_connections.contains(&id) { + log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); } else { - this.behaviour.inject_event(peer, conn_id, event); + this.behaviour.inject_event(peer_id, id, event); } } Poll::Ready(PoolEvent::AddressChange { - connection, + id, + peer_id, new_endpoint, old_endpoint, }) => { - let peer = connection.peer_id(); - let conn_id = connection.id(); - if !this.banned_peer_connections.contains(&conn_id) { + if !this.banned_peer_connections.contains(&id) { this.behaviour.inject_address_change( - &peer, - &conn_id, + &peer_id, + &id, &old_endpoint, &new_endpoint, ); From 9f0ca957d170a4364f18991a1c10f4d8fbc19436 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 12:55:01 +0200 Subject: [PATCH 2/6] swarm/src/lib: Extract PoolEvent handling into new method --- swarm/src/lib.rs | 328 ++++++++++++++++++++++++----------------------- 1 file changed, 171 insertions(+), 157 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 27eff34eb24..6d81b82bda6 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -514,7 +514,7 @@ where }; match dial { Ok(fut) => fut - .map(|r| (address, r.map_err(TransportError::Other))) + .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) .boxed(), Err(err) => futures::future::ready((address, Err(err))).boxed(), } @@ -538,7 +538,7 @@ where Err((connection_limit, handler)) => { let error = DialError::ConnectionLimit(connection_limit); self.behaviour.inject_dial_failure(None, handler, &error); - Err(error) + return Err(error); } } } @@ -664,6 +664,171 @@ where &mut self.behaviour } + fn handle_pool_event( + &mut self, + event: PoolEvent, transport::Boxed<(PeerId, StreamMuxerBox)>>, + ) -> Option>> { + match event { + PoolEvent::ConnectionEstablished { + peer_id, + id, + endpoint, + other_established_connection_ids, + concurrent_dial_errors, + } => { + if self.banned_peers.contains(&peer_id) { + // Mark the connection for the banned peer as banned, thus withholding any + // future events from the connection to the behaviour. + self.banned_peer_connections.insert(id); + self.pool.disconnect(peer_id); + return Some(SwarmEvent::BannedPeer { peer_id, endpoint }); + } else { + let num_established = NonZeroU32::new( + u32::try_from(other_established_connection_ids.len() + 1).unwrap(), + ) + .expect("n + 1 is always non-zero; qed"); + let non_banned_established = other_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(&conn_id)) + .count(); + + log::debug!( + "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", + peer_id, + endpoint, + num_established, + non_banned_established + 1, + ); + let endpoint = endpoint.clone(); + let failed_addresses = concurrent_dial_errors + .as_ref() + .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); + self.behaviour.inject_connection_established( + &peer_id, + &id, + &endpoint, + failed_addresses.as_ref(), + non_banned_established, + ); + return Some(SwarmEvent::ConnectionEstablished { + peer_id, + num_established, + endpoint, + concurrent_dial_errors, + }); + } + } + PoolEvent::PendingOutboundConnectionError { + id: _, + error, + handler, + peer, + } => { + let error = error.into(); + + self.behaviour.inject_dial_failure(peer, handler, &error); + + if let Some(peer) = peer { + log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); + } else { + log::debug!("Connection attempt to unknown peer failed with {:?}", error); + } + + return Some(SwarmEvent::OutgoingConnectionError { + peer_id: peer, + error, + }); + } + PoolEvent::PendingInboundConnectionError { + id: _, + send_back_addr, + local_addr, + error, + handler, + } => { + log::debug!("Incoming connection failed: {:?}", error); + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + return Some(SwarmEvent::IncomingConnectionError { + local_addr, + send_back_addr, + error, + }); + } + PoolEvent::ConnectionClosed { + id, + connected, + error, + remaining_established_connection_ids, + handler, + .. + } => { + if let Some(error) = error.as_ref() { + log::debug!( + "Connection closed with error {:?}: {:?}; Total (peer): {}.", + error, + connected, + remaining_established_connection_ids.len() + ); + } else { + log::debug!( + "Connection closed: {:?}; Total (peer): {}.", + connected, + remaining_established_connection_ids.len() + ); + } + let peer_id = connected.peer_id; + let endpoint = connected.endpoint; + let num_established = + u32::try_from(remaining_established_connection_ids.len()).unwrap(); + let conn_was_reported = !self.banned_peer_connections.remove(&id); + if conn_was_reported { + let remaining_non_banned = remaining_established_connection_ids + .into_iter() + .filter(|conn_id| !self.banned_peer_connections.contains(&conn_id)) + .count(); + self.behaviour.inject_connection_closed( + &peer_id, + &id, + &endpoint, + handler, + remaining_non_banned, + ); + } + return Some(SwarmEvent::ConnectionClosed { + peer_id, + endpoint, + cause: error, + num_established, + }); + } + PoolEvent::ConnectionEvent { peer_id, id, event } => { + if self.banned_peer_connections.contains(&id) { + log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); + } else { + self.behaviour.inject_event(peer_id, id, event); + } + } + PoolEvent::AddressChange { + peer_id, + id, + new_endpoint, + old_endpoint, + } => { + if !self.banned_peer_connections.contains(&id) { + self.behaviour.inject_address_change( + &peer_id, + &id, + &old_endpoint, + &new_endpoint, + ); + } + } + } + + None + } + /// Internal function used by everything event-related. /// /// Polls the `Swarm` for the next event. @@ -777,161 +942,10 @@ where // Poll the known peers. match this.pool.poll(cx) { - Poll::Pending => { - connections_not_ready = true; - } - Poll::Ready(PoolEvent::ConnectionEstablished { - id, - peer_id, - endpoint, - other_established_connection_ids, - concurrent_dial_errors, - }) => { - if this.banned_peers.contains(&peer_id) { - // Mark the connection for the banned peer as banned, thus withholding any - // future events from the connection to the behaviour. - this.banned_peer_connections.insert(id); - this.pool.disconnect(peer_id); - return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }); - } else { - let num_established = NonZeroU32::new( - u32::try_from(other_established_connection_ids.len() + 1).unwrap(), - ) - .expect("n + 1 is always non-zero; qed"); - let non_banned_established = other_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - - log::debug!( - "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", - peer_id, - endpoint, - num_established, - non_banned_established + 1, - ); - let failed_addresses = concurrent_dial_errors - .as_ref() - .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); - this.behaviour.inject_connection_established( - &peer_id, - &id, - &endpoint, - failed_addresses.as_ref(), - non_banned_established, - ); - return Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - num_established, - endpoint, - concurrent_dial_errors, - }); - } - } - Poll::Ready(PoolEvent::PendingOutboundConnectionError { - id: _, - error, - handler, - peer, - }) => { - let error = error.into(); - - this.behaviour.inject_dial_failure(peer, handler, &error); - - if let Some(peer) = peer { - log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,); - } else { - log::debug!("Connection attempt to unknown peer failed with {:?}", error); - } - - return Poll::Ready(SwarmEvent::OutgoingConnectionError { - peer_id: peer, - error, - }); - } - Poll::Ready(PoolEvent::PendingInboundConnectionError { - id: _, - send_back_addr, - local_addr, - error, - handler, - }) => { - log::debug!("Incoming connection failed: {:?}", error); - this.behaviour - .inject_listen_failure(&local_addr, &send_back_addr, handler); - return Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }); - } - Poll::Ready(PoolEvent::ConnectionClosed { - id, - connected, - error, - remaining_established_connection_ids, - handler, - .. - }) => { - if let Some(error) = error.as_ref() { - log::debug!( - "Connection closed with error {:?}: {:?}; Total (peer): {}.", - error, - connected, - remaining_established_connection_ids.len() - ); - } else { - log::debug!( - "Connection closed: {:?}; Total (peer): {}.", - connected, - remaining_established_connection_ids.len() - ); - } - let peer_id = connected.peer_id; - let endpoint = connected.endpoint; - let num_established = - u32::try_from(remaining_established_connection_ids.len()).unwrap(); - let conn_was_reported = !this.banned_peer_connections.remove(&id); - if conn_was_reported { - let remaining_non_banned = remaining_established_connection_ids - .into_iter() - .filter(|conn_id| !this.banned_peer_connections.contains(conn_id)) - .count(); - this.behaviour.inject_connection_closed( - &peer_id, - &id, - &endpoint, - handler, - remaining_non_banned, - ); - } - return Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, - endpoint, - cause: error, - num_established, - }); - } - Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => { - if this.banned_peer_connections.contains(&id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); - } else { - this.behaviour.inject_event(peer_id, id, event); - } - } - Poll::Ready(PoolEvent::AddressChange { - id, - peer_id, - new_endpoint, - old_endpoint, - }) => { - if !this.banned_peer_connections.contains(&id) { - this.behaviour.inject_address_change( - &peer_id, - &id, - &old_endpoint, - &new_endpoint, - ); + Poll::Pending => connections_not_ready = true, + Poll::Ready(pool_event) => { + if let Some(swarm_event) = this.handle_pool_event(pool_event) { + return Poll::Ready(swarm_event); } } }; From 87b7cb3cb59ec25731e130bc4e966b75c9a0d4d8 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 13:09:01 +0200 Subject: [PATCH 3/6] swarm/src/lib: Extract ListenersEvent handling into new method --- swarm/src/lib.rs | 189 +++++++++++++++++++++++++---------------------- 1 file changed, 99 insertions(+), 90 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6d81b82bda6..7ddc158bc60 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -829,6 +829,101 @@ where None } + fn handle_listeners_event( + &mut self, + event: ListenersEvent>, + ) -> Option>> { + match event { + ListenersEvent::Incoming { + listener_id: _, + upgrade, + local_addr, + send_back_addr, + } => { + let handler = self.behaviour.new_handler(); + match self.pool.add_incoming( + upgrade, + handler, + IncomingInfo { + local_addr: &local_addr, + send_back_addr: &send_back_addr, + }, + ) { + Ok(_connection_id) => { + return Some(SwarmEvent::IncomingConnection { + local_addr, + send_back_addr, + }); + } + Err((connection_limit, handler)) => { + self.behaviour + .inject_listen_failure(&local_addr, &send_back_addr, handler); + log::warn!("Incoming connection rejected: {:?}", connection_limit); + } + }; + } + ListenersEvent::NewAddress { + listener_id, + listen_addr, + } => { + log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); + if !self.listened_addrs.contains(&listen_addr) { + self.listened_addrs.push(listen_addr.clone()) + } + self.behaviour + .inject_new_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::NewListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::AddressExpired { + listener_id, + listen_addr, + } => { + log::debug!( + "Listener {:?}; Expired address {:?}.", + listener_id, + listen_addr + ); + self.listened_addrs.retain(|a| a != &listen_addr); + self.behaviour + .inject_expired_listen_addr(listener_id, &listen_addr); + return Some(SwarmEvent::ExpiredListenAddr { + listener_id, + address: listen_addr, + }); + } + ListenersEvent::Closed { + listener_id, + addresses, + reason, + } => { + log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); + for addr in addresses.iter() { + self.behaviour.inject_expired_listen_addr(listener_id, addr); + } + self.behaviour.inject_listener_closed( + listener_id, + match &reason { + Ok(()) => Ok(()), + Err(err) => Err(err), + }, + ); + return Some(SwarmEvent::ListenerClosed { + listener_id, + addresses, + reason, + }); + } + ListenersEvent::Error { listener_id, error } => { + self.behaviour.inject_listener_error(listener_id, &error); + return Some(SwarmEvent::ListenerError { listener_id, error }); + } + } + None + } + /// Internal function used by everything event-related. /// /// Polls the `Swarm` for the next event. @@ -846,97 +941,11 @@ where // Poll the listener(s) for new connections. match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { - Poll::Pending => { - listeners_not_ready = true; - } - Poll::Ready(ListenersEvent::Incoming { - listener_id: _, - upgrade, - local_addr, - send_back_addr, - }) => { - let handler = this.behaviour.new_handler(); - match this.pool.add_incoming( - upgrade, - handler, - IncomingInfo { - local_addr: &local_addr, - send_back_addr: &send_back_addr, - }, - ) { - Ok(_connection_id) => { - return Poll::Ready(SwarmEvent::IncomingConnection { - local_addr, - send_back_addr, - }); - } - Err((connection_limit, handler)) => { - this.behaviour.inject_listen_failure( - &local_addr, - &send_back_addr, - handler, - ); - log::warn!("Incoming connection rejected: {:?}", connection_limit); - } - }; - } - Poll::Ready(ListenersEvent::NewAddress { - listener_id, - listen_addr, - }) => { - log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr); - if !this.listened_addrs.contains(&listen_addr) { - this.listened_addrs.push(listen_addr.clone()) - } - this.behaviour - .inject_new_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::NewListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::AddressExpired { - listener_id, - listen_addr, - }) => { - log::debug!( - "Listener {:?}; Expired address {:?}.", - listener_id, - listen_addr - ); - this.listened_addrs.retain(|a| a != &listen_addr); - this.behaviour - .inject_expired_listen_addr(listener_id, &listen_addr); - return Poll::Ready(SwarmEvent::ExpiredListenAddr { - listener_id, - address: listen_addr, - }); - } - Poll::Ready(ListenersEvent::Closed { - listener_id, - addresses, - reason, - }) => { - log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason); - for addr in addresses.iter() { - this.behaviour.inject_expired_listen_addr(listener_id, addr); + Poll::Pending => listeners_not_ready = true, + Poll::Ready(listeners_event) => { + if let Some(swarm_event) = this.handle_listeners_event(listeners_event) { + return Poll::Ready(swarm_event); } - this.behaviour.inject_listener_closed( - listener_id, - match &reason { - Ok(()) => Ok(()), - Err(err) => Err(err), - }, - ); - return Poll::Ready(SwarmEvent::ListenerClosed { - listener_id, - addresses, - reason, - }); - } - Poll::Ready(ListenersEvent::Error { listener_id, error }) => { - this.behaviour.inject_listener_error(listener_id, &error); - return Poll::Ready(SwarmEvent::ListenerError { listener_id, error }); } } From 00346de814a950f317018e4aa79ed8c79a85f41d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 14:32:16 +0200 Subject: [PATCH 4/6] swarm/src/lib: Extract NetworkBehaviourAction handling into new method --- swarm/src/lib.rs | 178 +++++++++++++++++++++++------------------------ 1 file changed, 87 insertions(+), 91 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 7ddc158bc60..6b81d366718 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -924,6 +924,89 @@ where None } + fn handle_behaviour_event( + &mut self, + event: NetworkBehaviourAction, + ) -> Option>> { + match event { + NetworkBehaviourAction::GenerateEvent(event) => { + return Some(SwarmEvent::Behaviour(event)) + } + NetworkBehaviourAction::Dial { opts, handler } => { + let peer_id = opts.get_peer_id(); + if let Ok(()) = self.dial_with_handler(opts, handler) { + if let Some(peer_id) = peer_id { + return Some(SwarmEvent::Dialing(peer_id)); + } + } + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => { + assert!(self.pending_event.is_none()); + let handler = match handler { + NotifyHandler::One(connection) => PendingNotifyHandler::One(connection), + NotifyHandler::Any => { + let ids = self + .pool + .iter_established_connections_of_peer(&peer_id) + .collect(); + PendingNotifyHandler::Any(ids) + } + }; + + self.pending_event = Some((peer_id, handler, event)); + } + NetworkBehaviourAction::ReportObservedAddr { address, score } => { + // Maps the given `observed_addr`, representing an address of the local + // node observed by a remote peer, onto the locally known listen addresses + // to yield one or more addresses of the local node that may be publicly + // reachable. + // + // I.e. self method incorporates the view of other peers into the listen + // addresses seen by the local node to account for possible IP and port + // mappings performed by intermediate network devices in an effort to + // obtain addresses for the local peer that are also reachable for peers + // other than the peer who reported the `observed_addr`. + // + // The translation is transport-specific. See [`Transport::address_translation`]. + let translated_addresses = { + let transport = self.listeners.transport(); + let mut addrs: Vec<_> = self + .listeners + .listen_addrs() + .filter_map(move |server| transport.address_translation(server, &address)) + .collect(); + + // remove duplicates + addrs.sort_unstable(); + addrs.dedup(); + addrs + }; + for addr in translated_addresses { + self.add_external_address(addr, score); + } + } + NetworkBehaviourAction::CloseConnection { + peer_id, + connection, + } => match connection { + CloseConnection::One(connection_id) => { + if let Some(conn) = self.pool.get_established(connection_id) { + conn.start_close(); + } + } + CloseConnection::All => { + self.pool.disconnect(peer_id); + } + }, + } + + None + } + /// Internal function used by everything event-related. /// /// Polls the `Swarm` for the next event. @@ -1010,99 +1093,12 @@ where Poll::Pending if listeners_not_ready && connections_not_ready => { return Poll::Pending } - Poll::Pending => (), - Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => { - return Poll::Ready(SwarmEvent::Behaviour(event)) - } - Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => { - let peer_id = opts.get_peer_id(); - if let Ok(()) = this.dial_with_handler(opts, handler) { - if let Some(peer_id) = peer_id { - return Poll::Ready(SwarmEvent::Dialing(peer_id)); - } - } - } - Poll::Ready(NetworkBehaviourAction::NotifyHandler { - peer_id, - handler, - event, - }) => match handler { - NotifyHandler::One(connection) => { - if let Some(mut conn) = this.pool.get_established(connection) { - if let Some(event) = notify_one(&mut conn, event, cx) { - let handler = PendingNotifyHandler::One(connection); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } - } - NotifyHandler::Any => { - let ids = this - .pool - .iter_established_connections_of_peer(&peer_id) - .collect(); - if let Some((event, ids)) = - notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) - { - let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } - }, - Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => { - // Maps the given `observed_addr`, representing an address of the local - // node observed by a remote peer, onto the locally known listen addresses - // to yield one or more addresses of the local node that may be publicly - // reachable. - // - // I.e. this method incorporates the view of other peers into the listen - // addresses seen by the local node to account for possible IP and port - // mappings performed by intermediate network devices in an effort to - // obtain addresses for the local peer that are also reachable for peers - // other than the peer who reported the `observed_addr`. - // - // The translation is transport-specific. See [`Transport::address_translation`]. - let translated_addresses = { - let transport = this.listeners.transport(); - let mut addrs: Vec<_> = this - .listeners - .listen_addrs() - .filter_map(move |server| { - transport.address_translation(server, &address) - }) - .collect(); - - // remove duplicates - addrs.sort_unstable(); - addrs.dedup(); - addrs - }; - for addr in translated_addresses { - this.add_external_address(addr, score); + Poll::Pending => {} + Poll::Ready(behaviour_event) => { + if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) { + return Poll::Ready(swarm_event); } } - Poll::Ready(NetworkBehaviourAction::CloseConnection { - peer_id, - connection, - }) => match connection { - CloseConnection::One(connection_id) => { - if let Some(conn) = this.pool.get_established(connection_id) { - conn.start_close(); - } - } - CloseConnection::All => { - this.pool.disconnect(peer_id); - } - }, } } } From 426023ecdfb35f78c6a03a8f1d5397700adf97d7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 2 May 2022 15:30:25 +0200 Subject: [PATCH 5/6] swarm/src/lib: Prioritize Behaviour over Pool and Pool over Listeners Have the main event loop (`Swarm::poll_next_event`) prioritize: 1. Work on `NetworkBehaviour` over work on `Pool`, thus prioritizing local work over work coming from a remote. 2. Work on `Pool` over work on `ListenersStream`, thus prioritizing work on existing connections over upgrading new incoming connections. --- swarm/src/lib.rs | 126 ++++++++++++++++++++++++----------------------- 1 file changed, 65 insertions(+), 61 deletions(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 6b81d366718..2bf02bba144 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1018,88 +1018,92 @@ where // across a `Deref`. let this = &mut *self; + // This loop polls the components below in a prioritized order. + // + // 1. [`NetworkBehaviour`] + // 2. Connection [`Pool`] + // 3. [`ListenersStream`] + // + // (1) is polled before (2) to prioritize local work over work coming from a remote. + // + // (2) is polled before (3) to prioritize existing connections over upgrading new incoming connections. loop { - let mut listeners_not_ready = false; - let mut connections_not_ready = false; + match this.pending_event.take() { + // Try to deliver the pending event emitted by the [`NetworkBehaviour`] in the previous + // iteration to the connection handler(s). + Some((peer_id, handler, event)) => match handler { + PendingNotifyHandler::One(conn_id) => { + match this.pool.get_established(conn_id) { + Some(mut conn) => match notify_one(&mut conn, event, cx) { + None => continue, + Some(event) => { + this.pending_event = Some((peer_id, handler, event)); + } + }, + None => continue, + } + } + PendingNotifyHandler::Any(ids) => { + match notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) { + None => continue, + Some((event, ids)) => { + let handler = PendingNotifyHandler::Any(ids); + this.pending_event = Some((peer_id, handler, event)); + } + } + } + }, + // No pending event. Allow the [`NetworkBehaviour`] to make progress. + None => { + let behaviour_poll = { + let mut parameters = SwarmPollParameters { + local_peer_id: &this.local_peer_id, + supported_protocols: &this.supported_protocols, + listened_addrs: &this.listened_addrs, + external_addrs: &this.external_addrs, + }; + this.behaviour.poll(cx, &mut parameters) + }; - // Poll the listener(s) for new connections. - match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { - Poll::Pending => listeners_not_ready = true, - Poll::Ready(listeners_event) => { - if let Some(swarm_event) = this.handle_listeners_event(listeners_event) { - return Poll::Ready(swarm_event); + match behaviour_poll { + Poll::Pending => {} + Poll::Ready(behaviour_event) => { + if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) + { + return Poll::Ready(swarm_event); + } + + continue; + } } } } // Poll the known peers. match this.pool.poll(cx) { - Poll::Pending => connections_not_ready = true, + Poll::Pending => {} Poll::Ready(pool_event) => { if let Some(swarm_event) = this.handle_pool_event(pool_event) { return Poll::Ready(swarm_event); } - } - }; - // After the network had a chance to make progress, try to deliver - // the pending event emitted by the behaviour in the previous iteration - // to the connection handler(s). The pending event must be delivered - // before polling the behaviour again. If the targeted peer - // meanwhie disconnected, the event is discarded. - if let Some((peer_id, handler, event)) = this.pending_event.take() { - match handler { - PendingNotifyHandler::One(conn_id) => { - if let Some(mut conn) = this.pool.get_established(conn_id) { - if let Some(event) = notify_one(&mut conn, event, cx) { - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } - } - PendingNotifyHandler::Any(ids) => { - if let Some((event, ids)) = - notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx) - { - let handler = PendingNotifyHandler::Any(ids); - this.pending_event = Some((peer_id, handler, event)); - if listeners_not_ready && connections_not_ready { - return Poll::Pending; - } else { - continue; - } - } - } + continue; } - } - - debug_assert!(this.pending_event.is_none()); - - let behaviour_poll = { - let mut parameters = SwarmPollParameters { - local_peer_id: &this.local_peer_id, - supported_protocols: &this.supported_protocols, - listened_addrs: &this.listened_addrs, - external_addrs: &this.external_addrs, - }; - this.behaviour.poll(cx, &mut parameters) }; - match behaviour_poll { - Poll::Pending if listeners_not_ready && connections_not_ready => { - return Poll::Pending - } + // Poll the listener(s) for new connections. + match ListenersStream::poll(Pin::new(&mut this.listeners), cx) { Poll::Pending => {} - Poll::Ready(behaviour_event) => { - if let Some(swarm_event) = this.handle_behaviour_event(behaviour_event) { + Poll::Ready(listeners_event) => { + if let Some(swarm_event) = this.handle_listeners_event(listeners_event) { return Poll::Ready(swarm_event); } + + continue; } } + + return Poll::Pending; } } } From 03afd9bf392a6099a8fe7b0a59a3ba6a3da7afb7 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 3 May 2022 22:06:09 +0200 Subject: [PATCH 6/6] Update swarm/src/lib.rs Co-authored-by: Thomas Eizinger --- swarm/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 2bf02bba144..5f0b2e7cb61 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -514,7 +514,7 @@ where }; match dial { Ok(fut) => fut - .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) + .map(|r| (address, r.map_err(TransportError::Other))) .boxed(), Err(err) => futures::future::ready((address, Err(err))).boxed(), }