From 814ff4b2b981287d4ab03ac4ee8505b7d49c672a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Thu, 26 Aug 2021 22:07:17 +0200 Subject: [PATCH] swarm: Add doc example for carrying state in handler --- swarm/Cargo.toml | 3 +- swarm/src/behaviour.rs | 198 ++++++++++++++++++++++++++++++++++++++--- swarm/src/lib.rs | 23 +++-- 3 files changed, 201 insertions(+), 23 deletions(-) diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 33b482f2a0f..b8e5d77a242 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -20,7 +20,6 @@ wasm-timer = "0.2" void = "1" [dev-dependencies] -libp2p-mplex = { path = "../muxers/mplex" } -libp2p-noise = { path = "../transports/noise" } +libp2p = { path = "../", default-features = false, features = ["yamux", "plaintext"] } quickcheck = "0.9.0" rand = "0.7.2" diff --git a/swarm/src/behaviour.rs b/swarm/src/behaviour.rs index fc5edd35ca6..c3d121bdcc7 100644 --- a/swarm/src/behaviour.rs +++ b/swarm/src/behaviour.rs @@ -282,16 +282,17 @@ pub enum NetworkBehaviourAction< /// /// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. + /// + /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure + /// and connection closing. Thus it can be used to carry state, which otherwise would have to be + /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer + /// can be included in the handler, and thus directly send on connection success or extracted by + /// the [`NetworkBehaviour`] on connection failure. See [`NetworkBehaviourAction::DialPeer`] for + /// example. DialAddress { /// The address to dial. address: Multiaddr, /// The handler to be used to handle the connection to the peer. - /// - /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and - /// connection closing. Thus it can be used to carry state, which otherwise would have to be - /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected - /// peer can be included in the handler, and thus directly send on connection success or - /// extracted by the [`NetworkBehaviour`] on connection failure. handler: THandler, }, @@ -305,18 +306,191 @@ pub enum NetworkBehaviourAction< /// /// On success, [`NetworkBehaviour::inject_connection_established`] is invoked. /// On failure, [`NetworkBehaviour::inject_dial_failure`] is invoked. + /// + /// Note that the provided handler is returned to the [`NetworkBehaviour`] on connection failure + /// and connection closing. Thus it can be used to carry state, which otherwise would have to be + /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected peer + /// can be included in the handler, and thus directly send on connection success or extracted by + /// the [`NetworkBehaviour`] on connection failure. + /// + /// Example showcasing usage of handler to carry state: + /// + /// ```rust + /// # use futures::executor::block_on; + /// # use futures::stream::StreamExt; + /// # use libp2p::core::connection::ConnectionId; + /// # use libp2p::core::identity; + /// # use libp2p::core::transport::{MemoryTransport, Transport}; + /// # use libp2p::core::upgrade::{self, DeniedUpgrade, InboundUpgrade, OutboundUpgrade}; + /// # use libp2p::core::PeerId; + /// # use libp2p::plaintext::PlainText2Config; + /// # use libp2p::swarm::{ + /// # DialError, DialPeerCondition, IntoProtocolsHandler, KeepAlive, NegotiatedSubstream, + /// # NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler, + /// # ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, Swarm, SwarmEvent, + /// # }; + /// # use libp2p::yamux; + /// # use std::collections::VecDeque; + /// # use std::task::{Context, Poll}; + /// # use void::Void; + /// # + /// # let local_key = identity::Keypair::generate_ed25519(); + /// # let local_public_key = local_key.public(); + /// # let local_peer_id = PeerId::from(local_public_key.clone()); + /// # + /// # let transport = MemoryTransport::default() + /// # .upgrade(upgrade::Version::V1) + /// # .authenticate(PlainText2Config { local_public_key }) + /// # .multiplex(yamux::YamuxConfig::default()) + /// # .boxed(); + /// # + /// # let mut swarm = Swarm::new(transport, MyBehaviour::default(), local_peer_id); + /// # + /// // Super precious message that we should better not lose. + /// let message = PreciousMessage("My precious message".to_string()); + /// + /// // Unfortunately this peer is offline, thus sending our message to it will fail. + /// let offline_peer = PeerId::random(); + /// + /// // Let's send it anyways. We should get it back in case connecting to the peer fails. + /// swarm.behaviour_mut().send(offline_peer, message); + /// + /// block_on(async { + /// // As expected, sending failed. But great news, we got our message back. + /// matches!( + /// swarm.next().await.expect("Infinite stream"), + /// SwarmEvent::Behaviour(PreciousMessage(_)) + /// ); + /// }); + /// + /// # #[derive(Default)] + /// # struct MyBehaviour { + /// # outbox_to_swarm: VecDeque>, + /// # } + /// # + /// # impl MyBehaviour { + /// # fn send(&mut self, peer_id: PeerId, msg: PreciousMessage) { + /// # self.outbox_to_swarm + /// # .push_back(NetworkBehaviourAction::DialPeer { + /// # peer_id, + /// # condition: DialPeerCondition::Always, + /// # handler: MyHandler { message: Some(msg) }, + /// # }); + /// # } + /// # } + /// # + /// impl NetworkBehaviour for MyBehaviour { + /// # type ProtocolsHandler = MyHandler; + /// # type OutEvent = PreciousMessage; + /// # + /// # fn new_handler(&mut self) -> Self::ProtocolsHandler { + /// # MyHandler { message: None } + /// # } + /// # + /// # + /// # fn inject_event( + /// # &mut self, + /// # _: PeerId, + /// # _: ConnectionId, + /// # _: <::Handler as ProtocolsHandler>::OutEvent, + /// # ) { + /// # unreachable!(); + /// # } + /// # + /// fn inject_dial_failure( + /// &mut self, + /// _: &PeerId, + /// handler: Self::ProtocolsHandler, + /// _: DialError, + /// ) { + /// // As expected, sending the message failed. But lucky us, we got the handler back, thus + /// // the precious message is not lost and we can return it back to the user. + /// let msg = handler.message.unwrap(); + /// self.outbox_to_swarm + /// .push_back(NetworkBehaviourAction::GenerateEvent(msg)) + /// } + /// # + /// # fn poll( + /// # &mut self, + /// # _: &mut Context<'_>, + /// # _: &mut impl PollParameters, + /// # ) -> Poll> { + /// # if let Some(action) = self.outbox_to_swarm.pop_front() { + /// # return Poll::Ready(action); + /// # } + /// # Poll::Pending + /// # } + /// } + /// + /// # struct MyHandler { + /// # message: Option, + /// # } + /// # + /// # impl ProtocolsHandler for MyHandler { + /// # type InEvent = Void; + /// # type OutEvent = Void; + /// # type Error = Void; + /// # type InboundProtocol = DeniedUpgrade; + /// # type OutboundProtocol = DeniedUpgrade; + /// # type InboundOpenInfo = (); + /// # type OutboundOpenInfo = Void; + /// # + /// # fn listen_protocol( + /// # &self, + /// # ) -> SubstreamProtocol { + /// # SubstreamProtocol::new(DeniedUpgrade, ()) + /// # } + /// # + /// # fn inject_fully_negotiated_inbound( + /// # &mut self, + /// # _: >::Output, + /// # _: Self::InboundOpenInfo, + /// # ) { + /// # } + /// # + /// # fn inject_fully_negotiated_outbound( + /// # &mut self, + /// # _: >::Output, + /// # _: Self::OutboundOpenInfo, + /// # ) { + /// # } + /// # + /// # fn inject_event(&mut self, _event: Self::InEvent) {} + /// # + /// # fn inject_dial_upgrade_error( + /// # &mut self, + /// # _: Self::OutboundOpenInfo, + /// # _: ProtocolsHandlerUpgrErr, + /// # ) { + /// # } + /// # + /// # fn connection_keep_alive(&self) -> KeepAlive { + /// # KeepAlive::Yes + /// # } + /// # + /// # fn poll( + /// # &mut self, + /// # _: &mut Context<'_>, + /// # ) -> Poll< + /// # ProtocolsHandlerEvent< + /// # Self::OutboundProtocol, + /// # Self::OutboundOpenInfo, + /// # Self::OutEvent, + /// # Self::Error, + /// # >, + /// # > { + /// # todo!("If `Self::message.is_some()` send the message to the remote.") + /// # } + /// # } + /// # #[derive(Debug, PartialEq, Eq)] + /// # struct PreciousMessage(String); + /// ``` DialPeer { /// The peer to try reach. peer_id: PeerId, /// The condition for initiating a new dialing attempt. condition: DialPeerCondition, /// The handler to be used to handle the connection to the peer. - /// - /// Note that the handler is returned to the [`NetworkBehaviour`] on connection failure and - /// connection closing. Thus it can be used to carry state, which otherwise would have to be - /// tracked in the [`NetworkBehaviour`] itself. E.g. a message destined to an unconnected - /// peer can be included in the handler, and thus directly send on connection success or - /// extracted by the [`NetworkBehaviour`] on connection failure. handler: THandler, }, diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 925e47d644a..2fe2376c713 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -1326,8 +1326,9 @@ mod tests { use crate::protocols_handler::DummyProtocolsHandler; use crate::test::{CallTraceBehaviour, MockBehaviour}; use futures::{executor, future}; - use libp2p_core::{identity, multiaddr, transport, upgrade}; - use libp2p_noise as noise; + use libp2p::core::{identity, multiaddr, transport, upgrade}; + use libp2p::plaintext; + use libp2p::yamux; // Test execution state. // Connection => Disconnecting => Connecting. @@ -1343,17 +1344,16 @@ mod tests { O: Send + 'static, { let id_keys = identity::Keypair::generate_ed25519(); - let pubkey = id_keys.public(); - let noise_keys = noise::Keypair::::new() - .into_authentic(&id_keys) - .unwrap(); + let local_public_key = id_keys.public(); let transport = transport::MemoryTransport::default() .upgrade(upgrade::Version::V1) - .authenticate(noise::NoiseConfig::xx(noise_keys).into_authenticated()) - .multiplex(libp2p_mplex::MplexConfig::new()) + .authenticate(plaintext::PlainText2Config { + local_public_key: local_public_key.clone(), + }) + .multiplex(yamux::YamuxConfig::default()) .boxed(); let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto)); - SwarmBuilder::new(transport, behaviour, pubkey.into()).build() + SwarmBuilder::new(transport, behaviour, local_public_key.into()).build() } fn swarms_connected( @@ -1704,4 +1704,9 @@ mod tests { } })) } + + /// [`NetworkBehaviourAction::DialAddress`] and [`NetworkBehaviourAction::DialPeer`] require a + /// handler. This handler can be used to carry state. See corresponding doc comments. + #[test] + fn use_handler_to_carry_state() {} }