diff --git a/CHANGELOG.md b/CHANGELOG.md index d7a0b3a38ff..1cf9f7c9559 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ - [`libp2p-deflate` CHANGELOG](transports/deflate/CHANGELOG.md) - [`libp2p-dns` CHANGELOG](transports/dns/CHANGELOG.md) - [`libp2p-noise` CHANGELOG](transports/noise/CHANGELOG.md) +- [`libp2p-perf` CHANGELOG](transports/perf/CHANGELOG.md) - [`libp2p-plaintext` CHANGELOG](transports/plaintext/CHANGELOG.md) - [`libp2p-pnet` CHANGELOG](transports/pnet/CHANGELOG.md) - [`libp2p-quic` CHANGELOG](transports/quic/CHANGELOG.md) @@ -45,4 +46,4 @@ - [`libp2p-metrics` CHANGELOG](misc/metrics/CHANGELOG.md) - [`multistream-select` CHANGELOG](misc/multistream-select/CHANGELOG.md) - [`rw-stream-sink` CHANGELOG](misc/rw-stream-sink/CHANGELOG.md) -- [`quick-protobuf-codec` CHANGELOG](misc/quick-protobuf-codec/CHANGELOG.md) \ No newline at end of file +- [`quick-protobuf-codec` CHANGELOG](misc/quick-protobuf-codec/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 11103424c4c..3c63611c013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2163,6 +2163,7 @@ dependencies = [ "libp2p-metrics", "libp2p-mplex", "libp2p-noise", + "libp2p-perf", "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", @@ -2551,6 +2552,32 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-perf" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-std", + "clap 4.1.8", + "env_logger 0.10.0", + "futures", + "instant", + "libp2p-core", + "libp2p-dns", + "libp2p-identity", + "libp2p-noise", + "libp2p-plaintext", + "libp2p-quic", + "libp2p-swarm", + "libp2p-swarm-test", + "libp2p-tcp", + "libp2p-yamux", + "log", + "rand 0.8.5", + "thiserror", + "void", +] + [[package]] name = "libp2p-ping" version = "0.42.0" diff --git a/Cargo.toml b/Cargo.toml index 5d8a55cf211..dbe2947fb46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "protocols/identify", "protocols/kad", "protocols/mdns", + "protocols/perf", "protocols/ping", "protocols/relay", "protocols/request-response", diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 83e97d1dbd2..ee4d4a2d5c1 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -28,6 +28,7 @@ full = [ "metrics", "mplex", "noise", + "perf", "ping", "plaintext", "pnet", @@ -66,6 +67,7 @@ mdns = ["dep:libp2p-mdns"] metrics = ["dep:libp2p-metrics"] mplex = ["dep:libp2p-mplex"] noise = ["dep:libp2p-noise"] +perf = ["dep:libp2p-perf"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] @@ -120,6 +122,7 @@ pin-project = "1.0.0" libp2p-deflate = { version = "0.39.0", path = "../transports/deflate", optional = true } libp2p-dns = { version = "0.39.0", path = "../transports/dns", optional = true } libp2p-mdns = { version = "0.43.0", path = "../protocols/mdns", optional = true } +libp2p-perf = { version = "0.1.0", path = "../protocols/perf", optional = true } libp2p-quic = { version = "0.7.0-alpha.3", path = "../transports/quic", optional = true } libp2p-tcp = { version = "0.39.0", path = "../transports/tcp", optional = true } libp2p-tls = { version = "0.1.0", path = "../transports/tls", optional = true } diff --git a/protocols/perf/CHANGELOG.md b/protocols/perf/CHANGELOG.md new file mode 100644 index 00000000000..d1e4ec5a44e --- /dev/null +++ b/protocols/perf/CHANGELOG.md @@ -0,0 +1,3 @@ +# 0.1.0 - unreleased + +- Initial release. diff --git a/protocols/perf/Cargo.toml b/protocols/perf/Cargo.toml new file mode 100644 index 00000000000..e56338354e9 --- /dev/null +++ b/protocols/perf/Cargo.toml @@ -0,0 +1,42 @@ +[package] +name = "libp2p-perf" +edition = "2021" +rust-version = "1.64.0" +description = "libp2p perf protocol implementation" +version = "0.1.0" +authors = ["Max Inden "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +keywords = ["peer-to-peer", "libp2p", "networking"] +categories = ["network-programming", "asynchronous"] + +[dependencies] +anyhow = "1" +async-std = { version = "1.9.0", features = ["attributes"] } +clap = { version = "4.1.6", features = ["derive"] } +env_logger = "0.10.0" +futures = "0.3.26" +instant = "0.1.11" +libp2p-core = { version = "0.39.0", path = "../../core" } +libp2p-dns = { version = "0.39.0", path = "../../transports/dns", features = ["async-std"] } +libp2p-identity = { version = "0.1.0", path = "../../identity" } +libp2p-noise = { version = "0.42.0", path = "../../transports/noise" } +libp2p-quic = { version = "0.7.0-alpha.2", path = "../../transports/quic", features = ["async-std"] } +libp2p-swarm = { version = "0.42.0", path = "../../swarm", features = ["macros", "async-std"] } +libp2p-swarm-test = { path = "../../swarm-test"} +libp2p-tcp = { version = "0.39.0", path = "../../transports/tcp", features = ["async-io"] } +libp2p-yamux = { version = "0.43.0", path = "../../muxers/yamux" } +log = "0.4" +thiserror = "1.0" +void = "1" + +[dev-dependencies] +rand = "0.8" +libp2p-plaintext = { path = "../../transports/plaintext" } + +# Passing arguments to the docsrs builder in order to properly document cfg's. +# More information: https://docs.rs/about/builds#cross-compiling +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] +rustc-args = ["--cfg", "docsrs"] diff --git a/protocols/perf/Dockerfile b/protocols/perf/Dockerfile new file mode 100644 index 00000000000..aef8eed1cad --- /dev/null +++ b/protocols/perf/Dockerfile @@ -0,0 +1,22 @@ +# syntax=docker/dockerfile:1.5-labs +FROM rust:1.67.0 as builder + +# Run with access to the target cache to speed up builds +WORKDIR /workspace +ADD . . +RUN --mount=type=cache,target=./target \ + --mount=type=cache,target=/usr/local/cargo/registry \ + cargo build --release --package libp2p-perf + +RUN --mount=type=cache,target=./target \ + mv ./target/release/perf-server /usr/local/bin/perf-server + +RUN --mount=type=cache,target=./target \ + mv ./target/release/perf-client /usr/local/bin/perf-client + +FROM debian:bullseye-slim + +COPY --from=builder /usr/local/bin/perf-server /usr/local/bin/perf-server +COPY --from=builder /usr/local/bin/perf-client /usr/local/bin/perf-client + +ENTRYPOINT [ "perf-server"] diff --git a/protocols/perf/src/bin/perf-client.rs b/protocols/perf/src/bin/perf-client.rs new file mode 100644 index 00000000000..c12ab5cbe74 --- /dev/null +++ b/protocols/perf/src/bin/perf-client.rs @@ -0,0 +1,140 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use anyhow::{bail, Result}; +use clap::Parser; +use futures::{future::Either, StreamExt}; +use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Multiaddr, Transport}; +use libp2p_dns::DnsConfig; +use libp2p_identity::PeerId; +use libp2p_perf::client::RunParams; +use libp2p_swarm::{SwarmBuilder, SwarmEvent}; +use log::info; + +#[derive(Debug, Parser)] +#[clap(name = "libp2p perf client")] +struct Opts { + #[arg(long)] + server_address: Multiaddr, +} + +#[async_std::main] +async fn main() -> Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let opts = Opts::parse(); + + info!("Initiating performance tests with {}", opts.server_address); + + // Create a random PeerId + let local_key = libp2p_identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + + let transport = { + let tcp = + libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true)) + .upgrade(upgrade::Version::V1Lazy) + .authenticate( + libp2p_noise::NoiseAuthenticated::xx(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."), + ) + .multiplex(libp2p_yamux::YamuxConfig::default()); + + let quic = { + let mut config = libp2p_quic::Config::new(&local_key); + config.support_draft_29 = true; + libp2p_quic::async_std::Transport::new(config) + }; + + let dns = DnsConfig::system(OrTransport::new(quic, tcp)) + .await + .unwrap(); + + dns.map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed() + }; + + let mut swarm = SwarmBuilder::with_async_std_executor( + transport, + libp2p_perf::client::Behaviour::default(), + local_peer_id, + ) + .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) + .build(); + + swarm.dial(opts.server_address.clone()).unwrap(); + let server_peer_id = loop { + match swarm.next().await.unwrap() { + SwarmEvent::ConnectionEstablished { peer_id, .. } => break peer_id, + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + bail!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + e => panic!("{e:?}"), + } + }; + + info!( + "Connection to {} established. Launching benchmarks.", + opts.server_address + ); + + swarm.behaviour_mut().perf( + server_peer_id, + RunParams { + to_send: 10 * 1024 * 1024, + to_receive: 10 * 1024 * 1024, + }, + )?; + + let stats = loop { + match swarm.next().await.unwrap() { + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + info!("Established connection to {:?} via {:?}", peer_id, endpoint); + } + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + info!("Outgoing connection error to {:?}: {:?}", peer_id, error); + } + SwarmEvent::Behaviour(libp2p_perf::client::Event { id: _, result }) => break result?, + e => panic!("{e:?}"), + } + }; + + let sent_mebibytes = stats.params.to_send as f64 / 1024.0 / 1024.0; + let sent_time = (stats.timers.write_done - stats.timers.write_start).as_secs_f64(); + let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time; + + let received_mebibytes = stats.params.to_receive as f64 / 1024.0 / 1024.0; + let receive_time = (stats.timers.read_done - stats.timers.write_done).as_secs_f64(); + let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time; + + info!( + "Finished run: Sent {sent_mebibytes:.2} MiB in {sent_time:.2} s with \ + {sent_bandwidth_mebibit_second:.2} MiBit/s and received \ + {received_mebibytes:.2} MiB in {receive_time:.2} s with \ + {receive_bandwidth_mebibit_second:.2} MiBit/s", + ); + + Ok(()) +} diff --git a/protocols/perf/src/bin/perf-server.rs b/protocols/perf/src/bin/perf-server.rs new file mode 100644 index 00000000000..b12972e3c38 --- /dev/null +++ b/protocols/perf/src/bin/perf-server.rs @@ -0,0 +1,128 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use clap::Parser; +use futures::{future::Either, StreamExt}; +use libp2p_core::{muxing::StreamMuxerBox, transport::OrTransport, upgrade, Transport}; +use libp2p_dns::DnsConfig; +use libp2p_identity::PeerId; +use libp2p_swarm::{SwarmBuilder, SwarmEvent}; +use log::{error, info}; + +#[derive(Debug, Parser)] +#[clap(name = "libp2p perf server")] +struct Opts {} + +#[async_std::main] +async fn main() { + env_logger::init(); + + let _opts = Opts::parse(); + + // Create a random PeerId + let local_key = libp2p_identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {local_peer_id}"); + + let transport = { + let tcp = + libp2p_tcp::async_io::Transport::new(libp2p_tcp::Config::default().port_reuse(true)) + .upgrade(upgrade::Version::V1Lazy) + .authenticate( + libp2p_noise::NoiseAuthenticated::xx(&local_key) + .expect("Signing libp2p-noise static DH keypair failed."), + ) + .multiplex(libp2p_yamux::YamuxConfig::default()); + + let quic = { + let mut config = libp2p_quic::Config::new(&local_key); + config.support_draft_29 = true; + libp2p_quic::async_std::Transport::new(config) + }; + + let dns = DnsConfig::system(OrTransport::new(quic, tcp)) + .await + .unwrap(); + + dns.map(|either_output, _| match either_output { + Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)), + }) + .boxed() + }; + + let mut swarm = SwarmBuilder::with_async_std_executor( + transport, + libp2p_perf::server::Behaviour::default(), + local_peer_id, + ) + .substream_upgrade_protocol_override(upgrade::Version::V1Lazy) + .build(); + + swarm + .listen_on("/ip4/0.0.0.0/tcp/4001".parse().unwrap()) + .unwrap(); + + swarm + .listen_on("/ip4/0.0.0.0/udp/4001/quic-v1".parse().unwrap()) + .unwrap(); + + loop { + match swarm.next().await.unwrap() { + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {address}"); + } + SwarmEvent::IncomingConnection { .. } => {} + e @ SwarmEvent::IncomingConnectionError { .. } => { + error!("{e:?}"); + } + SwarmEvent::ConnectionEstablished { + peer_id, endpoint, .. + } => { + info!("Established connection to {:?} via {:?}", peer_id, endpoint); + } + SwarmEvent::ConnectionClosed { .. } => {} + SwarmEvent::Behaviour(libp2p_perf::server::Event { + remote_peer_id, + stats, + }) => { + let received_mebibytes = stats.params.received as f64 / 1024.0 / 1024.0; + let receive_time = (stats.timers.read_done - stats.timers.read_start).as_secs_f64(); + let receive_bandwidth_mebibit_second = (received_mebibytes * 8.0) / receive_time; + + let sent_mebibytes = stats.params.sent as f64 / 1024.0 / 1024.0; + let sent_time = (stats.timers.write_done - stats.timers.read_done).as_secs_f64(); + let sent_bandwidth_mebibit_second = (sent_mebibytes * 8.0) / sent_time; + + info!( + "Finished run with {}: Received {:.2} MiB in {:.2} s with {:.2} MiBit/s and sent {:.2} MiB in {:.2} s with {:.2} MiBit/s", + remote_peer_id, + received_mebibytes, + receive_time, + receive_bandwidth_mebibit_second, + sent_mebibytes, + sent_time, + sent_bandwidth_mebibit_second, + ) + } + e => panic!("{e:?}"), + } + } +} diff --git a/protocols/perf/src/client.rs b/protocols/perf/src/client.rs new file mode 100644 index 00000000000..c320b18ea32 --- /dev/null +++ b/protocols/perf/src/client.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod behaviour; +mod handler; + +use instant::Instant; +use std::sync::atomic::{AtomicUsize, Ordering}; + +pub use behaviour::{Behaviour, Event}; + +/// Parameters for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug, Clone, Copy)] +pub struct RunParams { + pub to_send: usize, + pub to_receive: usize, +} + +/// Timers for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug, Clone, Copy)] +pub struct RunTimers { + pub write_start: Instant, + pub write_done: Instant, + pub read_done: Instant, +} + +/// Statistics for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug)] +pub struct RunStats { + pub params: RunParams, + pub timers: RunTimers, +} + +static NEXT_RUN_ID: AtomicUsize = AtomicUsize::new(1); + +/// Connection identifier. +#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub struct RunId(usize); + +impl RunId { + /// Returns the next available [`RunId`]. + pub(crate) fn next() -> Self { + Self(NEXT_RUN_ID.fetch_add(1, Ordering::SeqCst)) + } +} diff --git a/protocols/perf/src/client/behaviour.rs b/protocols/perf/src/client/behaviour.rs new file mode 100644 index 00000000000..f7d9d8453e0 --- /dev/null +++ b/protocols/perf/src/client/behaviour.rs @@ -0,0 +1,158 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] of the libp2p perf client protocol. + +use std::{ + collections::{HashSet, VecDeque}, + task::{Context, Poll}, +}; + +use libp2p_core::Multiaddr; +use libp2p_identity::PeerId; +use libp2p_swarm::{ + derive_prelude::ConnectionEstablished, ConnectionClosed, ConnectionHandlerUpgrErr, + ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, + PollParameters, THandlerInEvent, THandlerOutEvent, +}; +use void::Void; + +use crate::client::handler::Handler; + +use super::{RunId, RunParams, RunStats}; + +#[derive(Debug)] +pub struct Event { + pub id: RunId, + pub result: Result>, +} + +#[derive(Default)] +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_events: VecDeque>>, + /// Set of connected peers. + connected: HashSet, +} + +impl Behaviour { + pub fn new() -> Self { + Self::default() + } + + pub fn perf(&mut self, server: PeerId, params: RunParams) -> Result { + if !self.connected.contains(&server) { + return Err(PerfError::NotConnected); + } + + let id = RunId::next(); + + self.queued_events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: server, + handler: NotifyHandler::Any, + event: crate::client::handler::Command { id, params }, + }); + + Ok(id) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum PerfError { + #[error("Not connected to peer")] + NotConnected, +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type OutEvent = Event; + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, .. }) => { + self.connected.insert(peer_id); + } + FromSwarm::ConnectionClosed(ConnectionClosed { + peer_id, + connection_id: _, + endpoint: _, + handler: _, + remaining_established, + }) => { + if remaining_established == 0 { + assert!(self.connected.remove(&peer_id)); + } + } + FromSwarm::AddressChange(_) + | FromSwarm::DialFailure(_) + | FromSwarm::ListenFailure(_) + | FromSwarm::NewListener(_) + | FromSwarm::NewListenAddr(_) + | FromSwarm::ExpiredListenAddr(_) + | FromSwarm::ListenerError(_) + | FromSwarm::ListenerClosed(_) + | FromSwarm::NewExternalAddr(_) + | FromSwarm::ExpiredExternalAddr(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + _event_source: PeerId, + _connection_id: ConnectionId, + super::handler::Event { id, result }: THandlerOutEvent, + ) { + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent(Event { id, result })); + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/client/handler.rs b/protocols/perf/src/client/handler.rs new file mode 100644 index 00000000000..f75a43f0a4e --- /dev/null +++ b/protocols/perf/src/client/handler.rs @@ -0,0 +1,196 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt, TryFutureExt}; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, + }, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, +}; +use void::Void; + +use super::{RunId, RunParams, RunStats}; + +#[derive(Debug)] +pub struct Command { + pub(crate) id: RunId, + pub(crate) params: RunParams, +} + +#[derive(Debug)] +pub struct Event { + pub(crate) id: RunId, + pub(crate) result: Result>, +} + +pub struct Handler { + /// Queue of events to return when polled. + queued_events: VecDeque< + ConnectionHandlerEvent< + ::OutboundProtocol, + ::OutboundOpenInfo, + ::OutEvent, + ::Error, + >, + >, + + outbound: FuturesUnordered>>, + + keep_alive: KeepAlive, +} + +impl Handler { + pub fn new() -> Self { + Self { + queued_events: Default::default(), + outbound: Default::default(), + keep_alive: KeepAlive::Yes, + } + } +} + +impl Default for Handler { + fn default() -> Self { + Self::new() + } +} + +impl ConnectionHandler for Handler { + type InEvent = Command; + type OutEvent = Event; + type Error = Void; + type InboundProtocol = DeniedUpgrade; + type OutboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundOpenInfo = Command; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(DeniedUpgrade, ()) + } + + fn on_behaviour_event(&mut self, command: Self::InEvent) { + self.queued_events + .push_back(ConnectionHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), command), + }) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, .. + }) => void::unreachable(protocol), + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { + protocol, + info: Command { params, id }, + }) => self.outbound.push( + crate::protocol::send_receive(params, protocol) + .map_ok(move |timers| Event { + id, + result: Ok(RunStats { params, timers }), + }) + .boxed(), + ), + + ConnectionEvent::AddressChange(_) => {} + ConnectionEvent::DialUpgradeError(DialUpgradeError { + info: Command { id, .. }, + error, + }) => self + .queued_events + .push_back(ConnectionHandlerEvent::Custom(Event { + id, + result: Err(error), + })), + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { + match error { + ConnectionHandlerUpgrErr::Timeout => {} + ConnectionHandlerUpgrErr::Timer => {} + ConnectionHandlerUpgrErr::Upgrade(error) => match error { + libp2p_core::UpgradeError::Select(_) => {} + libp2p_core::UpgradeError::Apply(v) => void::unreachable(v), + }, + } + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + // Return queued events. + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + while let Poll::Ready(Some(result)) = self.outbound.poll_next_unpin(cx) { + match result { + Ok(event) => return Poll::Ready(ConnectionHandlerEvent::Custom(event)), + Err(e) => { + panic!("{e:?}") + } + } + } + + if self.outbound.is_empty() { + match self.keep_alive { + KeepAlive::Yes => { + self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); + } + KeepAlive::Until(_) => {} + KeepAlive::No => panic!("Handler never sets KeepAlive::No."), + } + } else { + self.keep_alive = KeepAlive::Yes + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/lib.rs b/protocols/perf/src/lib.rs new file mode 100644 index 00000000000..19bb956a1d2 --- /dev/null +++ b/protocols/perf/src/lib.rs @@ -0,0 +1,31 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! Implementation of the [libp2p perf protocol](https://github.com/libp2p/specs/pull/478/). +//! +//! Do not use in untrusted environments. + +#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] + +pub mod client; +mod protocol; +pub mod server; + +pub const PROTOCOL_NAME: &[u8; 11] = b"/perf/1.0.0"; diff --git a/protocols/perf/src/protocol.rs b/protocols/perf/src/protocol.rs new file mode 100644 index 00000000000..808ea45752b --- /dev/null +++ b/protocols/perf/src/protocol.rs @@ -0,0 +1,206 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use instant::Instant; + +use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; + +use crate::{client, server}; + +const BUF: [u8; 1024] = [0; 1024]; + +pub async fn send_receive( + params: client::RunParams, + mut stream: S, +) -> Result { + let client::RunParams { + to_send, + to_receive, + } = params; + + let mut receive_buf = vec![0; 1024]; + + stream.write_all(&(to_receive as u64).to_be_bytes()).await?; + + let write_start = Instant::now(); + + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += stream.write(buf).await?; + } + + stream.close().await?; + + let write_done = Instant::now(); + + let mut received = 0; + while received < to_receive { + received += stream.read(&mut receive_buf).await?; + } + + let read_done = Instant::now(); + + Ok(client::RunTimers { + write_start, + write_done, + read_done, + }) +} + +pub async fn receive_send( + mut stream: S, +) -> Result { + let to_send = { + let mut buf = [0; 8]; + stream.read_exact(&mut buf).await?; + + u64::from_be_bytes(buf) as usize + }; + + let read_start = Instant::now(); + + let mut receive_buf = vec![0; 1024]; + let mut received = 0; + loop { + let n = stream.read(&mut receive_buf).await?; + received += n; + if n == 0 { + break; + } + } + + let read_done = Instant::now(); + + let mut sent = 0; + while sent < to_send { + let n = std::cmp::min(to_send - sent, BUF.len()); + let buf = &BUF[..n]; + + sent += stream.write(buf).await?; + } + + stream.close().await?; + let write_done = Instant::now(); + + Ok(server::RunStats { + params: server::RunParams { sent, received }, + timers: server::RunTimers { + read_start, + read_done, + write_done, + }, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{executor::block_on, AsyncRead, AsyncWrite}; + use std::{ + pin::Pin, + sync::{Arc, Mutex}, + task::Poll, + }; + + #[derive(Clone)] + struct DummyStream { + inner: Arc>, + } + + struct DummyStreamInner { + read: Vec, + write: Vec, + } + + impl DummyStream { + fn new(read: Vec) -> Self { + Self { + inner: Arc::new(Mutex::new(DummyStreamInner { + read, + write: Vec::new(), + })), + } + } + } + + impl Unpin for DummyStream {} + + impl AsyncWrite for DummyStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_flush(cx) + } + + fn poll_close( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + Pin::new(&mut self.inner.lock().unwrap().write).poll_close(cx) + } + } + + impl AsyncRead for DummyStream { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let amt = std::cmp::min(buf.len(), self.inner.lock().unwrap().read.len()); + let new = self.inner.lock().unwrap().read.split_off(amt); + + buf[..amt].copy_from_slice(self.inner.lock().unwrap().read.as_slice()); + + self.inner.lock().unwrap().read = new; + Poll::Ready(Ok(amt)) + } + } + + #[test] + fn test_client() { + let stream = DummyStream::new(vec![0]); + + block_on(send_receive( + client::RunParams { + to_send: 0, + to_receive: 0, + }, + stream.clone(), + )) + .unwrap(); + + assert_eq!( + stream.inner.lock().unwrap().write, + 0u64.to_be_bytes().to_vec() + ); + } +} diff --git a/protocols/perf/src/server.rs b/protocols/perf/src/server.rs new file mode 100644 index 00000000000..fd0643a0079 --- /dev/null +++ b/protocols/perf/src/server.rs @@ -0,0 +1,48 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +mod behaviour; +mod handler; + +use instant::Instant; + +pub use behaviour::{Behaviour, Event}; + +/// Parameters for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug, Clone, Copy)] +pub struct RunParams { + pub sent: usize, + pub received: usize, +} + +/// Timers for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug, Clone, Copy)] +pub struct RunTimers { + pub read_start: Instant, + pub read_done: Instant, + pub write_done: Instant, +} + +/// Statistics for a single run, i.e. one stream, sending and receiving data. +#[derive(Debug)] +pub struct RunStats { + pub params: RunParams, + pub timers: RunTimers, +} diff --git a/protocols/perf/src/server/behaviour.rs b/protocols/perf/src/server/behaviour.rs new file mode 100644 index 00000000000..32c0dbda892 --- /dev/null +++ b/protocols/perf/src/server/behaviour.rs @@ -0,0 +1,121 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +//! [`NetworkBehaviour`] of the libp2p perf server protocol. + +use std::{ + collections::VecDeque, + task::{Context, Poll}, +}; + +use libp2p_identity::PeerId; +use libp2p_swarm::{ + ConnectionId, FromSwarm, NetworkBehaviour, NetworkBehaviourAction, PollParameters, + THandlerInEvent, THandlerOutEvent, +}; + +use crate::server::handler::Handler; + +use super::RunStats; + +#[derive(Debug)] +pub struct Event { + pub remote_peer_id: PeerId, + pub stats: RunStats, +} + +#[derive(Default)] +pub struct Behaviour { + /// Queue of actions to return when polled. + queued_events: VecDeque>>, +} + +impl Behaviour { + pub fn new() -> Self { + Self::default() + } +} + +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = Handler; + type OutEvent = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &libp2p_core::Multiaddr, + _remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + Ok(Handler::default()) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(_) => {} + FromSwarm::ConnectionClosed(_) => {} + FromSwarm::AddressChange(_) => {} + FromSwarm::DialFailure(_) => {} + FromSwarm::ListenFailure(_) => {} + FromSwarm::NewListener(_) => {} + FromSwarm::NewListenAddr(_) => {} + FromSwarm::ExpiredListenAddr(_) => {} + FromSwarm::ListenerError(_) => {} + FromSwarm::ListenerClosed(_) => {} + FromSwarm::NewExternalAddr(_) => {} + FromSwarm::ExpiredExternalAddr(_) => {} + } + } + + fn on_connection_handler_event( + &mut self, + event_source: PeerId, + _connection_id: ConnectionId, + super::handler::Event { stats }: THandlerOutEvent, + ) { + self.queued_events + .push_back(NetworkBehaviourAction::GenerateEvent(Event { + remote_peer_id: event_source, + stats, + })) + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + _: &mut impl PollParameters, + ) -> Poll>> { + if let Some(event) = self.queued_events.pop_front() { + return Poll::Ready(event); + } + + Poll::Pending + } +} diff --git a/protocols/perf/src/server/handler.rs b/protocols/perf/src/server/handler.rs new file mode 100644 index 00000000000..2946b6d4a4c --- /dev/null +++ b/protocols/perf/src/server/handler.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::{ + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use libp2p_core::upgrade::{DeniedUpgrade, ReadyUpgrade}; +use libp2p_swarm::{ + handler::{ + ConnectionEvent, DialUpgradeError, FullyNegotiatedInbound, FullyNegotiatedOutbound, + ListenUpgradeError, + }, + ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerUpgrErr, KeepAlive, + SubstreamProtocol, +}; +use log::error; +use void::Void; + +use super::RunStats; + +#[derive(Debug)] +pub struct Event { + pub stats: RunStats, +} + +pub struct Handler { + inbound: FuturesUnordered>>, + keep_alive: KeepAlive, +} + +impl Handler { + pub fn new() -> Self { + Self { + inbound: Default::default(), + keep_alive: KeepAlive::Yes, + } + } +} + +impl Default for Handler { + fn default() -> Self { + Self::new() + } +} + +impl ConnectionHandler for Handler { + type InEvent = Void; + type OutEvent = Event; + type Error = Void; + type InboundProtocol = ReadyUpgrade<&'static [u8]>; + type OutboundProtocol = DeniedUpgrade; + type OutboundOpenInfo = Void; + type InboundOpenInfo = (); + + fn listen_protocol(&self) -> SubstreamProtocol { + SubstreamProtocol::new(ReadyUpgrade::new(crate::PROTOCOL_NAME), ()) + } + + fn on_behaviour_event(&mut self, v: Self::InEvent) { + void::unreachable(v) + } + + fn on_connection_event( + &mut self, + event: ConnectionEvent< + Self::InboundProtocol, + Self::OutboundProtocol, + Self::InboundOpenInfo, + Self::OutboundOpenInfo, + >, + ) { + match event { + ConnectionEvent::FullyNegotiatedInbound(FullyNegotiatedInbound { + protocol, + info: _, + }) => { + self.inbound + .push(crate::protocol::receive_send(protocol).boxed()); + } + ConnectionEvent::FullyNegotiatedOutbound(FullyNegotiatedOutbound { info, .. }) => { + void::unreachable(info) + } + + ConnectionEvent::DialUpgradeError(DialUpgradeError { info, .. }) => { + void::unreachable(info) + } + ConnectionEvent::AddressChange(_) => {} + ConnectionEvent::ListenUpgradeError(ListenUpgradeError { info: (), error }) => { + match error { + ConnectionHandlerUpgrErr::Timeout => {} + ConnectionHandlerUpgrErr::Timer => {} + ConnectionHandlerUpgrErr::Upgrade(error) => match error { + libp2p_core::UpgradeError::Select(_) => {} + libp2p_core::UpgradeError::Apply(v) => void::unreachable(v), + }, + } + } + } + } + + fn connection_keep_alive(&self) -> KeepAlive { + self.keep_alive + } + + fn poll( + &mut self, + cx: &mut Context<'_>, + ) -> Poll< + ConnectionHandlerEvent< + Self::OutboundProtocol, + Self::OutboundOpenInfo, + Self::OutEvent, + Self::Error, + >, + > { + while let Poll::Ready(Some(result)) = self.inbound.poll_next_unpin(cx) { + match result { + Ok(stats) => return Poll::Ready(ConnectionHandlerEvent::Custom(Event { stats })), + Err(e) => { + error!("{e:?}") + } + } + } + + if self.inbound.is_empty() { + match self.keep_alive { + KeepAlive::Yes => { + self.keep_alive = KeepAlive::Until(Instant::now() + Duration::from_secs(10)); + } + KeepAlive::Until(_) => {} + KeepAlive::No => panic!("Handler never sets KeepAlive::No."), + } + } else { + self.keep_alive = KeepAlive::Yes + } + + Poll::Pending + } +} diff --git a/protocols/perf/tests/lib.rs b/protocols/perf/tests/lib.rs new file mode 100644 index 00000000000..1d93ce3ee8d --- /dev/null +++ b/protocols/perf/tests/lib.rs @@ -0,0 +1,62 @@ +// Copyright 2023 Protocol Labs. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use libp2p_perf::{ + client::{self, RunParams}, + server, +}; +use libp2p_swarm::{Swarm, SwarmEvent}; +use libp2p_swarm_test::SwarmExt; + +#[async_std::test] +async fn perf() { + let _ = env_logger::try_init(); + + let mut server = Swarm::new_ephemeral(|_| server::Behaviour::new()); + let server_peer_id = *server.local_peer_id(); + let mut client = Swarm::new_ephemeral(|_| client::Behaviour::new()); + + server.listen().await; + client.connect(&mut server).await; + + async_std::task::spawn(server.loop_on_next()); + + client + .behaviour_mut() + .perf( + server_peer_id, + RunParams { + to_send: 0, + to_receive: 0, + }, + ) + .unwrap(); + + client + .wait(|e| match e { + SwarmEvent::IncomingConnection { .. } => panic!(), + SwarmEvent::ConnectionEstablished { .. } => None, + SwarmEvent::Dialing(_) => None, + SwarmEvent::Behaviour(client::Event { result, .. }) => Some(result), + e => panic!("{e:?}"), + }) + .await + .unwrap(); +}