Skip to content

Commit

Permalink
swarm: Add doc example for carrying state in handler
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 26, 2021
1 parent ceb77e5 commit 814ff4b
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 23 deletions.
3 changes: 1 addition & 2 deletions swarm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ wasm-timer = "0.2"
void = "1"

[dev-dependencies]
libp2p-mplex = { path = "../muxers/mplex" }
libp2p-noise = { path = "../transports/noise" }
libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] }
quickcheck = "0.9.0"
rand = "0.7.2"
198 changes: 186 additions & 12 deletions swarm/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,16 +282,17 @@ pub enum NetworkBehaviourAction<
///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
/// can be included in the handler, and thus directly send on connection success or extracted by
/// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for
/// example.
DialAddress {
/// The address to dial.
address: Multiaddr,
/// The handler to be used to handle the connection to the peer.
///
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
/// peer can be included in the handler, and thus directly send on connection success or
/// extracted by the [`NetworkBehaviour`] on connection failure.
handler: THandler,
},

Expand All @@ -305,18 +306,191 @@ pub enum NetworkBehaviourAction<
///
/// On success, [`NetworkBehaviour::inject_connection_established`] is invoked.
/// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked.
///
/// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure
/// and connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer
/// can be included in the handler, and thus directly send on connection success or extracted by
/// the [`NetworkBehaviour`] on connection failure.
///
/// Example showcasing usage of handler to carry state:
///
/// ```rust
/// # use futures::executor::block_on;
/// # use futures::stream::StreamExt;
/// # use libp2p::core::connection::ConnectionId;
/// # use libp2p::core::identity;
/// # use libp2p::core::transport::{MemoryTransport, Transport};
/// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade};
/// # use libp2p::core::PeerId;
/// # use libp2p::plaintext::PlainText2Config;
/// # use libp2p::swarm::{
/// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream,
/// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler,
/// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent,
/// # };
/// # use libp2p::yamux;
/// # use std::collections::VecDeque;
/// # use std::task::{Context, Poll};
/// # use void::Void;
/// #
/// # let local_key = identity::Keypair::generate_ed25519();
/// # let local_public_key = local_key.public();
/// # let local_peer_id = PeerId::from(local_public_key.clone());
/// #
/// # let transport = MemoryTransport::default()
/// # .upgrade(upgrade::Version::V1)
/// # .authenticate(PlainText2Config { local_public_key })
/// # .multiplex(yamux::YamuxConfig::default())
/// # .boxed();
/// #
/// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id);
/// #
/// // Super precious message that we should better not lose.
/// let message = PreciousMessage("My precious message".to_string());
///
/// // Unfortunately this peer is offline, thus sending our message to it will fail.
/// let offline_peer = PeerId::random();
///
/// // Let's send it anyways. We should get it back in case connecting to the peer fails.
/// swarm.behaviour_mut().send(offline_peer, message);
///
/// block_on(async {
/// // As expected, sending failed. But great news, we got our message back.
/// matches!(
/// swarm.next().await.expect("Infinite stream"),
/// SwarmEvent::Behaviour(PreciousMessage(_))
/// );
/// });
///
/// # #[derive(Default)]
/// # struct MyBehaviour {
/// # outbox_to_swarm: VecDeque<NetworkBehaviourAction<PreciousMessage, MyHandler>>,
/// # }
/// #
/// # impl MyBehaviour {
/// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) {
/// # self.outbox_to_swarm
/// # .push_back(NetworkBehaviourAction::DialPeer {
/// # peer_id,
/// # condition: DialPeerCondition::Always,
/// # handler: MyHandler { message: Some(msg) },
/// # });
/// # }
/// # }
/// #
/// impl NetworkBehaviour for MyBehaviour {
/// # type ProtocolsHandler = MyHandler;
/// # type OutEvent = PreciousMessage;
/// #
/// # fn new_handler(&mut self) -> Self::ProtocolsHandler {
/// # MyHandler { message: None }
/// # }
/// #
/// #
/// # fn inject_event(
/// # &mut self,
/// # _: PeerId,
/// # _: ConnectionId,
/// # _: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent,
/// # ) {
/// # unreachable!();
/// # }
/// #
/// fn inject_dial_failure(
/// &mut self,
/// _: &PeerId,
/// handler: Self::ProtocolsHandler,
/// _: DialError,
/// ) {
/// // As expected, sending the message failed. But lucky us, we got the handler back, thus
/// // the precious message is not lost and we can return it back to the user.
/// let msg = handler.message.unwrap();
/// self.outbox_to_swarm
/// .push_back(NetworkBehaviourAction::GenerateEvent(msg))
/// }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # _: &mut impl PollParameters,
/// # ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
/// # if let Some(action) = self.outbox_to_swarm.pop_front() {
/// # return Poll::Ready(action);
/// # }
/// # Poll::Pending
/// # }
/// }
///
/// # struct MyHandler {
/// # message: Option<PreciousMessage>,
/// # }
/// #
/// # impl ProtocolsHandler for MyHandler {
/// # type InEvent = Void;
/// # type OutEvent = Void;
/// # type Error = Void;
/// # type InboundProtocol = DeniedUpgrade;
/// # type OutboundProtocol = DeniedUpgrade;
/// # type InboundOpenInfo = ();
/// # type OutboundOpenInfo = Void;
/// #
/// # fn listen_protocol(
/// # &self,
/// # ) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
/// # SubstreamProtocol::new(DeniedUpgrade, ())
/// # }
/// #
/// # fn inject_fully_negotiated_inbound(
/// # &mut self,
/// # _: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::InboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_fully_negotiated_outbound(
/// # &mut self,
/// # _: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
/// # _: Self::OutboundOpenInfo,
/// # ) {
/// # }
/// #
/// # fn inject_event(&mut self, _event: Self::InEvent) {}
/// #
/// # fn inject_dial_upgrade_error(
/// # &mut self,
/// # _: Self::OutboundOpenInfo,
/// # _: ProtocolsHandlerUpgrErr<Void>,
/// # ) {
/// # }
/// #
/// # fn connection_keep_alive(&self) -> KeepAlive {
/// # KeepAlive::Yes
/// # }
/// #
/// # fn poll(
/// # &mut self,
/// # _: &mut Context<'_>,
/// # ) -> Poll<
/// # ProtocolsHandlerEvent<
/// # Self::OutboundProtocol,
/// # Self::OutboundOpenInfo,
/// # Self::OutEvent,
/// # Self::Error,
/// # >,
/// # > {
/// # todo!("If `Self::message.is_some()` send the message to the remote.")
/// # }
/// # }
/// # #[derive(Debug, PartialEq, Eq)]
/// # struct PreciousMessage(String);
/// ```
DialPeer {
/// The peer to try reach.
peer_id: PeerId,
/// The condition for initiating a new dialing attempt.
condition: DialPeerCondition,
/// The handler to be used to handle the connection to the peer.
///
/// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and
/// connection closing. Thus it can be used to carry state, which otherwise would have to be
/// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected
/// peer can be included in the handler, and thus directly send on connection success or
/// extracted by the [`NetworkBehaviour`] on connection failure.
handler: THandler,
},

