Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

swarm/src/lib: Refactor trait usage #2182

Merged
merged 3 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions swarm/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
except for `new_handler`, `inject_event` and `poll`.
This should make it easier to create new implementations. See [PR 2150].

- Remove `Swarm` type alias and rename `ExpandedSwarm` to `Swarm`. Reduce direct
trait parameters on `Swarm` (previously `ExpandedSwarm`), deriving parameters
through associated types on `TBehaviour`. See [PR 2182].

[PR 2150]: https://github.com/libp2p/rust-libp2p/pull/2150/
[PR 2182]: https://github.com/libp2p/rust-libp2p/pull/2182

# 0.30.0 [2021-07-12]

Expand Down
122 changes: 58 additions & 64 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,25 +120,34 @@ use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroUsize};
use upgrade::UpgradeInfoSend as _;

/// Contains the state of the network, plus the way it should behave.
pub type Swarm<TBehaviour> = ExpandedSwarm<
TBehaviour,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent,
<<<TBehaviour as NetworkBehaviour>::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
<TBehaviour as NetworkBehaviour>::ProtocolsHandler,
>;

/// Substream for which a protocol has been chosen.
///
/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;

/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::OutEvent;

/// [`ProtocolsHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
/// supports.
type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ProtocolsHandler;

/// Custom event that can be received by the [`ProtocolsHandler`] of the
/// [`NetworkBehaviour`].
type THandlerInEvent<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent;

/// Custom event that can be produced by the [`ProtocolsHandler`] of the [`NetworkBehaviour`].
type THandlerOutEvent<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent;

/// Custom error that can be produced by the [`ProtocolsHandler`] of the [`NetworkBehaviour`].
type THandlerErr<TBehaviour> = <<THandler<TBehaviour> as IntoProtocolsHandler>::Handler as ProtocolsHandler>::Error;

