From b63e05dad6b661c84b9392c2f4b93a3ddf048096 Mon Sep 17 00:00:00 2001 From: Oleg Kubrakov Date: Wed, 8 Mar 2023 16:17:33 +0800 Subject: [PATCH] refactor: move examples to common location Refactor examples into separate binary crates. Fixes https://github.com/libp2p/rust-libp2p/issues/3111. Pull-Request: #3509. --- .github/workflows/ci.yml | 2 +- Cargo.lock | 155 +++- Cargo.toml | 45 +- examples/README.md | 52 +- examples/chat-example/Cargo.toml | 12 + .../src/main.rs} | 2 +- examples/chat-tokio.rs | 157 ---- examples/chat.rs | 179 ----- examples/dcutr/Cargo.toml | 13 + .../dcutr.rs => examples/dcutr/src/main.rs | 41 +- .../distributed-key-value-store/Cargo.toml | 13 + .../src/main.rs} | 5 +- examples/file-sharing.rs | 751 ------------------ examples/file-sharing/Cargo.toml | 15 + examples/file-sharing/src/main.rs | 206 +++++ examples/file-sharing/src/network.rs | 541 +++++++++++++ examples/identify/Cargo.toml | 11 + .../identify/src/main.rs | 20 +- examples/ipfs-kad/Cargo.toml | 12 + .../{ipfs-kad.rs => ipfs-kad/src/main.rs} | 0 examples/ipfs-private/Cargo.toml | 14 + .../src/main.rs} | 0 examples/mdns-passive-discovery.rs | 65 -- examples/ping-example/Cargo.toml | 12 + .../{ping.rs => ping-example/src/main.rs} | 13 +- examples/rendezvous/Cargo.toml | 14 + .../rendezvous/src/bin/rzv-discover.rs | 60 +- .../rendezvous/src/bin/rzv-identify.rs | 86 +- .../rendezvous/src/bin/rzv-register.rs | 69 +- .../rendezvous/src/main.rs | 93 +-- 30 files changed, 1166 insertions(+), 1492 deletions(-) create mode 100644 examples/chat-example/Cargo.toml rename examples/{gossipsub-chat.rs => chat-example/src/main.rs} (99%) delete mode 100644 examples/chat-tokio.rs delete mode 100644 examples/chat.rs create mode 100644 examples/dcutr/Cargo.toml rename protocols/dcutr/examples/dcutr.rs => examples/dcutr/src/main.rs (92%) create mode 100644 examples/distributed-key-value-store/Cargo.toml rename examples/{distributed-key-value-store.rs => distributed-key-value-store/src/main.rs} (98%) delete mode 100644 examples/file-sharing.rs create mode 100644 examples/file-sharing/Cargo.toml create mode 100644 examples/file-sharing/src/main.rs create mode 100644 examples/file-sharing/src/network.rs create mode 100644 examples/identify/Cargo.toml rename protocols/identify/examples/identify.rs => examples/identify/src/main.rs (87%) create mode 100644 examples/ipfs-kad/Cargo.toml rename examples/{ipfs-kad.rs => ipfs-kad/src/main.rs} (100%) create mode 100644 examples/ipfs-private/Cargo.toml rename examples/{ipfs-private.rs => ipfs-private/src/main.rs} (100%) delete mode 100644 examples/mdns-passive-discovery.rs create mode 100644 examples/ping-example/Cargo.toml rename examples/{ping.rs => ping-example/src/main.rs} (89%) create mode 100644 examples/rendezvous/Cargo.toml rename protocols/rendezvous/examples/discover.rs => examples/rendezvous/src/bin/rzv-discover.rs (76%) rename protocols/rendezvous/examples/register_with_identify.rs => examples/rendezvous/src/bin/rzv-identify.rs (66%) rename protocols/rendezvous/examples/register.rs => examples/rendezvous/src/bin/rzv-register.rs (72%) rename protocols/rendezvous/examples/rendezvous_point.rs => examples/rendezvous/src/main.rs (62%) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb78b1814cc..93b5575fd33 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -213,7 +213,7 @@ jobs: save-if: ${{ github.ref == 'refs/heads/master' }} - name: Run ipfs-kad example - run: RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run --example ipfs-kad --features full + run: cd ./examples/ipfs-kad/ && RUST_LOG=libp2p_swarm=debug,libp2p_kad=trace,libp2p_tcp=debug cargo run rustfmt: runs-on: ubuntu-latest diff --git a/Cargo.lock b/Cargo.lock index 26051ac1b68..0cc9681b71b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -306,7 +306,7 @@ dependencies = [ "slab", "socket2", "waker-fn", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -346,7 +346,7 @@ dependencies = [ "futures-lite", "libc", "signal-hook", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -634,6 +634,17 @@ dependencies = [ "zeroize", ] +[[package]] +name = "chat-example" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "env_logger 0.10.0", + "futures", + "libp2p", +] + [[package]] name = "ciborium" version = "0.2.0" @@ -1104,6 +1115,18 @@ dependencies = [ "syn", ] +[[package]] +name = "dcutr" +version = "0.1.0" +dependencies = [ + "clap 4.1.6", + "env_logger 0.10.0", + "futures", + "futures-timer", + "libp2p", + "log", +] + [[package]] name = "der" version = "0.6.1" @@ -1205,6 +1228,18 @@ dependencies = [ "syn", ] +[[package]] +name = "distributed-key-value-store" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "env_logger 0.10.0", + "futures", + "libp2p", + "multiaddr", +] + [[package]] name = "dtoa" version = "1.0.5" @@ -1373,6 +1408,20 @@ version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a214f5bb88731d436478f3ae1f8a277b62124089ba9fb67f4f93fb100ef73c90" +[[package]] +name = "file-sharing" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "clap 4.1.6", + "either", + "env_logger 0.10.0", + "futures", + "libp2p", + "multiaddr", +] + [[package]] name = "flate2" version = "1.0.25" @@ -1823,6 +1872,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" +[[package]] +name = "identify" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "futures", + "libp2p", +] + [[package]] name = "idna" version = "0.2.3" @@ -1946,7 +2005,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -1961,6 +2020,30 @@ dependencies = [ "winreg", ] +[[package]] +name = "ipfs-kad" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "env_logger 0.10.0", + "futures", + "libp2p", +] + +[[package]] +name = "ipfs-private" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "either", + "env_logger 0.10.0", + "futures", + "libp2p", + "multiaddr", +] + [[package]] name = "ipnet" version = "2.7.1" @@ -1976,7 +2059,7 @@ dependencies = [ "hermit-abi 0.2.6", "io-lifetimes", "rustix", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -2942,7 +3025,7 @@ dependencies = [ "libc", "log", "wasi 0.11.0+wasi-snapshot-preview1", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -3305,7 +3388,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -3376,6 +3459,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "ping-example" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "futures", + "libp2p", + "multiaddr", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -3437,7 +3531,7 @@ dependencies = [ "libc", "log", "wepoll-ffi", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -3801,6 +3895,19 @@ version = "0.6.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" +[[package]] +name = "rendezvous-example" +version = "0.1.0" +dependencies = [ + "async-std", + "async-trait", + "env_logger 0.10.0", + "futures", + "libp2p", + "log", + "tokio", +] + [[package]] name = "resolv-conf" version = "0.7.0" @@ -3935,7 +4042,7 @@ dependencies = [ "io-lifetimes", "libc", "linux-raw-sys", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -4387,7 +4494,7 @@ dependencies = [ "fastrand", "redox_syscall", "rustix", - "windows-sys", + "windows-sys 0.42.0", ] [[package]] @@ -4479,9 +4586,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.25.0" +version = "1.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" +checksum = "03201d01c3c27a29c8a5cee5b55a93ddae1ccf6f08f65365c2c918f8c1b76f64" dependencies = [ "autocfg", "bytes", @@ -4494,7 +4601,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "windows-sys", + "windows-sys 0.45.0", ] [[package]] @@ -5238,6 +5345,30 @@ dependencies = [ "windows_x86_64_msvc 0.42.1", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.42.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e2522491fbfcd58cc84d47aeb2958948c4b8982e9a2d8a2a35bbaed431390e7" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc 0.42.1", + "windows_i686_gnu 0.42.1", + "windows_i686_msvc 0.42.1", + "windows_x86_64_gnu 0.42.1", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc 0.42.1", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.1" diff --git a/Cargo.toml b/Cargo.toml index b148540e477..f4e5c9e95cf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -142,6 +142,15 @@ libp2p-tcp = { path = "transports/tcp", features = ["tokio"] } [workspace] members = [ "core", + "examples/chat-example", + "examples/dcutr", + "examples/distributed-key-value-store", + "examples/file-sharing", + "examples/identify", + "examples/ipfs-kad", + "examples/ipfs-private", + "examples/ping-example", + "examples/rendezvous", "misc/metrics", "misc/multistream-select", "misc/rw-stream-sink", @@ -180,42 +189,6 @@ members = [ "interop-tests" ] -[[example]] -name = "chat" -required-features = ["full"] - -[[example]] -name = "chat-tokio" -required-features = ["full"] - -[[example]] -name = "file-sharing" -required-features = ["full"] - -[[example]] -name = "gossipsub-chat" -required-features = ["full"] - -[[example]] -name = "ipfs-private" -required-features = ["full"] - -[[example]] -name = "ipfs-kad" -required-features = ["full"] - -[[example]] -name = "ping" -required-features = ["full"] - -[[example]] -name = "mdns-passive-discovery" -required-features = ["full"] - -[[example]] -name = "distributed-key-value-store" -required-features = ["full"] - # Passing arguments to the docsrs builder in order to properly document cfg's. # More information: https://docs.rs/about/builds#cross-compiling [package.metadata.docs.rs] diff --git a/examples/README.md b/examples/README.md index e791ebb803a..3e0babd96ff 100644 --- a/examples/README.md +++ b/examples/README.md @@ -4,56 +4,24 @@ A set of examples showcasing how to use rust-libp2p. ## Getting started -- [Ping](ping.rs) - - Small `ping` clone, sending a ping to a peer, expecting a pong as a response. See - [tutorial](../src/tutorials/ping.rs) for a step-by-step guide building the example. ## Individual libp2p features -- [Chat](./chat.rs) - - A basic chat application demonstrating libp2p and the mDNS and floodsub protocols. - - - [Gossipsub chat](./gossipsub-chat.rs) - - Same as the chat example but using mDNS and the Gossipsub protocol. - - - [Tokio based chat](./chat-tokio.rs) - - Same as the chat example but using tokio for all asynchronous tasks and I/O. - -- [Distributed key-value store](./distributed-key-value-store.rs) - - A basic key value store demonstrating libp2p and the mDNS and Kademlia protocol. - -- [Identify](../protocols/identify/examples/identify.rs) - - Demonstrates how to use identify protocol to query peer information. +- [Chat](./chat) A basic chat application demonstrating libp2p and the mDNS and Gossipsub protocols. +- [Distributed key-value store](./distributed-key-value-store) A basic key value store demonstrating libp2p and the mDNS and Kademlia protocol. -- [IPFS Kademlia](ipfs-kad.rs) +- [File sharing application](./file-sharing) Basic file sharing application with peers either providing or locating and getting files by name. - Demonstrates how to perform Kademlia queries on the IPFS network. - -- [IPFS Private](ipfs-private.rs) - - Implementation using the gossipsub, ping and identify protocols to implement the ipfs private - swarms feature. - -- [Passive Discovery via MDNS](mdns-passive-discovery.rs) + While obviously showcasing how to build a basic file sharing application with the Kademlia and + Request-Response protocol, the actual goal of this example is **to show how to integrate + rust-libp2p into a larger application**. - Discover peers on the same network via the MDNS protocol. - -- [Hole punching tutorial](https://docs.rs/libp2p/latest/libp2p/tutorials/hole_punching/index.html) +- [IPFS Kademlia](./ipfs-kad) Demonstrates how to perform Kademlia queries on the IPFS network. - Tutorial on how to overcome firewalls and NATs with libp2p’s hole punching mechanism. +- [IPFS Private](./ipfs-private) Implementation using the gossipsub, ping and identify protocols to implement the ipfs private swarms feature. -## Integration into a larger application +- [Ping](./ping) Small `ping` clone, sending a ping to a peer, expecting a pong as a response. See [tutorial](../src/tutorials/ping.rs) for a step-by-step guide building the example. -- [File sharing application](./file-sharing.rs) - Basic file sharing application with peers either providing or locating and getting files by name. +- [Rendezvous](./rendezvous) Rendezvous Protocol. See [specs](https://github.com/libp2p/specs/blob/master/rendezvous/README.md). - While obviously showcasing how to build a basic file sharing application with the Kademlia and - Request-Response protocol, the actual goal of this example is **to show how to integrate - rust-libp2p into a larger application**. diff --git a/examples/chat-example/Cargo.toml b/examples/chat-example/Cargo.toml new file mode 100644 index 00000000000..6d65b84d74d --- /dev/null +++ b/examples/chat-example/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "chat-example" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +env_logger = "0.10.0" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "gossipsub", "mdns", "mplex", "noise", "macros", "tcp", "websocket", "yamux"] } diff --git a/examples/gossipsub-chat.rs b/examples/chat-example/src/main.rs similarity index 99% rename from examples/gossipsub-chat.rs rename to examples/chat-example/src/main.rs index 0353b72ec5e..c388bd6adb1 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/chat-example/src/main.rs @@ -24,7 +24,7 @@ //! Using two terminal windows, start two instances, typing the following in each: //! //! ```sh -//! cargo run --example gossipsub-chat --features=full +//! cargo run //! ``` //! //! Mutual mDNS discovery may take a few seconds. When each peer does discover the other diff --git a/examples/chat-tokio.rs b/examples/chat-tokio.rs deleted file mode 100644 index e2f88201fb8..00000000000 --- a/examples/chat-tokio.rs +++ /dev/null @@ -1,157 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A basic chat application demonstrating libp2p with the mDNS and floodsub protocols -//! using tokio for all asynchronous tasks and I/O. In order for all used libp2p -//! crates to use tokio, it enables tokio-specific features for some crates. -//! -//! The example is run per node as follows: -//! -//! ```sh -//! cargo run --example chat-tokio --features=full -//! ``` - -use futures::StreamExt; -use libp2p::{ - core::upgrade, - floodsub::{self, Floodsub, FloodsubEvent}, - identity, mdns, mplex, noise, - swarm::{NetworkBehaviour, SwarmEvent}, - tcp, Multiaddr, PeerId, Transport, -}; -use std::error::Error; -use tokio::io::{self, AsyncBufReadExt}; - -/// The `tokio::main` attribute sets up a tokio runtime. -#[tokio::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - // Create a random PeerId - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = PeerId::from(id_keys.public()); - println!("Local peer id: {peer_id:?}"); - - // Create a tokio-based TCP transport use noise for authenticated - // encryption and Mplex for multiplexing of substreams on a TCP stream. - let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) - .upgrade(upgrade::Version::V1) - .authenticate( - noise::NoiseAuthenticated::xx(&id_keys) - .expect("Signing libp2p-noise static DH keypair failed."), - ) - .multiplex(mplex::MplexConfig::new()) - .boxed(); - - // Create a Floodsub topic - let floodsub_topic = floodsub::Topic::new("chat"); - - // We create a custom behaviour that combines floodsub and mDNS. - // The derive generates a delegating `NetworkBehaviour` impl. - #[derive(NetworkBehaviour)] - #[behaviour(out_event = "MyBehaviourEvent")] - struct MyBehaviour { - floodsub: Floodsub, - mdns: mdns::tokio::Behaviour, - } - - #[allow(clippy::large_enum_variant)] - enum MyBehaviourEvent { - Floodsub(FloodsubEvent), - Mdns(mdns::Event), - } - - impl From for MyBehaviourEvent { - fn from(event: FloodsubEvent) -> Self { - MyBehaviourEvent::Floodsub(event) - } - } - - impl From for MyBehaviourEvent { - fn from(event: mdns::Event) -> Self { - MyBehaviourEvent::Mdns(event) - } - } - - // Create a Swarm to manage peers and events. - let mdns_behaviour = mdns::Behaviour::new(Default::default(), peer_id)?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(peer_id), - mdns: mdns_behaviour, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - - let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id); - - // Reach out to another node if specified - if let Some(to_dial) = std::env::args().nth(1) { - let addr: Multiaddr = to_dial.parse()?; - swarm.dial(addr)?; - println!("Dialed {to_dial:?}"); - } - - // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines(); - - // Listen on all interfaces and whatever port the OS assigns - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - - // Kick it off - loop { - tokio::select! { - line = stdin.next_line() => { - let line = line?.expect("stdin closed"); - swarm.behaviour_mut().floodsub.publish(floodsub_topic.clone(), line.as_bytes()); - } - event = swarm.select_next_some() => { - match event { - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {address:?}"); - } - SwarmEvent::Behaviour(MyBehaviourEvent::Floodsub(FloodsubEvent::Message(message))) => { - println!( - "Received: '{:?}' from {:?}", - String::from_utf8_lossy(&message.data), - message.source - ); - } - SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(event)) => { - match event { - mdns::Event::Discovered(list) => { - for (peer, _) in list { - swarm.behaviour_mut().floodsub.add_node_to_partial_view(peer); - } - } - mdns::Event::Expired(list) => { - for (peer, _) in list { - if !swarm.behaviour().mdns.has_node(&peer) { - swarm.behaviour_mut().floodsub.remove_node_from_partial_view(&peer); - } - } - } - } - } - _ => {} - } - } - } - } -} diff --git a/examples/chat.rs b/examples/chat.rs deleted file mode 100644 index 6d6825125df..00000000000 --- a/examples/chat.rs +++ /dev/null @@ -1,179 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! A basic chat application demonstrating libp2p and the mDNS and floodsub protocols. -//! -//! Using two terminal windows, start two instances. If you local network allows mDNS, -//! they will automatically connect. Type a message in either terminal and hit return: the -//! message is sent and printed in the other terminal. Close with Ctrl-c. -//! -//! You can of course open more terminal windows and add more participants. -//! Dialing any of the other peers will propagate the new participant to all -//! chat members and everyone will receive all messages. -//! -//! # If they don't automatically connect -//! -//! If the nodes don't automatically connect, take note of the listening addresses of the first -//! instance and start the second with one of the addresses as the first argument. In the first -//! terminal window, run: -//! -//! ```sh -//! cargo run --example chat --features=full -//! ``` -//! -//! It will print the PeerId and the listening addresses, e.g. `Listening on -//! "/ip4/0.0.0.0/tcp/24915"` -//! -//! In the second terminal window, start a new instance of the example with: -//! -//! ```sh -//! cargo run --example chat --features=full -- /ip4/127.0.0.1/tcp/24915 -//! ``` -//! -//! The two nodes then connect. - -use async_std::io; -use futures::{ - prelude::{stream::StreamExt, *}, - select, -}; -use libp2p::{ - floodsub::{self, Floodsub, FloodsubEvent}, - identity, mdns, - swarm::{NetworkBehaviour, SwarmEvent}, - Multiaddr, PeerId, Swarm, -}; -use std::error::Error; - -#[async_std::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - // Create a random PeerId - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {local_peer_id:?}"); - - // Set up an encrypted DNS-enabled TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::development_transport(local_key).await?; - - // Create a Floodsub topic - let floodsub_topic = floodsub::Topic::new("chat"); - - // We create a custom network behaviour that combines floodsub and mDNS. - // Use the derive to generate delegating NetworkBehaviour impl. - #[derive(NetworkBehaviour)] - #[behaviour(out_event = "OutEvent")] - struct MyBehaviour { - floodsub: Floodsub, - mdns: mdns::async_io::Behaviour, - } - - #[allow(clippy::large_enum_variant)] - #[derive(Debug)] - enum OutEvent { - Floodsub(FloodsubEvent), - Mdns(mdns::Event), - } - - impl From for OutEvent { - fn from(v: mdns::Event) -> Self { - Self::Mdns(v) - } - } - - impl From for OutEvent { - fn from(v: FloodsubEvent) -> Self { - Self::Floodsub(v) - } - } - - // Create a Swarm to manage peers and events - let mut swarm = { - let mdns = mdns::async_io::Behaviour::new(mdns::Config::default(), local_peer_id)?; - let mut behaviour = MyBehaviour { - floodsub: Floodsub::new(local_peer_id), - mdns, - }; - - behaviour.floodsub.subscribe(floodsub_topic.clone()); - Swarm::with_threadpool_executor(transport, behaviour, local_peer_id) - }; - - // Reach out to another node if specified - if let Some(to_dial) = std::env::args().nth(1) { - let addr: Multiaddr = to_dial.parse()?; - swarm.dial(addr)?; - println!("Dialed {to_dial:?}") - } - - // Read full lines from stdin - let mut stdin = io::BufReader::new(io::stdin()).lines().fuse(); - - // Listen on all interfaces and whatever port the OS assigns - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - - // Kick it off - loop { - select! { - line = stdin.select_next_some() => swarm - .behaviour_mut() - .floodsub - .publish(floodsub_topic.clone(), line.expect("Stdin not to close").as_bytes()), - event = swarm.select_next_some() => match event { - SwarmEvent::NewListenAddr { address, .. } => { - println!("Listening on {address:?}"); - } - SwarmEvent::Behaviour(OutEvent::Floodsub( - FloodsubEvent::Message(message) - )) => { - println!( - "Received: '{:?}' from {:?}", - String::from_utf8_lossy(&message.data), - message.source - ); - } - SwarmEvent::Behaviour(OutEvent::Mdns( - mdns::Event::Discovered(list) - )) => { - for (peer, _) in list { - swarm - .behaviour_mut() - .floodsub - .add_node_to_partial_view(peer); - } - } - SwarmEvent::Behaviour(OutEvent::Mdns(mdns::Event::Expired( - list - ))) => { - for (peer, _) in list { - if !swarm.behaviour_mut().mdns.has_node(&peer) { - swarm - .behaviour_mut() - .floodsub - .remove_node_from_partial_view(&peer); - } - } - }, - _ => {} - } - } - } -} diff --git a/examples/dcutr/Cargo.toml b/examples/dcutr/Cargo.toml new file mode 100644 index 00000000000..dc678f8052a --- /dev/null +++ b/examples/dcutr/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "dcutr" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +clap = { version = "4.1.6", features = ["derive"] } +env_logger = "0.10.0" +futures = "0.3.26" +futures-timer = "3.0" +libp2p = { path = "../../", features = ["async-std", "dns", "dcutr", "identify", "macros", "mplex", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] } +log = "0.4" diff --git a/protocols/dcutr/examples/dcutr.rs b/examples/dcutr/src/main.rs similarity index 92% rename from protocols/dcutr/examples/dcutr.rs rename to examples/dcutr/src/main.rs index 5daa7dcb6b2..aa7b32af4ce 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/examples/dcutr/src/main.rs @@ -19,22 +19,23 @@ // DEALINGS IN THE SOFTWARE. use clap::Parser; -use futures::executor::{block_on, ThreadPool}; -use futures::future::FutureExt; -use futures::stream::StreamExt; -use libp2p_core::multiaddr::{Multiaddr, Protocol}; -use libp2p_core::transport::OrTransport; -use libp2p_core::upgrade; -use libp2p_core::Transport; -use libp2p_core::{identity, PeerId}; -use libp2p_dcutr as dcutr; -use libp2p_dns::DnsConfig; -use libp2p_identify as identify; -use libp2p_noise as noise; -use libp2p_ping as ping; -use libp2p_relay as relay; -use libp2p_swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}; -use libp2p_tcp as tcp; +use futures::{ + executor::{block_on, ThreadPool}, + future::FutureExt, + stream::StreamExt, +}; +use libp2p::{ + core::{ + multiaddr::{Multiaddr, Protocol}, + transport::{OrTransport, Transport}, + upgrade, PeerId, + }, + dcutr, + dns::DnsConfig, + identify, identity, noise, ping, relay, + swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent}, + tcp, yamux, +}; use log::info; use std::error::Error; use std::net::Ipv4Addr; @@ -100,15 +101,11 @@ fn main() -> Result<(), Box> { noise::NoiseAuthenticated::xx(&local_key) .expect("Signing libp2p-noise static DH keypair failed."), ) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .multiplex(yamux::YamuxConfig::default()) .boxed(); #[derive(NetworkBehaviour)] - #[behaviour( - out_event = "Event", - event_process = false, - prelude = "libp2p_swarm::derive_prelude" - )] + #[behaviour(out_event = "Event", event_process = false)] struct Behaviour { relay_client: relay::client::Behaviour, ping: ping::Behaviour, diff --git a/examples/distributed-key-value-store/Cargo.toml b/examples/distributed-key-value-store/Cargo.toml new file mode 100644 index 00000000000..bd3308f6fb6 --- /dev/null +++ b/examples/distributed-key-value-store/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "distributed-key-value-store" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +env_logger = "0.10" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mdns", "mplex", "noise", "macros", "tcp", "websocket", "yamux"] } +multiaddr = { version = "0.17.0" } diff --git a/examples/distributed-key-value-store.rs b/examples/distributed-key-value-store/src/main.rs similarity index 98% rename from examples/distributed-key-value-store.rs rename to examples/distributed-key-value-store/src/main.rs index 18d74270e7e..c2e73a85f07 100644 --- a/examples/distributed-key-value-store.rs +++ b/examples/distributed-key-value-store/src/main.rs @@ -44,15 +44,14 @@ use async_std::io; use futures::{prelude::*, select}; use libp2p::kad::record::store::MemoryStore; use libp2p::kad::{ - record::Key, AddProviderOk, Kademlia, KademliaEvent, PeerRecord, PutRecordOk, QueryResult, - Quorum, Record, + record::Key, AddProviderOk, GetProvidersOk, GetRecordOk, Kademlia, KademliaEvent, PeerRecord, + PutRecordOk, QueryResult, Quorum, Record, }; use libp2p::{ development_transport, identity, mdns, swarm::{NetworkBehaviour, SwarmEvent}, PeerId, Swarm, }; -use libp2p_kad::{GetProvidersOk, GetRecordOk}; use std::error::Error; #[async_std::main] diff --git a/examples/file-sharing.rs b/examples/file-sharing.rs deleted file mode 100644 index 6cae1f492a5..00000000000 --- a/examples/file-sharing.rs +++ /dev/null @@ -1,751 +0,0 @@ -// Copyright 2021 Protocol Labs. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -//! # File sharing example -//! -//! Basic file sharing application with peers either providing or locating and -//! getting files by name. -//! -//! While obviously showcasing how to build a basic file sharing application, -//! the actual goal of this example is **to show how to integrate rust-libp2p -//! into a larger application**. -//! -//! ## Sample plot -//! -//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C -//! retrieves a file. -//! -//! Provider nodes A and B each provide a file, file FA and FB respectively. -//! They do so by advertising themselves as a provider for their file on a DHT -//! via [`libp2p-kad`]. The two, among other nodes of the network, are -//! interconnected via the DHT. -//! -//! Node C can locate the providers for file FA or FB on the DHT via -//! [`libp2p-kad`] without being connected to the specific node providing the -//! file, but any node of the DHT. Node C then connects to the corresponding -//! node and requests the file content of the file via -//! [`libp2p-request-response`]. -//! -//! ## Architectural properties -//! -//! - Clean clonable async/await interface ([`Client`]) to interact with the -//! network layer. -//! -//! - Single task driving the network layer, no locks required. -//! -//! ## Usage -//! -//! A two node setup with one node providing the file and one node requesting the file. -//! -//! 1. Run command below in one terminal. -//! -//! ``` -//! cargo run --example file-sharing --features=full -- \ -//! --listen-address /ip4/127.0.0.1/tcp/40837 \ -//! --secret-key-seed 1 \ -//! provide \ -//! --path \ -//! --name -//! ``` -//! -//! 2. Run command below in another terminal. -//! -//! ``` -//! cargo run --example file-sharing --features=full -- \ -//! --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \ -//! get \ -//! --name -//! ``` -//! -//! Note: The client does not need to be directly connected to the providing -//! peer, as long as both are connected to some node on the same DHT. - -use async_std::io; -use async_std::task::spawn; -use clap::Parser; -use futures::prelude::*; -use libp2p::core::{Multiaddr, PeerId}; -use libp2p::multiaddr::Protocol; -use std::error::Error; -use std::io::Write; -use std::path::PathBuf; - -#[async_std::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - let opt = Opt::parse(); - - let (mut network_client, mut network_events, network_event_loop) = - network::new(opt.secret_key_seed).await?; - - // Spawn the network task for it to run in the background. - spawn(network_event_loop.run()); - - // In case a listen address was provided use it, otherwise listen on any - // address. - match opt.listen_address { - Some(addr) => network_client - .start_listening(addr) - .await - .expect("Listening not to fail."), - None => network_client - .start_listening("/ip4/0.0.0.0/tcp/0".parse()?) - .await - .expect("Listening not to fail."), - }; - - // In case the user provided an address of a peer on the CLI, dial it. - if let Some(addr) = opt.peer { - let peer_id = match addr.iter().last() { - Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."), - _ => return Err("Expect peer multiaddr to contain peer ID.".into()), - }; - network_client - .dial(peer_id, addr) - .await - .expect("Dial to succeed"); - } - - match opt.argument { - // Providing a file. - CliArgument::Provide { path, name } => { - // Advertise oneself as a provider of the file on the DHT. - network_client.start_providing(name.clone()).await; - - loop { - match network_events.next().await { - // Reply with the content of the file on incoming requests. - Some(network::Event::InboundRequest { request, channel }) => { - if request == name { - network_client - .respond_file(std::fs::read(&path)?, channel) - .await; - } - } - e => todo!("{:?}", e), - } - } - } - // Locating and getting a file. - CliArgument::Get { name } => { - // Locate all nodes providing the file. - let providers = network_client.get_providers(name.clone()).await; - if providers.is_empty() { - return Err(format!("Could not find provider for file {name}.").into()); - } - - // Request the content of the file from each node. - let requests = providers.into_iter().map(|p| { - let mut network_client = network_client.clone(); - let name = name.clone(); - async move { network_client.request_file(p, name).await }.boxed() - }); - - // Await the requests, ignore the remaining once a single one succeeds. - let file_content = futures::future::select_ok(requests) - .await - .map_err(|_| "None of the providers returned file.")? - .0; - - std::io::stdout().write_all(&file_content)?; - } - } - - Ok(()) -} - -#[derive(Parser, Debug)] -#[clap(name = "libp2p file sharing example")] -struct Opt { - /// Fixed value to generate deterministic peer ID. - #[clap(long)] - secret_key_seed: Option, - - #[clap(long)] - peer: Option, - - #[clap(long)] - listen_address: Option, - - #[clap(subcommand)] - argument: CliArgument, -} - -#[derive(Debug, Parser)] -enum CliArgument { - Provide { - #[clap(long)] - path: PathBuf, - #[clap(long)] - name: String, - }, - Get { - #[clap(long)] - name: String, - }, -} - -/// The network module, encapsulating all network related logic. -mod network { - use super::*; - use async_trait::async_trait; - use either::Either; - use futures::channel::{mpsc, oneshot}; - use libp2p::core::upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}; - use libp2p::identity; - use libp2p::identity::ed25519; - use libp2p::kad::record::store::MemoryStore; - use libp2p::kad::{GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult}; - use libp2p::multiaddr::Protocol; - use libp2p::request_response::{self, ProtocolSupport, RequestId, ResponseChannel}; - use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}; - use std::collections::{hash_map, HashMap, HashSet}; - use std::iter; - - /// Creates the network components, namely: - /// - /// - The network client to interact with the network layer from anywhere - /// within your application. - /// - /// - The network event stream, e.g. for incoming requests. - /// - /// - The network task driving the network itself. - pub async fn new( - secret_key_seed: Option, - ) -> Result<(Client, impl Stream, EventLoop), Box> { - // Create a public/private key pair, either random or based on a seed. - let id_keys = match secret_key_seed { - Some(seed) => { - let mut bytes = [0u8; 32]; - bytes[0] = seed; - let secret_key = ed25519::SecretKey::from_bytes(&mut bytes).expect( - "this returns `Err` only if the length is wrong; the length is correct; qed", - ); - identity::Keypair::Ed25519(secret_key.into()) - } - None => identity::Keypair::generate_ed25519(), - }; - let peer_id = id_keys.public().to_peer_id(); - - // Build the Swarm, connecting the lower layer transport logic with the - // higher layer network behaviour logic. - let swarm = Swarm::with_threadpool_executor( - libp2p::development_transport(id_keys).await?, - ComposedBehaviour { - kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), - request_response: request_response::Behaviour::new( - FileExchangeCodec(), - iter::once((FileExchangeProtocol(), ProtocolSupport::Full)), - Default::default(), - ), - }, - peer_id, - ); - - let (command_sender, command_receiver) = mpsc::channel(0); - let (event_sender, event_receiver) = mpsc::channel(0); - - Ok(( - Client { - sender: command_sender, - }, - event_receiver, - EventLoop::new(swarm, command_receiver, event_sender), - )) - } - - #[derive(Clone)] - pub struct Client { - sender: mpsc::Sender, - } - - impl Client { - /// Listen for incoming connections on the given address. - pub async fn start_listening( - &mut self, - addr: Multiaddr, - ) -> Result<(), Box> { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(Command::StartListening { addr, sender }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") - } - - /// Dial the given peer at the given address. - pub async fn dial( - &mut self, - peer_id: PeerId, - peer_addr: Multiaddr, - ) -> Result<(), Box> { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(Command::Dial { - peer_id, - peer_addr, - sender, - }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") - } - - /// Advertise the local node as the provider of the given file on the DHT. - pub async fn start_providing(&mut self, file_name: String) { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(Command::StartProviding { file_name, sender }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped."); - } - - /// Find the providers for the given file on the DHT. - pub async fn get_providers(&mut self, file_name: String) -> HashSet { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(Command::GetProviders { file_name, sender }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not to be dropped.") - } - - /// Request the content of the given file from the given peer. - pub async fn request_file( - &mut self, - peer: PeerId, - file_name: String, - ) -> Result, Box> { - let (sender, receiver) = oneshot::channel(); - self.sender - .send(Command::RequestFile { - file_name, - peer, - sender, - }) - .await - .expect("Command receiver not to be dropped."); - receiver.await.expect("Sender not be dropped.") - } - - /// Respond with the provided file content to the given request. - pub async fn respond_file( - &mut self, - file: Vec, - channel: ResponseChannel, - ) { - self.sender - .send(Command::RespondFile { file, channel }) - .await - .expect("Command receiver not to be dropped."); - } - } - - pub struct EventLoop { - swarm: Swarm, - command_receiver: mpsc::Receiver, - event_sender: mpsc::Sender, - pending_dial: HashMap>>>, - pending_start_providing: HashMap>, - pending_get_providers: HashMap>>, - pending_request_file: - HashMap, Box>>>, - } - - impl EventLoop { - fn new( - swarm: Swarm, - command_receiver: mpsc::Receiver, - event_sender: mpsc::Sender, - ) -> Self { - Self { - swarm, - command_receiver, - event_sender, - pending_dial: Default::default(), - pending_start_providing: Default::default(), - pending_get_providers: Default::default(), - pending_request_file: Default::default(), - } - } - - pub async fn run(mut self) { - loop { - futures::select! { - event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await , - command = self.command_receiver.next() => match command { - Some(c) => self.handle_command(c).await, - // Command channel closed, thus shutting down the network event loop. - None=> return, - }, - } - } - } - - async fn handle_event( - &mut self, - event: SwarmEvent< - ComposedEvent, - Either, io::Error>, - >, - ) { - match event { - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { - id, - result: QueryResult::StartProviding(_), - .. - }, - )) => { - let sender: oneshot::Sender<()> = self - .pending_start_providing - .remove(&id) - .expect("Completed query to be previously pending."); - let _ = sender.send(()); - } - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { - id, - result: - QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { - providers, - .. - })), - .. - }, - )) => { - if let Some(sender) = self.pending_get_providers.remove(&id) { - sender.send(providers).expect("Receiver not to be dropped"); - - // Finish the query. We are only interested in the first result. - self.swarm - .behaviour_mut() - .kademlia - .query_mut(&id) - .unwrap() - .finish(); - } - } - SwarmEvent::Behaviour(ComposedEvent::Kademlia( - KademliaEvent::OutboundQueryProgressed { - result: - QueryResult::GetProviders(Ok( - GetProvidersOk::FinishedWithNoAdditionalRecord { .. }, - )), - .. - }, - )) => {} - SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - request_response::Event::Message { message, .. }, - )) => match message { - request_response::Message::Request { - request, channel, .. - } => { - self.event_sender - .send(Event::InboundRequest { - request: request.0, - channel, - }) - .await - .expect("Event receiver not to be dropped."); - } - request_response::Message::Response { - request_id, - response, - } => { - let _ = self - .pending_request_file - .remove(&request_id) - .expect("Request to still be pending.") - .send(Ok(response.0)); - } - }, - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - request_response::Event::OutboundFailure { - request_id, error, .. - }, - )) => { - let _ = self - .pending_request_file - .remove(&request_id) - .expect("Request to still be pending.") - .send(Err(Box::new(error))); - } - SwarmEvent::Behaviour(ComposedEvent::RequestResponse( - request_response::Event::ResponseSent { .. }, - )) => {} - SwarmEvent::NewListenAddr { address, .. } => { - let local_peer_id = *self.swarm.local_peer_id(); - eprintln!( - "Local node is listening on {:?}", - address.with(Protocol::P2p(local_peer_id.into())) - ); - } - SwarmEvent::IncomingConnection { .. } => {} - SwarmEvent::ConnectionEstablished { - peer_id, endpoint, .. - } => { - if endpoint.is_dialer() { - if let Some(sender) = self.pending_dial.remove(&peer_id) { - let _ = sender.send(Ok(())); - } - } - } - SwarmEvent::ConnectionClosed { .. } => {} - SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { - if let Some(peer_id) = peer_id { - if let Some(sender) = self.pending_dial.remove(&peer_id) { - let _ = sender.send(Err(Box::new(error))); - } - } - } - SwarmEvent::IncomingConnectionError { .. } => {} - SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"), - e => panic!("{e:?}"), - } - } - - async fn handle_command(&mut self, command: Command) { - match command { - Command::StartListening { addr, sender } => { - let _ = match self.swarm.listen_on(addr) { - Ok(_) => sender.send(Ok(())), - Err(e) => sender.send(Err(Box::new(e))), - }; - } - Command::Dial { - peer_id, - peer_addr, - sender, - } => { - if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) { - self.swarm - .behaviour_mut() - .kademlia - .add_address(&peer_id, peer_addr.clone()); - match self - .swarm - .dial(peer_addr.with(Protocol::P2p(peer_id.into()))) - { - Ok(()) => { - e.insert(sender); - } - Err(e) => { - let _ = sender.send(Err(Box::new(e))); - } - } - } else { - todo!("Already dialing peer."); - } - } - Command::StartProviding { file_name, sender } => { - let query_id = self - .swarm - .behaviour_mut() - .kademlia - .start_providing(file_name.into_bytes().into()) - .expect("No store error."); - self.pending_start_providing.insert(query_id, sender); - } - Command::GetProviders { file_name, sender } => { - let query_id = self - .swarm - .behaviour_mut() - .kademlia - .get_providers(file_name.into_bytes().into()); - self.pending_get_providers.insert(query_id, sender); - } - Command::RequestFile { - file_name, - peer, - sender, - } => { - let request_id = self - .swarm - .behaviour_mut() - .request_response - .send_request(&peer, FileRequest(file_name)); - self.pending_request_file.insert(request_id, sender); - } - Command::RespondFile { file, channel } => { - self.swarm - .behaviour_mut() - .request_response - .send_response(channel, FileResponse(file)) - .expect("Connection to peer to be still open."); - } - } - } - } - - #[derive(NetworkBehaviour)] - #[behaviour(out_event = "ComposedEvent")] - struct ComposedBehaviour { - request_response: request_response::Behaviour, - kademlia: Kademlia, - } - - #[derive(Debug)] - enum ComposedEvent { - RequestResponse(request_response::Event), - Kademlia(KademliaEvent), - } - - impl From> for ComposedEvent { - fn from(event: request_response::Event) -> Self { - ComposedEvent::RequestResponse(event) - } - } - - impl From for ComposedEvent { - fn from(event: KademliaEvent) -> Self { - ComposedEvent::Kademlia(event) - } - } - - #[derive(Debug)] - enum Command { - StartListening { - addr: Multiaddr, - sender: oneshot::Sender>>, - }, - Dial { - peer_id: PeerId, - peer_addr: Multiaddr, - sender: oneshot::Sender>>, - }, - StartProviding { - file_name: String, - sender: oneshot::Sender<()>, - }, - GetProviders { - file_name: String, - sender: oneshot::Sender>, - }, - RequestFile { - file_name: String, - peer: PeerId, - sender: oneshot::Sender, Box>>, - }, - RespondFile { - file: Vec, - channel: ResponseChannel, - }, - } - - #[derive(Debug)] - pub enum Event { - InboundRequest { - request: String, - channel: ResponseChannel, - }, - } - - // Simple file exchange protocol - - #[derive(Debug, Clone)] - struct FileExchangeProtocol(); - #[derive(Clone)] - struct FileExchangeCodec(); - #[derive(Debug, Clone, PartialEq, Eq)] - struct FileRequest(String); - #[derive(Debug, Clone, PartialEq, Eq)] - pub struct FileResponse(Vec); - - impl ProtocolName for FileExchangeProtocol { - fn protocol_name(&self) -> &[u8] { - "/file-exchange/1".as_bytes() - } - } - - #[async_trait] - impl request_response::Codec for FileExchangeCodec { - type Protocol = FileExchangeProtocol; - type Request = FileRequest; - type Response = FileResponse; - - async fn read_request( - &mut self, - _: &FileExchangeProtocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let vec = read_length_prefixed(io, 1_000_000).await?; - - if vec.is_empty() { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok(FileRequest(String::from_utf8(vec).unwrap())) - } - - async fn read_response( - &mut self, - _: &FileExchangeProtocol, - io: &mut T, - ) -> io::Result - where - T: AsyncRead + Unpin + Send, - { - let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum - - if vec.is_empty() { - return Err(io::ErrorKind::UnexpectedEof.into()); - } - - Ok(FileResponse(vec)) - } - - async fn write_request( - &mut self, - _: &FileExchangeProtocol, - io: &mut T, - FileRequest(data): FileRequest, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, data).await?; - io.close().await?; - - Ok(()) - } - - async fn write_response( - &mut self, - _: &FileExchangeProtocol, - io: &mut T, - FileResponse(data): FileResponse, - ) -> io::Result<()> - where - T: AsyncWrite + Unpin + Send, - { - write_length_prefixed(io, data).await?; - io.close().await?; - - Ok(()) - } - } -} diff --git a/examples/file-sharing/Cargo.toml b/examples/file-sharing/Cargo.toml new file mode 100644 index 00000000000..9ffa026bac8 --- /dev/null +++ b/examples/file-sharing/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "file-sharing" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +clap = { version = "4.1.6", features = ["derive"] } +either = "1.8" +env_logger = "0.10" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mplex", "noise", "macros", "request-response", "tcp", "websocket", "yamux"] } +multiaddr = { version = "0.17.0" } diff --git a/examples/file-sharing/src/main.rs b/examples/file-sharing/src/main.rs new file mode 100644 index 00000000000..27649be9657 --- /dev/null +++ b/examples/file-sharing/src/main.rs @@ -0,0 +1,206 @@ +// Copyright 2021 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! # File sharing example +//! +//! Basic file sharing application with peers either providing or locating and +//! getting files by name. +//! +//! While obviously showcasing how to build a basic file sharing application, +//! the actual goal of this example is **to show how to integrate rust-libp2p +//! into a larger application**. +//! +//! ## Sample plot +//! +//! Assuming there are 3 nodes, A, B and C. A and B each provide a file while C +//! retrieves a file. +//! +//! Provider nodes A and B each provide a file, file FA and FB respectively. +//! They do so by advertising themselves as a provider for their file on a DHT +//! via [`libp2p-kad`]. The two, among other nodes of the network, are +//! interconnected via the DHT. +//! +//! Node C can locate the providers for file FA or FB on the DHT via +//! [`libp2p-kad`] without being connected to the specific node providing the +//! file, but any node of the DHT. Node C then connects to the corresponding +//! node and requests the file content of the file via +//! [`libp2p-request-response`]. +//! +//! ## Architectural properties +//! +//! - Clean clonable async/await interface ([`Client`](network::Client)) to interact with the +//! network layer. +//! +//! - Single task driving the network layer, no locks required. +//! +//! ## Usage +//! +//! A two node setup with one node providing the file and one node requesting the file. +//! +//! 1. Run command below in one terminal. +//! +//! ```sh +//! cargo run -- --listen-address /ip4/127.0.0.1/tcp/40837 \ +//! --secret-key-seed 1 \ +//! provide \ +//! --path \ +//! --name +//! ``` +//! +//! 2. Run command below in another terminal. +//! +//! ```sh +//! cargo run -- --peer /ip4/127.0.0.1/tcp/40837/p2p/12D3KooWPjceQrSwdWXPyLLeABRXmuqt69Rg3sBYbU1Nft9HyQ6X \ +//! get \ +//! --name +//! ``` +//! +//! Note: The client does not need to be directly connected to the providing +//! peer, as long as both are connected to some node on the same DHT. +mod network; + +use async_std::task::spawn; +use clap::Parser; + +use futures::prelude::*; +use futures::StreamExt; +use libp2p::{ + core::{Multiaddr, PeerId}, + multiaddr::Protocol, +}; +use std::error::Error; +use std::io::Write; +use std::path::PathBuf; + +#[async_std::main] +async fn main() -> Result<(), Box> { + env_logger::init(); + + let opt = Opt::parse(); + + let (mut network_client, mut network_events, network_event_loop) = + network::new(opt.secret_key_seed).await?; + + // Spawn the network task for it to run in the background. + spawn(network_event_loop.run()); + + // In case a listen address was provided use it, otherwise listen on any + // address. + match opt.listen_address { + Some(addr) => network_client + .start_listening(addr) + .await + .expect("Listening not to fail."), + None => network_client + .start_listening("/ip4/0.0.0.0/tcp/0".parse()?) + .await + .expect("Listening not to fail."), + }; + + // In case the user provided an address of a peer on the CLI, dial it. + if let Some(addr) = opt.peer { + let peer_id = match addr.iter().last() { + Some(Protocol::P2p(hash)) => PeerId::from_multihash(hash).expect("Valid hash."), + _ => return Err("Expect peer multiaddr to contain peer ID.".into()), + }; + network_client + .dial(peer_id, addr) + .await + .expect("Dial to succeed"); + } + + match opt.argument { + // Providing a file. + CliArgument::Provide { path, name } => { + // Advertise oneself as a provider of the file on the DHT. + network_client.start_providing(name.clone()).await; + + loop { + match network_events.next().await { + // Reply with the content of the file on incoming requests. + Some(network::Event::InboundRequest { request, channel }) => { + if request == name { + network_client + .respond_file(std::fs::read(&path)?, channel) + .await; + } + } + e => todo!("{:?}", e), + } + } + } + // Locating and getting a file. + CliArgument::Get { name } => { + // Locate all nodes providing the file. + let providers = network_client.get_providers(name.clone()).await; + if providers.is_empty() { + return Err(format!("Could not find provider for file {name}.").into()); + } + + // Request the content of the file from each node. + let requests = providers.into_iter().map(|p| { + let mut network_client = network_client.clone(); + let name = name.clone(); + async move { network_client.request_file(p, name).await }.boxed() + }); + + // Await the requests, ignore the remaining once a single one succeeds. + let file_content = futures::future::select_ok(requests) + .await + .map_err(|_| "None of the providers returned file.")? + .0; + + std::io::stdout().write_all(&file_content)?; + } + } + + Ok(()) +} + +#[derive(Parser, Debug)] +#[clap(name = "libp2p file sharing example")] +struct Opt { + /// Fixed value to generate deterministic peer ID. + #[clap(long)] + secret_key_seed: Option, + + #[clap(long)] + peer: Option, + + #[clap(long)] + listen_address: Option, + + #[clap(subcommand)] + argument: CliArgument, +} + +#[derive(Debug, Parser)] +enum CliArgument { + Provide { + #[clap(long)] + path: PathBuf, + #[clap(long)] + name: String, + }, + Get { + #[clap(long)] + name: String, + }, +} diff --git a/examples/file-sharing/src/network.rs b/examples/file-sharing/src/network.rs new file mode 100644 index 00000000000..a7dce68991f --- /dev/null +++ b/examples/file-sharing/src/network.rs @@ -0,0 +1,541 @@ +use async_std::io; +use async_trait::async_trait; +use either::Either; +use futures::channel::{mpsc, oneshot}; +use futures::prelude::*; + +use libp2p::{ + core::{ + upgrade::{read_length_prefixed, write_length_prefixed, ProtocolName}, + Multiaddr, PeerId, + }, + identity, + kad::{ + record::store::MemoryStore, GetProvidersOk, Kademlia, KademliaEvent, QueryId, QueryResult, + }, + multiaddr::Protocol, + request_response::{self, ProtocolSupport, RequestId, ResponseChannel}, + swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent}, +}; + +use std::collections::{hash_map, HashMap, HashSet}; +use std::error::Error; +use std::iter; + +/// Creates the network components, namely: +/// +/// - The network client to interact with the network layer from anywhere +/// within your application. +/// +/// - The network event stream, e.g. for incoming requests. +/// +/// - The network task driving the network itself. +pub async fn new( + secret_key_seed: Option, +) -> Result<(Client, impl Stream, EventLoop), Box> { + // Create a public/private key pair, either random or based on a seed. + let id_keys = match secret_key_seed { + Some(seed) => { + let mut bytes = [0u8; 32]; + bytes[0] = seed; + let secret_key = identity::ed25519::SecretKey::from_bytes(&mut bytes).expect( + "this returns `Err` only if the length is wrong; the length is correct; qed", + ); + identity::Keypair::Ed25519(secret_key.into()) + } + None => identity::Keypair::generate_ed25519(), + }; + let peer_id = id_keys.public().to_peer_id(); + + // Build the Swarm, connecting the lower layer transport logic with the + // higher layer network behaviour logic. + let swarm = Swarm::with_threadpool_executor( + libp2p::development_transport(id_keys).await?, + ComposedBehaviour { + kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)), + request_response: request_response::Behaviour::new( + FileExchangeCodec(), + iter::once((FileExchangeProtocol(), ProtocolSupport::Full)), + Default::default(), + ), + }, + peer_id, + ); + + let (command_sender, command_receiver) = mpsc::channel(0); + let (event_sender, event_receiver) = mpsc::channel(0); + + Ok(( + Client { + sender: command_sender, + }, + event_receiver, + EventLoop::new(swarm, command_receiver, event_sender), + )) +} + +#[derive(Clone)] +pub struct Client { + sender: mpsc::Sender, +} + +impl Client { + /// Listen for incoming connections on the given address. + pub async fn start_listening(&mut self, addr: Multiaddr) -> Result<(), Box> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::StartListening { addr, sender }) + .await + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped.") + } + + /// Dial the given peer at the given address. + pub async fn dial( + &mut self, + peer_id: PeerId, + peer_addr: Multiaddr, + ) -> Result<(), Box> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::Dial { + peer_id, + peer_addr, + sender, + }) + .await + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped.") + } + + /// Advertise the local node as the provider of the given file on the DHT. + pub async fn start_providing(&mut self, file_name: String) { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::StartProviding { file_name, sender }) + .await + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped."); + } + + /// Find the providers for the given file on the DHT. + pub async fn get_providers(&mut self, file_name: String) -> HashSet { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::GetProviders { file_name, sender }) + .await + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not to be dropped.") + } + + /// Request the content of the given file from the given peer. + pub async fn request_file( + &mut self, + peer: PeerId, + file_name: String, + ) -> Result, Box> { + let (sender, receiver) = oneshot::channel(); + self.sender + .send(Command::RequestFile { + file_name, + peer, + sender, + }) + .await + .expect("Command receiver not to be dropped."); + receiver.await.expect("Sender not be dropped.") + } + + /// Respond with the provided file content to the given request. + pub async fn respond_file(&mut self, file: Vec, channel: ResponseChannel) { + self.sender + .send(Command::RespondFile { file, channel }) + .await + .expect("Command receiver not to be dropped."); + } +} + +pub struct EventLoop { + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + pending_dial: HashMap>>>, + pending_start_providing: HashMap>, + pending_get_providers: HashMap>>, + pending_request_file: + HashMap, Box>>>, +} + +impl EventLoop { + fn new( + swarm: Swarm, + command_receiver: mpsc::Receiver, + event_sender: mpsc::Sender, + ) -> Self { + Self { + swarm, + command_receiver, + event_sender, + pending_dial: Default::default(), + pending_start_providing: Default::default(), + pending_get_providers: Default::default(), + pending_request_file: Default::default(), + } + } + + pub async fn run(mut self) { + loop { + futures::select! { + event = self.swarm.next() => self.handle_event(event.expect("Swarm stream to be infinite.")).await , + command = self.command_receiver.next() => match command { + Some(c) => self.handle_command(c).await, + // Command channel closed, thus shutting down the network event loop. + None=> return, + }, + } + } + } + + async fn handle_event( + &mut self, + event: SwarmEvent, io::Error>>, + ) { + match event { + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + id, + result: QueryResult::StartProviding(_), + .. + }, + )) => { + let sender: oneshot::Sender<()> = self + .pending_start_providing + .remove(&id) + .expect("Completed query to be previously pending."); + let _ = sender.send(()); + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + id, + result: + QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { + providers, .. + })), + .. + }, + )) => { + if let Some(sender) = self.pending_get_providers.remove(&id) { + sender.send(providers).expect("Receiver not to be dropped"); + + // Finish the query. We are only interested in the first result. + self.swarm + .behaviour_mut() + .kademlia + .query_mut(&id) + .unwrap() + .finish(); + } + } + SwarmEvent::Behaviour(ComposedEvent::Kademlia( + KademliaEvent::OutboundQueryProgressed { + result: + QueryResult::GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { + .. + })), + .. + }, + )) => {} + SwarmEvent::Behaviour(ComposedEvent::Kademlia(_)) => {} + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + request_response::Event::Message { message, .. }, + )) => match message { + request_response::Message::Request { + request, channel, .. + } => { + self.event_sender + .send(Event::InboundRequest { + request: request.0, + channel, + }) + .await + .expect("Event receiver not to be dropped."); + } + request_response::Message::Response { + request_id, + response, + } => { + let _ = self + .pending_request_file + .remove(&request_id) + .expect("Request to still be pending.") + .send(Ok(response.0)); + } + }, + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + request_response::Event::OutboundFailure { + request_id, error, .. + }, + )) => { + let _ = self + .pending_request_file + .remove(&request_id) + .expect("Request to still be pending.") + .send(Err(Box::new(error))); + } + SwarmEvent::Behaviour(ComposedEvent::RequestResponse( + request_response::Event::ResponseSent { .. }, + )) => {} + SwarmEvent::NewListenAddr { address, .. } => { + let local_peer_id = *self.swarm.local_peer_id(); + eprintln!( + "Local node is listening on {:?}", + address.with(Protocol::P2p(local_peer_id.into())) + ); + } + SwarmEvent::IncomingConnection { .. } => {} + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + if endpoint.is_dialer() { + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Ok(())); + } + } + } + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::OutgoingConnectionError { peer_id, error, .. } => { + if let Some(peer_id) = peer_id { + if let Some(sender) = self.pending_dial.remove(&peer_id) { + let _ = sender.send(Err(Box::new(error))); + } + } + } + SwarmEvent::IncomingConnectionError { .. } => {} + SwarmEvent::Dialing(peer_id) => eprintln!("Dialing {peer_id}"), + e => panic!("{e:?}"), + } + } + + async fn handle_command(&mut self, command: Command) { + match command { + Command::StartListening { addr, sender } => { + let _ = match self.swarm.listen_on(addr) { + Ok(_) => sender.send(Ok(())), + Err(e) => sender.send(Err(Box::new(e))), + }; + } + Command::Dial { + peer_id, + peer_addr, + sender, + } => { + if let hash_map::Entry::Vacant(e) = self.pending_dial.entry(peer_id) { + self.swarm + .behaviour_mut() + .kademlia + .add_address(&peer_id, peer_addr.clone()); + match self + .swarm + .dial(peer_addr.with(Protocol::P2p(peer_id.into()))) + { + Ok(()) => { + e.insert(sender); + } + Err(e) => { + let _ = sender.send(Err(Box::new(e))); + } + } + } else { + todo!("Already dialing peer."); + } + } + Command::StartProviding { file_name, sender } => { + let query_id = self + .swarm + .behaviour_mut() + .kademlia + .start_providing(file_name.into_bytes().into()) + .expect("No store error."); + self.pending_start_providing.insert(query_id, sender); + } + Command::GetProviders { file_name, sender } => { + let query_id = self + .swarm + .behaviour_mut() + .kademlia + .get_providers(file_name.into_bytes().into()); + self.pending_get_providers.insert(query_id, sender); + } + Command::RequestFile { + file_name, + peer, + sender, + } => { + let request_id = self + .swarm + .behaviour_mut() + .request_response + .send_request(&peer, FileRequest(file_name)); + self.pending_request_file.insert(request_id, sender); + } + Command::RespondFile { file, channel } => { + self.swarm + .behaviour_mut() + .request_response + .send_response(channel, FileResponse(file)) + .expect("Connection to peer to be still open."); + } + } + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "ComposedEvent")] +struct ComposedBehaviour { + request_response: request_response::Behaviour, + kademlia: Kademlia, +} + +#[derive(Debug)] +enum ComposedEvent { + RequestResponse(request_response::Event), + Kademlia(KademliaEvent), +} + +impl From> for ComposedEvent { + fn from(event: request_response::Event) -> Self { + ComposedEvent::RequestResponse(event) + } +} + +impl From for ComposedEvent { + fn from(event: KademliaEvent) -> Self { + ComposedEvent::Kademlia(event) + } +} + +#[derive(Debug)] +enum Command { + StartListening { + addr: Multiaddr, + sender: oneshot::Sender>>, + }, + Dial { + peer_id: PeerId, + peer_addr: Multiaddr, + sender: oneshot::Sender>>, + }, + StartProviding { + file_name: String, + sender: oneshot::Sender<()>, + }, + GetProviders { + file_name: String, + sender: oneshot::Sender>, + }, + RequestFile { + file_name: String, + peer: PeerId, + sender: oneshot::Sender, Box>>, + }, + RespondFile { + file: Vec, + channel: ResponseChannel, + }, +} + +#[derive(Debug)] +pub enum Event { + InboundRequest { + request: String, + channel: ResponseChannel, + }, +} + +// Simple file exchange protocol + +#[derive(Debug, Clone)] +struct FileExchangeProtocol(); +#[derive(Clone)] +struct FileExchangeCodec(); +#[derive(Debug, Clone, PartialEq, Eq)] +struct FileRequest(String); +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct FileResponse(Vec); + +impl ProtocolName for FileExchangeProtocol { + fn protocol_name(&self) -> &[u8] { + "/file-exchange/1".as_bytes() + } +} + +#[async_trait] +impl request_response::Codec for FileExchangeCodec { + type Protocol = FileExchangeProtocol; + type Request = FileRequest; + type Response = FileResponse; + + async fn read_request( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let vec = read_length_prefixed(io, 1_000_000).await?; + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok(FileRequest(String::from_utf8(vec).unwrap())) + } + + async fn read_response( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + let vec = read_length_prefixed(io, 500_000_000).await?; // update transfer maximum + + if vec.is_empty() { + return Err(io::ErrorKind::UnexpectedEof.into()); + } + + Ok(FileResponse(vec)) + } + + async fn write_request( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + FileRequest(data): FileRequest, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) + } + + async fn write_response( + &mut self, + _: &FileExchangeProtocol, + io: &mut T, + FileResponse(data): FileResponse, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + write_length_prefixed(io, data).await?; + io.close().await?; + + Ok(()) + } +} diff --git a/examples/identify/Cargo.toml b/examples/identify/Cargo.toml new file mode 100644 index 00000000000..1494d79b085 --- /dev/null +++ b/examples/identify/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "identify" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "dcutr", "identify", "macros", "mplex", "noise", "ping", "relay", "rendezvous", "tcp", "tokio", "yamux"] } diff --git a/protocols/identify/examples/identify.rs b/examples/identify/src/main.rs similarity index 87% rename from protocols/identify/examples/identify.rs rename to examples/identify/src/main.rs index 12fe084a989..cd136d585c8 100644 --- a/protocols/identify/examples/identify.rs +++ b/examples/identify/src/main.rs @@ -23,7 +23,7 @@ //! In the first terminal window, run: //! //! ```sh -//! cargo run --example identify +//! cargo run //! ``` //! It will print the [`PeerId`] and the listening addresses, e.g. `Listening on //! "/ip4/127.0.0.1/tcp/24915"` @@ -31,16 +31,18 @@ //! In the second terminal window, start a new instance of the example with: //! //! ```sh -//! cargo run --example identify -- /ip4/127.0.0.1/tcp/24915 +//! cargo run -- /ip4/127.0.0.1/tcp/24915 //! ``` //! The two nodes establish a connection, negotiate the identify protocol //! and will send each other identify info which is then printed to the console. use futures::prelude::*; -use libp2p_core::upgrade::Version; -use libp2p_core::{identity, Multiaddr, PeerId, Transport}; -use libp2p_identify as identify; -use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p::{ + core::{multiaddr::Multiaddr, upgrade::Version, PeerId}, + identify, identity, noise, + swarm::{Swarm, SwarmEvent}, + tcp, yamux, Transport, +}; use std::error::Error; #[async_std::main] @@ -49,10 +51,10 @@ async fn main() -> Result<(), Box> { let local_peer_id = PeerId::from(local_key.public()); println!("Local peer id: {local_peer_id:?}"); - let transport = libp2p_tcp::async_io::Transport::default() + let transport = tcp::async_io::Transport::default() .upgrade(Version::V1) - .authenticate(libp2p_noise::NoiseAuthenticated::xx(&local_key).unwrap()) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .authenticate(noise::NoiseAuthenticated::xx(&local_key).unwrap()) + .multiplex(yamux::YamuxConfig::default()) .boxed(); // Create a identify network behaviour. diff --git a/examples/ipfs-kad/Cargo.toml b/examples/ipfs-kad/Cargo.toml new file mode 100644 index 00000000000..20c8ad30fd1 --- /dev/null +++ b/examples/ipfs-kad/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "ipfs-kad" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +env_logger = "0.10" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "kad", "mplex", "noise", "tcp", "websocket", "yamux"] } diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad/src/main.rs similarity index 100% rename from examples/ipfs-kad.rs rename to examples/ipfs-kad/src/main.rs diff --git a/examples/ipfs-private/Cargo.toml b/examples/ipfs-private/Cargo.toml new file mode 100644 index 00000000000..f03790db3d4 --- /dev/null +++ b/examples/ipfs-private/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "ipfs-private" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +either = "1.8" +env_logger = "0.10" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "gossipsub", "dns", "identify", "kad", "macros", "mplex", "noise", "ping", "pnet", "tcp", "websocket", "yamux"] } +multiaddr = { version = "0.17.0" } diff --git a/examples/ipfs-private.rs b/examples/ipfs-private/src/main.rs similarity index 100% rename from examples/ipfs-private.rs rename to examples/ipfs-private/src/main.rs diff --git a/examples/mdns-passive-discovery.rs b/examples/mdns-passive-discovery.rs deleted file mode 100644 index 587d268d0e2..00000000000 --- a/examples/mdns-passive-discovery.rs +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2018 Parity Technologies (UK) Ltd. -// -// Permission is hereby granted, free of charge, to any person obtaining a -// copy of this software and associated documentation files (the "Software"), -// to deal in the Software without restriction, including without limitation -// the rights to use, copy, modify, merge, publish, distribute, sublicense, -// and/or sell copies of the Software, and to permit persons to whom the -// Software is furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS -// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING -// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER -// DEALINGS IN THE SOFTWARE. - -use futures::StreamExt; -use libp2p::{ - identity, mdns, - swarm::{Swarm, SwarmEvent}, - PeerId, -}; -use std::error::Error; - -#[async_std::main] -async fn main() -> Result<(), Box> { - env_logger::init(); - - // Create a random PeerId. - let id_keys = identity::Keypair::generate_ed25519(); - let peer_id = PeerId::from(id_keys.public()); - println!("Local peer id: {peer_id:?}"); - - // Create a transport. - let transport = libp2p::development_transport(id_keys).await?; - - // Create an MDNS network behaviour. - let behaviour = mdns::async_io::Behaviour::new(mdns::Config::default(), peer_id)?; - - // Create a Swarm that establishes connections through the given transport. - // Note that the MDNS behaviour itself will not actually inititiate any connections, - // as it only uses UDP. - let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id); - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; - - loop { - match swarm.select_next_some().await { - SwarmEvent::Behaviour(mdns::Event::Discovered(peers)) => { - for (peer, addr) in peers { - println!("discovered {peer} {addr}"); - } - } - SwarmEvent::Behaviour(mdns::Event::Expired(expired)) => { - for (peer, addr) in expired { - println!("expired {peer} {addr}"); - } - } - _ => {} - } - } -} diff --git a/examples/ping-example/Cargo.toml b/examples/ping-example/Cargo.toml new file mode 100644 index 00000000000..8776d5eab71 --- /dev/null +++ b/examples/ping-example/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "ping-example" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "dns", "macros", "mplex", "noise", "ping", "tcp", "websocket", "yamux"] } +multiaddr = { version = "0.17.0" } diff --git a/examples/ping.rs b/examples/ping-example/src/main.rs similarity index 89% rename from examples/ping.rs rename to examples/ping-example/src/main.rs index 08761a06eb3..29a1648963b 100644 --- a/examples/ping.rs +++ b/examples/ping-example/src/main.rs @@ -25,7 +25,7 @@ //! In the first terminal window, run: //! //! ```sh -//! cargo run --example ping --features=full +//! cargo run //! ``` //! //! It will print the PeerId and the listening addresses, e.g. `Listening on @@ -34,15 +34,18 @@ //! In the second terminal window, start a new instance of the example with: //! //! ```sh -//! cargo run --example ping --features=full -- /ip4/127.0.0.1/tcp/24915 +//! cargo run -- /ip4/127.0.0.1/tcp/24915 //! ``` //! //! The two nodes establish a connection, negotiate the ping protocol //! and begin pinging each other. use futures::prelude::*; -use libp2p::swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use libp2p::{identity, ping, Multiaddr, PeerId}; +use libp2p::{ + identity, ping, + swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}, + Multiaddr, PeerId, +}; use std::error::Error; #[async_std::main] @@ -78,7 +81,7 @@ async fn main() -> Result<(), Box> { /// Our network behaviour. /// -/// For illustrative purposes, this includes the [`KeepAlive`](behaviour::KeepAlive) behaviour so a continuous sequence of +/// For illustrative purposes, this includes the [`KeepAlive`](keep_alive::Behaviour) behaviour so a continuous sequence of /// pings can be observed. #[derive(NetworkBehaviour, Default)] struct Behaviour { diff --git a/examples/rendezvous/Cargo.toml b/examples/rendezvous/Cargo.toml new file mode 100644 index 00000000000..507e560b78b --- /dev/null +++ b/examples/rendezvous/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "rendezvous-example" +version = "0.1.0" +edition = "2021" +publish = false + +[dependencies] +async-std = { version = "1.12", features = ["attributes"] } +async-trait = "0.1" +env_logger = "0.10.0" +futures = "0.3.26" +libp2p = { path = "../../", features = ["async-std", "identify", "macros", "mplex", "noise", "ping", "rendezvous", "tcp", "tokio", "yamux"] } +log = "0.4" +tokio = { version = "1.25", features = [ "rt-multi-thread", "macros", "time" ] } diff --git a/protocols/rendezvous/examples/discover.rs b/examples/rendezvous/src/bin/rzv-discover.rs similarity index 76% rename from protocols/rendezvous/examples/discover.rs rename to examples/rendezvous/src/bin/rzv-discover.rs index d12dd5cee0c..a12cc3070c7 100644 --- a/protocols/rendezvous/examples/discover.rs +++ b/examples/rendezvous/src/bin/rzv-discover.rs @@ -19,12 +19,15 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p_core::{identity, multiaddr::Protocol, upgrade::Version, Multiaddr, PeerId, Transport}; -use libp2p_ping as ping; -use libp2p_rendezvous as rendezvous; -use libp2p_swarm::{keep_alive, Swarm, SwarmEvent}; +use libp2p::{ + core::transport::upgrade::Version, + identity, + multiaddr::Protocol, + noise, ping, rendezvous, + swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Transport, +}; use std::time::Duration; -use void::Void; const NAMESPACE: &str = "rendezvous"; @@ -32,24 +35,24 @@ const NAMESPACE: &str = "rendezvous"; async fn main() { env_logger::init(); - let identity = identity::Keypair::generate_ed25519(); + let key_pair = identity::Keypair::generate_ed25519(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" .parse() .unwrap(); let mut swarm = Swarm::with_tokio_executor( - libp2p_tcp::tokio::Transport::default() + tcp::tokio::Transport::default() .upgrade(Version::V1) - .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) .boxed(), MyBehaviour { - rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), keep_alive: keep_alive::Behaviour, }, - PeerId::from(identity.public()), + PeerId::from(key_pair.public()), ); log::info!("Local peer id: {}", swarm.local_peer_id()); @@ -75,7 +78,7 @@ async fn main() { rendezvous_point, ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Discovered { + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous(rendezvous::client::Event::Discovered { registrations, cookie: new_cookie, .. @@ -99,7 +102,7 @@ async fn main() { } } } - SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { + SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event { peer, result: Ok(ping::Success::Ping { rtt }), })) if peer != rendezvous_point => { @@ -120,36 +123,7 @@ async fn main() { } } -#[derive(Debug)] -enum MyEvent { - Rendezvous(rendezvous::client::Event), - Ping(ping::Event), -} - -impl From for MyEvent { - fn from(event: rendezvous::client::Event) -> Self { - MyEvent::Rendezvous(event) - } -} - -impl From for MyEvent { - fn from(event: ping::Event) -> Self { - MyEvent::Ping(event) - } -} - -impl From for MyEvent { - fn from(event: Void) -> Self { - void::unreachable(event) - } -} - -#[derive(libp2p_swarm::NetworkBehaviour)] -#[behaviour( - out_event = "MyEvent", - event_process = false, - prelude = "libp2p_swarm::derive_prelude" -)] +#[derive(NetworkBehaviour)] struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, diff --git a/protocols/rendezvous/examples/register_with_identify.rs b/examples/rendezvous/src/bin/rzv-identify.rs similarity index 66% rename from protocols/rendezvous/examples/register_with_identify.rs rename to examples/rendezvous/src/bin/rzv-identify.rs index 8e0fac23a85..5e58bde9fcb 100644 --- a/protocols/rendezvous/examples/register_with_identify.rs +++ b/examples/rendezvous/src/bin/rzv-identify.rs @@ -19,48 +19,47 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; -use libp2p_identify as identify; -use libp2p_ping as ping; -use libp2p_rendezvous as rendezvous; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::{ + core::transport::upgrade::Version, + identify, identity, noise, ping, rendezvous, + swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Transport, +}; use std::time::Duration; -use void::Void; #[tokio::main] async fn main() { env_logger::init(); + let key_pair = identity::Keypair::generate_ed25519(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" .parse() .unwrap(); - let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::with_tokio_executor( - libp2p_tcp::tokio::Transport::default() + tcp::tokio::Transport::default() .upgrade(Version::V1) - .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) .boxed(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), - identity.public(), + key_pair.public(), )), - rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), keep_alive: keep_alive::Behaviour, }, - PeerId::from(identity.public()), + PeerId::from(key_pair.public()), ); log::info!("Local peer id: {}", swarm.local_peer_id()); let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); - swarm.dial(rendezvous_point_address).unwrap(); + swarm.dial(rendezvous_point_address.clone()).unwrap(); while let Some(event) = swarm.next().await { match event { @@ -75,18 +74,22 @@ async fn main() { log::error!("Lost connection to rendezvous point {}", error); } // once `/identify` did its job, we know our external address and can register - SwarmEvent::Behaviour(MyEvent::Identify(identify::Event::Received { .. })) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Identify(identify::Event::Received { + .. + })) => { swarm.behaviour_mut().rendezvous.register( rendezvous::Namespace::from_static("rendezvous"), rendezvous_point, None, ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { - namespace, - ttl, - rendezvous_node, - })) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + }, + )) => { log::info!( "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", namespace, @@ -94,13 +97,13 @@ async fn main() { ttl ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous( + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( rendezvous::client::Event::RegisterFailed(error), )) => { log::error!("Failed to register {}", error); return; } - SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { + SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event { peer, result: Ok(ping::Success::Ping { rtt }), })) if peer != rendezvous_point => { @@ -113,44 +116,7 @@ async fn main() { } } -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum MyEvent { - Rendezvous(rendezvous::client::Event), - Identify(identify::Event), - Ping(ping::Event), -} - -impl From for MyEvent { - fn from(event: rendezvous::client::Event) -> Self { - MyEvent::Rendezvous(event) - } -} - -impl From for MyEvent { - fn from(event: identify::Event) -> Self { - MyEvent::Identify(event) - } -} - -impl From for MyEvent { - fn from(event: ping::Event) -> Self { - MyEvent::Ping(event) - } -} - -impl From for MyEvent { - fn from(event: Void) -> Self { - void::unreachable(event) - } -} - #[derive(NetworkBehaviour)] -#[behaviour( - out_event = "MyEvent", - event_process = false, - prelude = "libp2p_swarm::derive_prelude" -)] struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::client::Behaviour, diff --git a/protocols/rendezvous/examples/register.rs b/examples/rendezvous/src/bin/rzv-register.rs similarity index 72% rename from protocols/rendezvous/examples/register.rs rename to examples/rendezvous/src/bin/rzv-register.rs index 98060c5cd96..3923d9c763e 100644 --- a/protocols/rendezvous/examples/register.rs +++ b/examples/rendezvous/src/bin/rzv-register.rs @@ -19,35 +19,36 @@ // DEALINGS IN THE SOFTWARE. use futures::StreamExt; -use libp2p_core::{identity, upgrade::Version, Multiaddr, PeerId, Transport}; -use libp2p_ping as ping; -use libp2p_rendezvous as rendezvous; -use libp2p_swarm::AddressScore; -use libp2p_swarm::{NetworkBehaviour, Swarm, SwarmEvent}; +use libp2p::{ + core::transport::upgrade::Version, + identity, noise, ping, rendezvous, + swarm::{keep_alive, AddressScore, NetworkBehaviour, Swarm, SwarmEvent}, + tcp, yamux, Multiaddr, PeerId, Transport, +}; use std::time::Duration; #[tokio::main] async fn main() { env_logger::init(); + let key_pair = identity::Keypair::generate_ed25519(); let rendezvous_point_address = "/ip4/127.0.0.1/tcp/62649".parse::().unwrap(); let rendezvous_point = "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN" .parse() .unwrap(); - let identity = identity::Keypair::generate_ed25519(); - let mut swarm = Swarm::with_tokio_executor( - libp2p_tcp::tokio::Transport::default() + tcp::tokio::Transport::default() .upgrade(Version::V1) - .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) .boxed(), MyBehaviour { - rendezvous: rendezvous::client::Behaviour::new(identity.clone()), + rendezvous: rendezvous::client::Behaviour::new(key_pair.clone()), ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), + keep_alive: keep_alive::Behaviour, }, - PeerId::from(identity.public()), + PeerId::from(key_pair.public()), ); // In production the external address should be the publicly facing IP address of the rendezvous point. @@ -57,9 +58,7 @@ async fn main() { log::info!("Local peer id: {}", swarm.local_peer_id()); - let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); - - swarm.dial(rendezvous_point_address).unwrap(); + swarm.dial(rendezvous_point_address.clone()).unwrap(); while let Some(event) = swarm.next().await { match event { @@ -82,11 +81,13 @@ async fn main() { log::info!("Connection established with rendezvous point {}", peer_id); } // once `/identify` did its job, we know our external address and can register - SwarmEvent::Behaviour(MyEvent::Rendezvous(rendezvous::client::Event::Registered { - namespace, - ttl, - rendezvous_node, - })) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( + rendezvous::client::Event::Registered { + namespace, + ttl, + rendezvous_node, + }, + )) => { log::info!( "Registered for namespace '{}' at rendezvous point {} for the next {} seconds", namespace, @@ -94,13 +95,13 @@ async fn main() { ttl ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous( + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( rendezvous::client::Event::RegisterFailed(error), )) => { log::error!("Failed to register {}", error); return; } - SwarmEvent::Behaviour(MyEvent::Ping(ping::Event { + SwarmEvent::Behaviour(MyBehaviourEvent::Ping(ping::Event { peer, result: Ok(ping::Success::Ping { rtt }), })) if peer != rendezvous_point => { @@ -113,31 +114,9 @@ async fn main() { } } -#[derive(Debug)] -enum MyEvent { - Rendezvous(rendezvous::client::Event), - Ping(ping::Event), -} - -impl From for MyEvent { - fn from(event: rendezvous::client::Event) -> Self { - MyEvent::Rendezvous(event) - } -} - -impl From for MyEvent { - fn from(event: ping::Event) -> Self { - MyEvent::Ping(event) - } -} - #[derive(NetworkBehaviour)] -#[behaviour( - out_event = "MyEvent", - event_process = false, - prelude = "libp2p_swarm::derive_prelude" -)] struct MyBehaviour { rendezvous: rendezvous::client::Behaviour, ping: ping::Behaviour, + keep_alive: keep_alive::Behaviour, } diff --git a/protocols/rendezvous/examples/rendezvous_point.rs b/examples/rendezvous/src/main.rs similarity index 62% rename from protocols/rendezvous/examples/rendezvous_point.rs rename to examples/rendezvous/src/main.rs index 49680bfd940..d3f6ea7746e 100644 --- a/protocols/rendezvous/examples/rendezvous_point.rs +++ b/examples/rendezvous/src/main.rs @@ -18,53 +18,60 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use futures::StreamExt; -use libp2p_core::{identity, upgrade::Version, PeerId, Transport}; -use libp2p_identify as identify; -use libp2p_ping as ping; -use libp2p_rendezvous as rendezvous; -use libp2p_swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}; -use void::Void; - /// Examples for the rendezvous protocol: /// /// 1. Run the rendezvous server: -/// RUST_LOG=info cargo run --example rendezvous_point +/// ``` +/// RUST_LOG=info cargo run --bin rendezvous-example +/// ``` /// 2. Register a peer: -/// RUST_LOG=info cargo run --example register_with_identify +/// ``` +/// RUST_LOG=info cargo run --bin rzv-register +/// ``` /// 3. Try to discover the peer from (2): -/// RUST_LOG=info cargo run --example discover +/// ``` +/// RUST_LOG=info cargo run --bin rzv-discover +/// ``` +/// 4. Try to discover with identify: +/// ``` +/// RUST_LOG=info cargo run --bin rzv-identify +/// ``` +use futures::StreamExt; +use libp2p::{ + core::transport::upgrade::Version, + identify, identity, noise, ping, rendezvous, + swarm::{keep_alive, NetworkBehaviour, Swarm, SwarmEvent}, + tcp, yamux, PeerId, Transport, +}; +use std::time::Duration; + #[tokio::main] async fn main() { env_logger::init(); - let bytes = [0u8; 32]; - let key = identity::ed25519::SecretKey::from_bytes(bytes).expect("we always pass 32 bytes"); - let identity = identity::Keypair::Ed25519(key.into()); + let key_pair = identity::Keypair::generate_ed25519(); let mut swarm = Swarm::with_tokio_executor( - libp2p_tcp::tokio::Transport::default() + tcp::tokio::Transport::default() .upgrade(Version::V1) - .authenticate(libp2p_noise::NoiseAuthenticated::xx(&identity).unwrap()) - .multiplex(libp2p_yamux::YamuxConfig::default()) + .authenticate(noise::NoiseAuthenticated::xx(&key_pair).unwrap()) + .multiplex(yamux::YamuxConfig::default()) .boxed(), MyBehaviour { identify: identify::Behaviour::new(identify::Config::new( "rendezvous-example/1.0.0".to_string(), - identity.public(), + key_pair.public(), )), rendezvous: rendezvous::server::Behaviour::new(rendezvous::server::Config::default()), - ping: ping::Behaviour::new(ping::Config::new()), + ping: ping::Behaviour::new(ping::Config::new().with_interval(Duration::from_secs(1))), keep_alive: keep_alive::Behaviour, }, - PeerId::from(identity.public()), + PeerId::from(key_pair.public()), ); log::info!("Local peer id: {}", swarm.local_peer_id()); - swarm - .listen_on("/ip4/0.0.0.0/tcp/62649".parse().unwrap()) - .unwrap(); + let _ = swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()); while let Some(event) = swarm.next().await { match event { @@ -74,7 +81,7 @@ async fn main() { SwarmEvent::ConnectionClosed { peer_id, .. } => { log::info!("Disconnected from {}", peer_id); } - SwarmEvent::Behaviour(MyEvent::Rendezvous( + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( rendezvous::server::Event::PeerRegistered { peer, registration }, )) => { log::info!( @@ -83,7 +90,7 @@ async fn main() { registration.namespace ); } - SwarmEvent::Behaviour(MyEvent::Rendezvous( + SwarmEvent::Behaviour(MyBehaviourEvent::Rendezvous( rendezvous::server::Event::DiscoverServed { enquirer, registrations, @@ -102,43 +109,7 @@ async fn main() { } } -#[derive(Debug)] -enum MyEvent { - Rendezvous(rendezvous::server::Event), - Ping(ping::Event), - Identify(identify::Event), -} - -impl From for MyEvent { - fn from(event: rendezvous::server::Event) -> Self { - MyEvent::Rendezvous(event) - } -} - -impl From for MyEvent { - fn from(event: ping::Event) -> Self { - MyEvent::Ping(event) - } -} - -impl From for MyEvent { - fn from(event: identify::Event) -> Self { - MyEvent::Identify(event) - } -} - -impl From for MyEvent { - fn from(event: Void) -> Self { - void::unreachable(event) - } -} - #[derive(NetworkBehaviour)] -#[behaviour( - out_event = "MyEvent", - event_process = false, - prelude = "libp2p_swarm::derive_prelude" -)] struct MyBehaviour { identify: identify::Behaviour, rendezvous: rendezvous::server::Behaviour,