From 3cfbf89a3aaad37bfa0731d7f27d0ae9e3e2b2b0 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 4 May 2022 10:33:40 +0200 Subject: [PATCH] swarm/src/connection/pool: Remove 'a lifetime in PoolEvent (#2625) Simplifies `PoolEvent`, no longer carrying a reference to an `EstablishedConnection` or the `Pool`, but instead the `PeerId`, `ConnectionId` and `ConnectedPoint` directly. Co-authored-by: Elena Frank --- swarm/src/connection/pool.rs | 121 ++++++----------------------------- swarm/src/lib.rs | 36 +++++------ 2 files changed, 36 insertions(+), 121 deletions(-) diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index e62f3ea643f..be3178ba8b2 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -155,13 +155,16 @@ impl fmt::Debug for Pool +#[derive(Debug)] +pub enum PoolEvent where TTrans: Transport, { /// A new connection has been established. ConnectionEstablished { - connection: EstablishedConnection<'a, ::InEvent>, + id: ConnectionId, + peer_id: PeerId, + endpoint: ConnectedPoint, /// List of other connections to the same peer. /// /// Note: Does not include the connection reported through this event. @@ -190,8 +193,6 @@ where /// The error that occurred, if any. If `None`, the connection /// was closed by the local peer. error: Option::Error>>, - /// A reference to the pool that used to manage the connection. - pool: &'a mut Pool, /// The remaining established connections to the same peer. remaining_established_connection_ids: Vec, handler: THandler::Handler, @@ -225,16 +226,16 @@ where /// A node has produced an event. ConnectionEvent { - /// The connection that has generated the event. - connection: EstablishedConnection<'a, THandlerInEvent>, + id: ConnectionId, + peer_id: PeerId, /// The produced event. event: THandlerOutEvent, }, /// The connection to a node has changed its address. AddressChange { - /// The connection that has changed address. - connection: EstablishedConnection<'a, THandlerInEvent>, + id: ConnectionId, + peer_id: PeerId, /// The new endpoint. new_endpoint: ConnectedPoint, /// The old endpoint. @@ -242,73 +243,6 @@ where }, } -impl<'a, THandler: IntoConnectionHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans> -where - TTrans: Transport, - TTrans::Error: fmt::Debug, -{ - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - match self { - PoolEvent::ConnectionEstablished { - connection, - concurrent_dial_errors, - .. - } => f - .debug_tuple("PoolEvent::ConnectionEstablished") - .field(connection) - .field(concurrent_dial_errors) - .finish(), - PoolEvent::ConnectionClosed { - id, - connected, - error, - .. - } => f - .debug_struct("PoolEvent::ConnectionClosed") - .field("id", id) - .field("connected", connected) - .field("error", error) - .finish(), - PoolEvent::PendingOutboundConnectionError { - id, error, peer, .. - } => f - .debug_struct("PoolEvent::PendingOutboundConnectionError") - .field("id", id) - .field("error", error) - .field("peer", peer) - .finish(), - PoolEvent::PendingInboundConnectionError { - id, - error, - send_back_addr, - local_addr, - .. - } => f - .debug_struct("PoolEvent::PendingInboundConnectionError") - .field("id", id) - .field("error", error) - .field("send_back_addr", send_back_addr) - .field("local_addr", local_addr) - .finish(), - PoolEvent::ConnectionEvent { connection, event } => f - .debug_struct("PoolEvent::ConnectionEvent") - .field("peer", &connection.peer_id()) - .field("event", event) - .finish(), - PoolEvent::AddressChange { - connection, - new_endpoint, - old_endpoint, - } => f - .debug_struct("PoolEvent::AddressChange") - .field("peer", &connection.peer_id()) - .field("new_endpoint", new_endpoint) - .field("old_endpoint", old_endpoint) - .finish(), - } - } -} - impl Pool where THandler: IntoConnectionHandler, @@ -583,10 +517,7 @@ where Ok(connection_id) } /// Polls the connection pool for events. - /// - /// > **Note**: We use a regular `poll` method instead of implementing `Stream`, - /// > because we want the `Pool` to stay borrowed if necessary. - pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll> + pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll> where TTrans: Transport, THandler: IntoConnectionHandler + 'static, @@ -602,16 +533,7 @@ where Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."), Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => { - let entry = self - .established - .get_mut(&peer_id) - .expect("Receive `Notify` event for established peer.") - .entry(id) - .expect_occupied("Receive `Notify` event from established connection"); - return Poll::Ready(PoolEvent::ConnectionEvent { - connection: EstablishedConnection { entry }, - event, - }); + return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event }); } Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange { id, @@ -629,16 +551,12 @@ where let old_endpoint = std::mem::replace(&mut connection.endpoint, new_endpoint.clone()); - match self.get(id) { - Some(PoolConnection::Established(connection)) => { - return Poll::Ready(PoolEvent::AddressChange { - connection, - new_endpoint, - old_endpoint, - }) - } - _ => unreachable!("since `entry` is an `EstablishedEntry`."), - } + return Poll::Ready(PoolEvent::AddressChange { + peer_id, + id, + new_endpoint, + old_endpoint, + }); } Poll::Ready(Some(task::EstablishedConnectionEvent::Closed { id, @@ -663,7 +581,6 @@ where connected: Connected { endpoint, peer_id }, error, remaining_established_connection_ids, - pool: self, handler, }); } @@ -841,7 +758,9 @@ where match self.get(id) { Some(PoolConnection::Established(connection)) => { return Poll::Ready(PoolEvent::ConnectionEstablished { - connection, + peer_id: connection.peer_id(), + endpoint: connection.endpoint().clone(), + id: connection.id(), other_established_connection_ids, concurrent_dial_errors, }) diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 03533b28377..abe3fcf0705 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -782,16 +782,16 @@ where connections_not_ready = true; } Poll::Ready(PoolEvent::ConnectionEstablished { - connection, + id, + peer_id, + endpoint, other_established_connection_ids, concurrent_dial_errors, }) => { - let peer_id = connection.peer_id(); - let endpoint = connection.endpoint().clone(); if this.banned_peers.contains(&peer_id) { // Mark the connection for the banned peer as banned, thus withholding any // future events from the connection to the behaviour. - this.banned_peer_connections.insert(connection.id()); + this.banned_peer_connections.insert(id); this.pool.disconnect(peer_id); return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }); } else { @@ -806,18 +806,17 @@ where log::debug!( "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}", - connection.peer_id(), - connection.endpoint(), + peer_id, + endpoint, num_established, non_banned_established + 1, ); - let endpoint = connection.endpoint().clone(); let failed_addresses = concurrent_dial_errors .as_ref() .map(|es| es.iter().map(|(a, _)| a).cloned().collect()); this.behaviour.inject_connection_established( &peer_id, - &connection.id(), + &id, &endpoint, failed_addresses.as_ref(), non_banned_established, @@ -914,26 +913,23 @@ where num_established, }); } - Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => { - let peer = connection.peer_id(); - let conn_id = connection.id(); - if this.banned_peer_connections.contains(&conn_id) { - log::debug!("Ignoring event from banned peer: {} {:?}.", peer, conn_id); + Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => { + if this.banned_peer_connections.contains(&id) { + log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id); } else { - this.behaviour.inject_event(peer, conn_id, event); + this.behaviour.inject_event(peer_id, id, event); } } Poll::Ready(PoolEvent::AddressChange { - connection, + id, + peer_id, new_endpoint, old_endpoint, }) => { - let peer = connection.peer_id(); - let conn_id = connection.id(); - if !this.banned_peer_connections.contains(&conn_id) { + if !this.banned_peer_connections.contains(&id) { this.behaviour.inject_address_change( - &peer, - &conn_id, + &peer_id, + &id, &old_endpoint, &new_endpoint, );