Skip to content

Commit

Permalink
swarm/src/connection/pool: Remove 'a lifetime in PoolEvent (#2625)
Browse files Browse the repository at this point in the history
Simplifies `PoolEvent`, no longer carrying a reference to an
`EstablishedConnection` or the `Pool`, but instead the `PeerId`,
`ConnectionId` and `ConnectedPoint` directly.

Co-authored-by: Elena Frank <elena.frank@protonmail.com>
  • Loading branch information
mxinden and elenaf9 authored May 4, 2022
1 parent 3e1ed95 commit 3cfbf89
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 121 deletions.
121 changes: 20 additions & 101 deletions swarm/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,16 @@ impl<THandler: IntoConnectionHandler, TTrans: Transport> fmt::Debug for Pool<THa
}

/// Event that can happen on the `Pool`.
pub enum PoolEvent<'a, THandler: IntoConnectionHandler, TTrans>
#[derive(Debug)]
pub enum PoolEvent<THandler: IntoConnectionHandler, TTrans>
where
TTrans: Transport,
{
/// A new connection has been established.
ConnectionEstablished {
connection: EstablishedConnection<'a, <THandler::Handler as ConnectionHandler>::InEvent>,
id: ConnectionId,
peer_id: PeerId,
endpoint: ConnectedPoint,
/// List of other connections to the same peer.
///
/// Note: Does not include the connection reported through this event.
Expand Down Expand Up @@ -190,8 +193,6 @@ where
/// The error that occurred, if any. If `None`, the connection
/// was closed by the local peer.
error: Option<ConnectionError<<THandler::Handler as ConnectionHandler>::Error>>,
/// A reference to the pool that used to manage the connection.
pool: &'a mut Pool<THandler, TTrans>,
/// The remaining established connections to the same peer.
remaining_established_connection_ids: Vec<ConnectionId>,
handler: THandler::Handler,
Expand Down Expand Up @@ -225,90 +226,23 @@ where

/// A node has produced an event.
ConnectionEvent {
/// The connection that has generated the event.
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>,
id: ConnectionId,
peer_id: PeerId,
/// The produced event.
event: THandlerOutEvent<THandler>,
},

/// The connection to a node has changed its address.
AddressChange {
/// The connection that has changed address.
connection: EstablishedConnection<'a, THandlerInEvent<THandler>>,
id: ConnectionId,
peer_id: PeerId,
/// The new endpoint.
new_endpoint: ConnectedPoint,
/// The old endpoint.
old_endpoint: ConnectedPoint,
},
}

impl<'a, THandler: IntoConnectionHandler, TTrans> fmt::Debug for PoolEvent<'a, THandler, TTrans>
where
TTrans: Transport,
TTrans::Error: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
match self {
PoolEvent::ConnectionEstablished {
connection,
concurrent_dial_errors,
..
} => f
.debug_tuple("PoolEvent::ConnectionEstablished")
.field(connection)
.field(concurrent_dial_errors)
.finish(),
PoolEvent::ConnectionClosed {
id,
connected,
error,
..
} => f
.debug_struct("PoolEvent::ConnectionClosed")
.field("id", id)
.field("connected", connected)
.field("error", error)
.finish(),
PoolEvent::PendingOutboundConnectionError {
id, error, peer, ..
} => f
.debug_struct("PoolEvent::PendingOutboundConnectionError")
.field("id", id)
.field("error", error)
.field("peer", peer)
.finish(),
PoolEvent::PendingInboundConnectionError {
id,
error,
send_back_addr,
local_addr,
..
} => f
.debug_struct("PoolEvent::PendingInboundConnectionError")
.field("id", id)
.field("error", error)
.field("send_back_addr", send_back_addr)
.field("local_addr", local_addr)
.finish(),
PoolEvent::ConnectionEvent { connection, event } => f
.debug_struct("PoolEvent::ConnectionEvent")
.field("peer", &connection.peer_id())
.field("event", event)
.finish(),
PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
} => f
.debug_struct("PoolEvent::AddressChange")
.field("peer", &connection.peer_id())
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish(),
}
}
}

impl<THandler, TTrans> Pool<THandler, TTrans>
where
THandler: IntoConnectionHandler,
Expand Down Expand Up @@ -583,10 +517,7 @@ where
Ok(connection_id)
}
/// Polls the connection pool for events.
///
/// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
/// > because we want the `Pool` to stay borrowed if necessary.
pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<'a, THandler, TTrans>>
pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<PoolEvent<THandler, TTrans>>
where
TTrans: Transport<Output = (PeerId, StreamMuxerBox)>,
THandler: IntoConnectionHandler + 'static,
Expand All @@ -602,16 +533,7 @@ where
Poll::Ready(None) => unreachable!("Pool holds both sender and receiver."),