Expand Down
23 changes: 14 additions & 9 deletions swarm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1326,8 +1326,9 @@ mod tests {
use crate::protocols_handler::DummyProtocolsHandler;
use crate::test::{CallTraceBehaviour, MockBehaviour};
use futures::{executor, future};
use libp2p_core::{identity, multiaddr, transport, upgrade};
use libp2p_noise as noise;
use libp2p::core::{identity, multiaddr, transport, upgrade};
use libp2p::plaintext;
use libp2p::yamux;

// Test execution state.
// Connection => Disconnecting => Connecting.
Expand All @@ -1343,17 +1344,16 @@ mod tests {
O: Send + 'static,
{
let id_keys = identity::Keypair::generate_ed25519();
let pubkey = id_keys.public();
let noise_keys = noise::Keypair::<noise::X25519Spec>::new()
.into_authentic(&id_keys)
.unwrap();
let local_public_key = id_keys.public();
let transport = transport::MemoryTransport::default()
.upgrade(upgrade::Version::V1)
.authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(libp2p_mplex::MplexConfig::new())
.authenticate(plaintext::PlainText2Config {
local_public_key: local_public_key.clone(),
})
.multiplex(yamux::YamuxConfig::default())
.boxed();
let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
SwarmBuilder::new(transport, behaviour, pubkey.into()).build()
SwarmBuilder::new(transport, behaviour, local_public_key.into()).build()
}

fn swarms_connected<TBehaviour>(
Expand Down Expand Up @@ -1704,4 +1704,9 @@ mod tests {
}
}))
}

/// [`NetworkBehaviourAction::DialAddress`] and [`NetworkBehaviourAction::DialPeer`] require a
/// handler. This handler can be used to carry state. See corresponding doc comments.
#[test]
fn use_handler_to_carry_state() {}
}

0 comments on commit 814ff4b

Please sign in to comment.