From fb45ce37731f4d593f552a0a598750781c8367c1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 17 Dec 2022 21:49:09 +0100 Subject: [PATCH 01/11] fix(metrics): Update connections_establishment_duration buckets (#3256) Currently the `libp2p_swarm_connections_establishment_duration` metric has the following buckets: ``` // exponential_buckets(1e-3, 2., 10) [0.001, 0.002, 0.004, 0.008, 0.016, 0.032, 0.064, 0.128, 0.256, 0.512] ``` It is unlikely that a connection is established in 1ms, even within a datacenter. It is very likely that libp2p connection establishment takes longer than 512ms over the internet. This commit proposes the following buckets: ``` // exponential_buckets(0.01, 1.5, 20) [0.01, 0.015, 0.0225, 0.03375, 0.050625, 0.0759375, 0.11390625, 0.170859375, 0.2562890625, 0.38443359375, 0.576650390625, 0.8649755859375, 1.29746337890625, 1.946195068359375, 2.9192926025390626, 4.378938903808594, 6.568408355712891, 9.852612533569337, 14.778918800354004, 22.168378200531006] ``` - Buckets start at 10ms. - Reasonably high resolution in the sub-second area. - Largest bucket at 22s, e.g. for a relayed connection. - Unfortunately rather obscure numbers. --- misc/metrics/src/swarm.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/metrics/src/swarm.rs b/misc/metrics/src/swarm.rs index 065be8ba259..a003ab56570 100644 --- a/misc/metrics/src/swarm.rs +++ b/misc/metrics/src/swarm.rs @@ -396,5 +396,5 @@ impl From<&libp2p_swarm::PendingInboundConnectionError> } fn create_connection_establishment_duration_histogram() -> Histogram { - Histogram::new(exponential_buckets(1e-3, 2., 10)) + Histogram::new(exponential_buckets(0.01, 1.5, 20)) } From 160ddc58f2edacd706d044fe0f715a901d88df84 Mon Sep 17 00:00:00 2001 From: Hannes <55623006+umgefahren@users.noreply.github.com> Date: Mon, 19 Dec 2022 07:34:29 +0100 Subject: [PATCH 02/11] docs: Add link to rustdocs for `master` to README (#3197) I added a link to the master docs to the [README](README.md). --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 1ce2acc06a0..23dfbf75a14 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ [![dependency status](https://deps.rs/repo/github/libp2p/rust-libp2p/status.svg?style=flat-square)](https://deps.rs/repo/github/libp2p/rust-libp2p) [![Crates.io](https://img.shields.io/crates/v/libp2p.svg)](https://crates.io/crates/libp2p) [![docs.rs](https://img.shields.io/badge/api-rustdoc-blue.svg)](https://docs.rs/libp2p) +[![docs.rs master](https://img.shields.io/badge/docs-master-blueviolet)](https://libp2p.github.io/rust-libp2p/libp2p/) This repository is the central place for Rust development of the [libp2p](https://libp2p.io) spec. From 76abab9e20c29d03f787f39565de38dff42b0f80 Mon Sep 17 00:00:00 2001 From: Anton Date: Mon, 19 Dec 2022 11:26:19 +0400 Subject: [PATCH 03/11] feat: log bandwidth on substream instead of socket level (#3180) Previously, the `with_bandwidth_logging` extension to `Transport` would track the bytes sent and received on a socket level. This however only works in conjunction with `Transport` upgrades where a separate multiplexer is negotiated on top of a regular stream. With QUIC and WebRTC landing, this no longer works as those `Transport`s bring their own multiplexing stack. To still allow for tracking of bandwidth, we refactor the `with_bandwidth_logging` extension to count the bytes send on all substreams opened through a `StreamMuxer`. This works, regardless of the underlying transport technology. It does omit certain layers. However, there isn't necessarily a "correct" layer to count bandwidth on because you can always go down another layer (IP, Ethernet, etc). Closes #3157. --- CHANGELOG.md | 5 +- Cargo.toml | 4 ++ src/bandwidth.rs | 149 +++++++++++++++---------------------------- src/transport_ext.rs | 66 +++++++++++++++++-- 4 files changed, 120 insertions(+), 104 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8df2d37c210..511120d1a57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,8 +47,11 @@ # 0.51.0 [unreleased] +- Count bandwidth at the application level. Previously `BandwidthLogging` would implement `Transport` and now implements `StreamMuxer` ([PR 3180](https://github.com/libp2p/rust-libp2p/pull/3180)). + - `BandwidthLogging::new` now requires a 2nd argument: `Arc` + - Remove `BandwidthFuture` + - Rename `BandwidthConnecLogging` to `InstrumentedStream` - Remove `SimpleProtocol` due to being unused. See [`libp2p::core::upgrade`](https://docs.rs/libp2p/0.50.0/libp2p/core/upgrade/index.html) for alternatives. See [PR 3191]. - - Update individual crates. - Update to [`libp2p-dcutr` `v0.9.0`](protocols/dcutr/CHANGELOG.md#090). diff --git a/Cargo.toml b/Cargo.toml index eaed637a2d8..0ec146a9c07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,6 +136,10 @@ env_logger = "0.10.0" clap = { version = "4.0.13", features = ["derive"] } tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] } +libp2p-mplex = { version = "0.38.0", path = "muxers/mplex" } +libp2p-noise = { version = "0.41.0", path = "transports/noise" } +libp2p-tcp = { version = "0.38.0", path = "transports/tcp", features = ["tokio"] } + [workspace] members = [ "core", diff --git a/src/bandwidth.rs b/src/bandwidth.rs index a58eec95ddb..dc696ce07e2 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -18,20 +18,13 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{ - core::{ - transport::{TransportError, TransportEvent}, - Transport, - }, - Multiaddr, -}; +use crate::core::muxing::{StreamMuxer, StreamMuxerEvent}; use futures::{ io::{IoSlice, IoSliceMut}, prelude::*, ready, }; -use libp2p_core::transport::ListenerId; use std::{ convert::TryFrom as _, io, @@ -43,121 +36,86 @@ use std::{ task::{Context, Poll}, }; -/// Wraps around a `Transport` and counts the number of bytes that go through all the opened -/// connections. +/// Wraps around a [`StreamMuxer`] and counts the number of bytes that go through all the opened +/// streams. #[derive(Clone)] #[pin_project::pin_project] -pub struct BandwidthLogging { +pub(crate) struct BandwidthLogging { #[pin] - inner: TInner, + inner: SMInner, sinks: Arc, } -impl BandwidthLogging { - /// Creates a new [`BandwidthLogging`] around the transport. - pub fn new(inner: TInner) -> (Self, Arc) { - let sink = Arc::new(BandwidthSinks { - inbound: AtomicU64::new(0), - outbound: AtomicU64::new(0), - }); - - let trans = BandwidthLogging { - inner, - sinks: sink.clone(), - }; - - (trans, sink) +impl BandwidthLogging { + /// Creates a new [`BandwidthLogging`] around the stream muxer. + pub(crate) fn new(inner: SMInner, sinks: Arc) -> Self { + Self { inner, sinks } } } -impl Transport for BandwidthLogging +impl StreamMuxer for BandwidthLogging where - TInner: Transport, + SMInner: StreamMuxer, { - type Output = BandwidthConnecLogging; - type Error = TInner::Error; - type ListenerUpgrade = BandwidthFuture; - type Dial = BandwidthFuture; + type Substream = InstrumentedStream; + type Error = SMInner::Error; fn poll( self: Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll> { let this = self.project(); - match this.inner.poll(cx) { - Poll::Ready(event) => { - let event = event.map_upgrade({ - let sinks = this.sinks.clone(); - |inner| BandwidthFuture { inner, sinks } - }); - Poll::Ready(event) - } - Poll::Pending => Poll::Pending, - } - } - - fn listen_on(&mut self, addr: Multiaddr) -> Result> { - self.inner.listen_on(addr) + this.inner.poll(cx) } - fn remove_listener(&mut self, id: ListenerId) -> bool { - self.inner.remove_listener(id) - } - - fn dial(&mut self, addr: Multiaddr) -> Result> { - let sinks = self.sinks.clone(); - self.inner - .dial(addr) - .map(move |fut| BandwidthFuture { inner: fut, sinks }) - } - - fn dial_as_listener( - &mut self, - addr: Multiaddr, - ) -> Result> { - let sinks = self.sinks.clone(); - self.inner - .dial_as_listener(addr) - .map(move |fut| BandwidthFuture { inner: fut, sinks }) - } - - fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) + fn poll_inbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let this = self.project(); + let inner = ready!(this.inner.poll_inbound(cx)?); + let logged = InstrumentedStream { + inner, + sinks: this.sinks.clone(), + }; + Poll::Ready(Ok(logged)) } -} - -/// Wraps around a `Future` that produces a connection. Wraps the connection around a bandwidth -/// counter. -#[pin_project::pin_project] -pub struct BandwidthFuture { - #[pin] - inner: TInner, - sinks: Arc, -} - -impl Future for BandwidthFuture { - type Output = Result, TInner::Error>; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll_outbound( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let this = self.project(); - let inner = ready!(this.inner.try_poll(cx)?); - let logged = BandwidthConnecLogging { + let inner = ready!(this.inner.poll_outbound(cx)?); + let logged = InstrumentedStream { inner, sinks: this.sinks.clone(), }; Poll::Ready(Ok(logged)) } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.inner.poll_close(cx) + } } -/// Allows obtaining the average bandwidth of the connections created from a [`BandwidthLogging`]. +/// Allows obtaining the average bandwidth of the streams. pub struct BandwidthSinks { inbound: AtomicU64, outbound: AtomicU64, } impl BandwidthSinks { - /// Returns the total number of bytes that have been downloaded on all the connections spawned - /// through the [`BandwidthLogging`]. + /// Returns a new [`BandwidthSinks`]. + pub(crate) fn new() -> Arc { + Arc::new(Self { + inbound: AtomicU64::new(0), + outbound: AtomicU64::new(0), + }) + } + + /// Returns the total number of bytes that have been downloaded on all the streams. /// /// > **Note**: This method is by design subject to race conditions. The returned value should /// > only ever be used for statistics purposes. @@ -165,8 +123,7 @@ impl BandwidthSinks { self.inbound.load(Ordering::Relaxed) } - /// Returns the total number of bytes that have been uploaded on all the connections spawned - /// through the [`BandwidthLogging`]. + /// Returns the total number of bytes that have been uploaded on all the streams. /// /// > **Note**: This method is by design subject to race conditions. The returned value should /// > only ever be used for statistics purposes. @@ -175,15 +132,15 @@ impl BandwidthSinks { } } -/// Wraps around an `AsyncRead + AsyncWrite` and logs the bandwidth that goes through it. +/// Wraps around an [`AsyncRead`] + [`AsyncWrite`] and logs the bandwidth that goes through it. #[pin_project::pin_project] -pub struct BandwidthConnecLogging { +pub(crate) struct InstrumentedStream { #[pin] - inner: TInner, + inner: SMInner, sinks: Arc, } -impl AsyncRead for BandwidthConnecLogging { +impl AsyncRead for InstrumentedStream { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -213,7 +170,7 @@ impl AsyncRead for BandwidthConnecLogging { } } -impl AsyncWrite for BandwidthConnecLogging { +impl AsyncWrite for InstrumentedStream { fn poll_write( self: Pin<&mut Self>, cx: &mut Context<'_>, diff --git a/src/transport_ext.rs b/src/transport_ext.rs index fa8926c8380..2a4c30f17e3 100644 --- a/src/transport_ext.rs +++ b/src/transport_ext.rs @@ -20,22 +20,74 @@ //! Provides the `TransportExt` trait. -use crate::{bandwidth::BandwidthLogging, bandwidth::BandwidthSinks, Transport}; +use crate::core::{ + muxing::{StreamMuxer, StreamMuxerBox}, + transport::Boxed, + PeerId, +}; +use crate::{ + bandwidth::{BandwidthLogging, BandwidthSinks}, + Transport, +}; use std::sync::Arc; /// Trait automatically implemented on all objects that implement `Transport`. Provides some /// additional utilities. pub trait TransportExt: Transport { - /// Adds a layer on the `Transport` that logs all trafic that passes through the sockets + /// Adds a layer on the `Transport` that logs all trafic that passes through the streams /// created by it. /// - /// This method returns an `Arc` that can be used to retreive the total number - /// of bytes transferred through the sockets. - fn with_bandwidth_logging(self) -> (BandwidthLogging, Arc) + /// This method returns an `Arc` that can be used to retrieve the total number + /// of bytes transferred through the streams. + /// + /// # Example + /// + /// ``` + /// use libp2p_mplex as mplex; + /// use libp2p_noise as noise; + /// use libp2p_tcp as tcp; + /// use libp2p::{ + /// core::upgrade, + /// identity, + /// TransportExt, + /// Transport, + /// }; + /// + /// let id_keys = identity::Keypair::generate_ed25519(); + /// + /// let transport = tcp::tokio::Transport::new(tcp::Config::default().nodelay(true)) + /// .upgrade(upgrade::Version::V1) + /// .authenticate( + /// noise::NoiseAuthenticated::xx(&id_keys) + /// .expect("Signing libp2p-noise static DH keypair failed."), + /// ) + /// .multiplex(mplex::MplexConfig::new()) + /// .boxed(); + /// + /// let (transport, sinks) = transport.with_bandwidth_logging(); + /// ``` + fn with_bandwidth_logging(self) -> (Boxed<(PeerId, StreamMuxerBox)>, Arc) where - Self: Sized, + Self: Sized + Send + Unpin + 'static, + Self::Dial: Send + 'static, + Self::ListenerUpgrade: Send + 'static, + Self::Error: Send + Sync, + Self::Output: Into<(PeerId, S)>, + S: StreamMuxer + Send + 'static, + S::Substream: Send + 'static, + S::Error: Send + Sync + 'static, { - BandwidthLogging::new(self) + let sinks = BandwidthSinks::new(); + let sinks_copy = sinks.clone(); + let transport = Transport::map(self, |output, _| { + let (peer_id, stream_muxer_box) = output.into(); + ( + peer_id, + StreamMuxerBox::new(BandwidthLogging::new(stream_muxer_box, sinks_copy)), + ) + }) + .boxed(); + (transport, sinks) } } From 2dd188e8970abafdc5dfbb7bb1eaa597bb1e1303 Mon Sep 17 00:00:00 2001 From: Nick Loadholtes Date: Mon, 19 Dec 2022 06:41:38 -0500 Subject: [PATCH 04/11] feat(floodsub): make use of `prost-codec` (#3224) This patch addresses #2500 for the `libp2p-floodsub` crate. For this PR the existing code was upgraded to use `Framed` with the `prost_codec::Codec` as the standard codec for handling the RPC message format serialization/deserialization. --- protocols/floodsub/CHANGELOG.md | 4 ++ protocols/floodsub/Cargo.toml | 2 + protocols/floodsub/src/protocol.rs | 75 +++++++++++++++++------------- 3 files changed, 48 insertions(+), 33 deletions(-) diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 564476d987a..7b7293e6390 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -2,6 +2,10 @@ - Update to `libp2p-swarm` `v0.42.0`. +- Read and write protocols messages via `prost-codec`. See [PR 3224]. + +[pr 3224]: https://github.com/libp2p/rust-libp2p/pull/3224 + # 0.41.0 - Update to `libp2p-core` `v0.38.0`. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 5716da7d14d..778f949b655 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -11,6 +11,7 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] +asynchronous-codec = "0.6" cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" @@ -18,6 +19,7 @@ libp2p-core = { version = "0.38.0", path = "../../core" } libp2p-swarm = { version = "0.42.0", path = "../../swarm" } log = "0.4" prost = "0.11" +prost-codec = { version = "0.3", path = "../../misc/prost-codec" } rand = "0.8" smallvec = "1.6.1" thiserror = "1.0.37" diff --git a/protocols/floodsub/src/protocol.rs b/protocols/floodsub/src/protocol.rs index 104e92e49b8..fe3f2859437 100644 --- a/protocols/floodsub/src/protocol.rs +++ b/protocols/floodsub/src/protocol.rs @@ -20,14 +20,19 @@ use crate::rpc_proto; use crate::topic::Topic; +use asynchronous_codec::Framed; use futures::{ io::{AsyncRead, AsyncWrite}, - AsyncWriteExt, Future, + Future, }; -use libp2p_core::{upgrade, InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; -use prost::Message; +use futures::{SinkExt, StreamExt}; +use libp2p_core::{InboundUpgrade, OutboundUpgrade, PeerId, UpgradeInfo}; use std::{io, iter, pin::Pin}; +const MAX_MESSAGE_LEN_BYTES: usize = 2048; + +const PROTOCOL_NAME: &[u8] = b"/floodsub/1.0.0"; + /// Implementation of `ConnectionUpgrade` for the floodsub protocol. #[derive(Debug, Clone, Default)] pub struct FloodsubProtocol {} @@ -44,7 +49,7 @@ impl UpgradeInfo for FloodsubProtocol { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/floodsub/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -53,19 +58,27 @@ where TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static, { type Output = FloodsubRpc; - type Error = FloodsubDecodeError; + type Error = FloodsubError; type Future = Pin> + Send>>; - fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { - let packet = upgrade::read_length_prefixed(&mut socket, 2048).await?; - let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(DecodeError)?; + let mut framed = Framed::new( + socket, + prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + ); + + let rpc = framed + .next() + .await + .ok_or_else(|| FloodsubError::ReadError(io::ErrorKind::UnexpectedEof.into()))? + .map_err(CodecError)?; let mut messages = Vec::with_capacity(rpc.publish.len()); for publish in rpc.publish.into_iter() { messages.push(FloodsubMessage { source: PeerId::from_bytes(&publish.from.unwrap_or_default()) - .map_err(|_| FloodsubDecodeError::InvalidPeerId)?, + .map_err(|_| FloodsubError::InvalidPeerId)?, data: publish.data.unwrap_or_default(), sequence_number: publish.seqno.unwrap_or_default(), topics: publish.topic_ids.into_iter().map(Topic::new).collect(), @@ -93,21 +106,21 @@ where /// Reach attempt interrupt errors. #[derive(thiserror::Error, Debug)] -pub enum FloodsubDecodeError { - /// Error when reading the packet from the socket. - #[error("Failed to read from socket")] - ReadError(#[from] io::Error), - /// Error when decoding the raw buffer into a protobuf. - #[error("Failed to decode protobuf")] - ProtobufError(#[from] DecodeError), +pub enum FloodsubError { /// Error when parsing the `PeerId` in the message. #[error("Failed to decode PeerId from message")] InvalidPeerId, + /// Error when decoding the raw buffer into a protobuf. + #[error("Failed to decode protobuf")] + ProtobufError(#[from] CodecError), + /// Error when reading the packet from the socket. + #[error("Failed to read from socket")] + ReadError(#[from] io::Error), } #[derive(thiserror::Error, Debug)] #[error(transparent)] -pub struct DecodeError(prost::DecodeError); +pub struct CodecError(#[from] prost_codec::Error); /// An RPC received by the floodsub system. #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -123,7 +136,7 @@ impl UpgradeInfo for FloodsubRpc { type InfoIter = iter::Once; fn protocol_info(&self) -> Self::InfoIter { - iter::once(b"/floodsub/1.0.0") + iter::once(PROTOCOL_NAME) } } @@ -132,16 +145,17 @@ where TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static, { type Output = (); - type Error = io::Error; + type Error = CodecError; type Future = Pin> + Send>>; - fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future { + fn upgrade_outbound(self, socket: TSocket, _: Self::Info) -> Self::Future { Box::pin(async move { - let bytes = self.into_bytes(); - - upgrade::write_length_prefixed(&mut socket, bytes).await?; - socket.close().await?; - + let mut framed = Framed::new( + socket, + prost_codec::Codec::::new(MAX_MESSAGE_LEN_BYTES), + ); + framed.send(self.into_rpc()).await?; + framed.close().await?; Ok(()) }) } @@ -149,8 +163,8 @@ where impl FloodsubRpc { /// Turns this `FloodsubRpc` into a message that can be sent to a substream. - fn into_bytes(self) -> Vec { - let rpc = rpc_proto::Rpc { + fn into_rpc(self) -> rpc_proto::Rpc { + rpc_proto::Rpc { publish: self .messages .into_iter() @@ -170,12 +184,7 @@ impl FloodsubRpc { topic_id: Some(topic.topic.into()), }) .collect(), - }; - - let mut buf = Vec::with_capacity(rpc.encoded_len()); - rpc.encode(&mut buf) - .expect("Vec provides capacity as needed"); - buf + } } } From fbd4192e2abe5e0270686dfd7acc5e321f3ace17 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 19 Dec 2022 14:54:39 +0200 Subject: [PATCH 05/11] fix(kademlia): downgrade reusable stream dropping warning to debug log (#3234) There is nothing wrong about being near the edge of the concurrency Kademlia allows. If there was an older stream about to be reused, it doesn't mean there was anything wrong to warn about. --- protocols/kad/src/handler.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index d012c94d2e0..37f41a8cd8a 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -580,7 +580,7 @@ where ) }) { *s = InboundSubstreamState::Cancelled; - log::warn!( + log::debug!( "New inbound substream to {:?} exceeds inbound substream limit. \ Removed older substream waiting to be reused.", self.remote_peer_id, From 5458446205e61385b198c8d472d73ea6f40fea8c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 00:28:07 +1100 Subject: [PATCH 06/11] feat(noise): deprecate non-compliant implementation (#3227) For the last [two years](https://github.com/libp2p/rust-libp2p/blob/master/CHANGELOG.md#version-0190-2020-05-18), we have been carrying around a non-compliant implementation of the noise protocol for libp2p. It is time to follow through on the announcement in that changelog entry and deprecate it. Users should use the spec compliant implementation instead. 1. This will reduce the maintenance effort of our codebase. 2. We will improve compile-times because we no longer need to depend on `libsodium` to test cryptography that is only part of the non-compliant implementation. 3. It will simplify usage of `rust-libp2p` because users cannot accidentally choose the wrong implementation. --- transports/noise/CHANGELOG.md | 6 ++ transports/noise/Cargo.toml | 2 +- transports/noise/src/lib.rs | 4 +- transports/noise/src/protocol.rs | 8 +-- transports/noise/src/protocol/x25519.rs | 15 ++--- transports/noise/src/protocol/x25519_spec.rs | 20 ++++-- transports/noise/tests/smoke.rs | 64 ++++---------------- 7 files changed, 49 insertions(+), 70 deletions(-) diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index 7bbab360faa..4deec2a4baa 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.41.1 [unreleased] + +- Deprecate non-compliant noise implementation. We intend to remove it in a future release without replacement. See [PR 3227]. + +[PR 3227]: https://github.com/libp2p/rust-libp2p/pull/3227 + # 0.41.0 - Remove `prost::Error` from public API. See [PR 3058]. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index f0bf2864297..50aacafebac 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-noise" edition = "2021" rust-version = "1.60.0" description = "Cryptographic handshake protocol using the noise framework." -version = "0.41.0" +version = "0.41.1" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/transports/noise/src/lib.rs b/transports/noise/src/lib.rs index 45e5540df43..52f13ccac94 100644 --- a/transports/noise/src/lib.rs +++ b/transports/noise/src/lib.rs @@ -60,7 +60,9 @@ mod protocol; pub use io::handshake::RemoteIdentity; pub use io::NoiseOutput; -pub use protocol::{x25519::X25519, x25519_spec::X25519Spec}; +#[allow(deprecated)] +pub use protocol::x25519::X25519; +pub use protocol::x25519_spec::X25519Spec; pub use protocol::{AuthenticKeypair, Keypair, KeypairIdentity, PublicKey, SecretKey}; pub use protocol::{Protocol, ProtocolParams, IK, IX, XX}; diff --git a/transports/noise/src/protocol.rs b/transports/noise/src/protocol.rs index 5a4ea04f518..7ec9746d5bc 100644 --- a/transports/noise/src/protocol.rs +++ b/transports/noise/src/protocol.rs @@ -245,7 +245,7 @@ impl snow::resolvers::CryptoResolver for Resolver { fn resolve_dh(&self, choice: &snow::params::DHChoice) -> Option> { if let snow::params::DHChoice::Curve25519 = choice { - Some(Box::new(Keypair::::default())) + Some(Box::new(Keypair::::default())) } else { None } @@ -308,7 +308,7 @@ impl snow::types::Random for Rng {} #[cfg(test)] mod tests { use super::*; - use crate::X25519; + use crate::X25519Spec; use once_cell::sync::Lazy; #[test] @@ -334,9 +334,9 @@ mod tests { } fn xx_builder(prologue: &'static [u8]) -> snow::Builder<'static> { - X25519::params_xx().into_builder(prologue, TEST_KEY.secret(), None) + X25519Spec::params_xx().into_builder(prologue, TEST_KEY.secret(), None) } // Hack to work around borrow-checker. - static TEST_KEY: Lazy> = Lazy::new(Keypair::::new); + static TEST_KEY: Lazy> = Lazy::new(Keypair::::new); } diff --git a/transports/noise/src/protocol/x25519.rs b/transports/noise/src/protocol/x25519.rs index 067f67ca2c1..182d9905b39 100644 --- a/transports/noise/src/protocol/x25519.rs +++ b/transports/noise/src/protocol/x25519.rs @@ -23,6 +23,8 @@ //! **Note**: This set of protocols is not interoperable with other //! libp2p implementations. +#![allow(deprecated)] + use crate::{NoiseConfig, NoiseError, Protocol, ProtocolParams}; use curve25519_dalek::edwards::CompressedEdwardsY; use libp2p_core::UpgradeInfo; @@ -56,6 +58,10 @@ static PARAMS_XX: Lazy = Lazy::new(|| { /// A X25519 key. #[derive(Clone)] +#[deprecated( + since = "0.41.1", + note = "Will be removed because it is not compliant with the official libp2p specification. Use `X25519Spec` instead." +)] pub struct X25519([u8; 32]); impl AsRef<[u8]> for X25519 { @@ -135,15 +141,6 @@ impl Protocol for X25519 { } impl Keypair { - /// An "empty" keypair as a starting state for DH computations in `snow`, - /// which get manipulated through the `snow::types::Dh` interface. - pub(super) fn default() -> Self { - Keypair { - secret: SecretKey(X25519([0u8; 32])), - public: PublicKey(X25519([0u8; 32])), - } - } - /// Create a new X25519 keypair. pub fn new() -> Keypair { let mut sk_bytes = [0u8; 32]; diff --git a/transports/noise/src/protocol/x25519_spec.rs b/transports/noise/src/protocol/x25519_spec.rs index e114f10747c..87973463521 100644 --- a/transports/noise/src/protocol/x25519_spec.rs +++ b/transports/noise/src/protocol/x25519_spec.rs @@ -29,7 +29,7 @@ use rand::Rng; use x25519_dalek::{x25519, X25519_BASEPOINT_BYTES}; use zeroize::Zeroize; -use super::{x25519::X25519, *}; +use super::*; /// Prefix of static key signatures for domain separation. const STATIC_KEY_DOMAIN: &str = "noise-libp2p-static-key:"; @@ -51,6 +51,15 @@ impl Zeroize for X25519Spec { } impl Keypair { + /// An "empty" keypair as a starting state for DH computations in `snow`, + /// which get manipulated through the `snow::types::Dh` interface. + pub(super) fn default() -> Self { + Keypair { + secret: SecretKey(X25519Spec([0u8; 32])), + public: PublicKey(X25519Spec([0u8; 32])), + } + } + /// Create a new X25519 keypair. pub fn new() -> Keypair { let mut sk_bytes = [0u8; 32]; @@ -110,15 +119,18 @@ impl UpgradeInfo for NoiseConfig { /// interoperable with other libp2p implementations. impl Protocol for X25519Spec { fn params_ik() -> ProtocolParams { - X25519::params_ik() + #[allow(deprecated)] + x25519::X25519::params_ik() } fn params_ix() -> ProtocolParams { - X25519::params_ix() + #[allow(deprecated)] + x25519::X25519::params_ix() } fn params_xx() -> ProtocolParams { - X25519::params_xx() + #[allow(deprecated)] + x25519::X25519::params_xx() } fn public_from_bytes(bytes: &[u8]) -> Result, NoiseError> { diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index 56b5e139651..c92435e4c3e 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -28,7 +28,6 @@ use libp2p_core::upgrade::{apply_inbound, apply_outbound, Negotiated}; use libp2p_core::{identity, transport, upgrade}; use libp2p_noise::{ Keypair, NoiseAuthenticated, NoiseConfig, NoiseError, NoiseOutput, RemoteIdentity, X25519Spec, - X25519, }; use libp2p_tcp as tcp; use log::info; @@ -47,7 +46,7 @@ fn core_upgrade_compat() { } #[test] -fn xx_spec() { +fn xx() { let _ = env_logger::try_init(); fn prop(mut messages: Vec) -> bool { messages.truncate(5); @@ -95,51 +94,6 @@ fn xx_spec() { .quickcheck(prop as fn(Vec) -> bool) } -#[test] -fn xx() { - let _ = env_logger::try_init(); - fn prop(mut messages: Vec) -> bool { - messages.truncate(5); - let server_id = identity::Keypair::generate_ed25519(); - let client_id = identity::Keypair::generate_ed25519(); - - let server_id_public = server_id.public(); - let client_id_public = client_id.public(); - - let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); - let server_transport = tcp::async_io::Transport::default() - .and_then(move |output, endpoint| { - upgrade::apply( - output, - NoiseConfig::xx(server_dh), - endpoint, - upgrade::Version::V1, - ) - }) - .and_then(move |out, _| expect_identity(out, &client_id_public)) - .boxed(); - - let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); - let client_transport = tcp::async_io::Transport::default() - .and_then(move |output, endpoint| { - upgrade::apply( - output, - NoiseConfig::xx(client_dh), - endpoint, - upgrade::Version::V1, - ) - }) - .and_then(move |out, _| expect_identity(out, &server_id_public)) - .boxed(); - - run(server_transport, client_transport, messages); - true - } - QuickCheck::new() - .max_tests(30) - .quickcheck(prop as fn(Vec) -> bool) -} - #[test] fn ix() { let _ = env_logger::try_init(); @@ -151,7 +105,9 @@ fn ix() { let server_id_public = server_id.public(); let client_id_public = client_id.public(); - let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); + let server_dh = Keypair::::new() + .into_authentic(&server_id) + .unwrap(); let server_transport = tcp::async_io::Transport::default() .and_then(move |output, endpoint| { upgrade::apply( @@ -164,7 +120,9 @@ fn ix() { .and_then(move |out, _| expect_identity(out, &client_id_public)) .boxed(); - let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); + let client_dh = Keypair::::new() + .into_authentic(&client_id) + .unwrap(); let client_transport = tcp::async_io::Transport::default() .and_then(move |output, endpoint| { upgrade::apply( @@ -196,7 +154,9 @@ fn ik_xx() { let client_id = identity::Keypair::generate_ed25519(); let client_id_public = client_id.public(); - let server_dh = Keypair::::new().into_authentic(&server_id).unwrap(); + let server_dh = Keypair::::new() + .into_authentic(&server_id) + .unwrap(); let server_dh_public = server_dh.public_dh_key().clone(); let server_transport = tcp::async_io::Transport::default() .and_then(move |output, endpoint| { @@ -213,7 +173,9 @@ fn ik_xx() { .and_then(move |out, _| expect_identity(out, &client_id_public)) .boxed(); - let client_dh = Keypair::::new().into_authentic(&client_id).unwrap(); + let client_dh = Keypair::::new() + .into_authentic(&client_id) + .unwrap(); let server_id_public2 = server_id_public.clone(); let client_transport = tcp::async_io::Transport::default() .and_then(move |output, endpoint| { From de61a74d3379f4f84ec3b4df0be79f24972e5f29 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 20 Dec 2022 00:41:07 +0100 Subject: [PATCH 07/11] fix(libp2p): Don't specify monorepo dev dependencies by version (#3261) There is no need to specify dev dependencies from the monorepo by version. Versions can be inferred from the dependencies `Cargo.toml`. Specifying the version increases maintenance overhead, as they have to be bumped manually. --- Cargo.toml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0ec146a9c07..976d9fa0002 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -136,9 +136,9 @@ env_logger = "0.10.0" clap = { version = "4.0.13", features = ["derive"] } tokio = { version = "1.15", features = ["io-util", "io-std", "macros", "rt", "rt-multi-thread"] } -libp2p-mplex = { version = "0.38.0", path = "muxers/mplex" } -libp2p-noise = { version = "0.41.0", path = "transports/noise" } -libp2p-tcp = { version = "0.38.0", path = "transports/tcp", features = ["tokio"] } +libp2p-mplex = { path = "muxers/mplex" } +libp2p-noise = { path = "transports/noise" } +libp2p-tcp = { path = "transports/tcp", features = ["tokio"] } [workspace] members = [ From 88fa8e66b8623ec40bf4708cdfdbce0ba8ecdfed Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 11:59:25 +1100 Subject: [PATCH 08/11] refactor(gossipsub): don't store messages within `Arc` (#3243) Currently, we store messages to be sent to the `ConnectionHandler` in an `Arc`. However, we never actually clone these messages as we can see with this patch, hence we remove this wrapping. Related: https://github.com/libp2p/rust-libp2p/pull/3242 --- protocols/gossipsub/src/behaviour.rs | 23 +-- protocols/gossipsub/src/behaviour/tests.rs | 201 ++++++++++----------- protocols/gossipsub/src/handler.rs | 2 +- 3 files changed, 108 insertions(+), 118 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 5d1b69ec0fb..0803137323a 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -25,7 +25,6 @@ use std::{ collections::{BTreeSet, HashMap}, fmt, net::IpAddr, - sync::Arc, task::{Context, Poll}, time::Duration, }; @@ -201,9 +200,6 @@ impl From for PublishConfig { } } -type GossipsubNetworkBehaviourAction = - NetworkBehaviourAction>; - /// Network behaviour that handles the gossipsub protocol. /// /// NOTE: Initialisation requires a [`MessageAuthenticity`] and [`GossipsubConfig`] instance. If @@ -223,7 +219,7 @@ pub struct Gossipsub< config: GossipsubConfig, /// Events that need to be yielded to the outside when polling. - events: VecDeque, + events: VecDeque>, /// Pools non-urgent control messages between heartbeats. control_pool: HashMap>, @@ -2903,7 +2899,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::Message(message)), + event: GossipsubHandlerIn::Message(message), handler: NotifyHandler::Any, }) } @@ -3163,7 +3159,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), + event: GossipsubHandlerIn::JoinedMesh, handler: NotifyHandler::One(connections.connections[0]), }); break; @@ -3449,10 +3445,7 @@ where _: &mut impl PollParameters, ) -> Poll> { if let Some(event) = self.events.pop_front() { - return Poll::Ready(event.map_in(|e: Arc| { - // clone send event reference if others references are present - Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone()) - })); + return Poll::Ready(event); } // update scores @@ -3499,7 +3492,7 @@ fn peer_added_to_mesh( new_topics: Vec<&TopicHash>, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3527,7 +3520,7 @@ fn peer_added_to_mesh( // This is the first mesh the peer has joined, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::JoinedMesh), + event: GossipsubHandlerIn::JoinedMesh, handler: NotifyHandler::One(connection_id), }); } @@ -3540,7 +3533,7 @@ fn peer_removed_from_mesh( old_topic: &TopicHash, mesh: &HashMap>, known_topics: Option<&BTreeSet>, - events: &mut VecDeque, + events: &mut VecDeque>, connections: &HashMap, ) { // Ensure there is an active connection @@ -3566,7 +3559,7 @@ fn peer_removed_from_mesh( // The peer is not in any other mesh, inform the handler events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: Arc::new(GossipsubHandlerIn::LeftMesh), + event: GossipsubHandlerIn::LeftMesh, handler: NotifyHandler::One(*connection_id), }); } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 42c28061e88..f5fd8a3dc40 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -375,17 +375,17 @@ fn test_subscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; - } - collected_subscriptions + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + for s in &message.subscriptions { + if let Some(true) = s.subscribe { + collected_subscriptions.push(s.clone()) + }; } - _ => collected_subscriptions, - }, + collected_subscriptions + } _ => collected_subscriptions, }); @@ -443,17 +443,17 @@ fn test_unsubscribe() { .events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - for s in &message.subscriptions { - if let Some(true) = s.subscribe { - collected_subscriptions.push(s.clone()) - }; - } - collected_subscriptions + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + for s in &message.subscriptions { + if let Some(true) = s.subscribe { + collected_subscriptions.push(s.clone()) + }; } - _ => collected_subscriptions, - }, + collected_subscriptions + } _ => collected_subscriptions, }); @@ -630,16 +630,16 @@ fn test_publish_without_flood_publishing() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } - collected_publish + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + let event = proto_to_message(message); + for s in &event.messages { + collected_publish.push(s.clone()); } - _ => collected_publish, - }, + collected_publish + } _ => collected_publish, }); @@ -720,16 +720,16 @@ fn test_fanout() { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { - GossipsubHandlerIn::Message(ref message) => { - let event = proto_to_message(message); - for s in &event.messages { - collected_publish.push(s.clone()); - } - collected_publish + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref message), + .. + } => { + let event = proto_to_message(message); + for s in &event.messages { + collected_publish.push(s.clone()); } - _ => collected_publish, - }, + collected_publish + } _ => collected_publish, }); @@ -773,26 +773,25 @@ fn test_inject_connected() { .events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - !m.subscriptions.is_empty() - } else { - false - } - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => !m.subscriptions.is_empty(), _ => false, }) .collect(); // check that there are two subscriptions sent to each peer for sevent in send_events.clone() { - if let NetworkBehaviourAction::NotifyHandler { event, .. } = sevent { - if let GossipsubHandlerIn::Message(ref m) = **event { - assert!( - m.subscriptions.len() == 2, - "There should be two subscriptions sent to each peer (1 for each topic)." - ); - } + if let NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } = sevent + { + assert!( + m.subscriptions.len() == 2, + "There should be two subscriptions sent to each peer (1 for each topic)." + ); }; } @@ -1018,7 +1017,7 @@ fn test_handle_iwant_msg_cached() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push(c.clone()) @@ -1075,17 +1074,16 @@ fn test_handle_iwant_msg_cached_shifted() { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event - .messages - .iter() - .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) - .any(|msg| gs.config.message_id(&msg) == msg_id) - } else { - false - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event + .messages + .iter() + .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) + .any(|msg| gs.config.message_id(&msg) == msg_id) } _ => false, }); @@ -1317,17 +1315,17 @@ fn count_control_msgs( + gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event - .control_msgs - .iter() - .filter(|m| filter(peer_id, m)) - .count() - } else { - 0 - } + NetworkBehaviourAction::NotifyHandler { + peer_id, + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event + .control_msgs + .iter() + .filter(|m| filter(peer_id, m)) + .count() } _ => 0, }) @@ -1540,19 +1538,19 @@ fn do_forward_messages_to_explicit_peers() { gs.events .iter() .filter(|e| match e { - NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - peer_id == &peers[0] - && event - .messages - .iter() - .filter(|m| m.data == message.data) - .count() - > 0 - } else { - false - } + NetworkBehaviourAction::NotifyHandler { + peer_id, + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + peer_id == &peers[0] + && event + .messages + .iter() + .filter(|m| m.data == message.data) + .count() + > 0 } _ => false, }) @@ -2107,7 +2105,7 @@ fn test_flood_publish() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -2668,7 +2666,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for c in &event.messages { collected_messages.push((*peer_id, c.clone())) @@ -2816,7 +2814,7 @@ fn test_do_not_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -2873,7 +2871,7 @@ fn test_do_not_flood_publish_to_peer_below_publish_threshold() { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push((*peer_id, s.clone())); @@ -4407,13 +4405,12 @@ fn test_ignore_too_many_iwants_from_same_peer_for_same_message() { gs.events .iter() .map(|e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - if let GossipsubHandlerIn::Message(ref m) = **event { - let event = proto_to_message(m); - event.messages.len() - } else { - 0 - } + NetworkBehaviourAction::NotifyHandler { + event: GossipsubHandlerIn::Message(ref m), + .. + } => { + let event = proto_to_message(m); + event.messages.len() } _ => 0, }) @@ -4816,7 +4813,7 @@ fn test_publish_to_floodsub_peers_without_flood_publish() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -4873,7 +4870,7 @@ fn test_do_not_use_floodsub_in_fanout() { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - if let GossipsubHandlerIn::Message(ref m) = **event { + if let GossipsubHandlerIn::Message(ref m) = event { let event = proto_to_message(m); for s in &event.messages { collected_publish.push(s.clone()); @@ -5187,7 +5184,7 @@ fn test_subscribe_and_graft_with_negative_score() { let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == p1 { - if let GossipsubHandlerIn::Message(m) = Arc::try_unwrap(event).unwrap() { + if let GossipsubHandlerIn::Message(m) = event { Some(m) } else { None diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index 68bcf912975..8fd563c37a6 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -64,7 +64,7 @@ pub enum HandlerEvent { } /// A message sent from the behaviour to the handler. -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum GossipsubHandlerIn { /// A gossipsub message to send. Message(crate::rpc_proto::Rpc), From ee775137f30c19dfe43598400b1dda9fcdd46f51 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Dec 2022 01:48:15 +0000 Subject: [PATCH 09/11] deps(noise): Update ed25519-compact requirement from 1.0.11 to 2.0.4 (#3232) --- transports/noise/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 50aacafebac..942618d2c57 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -31,7 +31,7 @@ snow = { version = "0.9.0", features = ["default-resolver"], default-features = [dev-dependencies] async-io = "1.2.0" -ed25519-compact = "1.0.11" +ed25519-compact = "2.0.4" env_logger = "0.10.0" libp2p-tcp = { path = "../tcp", features = ["async-io"] } libsodium-sys-stable = { version = "1.19.22", features = ["fetch-latest"] } From 06aa694d0a5c0d9d61f712a402f9bc0364280c8a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 20 Dec 2022 03:57:26 +0000 Subject: [PATCH 10/11] deps: Bump actions/upload-pages-artifact from 1.0.6 to 1.0.7 (#3259) --- .github/workflows/docs.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index ab636bcb386..9901be942be 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -28,7 +28,7 @@ jobs: echo "" > target/doc/index.html cp -r target/doc/* ./host-docs - name: Upload documentation - uses: actions/upload-pages-artifact@v1.0.6 + uses: actions/upload-pages-artifact@v1.0.7 with: path: "host-docs/" From 93335b8818bfe70ae9daf5c2d1b623a34cd0ffa6 Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Tue, 20 Dec 2022 16:03:40 +1100 Subject: [PATCH 11/11] refactor(dcutr): reshape public API to follow naming guidelines (#3214) With this patch, the naming of types follows the guidelines discussed in #2217. --- misc/metrics/src/dcutr.rs | 20 +++++------ misc/metrics/src/lib.rs | 4 +-- protocols/dcutr/CHANGELOG.md | 4 +++ protocols/dcutr/examples/dcutr.rs | 10 +++--- .../src/{behaviour.rs => behaviour_impl.rs} | 10 +++--- protocols/dcutr/src/lib.rs | 35 +++++++++++++++++-- protocols/dcutr/tests/lib.rs | 14 ++++---- 7 files changed, 65 insertions(+), 32 deletions(-) rename protocols/dcutr/src/{behaviour.rs => behaviour_impl.rs} (98%) diff --git a/misc/metrics/src/dcutr.rs b/misc/metrics/src/dcutr.rs index b90e784f9b7..9e6f06bb497 100644 --- a/misc/metrics/src/dcutr.rs +++ b/misc/metrics/src/dcutr.rs @@ -55,21 +55,21 @@ enum EventType { DirectConnectionUpgradeFailed, } -impl From<&libp2p_dcutr::behaviour::Event> for EventType { - fn from(event: &libp2p_dcutr::behaviour::Event) -> Self { +impl From<&libp2p_dcutr::Event> for EventType { + fn from(event: &libp2p_dcutr::Event) -> Self { match event { - libp2p_dcutr::behaviour::Event::InitiatedDirectConnectionUpgrade { + libp2p_dcutr::Event::InitiatedDirectConnectionUpgrade { remote_peer_id: _, local_relayed_addr: _, } => EventType::InitiateDirectConnectionUpgrade, - libp2p_dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + libp2p_dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id: _, remote_relayed_addr: _, } => EventType::RemoteInitiatedDirectConnectionUpgrade, - libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeSucceeded { - remote_peer_id: _, - } => EventType::DirectConnectionUpgradeSucceeded, - libp2p_dcutr::behaviour::Event::DirectConnectionUpgradeFailed { + libp2p_dcutr::Event::DirectConnectionUpgradeSucceeded { remote_peer_id: _ } => { + EventType::DirectConnectionUpgradeSucceeded + } + libp2p_dcutr::Event::DirectConnectionUpgradeFailed { remote_peer_id: _, error: _, } => EventType::DirectConnectionUpgradeFailed, @@ -77,8 +77,8 @@ impl From<&libp2p_dcutr::behaviour::Event> for EventType { } } -impl super::Recorder for Metrics { - fn record(&self, event: &libp2p_dcutr::behaviour::Event) { +impl super::Recorder for Metrics { + fn record(&self, event: &libp2p_dcutr::Event) { self.events .get_or_create(&EventLabels { event: event.into(), diff --git a/misc/metrics/src/lib.rs b/misc/metrics/src/lib.rs index 351887260df..15a938c7d7b 100644 --- a/misc/metrics/src/lib.rs +++ b/misc/metrics/src/lib.rs @@ -100,8 +100,8 @@ pub trait Recorder { } #[cfg(feature = "dcutr")] -impl Recorder for Metrics { - fn record(&self, event: &libp2p_dcutr::behaviour::Event) { +impl Recorder for Metrics { + fn record(&self, event: &libp2p_dcutr::Event) { self.dcutr.record(event) } } diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index 23c9b616541..39f92d2772d 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -7,8 +7,12 @@ - Require the node's local `PeerId` to be passed into the constructor of `libp2p_dcutr::Behaviour`. See [PR 3153]. +- Rename types in public API to follow naming conventions defined in [issue 2217]. See [PR 3214]. + [PR 3213]: https://github.com/libp2p/rust-libp2p/pull/3213 [PR 3153]: https://github.com/libp2p/rust-libp2p/pull/3153 +[issue 2217]: https://github.com/libp2p/rust-libp2p/issues/2217 +[PR 3214]: https://github.com/libp2p/rust-libp2p/pull/3214 # 0.8.0 diff --git a/protocols/dcutr/examples/dcutr.rs b/protocols/dcutr/examples/dcutr.rs index 73cc735072b..ac8b284c2e0 100644 --- a/protocols/dcutr/examples/dcutr.rs +++ b/protocols/dcutr/examples/dcutr.rs @@ -114,7 +114,7 @@ fn main() -> Result<(), Box> { relay_client: Client, ping: ping::Behaviour, identify: identify::Behaviour, - dcutr: dcutr::behaviour::Behaviour, + dcutr: dcutr::Behaviour, } #[derive(Debug)] @@ -123,7 +123,7 @@ fn main() -> Result<(), Box> { Ping(ping::Event), Identify(identify::Event), Relay(client::Event), - Dcutr(dcutr::behaviour::Event), + Dcutr(dcutr::Event), } impl From for Event { @@ -144,8 +144,8 @@ fn main() -> Result<(), Box> { } } - impl From for Event { - fn from(e: dcutr::behaviour::Event) -> Self { + impl From for Event { + fn from(e: dcutr::Event) -> Self { Event::Dcutr(e) } } @@ -157,7 +157,7 @@ fn main() -> Result<(), Box> { "/TODO/0.0.1".to_string(), local_key.public(), )), - dcutr: dcutr::behaviour::Behaviour::new(local_peer_id), + dcutr: dcutr::Behaviour::new(local_peer_id), }; let mut swarm = match ThreadPool::new() { diff --git a/protocols/dcutr/src/behaviour.rs b/protocols/dcutr/src/behaviour_impl.rs similarity index 98% rename from protocols/dcutr/src/behaviour.rs rename to protocols/dcutr/src/behaviour_impl.rs index 15dfe078bfe..2d65197c5f8 100644 --- a/protocols/dcutr/src/behaviour.rs +++ b/protocols/dcutr/src/behaviour_impl.rs @@ -54,12 +54,12 @@ pub enum Event { }, DirectConnectionUpgradeFailed { remote_peer_id: PeerId, - error: UpgradeError, + error: Error, }, } #[derive(Debug, Error)] -pub enum UpgradeError { +pub enum Error { #[error("Failed to dial peer.")] Dial, #[error("Failed to establish substream: {0}.")] @@ -164,7 +164,7 @@ impl Behaviour { .into(), NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: peer_id, - error: UpgradeError::Dial, + error: Error::Dial, }) .into(), ]); @@ -236,7 +236,7 @@ impl NetworkBehaviour for Behaviour { self.queued_actions.push_back( NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source, - error: UpgradeError::Handler(error), + error: Error::Handler(error), }) .into(), ); @@ -260,7 +260,7 @@ impl NetworkBehaviour for Behaviour { self.queued_actions.push_back( NetworkBehaviourAction::GenerateEvent(Event::DirectConnectionUpgradeFailed { remote_peer_id: event_source, - error: UpgradeError::Handler(error), + error: Error::Handler(error), }) .into(), ); diff --git a/protocols/dcutr/src/lib.rs b/protocols/dcutr/src/lib.rs index 525b2a08d20..d32ad011b5b 100644 --- a/protocols/dcutr/src/lib.rs +++ b/protocols/dcutr/src/lib.rs @@ -23,7 +23,7 @@ #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -pub mod behaviour; +mod behaviour_impl; // TODO: Rename back `behaviour` once deprecation symbols are removed. mod handler; mod protocol; #[allow(clippy::derive_partial_eq_without_eq)] @@ -31,6 +31,35 @@ mod message_proto { include!(concat!(env!("OUT_DIR"), "/holepunch.pb.rs")); } +pub use behaviour_impl::Behaviour; +pub use behaviour_impl::Error; +pub use behaviour_impl::Event; pub use protocol::PROTOCOL_NAME; -pub type InboundUpgradeError = protocol::inbound::UpgradeError; -pub type OutboundUpgradeError = protocol::outbound::UpgradeError; +pub mod inbound { + pub use crate::protocol::inbound::UpgradeError; +} +pub mod outbound { + pub use crate::protocol::outbound::UpgradeError; +} + +#[deprecated( + since = "0.9.0", + note = "Use `libp2p_dcutr::inbound::UpgradeError` instead.`" +)] +pub type InboundUpgradeError = inbound::UpgradeError; + +#[deprecated( + since = "0.9.0", + note = "Use `libp2p_dcutr::outbound::UpgradeError` instead.`" +)] +pub type OutboundUpgradeError = outbound::UpgradeError; +pub mod behaviour { + #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Behaviour` instead.`")] + pub type Behaviour = crate::Behaviour; + + #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Event` instead.`")] + pub type Event = crate::Event; + + #[deprecated(since = "0.9.0", note = "Use `libp2p_dcutr::Error` instead.`")] + pub type UpgradeError = crate::Error; +} diff --git a/protocols/dcutr/tests/lib.rs b/protocols/dcutr/tests/lib.rs index b204d40738f..38c5b550b71 100644 --- a/protocols/dcutr/tests/lib.rs +++ b/protocols/dcutr/tests/lib.rs @@ -79,7 +79,7 @@ fn connect() { pool.run_until(wait_for_connection_established(&mut src, &dst_relayed_addr)); match pool.run_until(wait_for_dcutr_event(&mut src)) { - dcutr::behaviour::Event::RemoteInitiatedDirectConnectionUpgrade { + dcutr::Event::RemoteInitiatedDirectConnectionUpgrade { remote_peer_id, remote_relayed_addr, } if remote_peer_id == dst_peer_id && remote_relayed_addr == dst_relayed_addr => {} @@ -126,7 +126,7 @@ fn build_client() -> Swarm { transport, Client { relay: behaviour, - dcutr: dcutr::behaviour::Behaviour::new(local_peer_id), + dcutr: dcutr::Behaviour::new(local_peer_id), }, local_peer_id, ) @@ -154,13 +154,13 @@ where )] struct Client { relay: client::Client, - dcutr: dcutr::behaviour::Behaviour, + dcutr: dcutr::Behaviour, } #[derive(Debug)] enum ClientEvent { Relay(client::Event), - Dcutr(dcutr::behaviour::Event), + Dcutr(dcutr::Event), } impl From for ClientEvent { @@ -169,8 +169,8 @@ impl From for ClientEvent { } } -impl From for ClientEvent { - fn from(event: dcutr::behaviour::Event) -> Self { +impl From for ClientEvent { + fn from(event: dcutr::Event) -> Self { ClientEvent::Dcutr(event) } } @@ -241,7 +241,7 @@ async fn wait_for_new_listen_addr(client: &mut Swarm, new_addr: &Multiad } } -async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::behaviour::Event { +async fn wait_for_dcutr_event(client: &mut Swarm) -> dcutr::Event { loop { match client.select_next_some().await { SwarmEvent::Behaviour(ClientEvent::Dcutr(e)) => return e,