/// Event generated by the `Swarm`.
#[derive(Debug)]
pub enum SwarmEvent<TBvEv, THandleErr> {
pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
/// Event generated by the `NetworkBehaviour`.
Behaviour(TBvEv),
Behaviour(TBehaviourOutEvent),
/// A connection to the given peer has been opened.
ConnectionEstablished {
/// Identity of the peer that we have connected to.
Expand All @@ -160,7 +169,7 @@ pub enum SwarmEvent<TBvEv, THandleErr> {
num_established: u32,
/// Reason for the disconnection, if it was not a successful
/// active close.
cause: Option<ConnectionError<NodeHandlerWrapperError<THandleErr>>>,
cause: Option<ConnectionError<NodeHandlerWrapperError<THandlerErr>>>,
},
/// A new connection arrived on a listener and is in the process of protocol negotiation.
///
Expand Down Expand Up @@ -261,17 +270,17 @@ pub enum SwarmEvent<TBvEv, THandleErr> {

/// Contains the state of the network, plus the way it should behave.
///
/// Note: Needs to be polled via `<ExpandedSwarm as Stream>` in order to make
/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
/// progress.
pub struct ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
pub struct Swarm<TBehaviour>
where
THandler: IntoProtocolsHandler,
TBehaviour: NetworkBehaviour,
{
network: Network<
transport::Boxed<(PeerId, StreamMuxerBox)>,
TInEvent,
TOutEvent,
NodeHandlerWrapperBuilder<THandler>,
THandlerInEvent<TBehaviour>,
THandlerOutEvent<TBehaviour>,
NodeHandlerWrapperBuilder<THandler<TBehaviour>>,
>,

/// Handles which nodes to connect to and how to handle the events sent back by the protocol
Expand All @@ -294,27 +303,21 @@ where
/// Pending event to be delivered to connection handlers
/// (or dropped if the peer disconnected) before the `behaviour`
/// can be polled again.
pending_event: Option<(PeerId, PendingNotifyHandler, TInEvent)>,
pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,

/// The configured override for substream protocol upgrades, if any.
substream_upgrade_protocol_override: Option<libp2p_core::upgrade::Version>,
}

impl<TBehaviour, TInEvent, TOutEvent, THandler> Unpin for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
impl<TBehaviour> Unpin for Swarm<TBehaviour>
where
THandler: IntoProtocolsHandler,
TBehaviour: NetworkBehaviour,
{
}

impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr>
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler: IntoProtocolsHandler + Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
impl<TBehaviour> Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
/// Builds a new `Swarm`.
pub fn new(
Expand Down Expand Up @@ -438,7 +441,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}

/// Removes an external address of the local node, regardless of
/// its current score. See [`ExpandedSwarm::add_external_address`]
/// its current score. See [`Swarm::add_external_address`]
/// for details.
///
/// Returns `true` if the address existed and was removed, `false`
Expand Down Expand Up @@ -473,7 +476,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// Returns `Ok(())` if there was one or more established connections to the peer.
///
/// Note: Closing a connection via [`ExpandedSwarm::disconnect_peer_id`] does
/// Note: Closing a connection via [`Swarm::disconnect_peer_id`] does
/// not inform the corresponding [`ProtocolsHandler`].
/// Closing a connection via a [`ProtocolsHandler`] can be done either in a
/// collaborative manner across [`ProtocolsHandler`]s
Expand Down Expand Up @@ -507,7 +510,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
///
/// Polls the `Swarm` for the next event.
fn poll_next_event(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandleErr>>
-> Poll<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>>
{
// We use a `this` variable because the compiler can't mutably borrow multiple times
// across a `Deref`.
Expand Down Expand Up @@ -708,7 +711,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
return Poll::Ready(SwarmEvent::Behaviour(event))
},
Poll::Ready(NetworkBehaviourAction::DialAddress { address }) => {
let _ = ExpandedSwarm::dial_addr(&mut *this, address);
let _ = Swarm::dial_addr(&mut *this, address);
},
Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition }) => {
if this.banned_peers.contains(&peer_id) {
Expand All @@ -720,7 +723,7 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
DialPeerCondition::Always => true,
};
if condition_matched {
if ExpandedSwarm::dial(this, &peer_id).is_ok() {
if Swarm::dial(this, &peer_id).is_ok() {
return Poll::Ready(SwarmEvent::Dialing(peer_id))
}
} else {
Expand Down Expand Up @@ -808,11 +811,11 @@ enum PendingNotifyHandler {
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, TInEvent>(
conn: &mut EstablishedConnection<'a, TInEvent>,
event: TInEvent,
fn notify_one<'a, THandlerInEvent>(
conn: &mut EstablishedConnection<'a, THandlerInEvent>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<TInEvent>
) -> Option<THandlerInEvent>
{
match conn.poll_ready_notify_handler(cx) {
Poll::Pending => Some(event),
Expand All @@ -835,12 +838,12 @@ fn notify_one<'a, TInEvent>(
///
/// Returns `None` if either all connections are closing or the event
/// was successfully sent to a handler, in either case the event is consumed.
fn notify_any<'a, TTrans, TInEvent, TOutEvent, THandler>(
fn notify_any<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>(
ids: SmallVec<[ConnectionId; 10]>,
peer: &mut ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
event: TInEvent,
peer: &mut ConnectedPeer<'a, TTrans, THandlerInEvent, THandlerOutEvent, THandler>,
event: THandlerInEvent,
cx: &mut Context<'_>,
) -> Option<(TInEvent, SmallVec<[ConnectionId; 10]>)>
) -> Option<(THandlerInEvent, SmallVec<[ConnectionId; 10]>)>
where
TTrans: Transport,
THandler: IntoConnectionHandler,
Expand Down Expand Up @@ -872,24 +875,18 @@ where
})
}

/// Stream of events returned by [`ExpandedSwarm`].
/// Stream of events returned by [`Swarm`].
///
/// Includes events from the [`NetworkBehaviour`] as well as events about
/// connection and listener status. See [`SwarmEvent`] for details.
///
/// Note: This stream is infinite and it is guaranteed that
/// [`Stream::poll_next`] will never return `Poll::Ready(None)`.
impl<TBehaviour, TInEvent, TOutEvent, THandler, THandleErr> Stream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler:
ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent, Error = THandleErr>,
THandleErr: error::Error + Send + 'static,
impl<TBehaviour> Stream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
type Item = SwarmEvent<TBehaviour::OutEvent, THandleErr>;
type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.as_mut()
Expand All @@ -899,13 +896,9 @@ where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
}

/// The stream of swarm events never terminates, so we can implement fused for it.
impl<TBehaviour, TInEvent, TOutEvent, THandler> FusedStream for
ExpandedSwarm<TBehaviour, TInEvent, TOutEvent, THandler>
where TBehaviour: NetworkBehaviour<ProtocolsHandler = THandler>,
THandler: IntoProtocolsHandler + Send + 'static,
TInEvent: Send + 'static,
TOutEvent: Send + 'static,
THandler::Handler: ProtocolsHandler<InEvent = TInEvent, OutEvent = TOutEvent>,
impl<TBehaviour> FusedStream for Swarm<TBehaviour>
where
TBehaviour: NetworkBehaviour,
{
fn is_terminated(&self) -> bool {
false
Expand Down Expand Up @@ -954,7 +947,8 @@ pub struct SwarmBuilder<TBehaviour> {
}

impl<TBehaviour> SwarmBuilder<TBehaviour>
where TBehaviour: NetworkBehaviour,
where
TBehaviour: NetworkBehaviour,
{
/// Creates a new `SwarmBuilder` from the given transport, behaviour and
/// local peer ID. The `Swarm` with its underlying `Network` is obtained
Expand Down Expand Up @@ -1073,7 +1067,7 @@ where TBehaviour: NetworkBehaviour,

let network = Network::new(self.transport, self.local_peer_id, network_cfg);

ExpandedSwarm {
Swarm {
network,
behaviour: self.behaviour,
supported_protocols,
Expand All @@ -1086,7 +1080,7 @@ where TBehaviour: NetworkBehaviour,
}
}

/// The possible failures of [`ExpandedSwarm::dial`].
/// The possible failures of [`Swarm::dial`].
#[derive(Debug)]
pub enum DialError {
/// The peer is currently banned.
Expand Down Expand Up @@ -1346,7 +1340,7 @@ mod tests {
}

/// Establishes multiple connections between two peers,
/// after which one peer disconnects the other using [`ExpandedSwarm::disconnect_peer_id`].
/// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
///
/// The test expects both behaviours to be notified via pairs of
/// inject_connected / inject_disconnected as well as
Expand Down