Poll::Ready(Some(task::EstablishedConnectionEvent::Notify { id, peer_id, event })) => {
let entry = self
.established
.get_mut(&peer_id)
.expect("Receive `Notify` event for established peer.")
.entry(id)
.expect_occupied("Receive `Notify` event from established connection");
return Poll::Ready(PoolEvent::ConnectionEvent {
connection: EstablishedConnection { entry },
event,
});
return Poll::Ready(PoolEvent::ConnectionEvent { peer_id, id, event });
}
Poll::Ready(Some(task::EstablishedConnectionEvent::AddressChange {
id,
Expand All @@ -629,16 +551,12 @@ where
let old_endpoint =
std::mem::replace(&mut connection.endpoint, new_endpoint.clone());

match self.get(id) {
Some(PoolConnection::Established(connection)) => {
return Poll::Ready(PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
})
}
_ => unreachable!("since `entry` is an `EstablishedEntry`."),
}
return Poll::Ready(PoolEvent::AddressChange {
peer_id,
id,
new_endpoint,
old_endpoint,
});
}
Poll::Ready(Some(task::EstablishedConnectionEvent::Closed {
id,
Expand All @@ -663,7 +581,6 @@ where
connected: Connected { endpoint, peer_id },
error,
remaining_established_connection_ids,
pool: self,
handler,
});
}
Expand Down Expand Up @@ -841,7 +758,9 @@ where
match self.get(id) {
Some(PoolConnection::Established(connection)) => {
return Poll::Ready(PoolEvent::ConnectionEstablished {
connection,
peer_id: connection.peer_id(),
endpoint: connection.endpoint().clone(),
id: connection.id(),
other_established_connection_ids,
concurrent_dial_errors,
})
Expand Down
36 changes: 16 additions & 20 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -782,16 +782,16 @@ where
connections_not_ready = true;
}
Poll::Ready(PoolEvent::ConnectionEstablished {
connection,
id,
peer_id,
endpoint,
other_established_connection_ids,
concurrent_dial_errors,
}) => {
let peer_id = connection.peer_id();
let endpoint = connection.endpoint().clone();
if this.banned_peers.contains(&peer_id) {
// Mark the connection for the banned peer as banned, thus withholding any
// future events from the connection to the behaviour.
this.banned_peer_connections.insert(connection.id());
this.banned_peer_connections.insert(id);
this.pool.disconnect(peer_id);
return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
} else {
Expand All @@ -806,18 +806,17 @@ where

log::debug!(
"Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
connection.peer_id(),
connection.endpoint(),
peer_id,
endpoint,
num_established,
non_banned_established + 1,
);
let endpoint = connection.endpoint().clone();
let failed_addresses = concurrent_dial_errors
.as_ref()
.map(|es| es.iter().map(|(a, _)| a).cloned().collect());
this.behaviour.inject_connection_established(
&peer_id,
&connection.id(),
&id,
&endpoint,
failed_addresses.as_ref(),
non_banned_established,
Expand Down Expand Up @@ -914,26 +913,23 @@ where
num_established,
});
}
Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
let peer = connection.peer_id();
let conn_id = connection.id();
if this.banned_peer_connections.contains(&conn_id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer, conn_id);
Poll::Ready(PoolEvent::ConnectionEvent { id, peer_id, event }) => {
if this.banned_peer_connections.contains(&id) {
log::debug!("Ignoring event from banned peer: {} {:?}.", peer_id, id);
} else {
this.behaviour.inject_event(peer, conn_id, event);
this.behaviour.inject_event(peer_id, id, event);
}
}
Poll::Ready(PoolEvent::AddressChange {
connection,
id,
peer_id,
new_endpoint,
old_endpoint,
}) => {
let peer = connection.peer_id();
let conn_id = connection.id();
if !this.banned_peer_connections.contains(&conn_id) {
if !this.banned_peer_connections.contains(&id) {
this.behaviour.inject_address_change(
&peer,
&conn_id,
&peer_id,
&id,
&old_endpoint,
&new_endpoint,
);
Expand Down

0 comments on commit 3cfbf89

Please sign in to comment.