Skip to content

Commit

Permalink
swarm/src/lib: Refactor trait usage (#2182)
Browse files Browse the repository at this point in the history
- Removes the `Swarm` type alias, renaming `ExpandedSwarm` to `Swarm`.
- Remove `TInEvent`, `TOutEvent` and `THandler` trait parameters on
`Swarm`, instead deriving them through `TBehaviour`. Move derive logic
to separate type aliases.
- Simplify trait bounds on `Swarm` main `impl` and `Stream` `impl`.
  • Loading branch information
mxinden authored Aug 9, 2021
1 parent 68b5f74 commit a0d690e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 64 deletions.
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
except for `new_handler`, `inject_event` and `poll`.
This should make it easier to create new implementations. See [PR 2150].

- Remove `Swarm` type alias and rename `ExpandedSwarm` to `Swarm`. Reduce direct
trait parameters on `Swarm` (previously `ExpandedSwarm`), deriving parameters
through associated types on `TBehaviour`. See [PR 2182].

[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150/
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182

# 0.30.0 [2021-07-12]

Expand Down
122 changes: 58 additions & 64 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,34 @@ use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;

/// Contains the state of the network, plus the way it should behave.
pub type Swarm<TBehaviour> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
>;

/// Substream for which a protocol has been chosen.
///
/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;

/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::OutEvent;

/// [`ProtocolsHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
/// supports.
type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ProtocolsHandler;

/// Custom event that can be received by the [`ProtocolsHandler`] of the
/// [`NetworkBehaviour`].
type THandlerInEvent<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent;

/// Custom event that can be produced by the [`ProtocolsHandler`] of the [`NetworkBehaviour`].
type THandlerOutEvent<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent;

/// Custom error that can be produced by the [`ProtocolsHandler`] of the [`NetworkBehaviour`].
type THandlerErr<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error;

/// Event generated by the `Swarm`.
#[derive(Debug)]
pub enum SwarmEvent<TBvEv, THandleErr> {
pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Event generated by the `NetworkBehaviour`.
Behaviour(TBvEv),
Behaviour(TBehaviourOutEvent),
/// A connection to the given peer has been opened.
ConnectionEstablished {
/// Identity of the peer that we have connected to.
Expand All @@ -160,7 +169,7 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
num_established: u32,
/// Reason for the disconnection, if it was not a successful
/// active close.
cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
cause: Option<ConnectionError<NodeHandlerWrapperError<THandlerErr>>>,
},
/// A new connection arrived on a listener and is in the process of protocol negotiation.
///
Expand Down Expand Up @@ -261,17 +270,17 @@ pub enum SwarmEvent<TBvEv, THandleErr> {

/// Contains the state of the network, plus the way it should behave.
///
/// Note: Needs to be polled via `<ExpandedSwarm as Stream>` in order to make
/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
/// progress.
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
pub struct Swarm<TBehaviour>
where
THandler: IntoProtocolsHandler,
TBehaviour: NetworkBehaviour,
{
network: Network<
transport::Boxed<(PeerId, StreamMuxerBox)>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
THandlerInEvent<TBehaviour>,
THandlerOutEvent<TBehaviour>,
NodeHandlerWrapperBuilder<THandler<TBehaviour>>,
>,

/// Handles which nodes to connect to and how to handle the events sent back by the protocol
Expand All @@ -294,27 +303,21 @@ where
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
impl<TBehaviour> Unpin for Swarm<TBehaviour>
where
THandler: IntoProtocolsHandler,
TBehaviour: NetworkBehaviour,
{
}

impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
impl<TBehaviour> Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
/// Builds a new `Swarm`.
pub fn new(
Expand Down Expand Up @@ -438,7 +441,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}

/// Removes an external address of the local node, regardless of
/// its current score. See [`ExpandedSwarm::add_external_address`]
/// its current score. See [`Swarm::add_external_address`]
/// for details.
///
/// Returns `true` if the address existed and was removed, `false`
Expand Down Expand Up @@ -473,7 +476,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// Returns `Ok(())` if there was one or more established connections to the peer.
///
/// Note: Closing a connection via [`ExpandedSwarm::disconnect_peer_id`] does
/// Note: Closing a connection via [`Swarm::disconnect_peer_id`] does
/// not inform the corresponding [`ProtocolsHandler`].
/// Closing a connection via a [`ProtocolsHandler`] can be done either in a
/// collaborative manner across [`ProtocolsHandler`]s
Expand Down Expand Up @@ -507,7 +510,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// Polls the `Swarm` for the next event.
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>>
{
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
Expand Down Expand Up @@ -708,7 +711,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
let _ = Swarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
Expand All @@ -720,7 +723,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
DialPeerCondition::Always => true,
};
if condition_matched {
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
if Swarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
Expand Down Expand Up @@ -808,11 +811,11 @@ enum PendingNotifyHandler {
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, TInEvent>(
conn: &mut EstablishedConnection<'a, TInEvent>,
event: TInEvent,
fn notify_one<'a, THandlerInEvent>(
conn: &mut EstablishedConnection<'a, THandlerInEvent>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
) -> Option<THandlerInEvent>
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Expand All @@ -835,12 +838,12 @@ fn notify_one<'a, TInEvent>(
///
/// Returns `None` if either all connections are closing or the event
/// was successfully sent to a handler, in either case the event is consumed.
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
fn notify_any<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
peer: &mut ConnectedPeer<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
Expand Down Expand Up @@ -872,24 +875,18 @@ where
})
}

/// Stream of events returned by [`ExpandedSwarm`].
/// Stream of events returned by [`Swarm`].
///
/// Includes events from the [`NetworkBehaviour`] as well as events about
/// connection and listener status. See [`SwarmEvent`] for details.
///
/// Note: This stream is infinite and it is guaranteed that
/// [`Stream::poll_next`] will never return `Poll::Ready(None)`.
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler:
ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
impl<TBehaviour> Stream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
type Item = SwarmEvent<TBehaviour::OutEvent, THandleErr>;
type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut()
Expand All @@ -899,13 +896,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}

/// The stream of swarm events never terminates, so we can implement fused for it.
impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
impl<TBehaviour> FusedStream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
fn is_terminated(&self) -> bool {
false
Expand Down Expand Up @@ -954,7 +947,8 @@ pub struct SwarmBuilder<TBehaviour> {
}

impl<TBehaviour> SwarmBuilder<TBehaviour>
where TBehaviour: NetworkBehaviour,
where
TBehaviour: NetworkBehaviour,
{
/// Creates a new `SwarmBuilder` from the given transport, behaviour and
/// local peer ID. The `Swarm` with its underlying `Network` is obtained
Expand Down Expand Up @@ -1073,7 +1067,7 @@ where TBehaviour: NetworkBehaviour,

let network = Network::new(self.transport, self.local_peer_id, network_cfg);

ExpandedSwarm {
Swarm {
network,
behaviour: self.behaviour,
supported_protocols,
Expand All @@ -1086,7 +1080,7 @@ where TBehaviour: NetworkBehaviour,
}
}

/// The possible failures of [`ExpandedSwarm::dial`].
/// The possible failures of [`Swarm::dial`].
#[derive(Debug)]
pub enum DialError {
/// The peer is currently banned.
Expand Down Expand Up @@ -1346,7 +1340,7 @@ mod tests {
}

/// Establishes multiple connections between two peers,
/// after which one peer disconnects the other using [`ExpandedSwarm::disconnect_peer_id`].
/// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
Expand Down

0 comments on commit a0d690e

Please sign in to comment.