diff --git a/CHANGELOG.md b/CHANGELOG.md index 912f341ad6b..668048a6af2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -42,6 +42,27 @@ # `libp2p` facade crate +# 0.45.0 [unreleased] + +- Update individual crates. + - Update to [`libp2p-plaintext` `v0.33.0`](transports/plaintext/CHANGELOG.md). + - Update to [`libp2p-noise` `v0.36.0`](transports/noise/CHANGELOG.md). + - Update to [`libp2p-wasm-ext` `v0.33.0`](transports/wasm-ext/CHANGELOG.md). + - Update to [`libp2p-yamux` `v0.37.0`](muxers/yamux/CHANGELOG.md). + - Update to [`libp2p-mplex` `v0.33.0`](muxers/mplex/CHANGELOG.md). + - Update to [`libp2p-dcutr` `v0.3.0`](protocols/dcutr/CHANGELOG.md). + - Update to [`libp2p-rendezvous` `v0.6.0`](protocols/rendezvous/CHANGELOG.md). + - Update to [`libp2p-ping` `v0.36.0`](protocols/ping/CHANGELOG.md). + - Update to [`libp2p-identify` `v0.36.0`](protocols/identify/CHANGELOG.md). + - Update to [`libp2p-floodsub` `v0.36.0`](protocols/floodsub/CHANGELOG.md). + - Update to [`libp2p-relay` `v0.9.0`](protocols/relay/CHANGELOG.md). + - Update to [`libp2p-metrics` `v0.6.0`](misc/metrics/CHANGELOG.md). + - Update to [`libp2p-kad` `v0.37.0`](protocols/kad/CHANGELOG.md). + - Update to [`libp2p-autonat` `v0.4.0`](protocols/autonat/CHANGELOG.md). + - Update to [`libp2p-request-response` `v0.18.0`](protocols/request-response/CHANGELOG.md). + - Update to [`libp2p-swarm` `v0.36.0`](swarm/CHANGELOG.md). + - Update to [`libp2p-core` `v0.33.0`](core/CHANGELOG.md). + ## 0.44.0 - Update individual crates. diff --git a/Cargo.toml b/Cargo.toml index 31d5308da29..d2f361cb1b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p" edition = "2021" rust-version = "1.56.1" description = "Peer-to-peer networking library" -version = "0.44.0" +version = "0.45.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -77,26 +77,26 @@ getrandom = "0.2.3" # Explicit dependency to be used in `wasm-bindgen` feature instant = "0.1.11" # Explicit dependency to be used in `wasm-bindgen` feature lazy_static = "1.2" -libp2p-autonat = { version = "0.3.0", path = "protocols/autonat", optional = true } -libp2p-core = { version = "0.32.1", path = "core", default-features = false } -libp2p-dcutr = { version = "0.2.0", path = "protocols/dcutr", optional = true } -libp2p-floodsub = { version = "0.35.0", path = "protocols/floodsub", optional = true } -libp2p-identify = { version = "0.35.0", path = "protocols/identify", optional = true } -libp2p-kad = { version = "0.36.0", path = "protocols/kad", optional = true } -libp2p-metrics = { version = "0.5.0", path = "misc/metrics", optional = true } -libp2p-mplex = { version = "0.32.0", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.35.0", path = "transports/noise", optional = true } -libp2p-ping = { version = "0.35.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.32.0", path = "transports/plaintext", optional = true } +libp2p-autonat = { version = "0.4.0", path = "protocols/autonat", optional = true } +libp2p-core = { version = "0.33.0", path = "core", default-features = false } +libp2p-dcutr = { version = "0.3.0", path = "protocols/dcutr", optional = true } +libp2p-floodsub = { version = "0.36.0", path = "protocols/floodsub", optional = true } +libp2p-identify = { version = "0.36.0", path = "protocols/identify", optional = true } +libp2p-kad = { version = "0.37.0", path = "protocols/kad", optional = true } +libp2p-metrics = { version = "0.6.0", path = "misc/metrics", optional = true } +libp2p-mplex = { version = "0.33.0", path = "muxers/mplex", optional = true } +libp2p-noise = { version = "0.36.0", path = "transports/noise", optional = true } +libp2p-ping = { version = "0.36.0", path = "protocols/ping", optional = true } +libp2p-plaintext = { version = "0.33.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.22.0", path = "transports/pnet", optional = true } -libp2p-relay = { version = "0.8.0", path = "protocols/relay", optional = true } -libp2p-rendezvous = { version = "0.5.0", path = "protocols/rendezvous", optional = true } -libp2p-request-response = { version = "0.17.0", path = "protocols/request-response", optional = true } -libp2p-swarm = { version = "0.35.0", path = "swarm" } +libp2p-relay = { version = "0.9.0", path = "protocols/relay", optional = true } +libp2p-rendezvous = { version = "0.6.0", path = "protocols/rendezvous", optional = true } +libp2p-request-response = { version = "0.18.0", path = "protocols/request-response", optional = true } +libp2p-swarm = { version = "0.36.0", path = "swarm" } libp2p-swarm-derive = { version = "0.27.0", path = "swarm-derive" } libp2p-uds = { version = "0.32.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.32.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.36.0", path = "muxers/yamux", optional = true } +libp2p-wasm-ext = { version = "0.33.0", path = "transports/wasm-ext", default-features = false, optional = true } +libp2p-yamux = { version = "0.37.0", path = "muxers/yamux", optional = true } multiaddr = { version = "0.14.0" } parking_lot = "0.12.0" pin-project = "1.0.0" @@ -104,14 +104,14 @@ rand = "0.7.3" # Explicit dependency to be used in `wasm-bindgen` feature smallvec = "1.6.1" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.32.0", path = "transports/deflate", optional = true } -libp2p-dns = { version = "0.32.1", path = "transports/dns", optional = true, default-features = false } -libp2p-mdns = { version = "0.36.0", path = "protocols/mdns", optional = true } -libp2p-tcp = { version = "0.32.0", path = "transports/tcp", default-features = false, optional = true } -libp2p-websocket = { version = "0.34.0", path = "transports/websocket", optional = true } +libp2p-deflate = { version = "0.33.0", path = "transports/deflate", optional = true } +libp2p-dns = { version = "0.33.0", path = "transports/dns", optional = true, default-features = false } +libp2p-mdns = { version = "0.37.0", path = "protocols/mdns", optional = true } +libp2p-tcp = { version = "0.33.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-websocket = { version = "0.35.0", path = "transports/websocket", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] -libp2p-gossipsub = { version = "0.37.0", path = "protocols/gossipsub", optional = true } +libp2p-gossipsub = { version = "0.38.0", path = "protocols/gossipsub", optional = true } [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 71035395bab..3a2cffea52a 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.33.0 [unreleased] + +- Have methods on `Transport` take `&mut self` instead of `self`. See [PR 2529]. + +[PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529 + # 0.32.1 - Add `PeerId::try_from_multiaddr` to extract a `PeerId` from a `Multiaddr` that ends in `/p2p/`. diff --git a/core/Cargo.toml b/core/Cargo.toml index 37e311862ac..de923fce311 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-core" edition = "2021" rust-version = "1.56.1" description = "Core traits and structs of libp2p" -version = "0.32.1" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/core/src/either.rs b/core/src/either.rs index 46f8ba833cc..df7caf600bd 100644 --- a/core/src/either.rs +++ b/core/src/either.rs @@ -497,7 +497,10 @@ where type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { use TransportError::*; match self { EitherTransport::Left(a) => match a.listen_on(addr) { @@ -513,7 +516,7 @@ where } } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { use TransportError::*; match self { EitherTransport::Left(a) => match a.dial(addr) { @@ -529,7 +532,10 @@ where } } - fn dial_as_listener(self, addr: Multiaddr) -> Result> + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> where Self: Sized, { diff --git a/core/src/transport.rs b/core/src/transport.rs index e13aecce96c..ce64f2dd923 100644 --- a/core/src/transport.rs +++ b/core/src/transport.rs @@ -70,10 +70,12 @@ pub use self::upgrade::Upgrade; /// by a [`Transport`] through an upgrade mechanism that is initiated via /// [`upgrade`](Transport::upgrade). /// -/// > **Note**: The methods of this trait use `self` and not `&self` or `&mut self`. In other -/// > words, listening or dialing consumes the transport object. This has been designed -/// > so that you would implement this trait on `&Foo` or `&mut Foo` instead of directly -/// > on `Foo`. +/// Note for implementors: Futures returned by [`Transport::dial`] should only +/// do work once polled for the first time. E.g. in the case of TCP, connecting +/// to the remote should not happen immediately on [`Transport::dial`] but only +/// once the returned [`Future`] is polled. The caller of [`Transport::dial`] +/// may call the method multiple times with a set of addresses, racing a subset +/// of the returned dials to success concurrently. pub trait Transport { /// The result of a connection setup process, including protocol upgrades. /// @@ -118,7 +120,7 @@ pub trait Transport { /// /// Returning an error from the stream is considered fatal. The listener can also report /// non-fatal errors by producing a [`ListenerEvent::Error`]. - fn listen_on(self, addr: Multiaddr) -> Result> + fn listen_on(&mut self, addr: Multiaddr) -> Result> where Self: Sized; @@ -126,7 +128,7 @@ pub trait Transport { /// /// If [`TransportError::MultiaddrNotSupported`] is returned, it may be desirable to /// try an alternative [`Transport`], if available. - fn dial(self, addr: Multiaddr) -> Result> + fn dial(&mut self, addr: Multiaddr) -> Result> where Self: Sized; @@ -135,7 +137,10 @@ pub trait Transport { /// This option is needed for NAT and firewall hole punching. /// /// See [`ConnectedPoint::Dialer`](crate::connection::ConnectedPoint::Dialer) for related option. - fn dial_as_listener(self, addr: Multiaddr) -> Result> + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> where Self: Sized; @@ -147,7 +152,7 @@ pub trait Transport { /// Boxes the transport, including custom transport errors. fn boxed(self) -> boxed::Boxed where - Self: Transport + Sized + Clone + Send + Sync + 'static, + Self: Transport + Sized + Send + Sync + 'static, Self::Dial: Send + 'static, Self::Listener: Send + 'static, Self::ListenerUpgrade: Send + 'static, @@ -160,7 +165,7 @@ pub trait Transport { fn map(self, f: F) -> map::Map where Self: Sized, - F: FnOnce(Self::Output, ConnectedPoint) -> O + Clone, + F: FnOnce(Self::Output, ConnectedPoint) -> O, { map::Map::new(self, f) } @@ -169,7 +174,7 @@ pub trait Transport { fn map_err(self, f: F) -> map_err::MapErr where Self: Sized, - F: FnOnce(Self::Error) -> E + Clone, + F: FnOnce(Self::Error) -> E, { map_err::MapErr::new(self, f) } @@ -198,7 +203,7 @@ pub trait Transport { fn and_then(self, f: C) -> and_then::AndThen where Self: Sized, - C: FnOnce(Self::Output, ConnectedPoint) -> F + Clone, + C: FnOnce(Self::Output, ConnectedPoint) -> F, F: TryFuture, ::Error: Error + 'static, { diff --git a/core/src/transport/and_then.rs b/core/src/transport/and_then.rs index 219887fa6df..f73a0caf8e6 100644 --- a/core/src/transport/and_then.rs +++ b/core/src/transport/and_then.rs @@ -53,7 +53,10 @@ where type ListenerUpgrade = AndThenFuture; type Dial = AndThenFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let listener = self .transport .listen_on(addr) @@ -64,12 +67,12 @@ where // `stream` can only produce an `Err` if `listening_stream` produces an `Err`. let stream = AndThenStream { stream: listener, - fun: self.fun, + fun: self.fun.clone(), }; Ok(stream) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let dialed_fut = self .transport .dial(addr.clone()) @@ -77,7 +80,7 @@ where let future = AndThenFuture { inner: Either::Left(Box::pin(dialed_fut)), args: Some(( - self.fun, + self.fun.clone(), ConnectedPoint::Dialer { address: addr, role_override: Endpoint::Dialer, @@ -88,7 +91,10 @@ where Ok(future) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { let dialed_fut = self .transport .dial_as_listener(addr.clone()) @@ -96,7 +102,7 @@ where let future = AndThenFuture { inner: Either::Left(Box::pin(dialed_fut)), args: Some(( - self.fun, + self.fun.clone(), ConnectedPoint::Dialer { address: addr, role_override: Endpoint::Listener, diff --git a/core/src/transport/boxed.rs b/core/src/transport/boxed.rs index 0f47470fcad..6ddf34ad950 100644 --- a/core/src/transport/boxed.rs +++ b/core/src/transport/boxed.rs @@ -21,19 +21,19 @@ use crate::transport::{ListenerEvent, Transport, TransportError}; use futures::prelude::*; use multiaddr::Multiaddr; -use std::{error::Error, fmt, io, pin::Pin, sync::Arc}; +use std::{error::Error, fmt, io, pin::Pin}; /// Creates a new [`Boxed`] transport from the given transport. pub fn boxed(transport: T) -> Boxed where - T: Transport + Clone + Send + Sync + 'static, + T: Transport + Send + Sync + 'static, T::Error: Send + Sync, T::Dial: Send + 'static, T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, { Boxed { - inner: Arc::new(transport) as Arc<_>, + inner: Box::new(transport) as Box<_>, } } @@ -41,7 +41,7 @@ where /// and `ListenerUpgrade` futures are `Box`ed and only the `Output` /// and `Error` types are captured in type variables. pub struct Boxed { - inner: Arc + Send + Sync>, + inner: Box + Send + Sync>, } type Dial = Pin> + Send>>; @@ -50,22 +50,22 @@ type Listener = type ListenerUpgrade = Pin> + Send>>; trait Abstract { - fn listen_on(&self, addr: Multiaddr) -> Result, TransportError>; - fn dial(&self, addr: Multiaddr) -> Result, TransportError>; - fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError>; + fn listen_on(&mut self, addr: Multiaddr) -> Result, TransportError>; + fn dial(&mut self, addr: Multiaddr) -> Result, TransportError>; + fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError>; fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option; } impl Abstract for T where - T: Transport + Clone + 'static, + T: Transport + 'static, T::Error: Send + Sync, T::Dial: Send + 'static, T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, { - fn listen_on(&self, addr: Multiaddr) -> Result, TransportError> { - let listener = Transport::listen_on(self.clone(), addr).map_err(|e| e.map(box_err))?; + fn listen_on(&mut self, addr: Multiaddr) -> Result, TransportError> { + let listener = Transport::listen_on(self, addr).map_err(|e| e.map(box_err))?; let fut = listener .map_ok(|event| { event @@ -79,15 +79,15 @@ where Ok(Box::pin(fut)) } - fn dial(&self, addr: Multiaddr) -> Result, TransportError> { - let fut = Transport::dial(self.clone(), addr) + fn dial(&mut self, addr: Multiaddr) -> Result, TransportError> { + let fut = Transport::dial(self, addr) .map(|r| r.map_err(box_err)) .map_err(|e| e.map(box_err))?; Ok(Box::pin(fut) as Dial<_>) } - fn dial_as_listener(&self, addr: Multiaddr) -> Result, TransportError> { - let fut = Transport::dial_as_listener(self.clone(), addr) + fn dial_as_listener(&mut self, addr: Multiaddr) -> Result, TransportError> { + let fut = Transport::dial_as_listener(self, addr) .map(|r| r.map_err(box_err)) .map_err(|e| e.map(box_err))?; Ok(Box::pin(fut) as Dial<_>) @@ -104,14 +104,6 @@ impl fmt::Debug for Boxed { } } -impl Clone for Boxed { - fn clone(&self) -> Self { - Boxed { - inner: self.inner.clone(), - } - } -} - impl Transport for Boxed { type Output = O; type Error = io::Error; @@ -119,15 +111,21 @@ impl Transport for Boxed { type ListenerUpgrade = ListenerUpgrade; type Dial = Dial; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { self.inner.listen_on(addr) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { self.inner.dial(addr) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.inner.dial_as_listener(addr) } diff --git a/core/src/transport/choice.rs b/core/src/transport/choice.rs index 3f2d87064cb..f1d21cfa30c 100644 --- a/core/src/transport/choice.rs +++ b/core/src/transport/choice.rs @@ -43,7 +43,10 @@ where type ListenerUpgrade = EitherFuture; type Dial = EitherFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let addr = match self.0.listen_on(addr) { Ok(listener) => return Ok(EitherListenStream::First(listener)), Err(TransportError::MultiaddrNotSupported(addr)) => addr, @@ -63,7 +66,7 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let addr = match self.0.dial(addr) { Ok(connec) => return Ok(EitherFuture::First(connec)), Err(TransportError::MultiaddrNotSupported(addr)) => addr, @@ -83,7 +86,10 @@ where Err(TransportError::MultiaddrNotSupported(addr)) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { let addr = match self.0.dial_as_listener(addr) { Ok(connec) => return Ok(EitherFuture::First(connec)), Err(TransportError::MultiaddrNotSupported(addr)) => addr, diff --git a/core/src/transport/dummy.rs b/core/src/transport/dummy.rs index ead00ff9609..5862348b0d4 100644 --- a/core/src/transport/dummy.rs +++ b/core/src/transport/dummy.rs @@ -62,15 +62,21 @@ impl Transport for DummyTransport { type ListenerUpgrade = futures::future::Pending>; type Dial = futures::future::Pending>; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { Err(TransportError::MultiaddrNotSupported(addr)) } diff --git a/core/src/transport/map.rs b/core/src/transport/map.rs index ec5d8c4a1be..703e1ea430b 100644 --- a/core/src/transport/map.rs +++ b/core/src/transport/map.rs @@ -37,6 +37,14 @@ impl Map { pub(crate) fn new(transport: T, fun: F) -> Self { Map { transport, fun } } + + pub fn inner(&self) -> &T { + &self.transport + } + + pub fn inner_mut(&mut self) -> &mut T { + &mut self.transport + } } impl Transport for Map @@ -50,15 +58,18 @@ where type ListenerUpgrade = MapFuture; type Dial = MapFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let stream = self.transport.listen_on(addr)?; Ok(MapStream { stream, - fun: self.fun, + fun: self.fun.clone(), }) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let future = self.transport.dial(addr.clone())?; let p = ConnectedPoint::Dialer { address: addr, @@ -66,11 +77,14 @@ where }; Ok(MapFuture { inner: future, - args: Some((self.fun, p)), + args: Some((self.fun.clone(), p)), }) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { let future = self.transport.dial_as_listener(addr.clone())?; let p = ConnectedPoint::Dialer { address: addr, @@ -78,7 +92,7 @@ where }; Ok(MapFuture { inner: future, - args: Some((self.fun, p)), + args: Some((self.fun.clone(), p)), }) } diff --git a/core/src/transport/map_err.rs b/core/src/transport/map_err.rs index eba3e042552..6cc2c5c3662 100644 --- a/core/src/transport/map_err.rs +++ b/core/src/transport/map_err.rs @@ -49,16 +49,19 @@ where type ListenerUpgrade = MapErrListenerUpgrade; type Dial = MapErrDial; - fn listen_on(self, addr: Multiaddr) -> Result> { - let map = self.map; + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { + let map = self.map.clone(); match self.transport.listen_on(addr) { Ok(stream) => Ok(MapErrListener { inner: stream, map }), Err(err) => Err(err.map(map)), } } - fn dial(self, addr: Multiaddr) -> Result> { - let map = self.map; + fn dial(&mut self, addr: Multiaddr) -> Result> { + let map = self.map.clone(); match self.transport.dial(addr) { Ok(future) => Ok(MapErrDial { inner: future, @@ -68,8 +71,11 @@ where } } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { - let map = self.map; + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + let map = self.map.clone(); match self.transport.dial_as_listener(addr) { Ok(future) => Ok(MapErrDial { inner: future, diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 2197f76b7ac..2685ef72d06 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -169,7 +169,10 @@ impl Transport for MemoryTransport { type ListenerUpgrade = Ready>; type Dial = DialFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let port = if let Ok(port) = parse_memory_addr(&addr) { port } else { @@ -191,7 +194,7 @@ impl Transport for MemoryTransport { Ok(listener) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let port = if let Ok(port) = parse_memory_addr(&addr) { if let Some(port) = NonZeroU64::new(port) { port @@ -205,7 +208,10 @@ impl Transport for MemoryTransport { DialFuture::new(port).ok_or(TransportError::Other(MemoryTransportError::Unreachable)) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.dial(addr) } @@ -408,7 +414,7 @@ mod tests { #[test] fn listening_twice() { - let transport = MemoryTransport::default(); + let mut transport = MemoryTransport::default(); assert!(transport .listen_on("/memory/1639174018481".parse().unwrap()) .is_ok()); @@ -435,7 +441,7 @@ mod tests { #[test] fn port_not_in_use() { - let transport = MemoryTransport::default(); + let mut transport = MemoryTransport::default(); assert!(transport .dial("/memory/810172461024613".parse().unwrap()) .is_err()); @@ -457,7 +463,7 @@ mod tests { let t1_addr: Multiaddr = format!("/memory/{}", rand_port).parse().unwrap(); let cloned_t1_addr = t1_addr.clone(); - let t1 = MemoryTransport::default(); + let mut t1 = MemoryTransport::default(); let listener = async move { let listener = t1.listen_on(t1_addr.clone()).unwrap(); @@ -478,7 +484,7 @@ mod tests { // Setup dialer. - let t2 = MemoryTransport::default(); + let mut t2 = MemoryTransport::default(); let dialer = async move { let mut socket = t2.dial(cloned_t1_addr).unwrap().await.unwrap(); socket.write_all(&msg).await.unwrap(); @@ -495,7 +501,7 @@ mod tests { Protocol::Memory(rand::random::().saturating_add(1)).into(); let listener_addr_cloned = listener_addr.clone(); - let listener_transport = MemoryTransport::default(); + let mut listener_transport = MemoryTransport::default(); let listener = async move { let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap(); @@ -530,7 +536,7 @@ mod tests { Protocol::Memory(rand::random::().saturating_add(1)).into(); let listener_addr_cloned = listener_addr.clone(); - let listener_transport = MemoryTransport::default(); + let mut listener_transport = MemoryTransport::default(); let listener = async move { let mut listener = listener_transport.listen_on(listener_addr.clone()).unwrap(); diff --git a/core/src/transport/optional.rs b/core/src/transport/optional.rs index a3bfc744add..cb10c35e133 100644 --- a/core/src/transport/optional.rs +++ b/core/src/transport/optional.rs @@ -59,24 +59,30 @@ where type ListenerUpgrade = T::ListenerUpgrade; type Dial = T::Dial; - fn listen_on(self, addr: Multiaddr) -> Result> { - if let Some(inner) = self.0 { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { + if let Some(inner) = self.0.as_mut() { inner.listen_on(addr) } else { Err(TransportError::MultiaddrNotSupported(addr)) } } - fn dial(self, addr: Multiaddr) -> Result> { - if let Some(inner) = self.0 { + fn dial(&mut self, addr: Multiaddr) -> Result> { + if let Some(inner) = self.0.as_mut() { inner.dial(addr) } else { Err(TransportError::MultiaddrNotSupported(addr)) } } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { - if let Some(inner) = self.0 { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + if let Some(inner) = self.0.as_mut() { inner.dial_as_listener(addr) } else { Err(TransportError::MultiaddrNotSupported(addr)) diff --git a/core/src/transport/timeout.rs b/core/src/transport/timeout.rs index 51fead8d42e..bb413cf8909 100644 --- a/core/src/transport/timeout.rs +++ b/core/src/transport/timeout.rs @@ -84,7 +84,10 @@ where type ListenerUpgrade = Timeout; type Dial = Timeout; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let listener = self .inner .listen_on(addr) @@ -98,7 +101,7 @@ where Ok(listener) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let dial = self .inner .dial(addr) @@ -109,7 +112,10 @@ where }) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { let dial = self .inner .dial_as_listener(addr) diff --git a/core/src/transport/upgrade.rs b/core/src/transport/upgrade.rs index 3483b709d82..c72859eb57d 100644 --- a/core/src/transport/upgrade.rs +++ b/core/src/transport/upgrade.rs @@ -294,7 +294,7 @@ impl Multiplexed { /// the [`StreamMuxer`] and custom transport errors. pub fn boxed(self) -> super::Boxed<(PeerId, StreamMuxerBox)> where - T: Transport + Sized + Clone + Send + Sync + 'static, + T: Transport + Sized + Send + Sync + 'static, T::Dial: Send + 'static, T::Listener: Send + 'static, T::ListenerUpgrade: Send + 'static, @@ -335,15 +335,21 @@ where type ListenerUpgrade = T::ListenerUpgrade; type Dial = T::Dial; - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { self.0.dial(addr) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.0.dial_as_listener(addr) } - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { self.0.listen_on(addr) } @@ -385,36 +391,42 @@ where type ListenerUpgrade = ListenerUpgradeFuture; type Dial = DialUpgradeFuture; - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let future = self .inner .dial(addr) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(DialUpgradeFuture { future: Box::pin(future), - upgrade: future::Either::Left(Some(self.upgrade)), + upgrade: future::Either::Left(Some(self.upgrade.clone())), }) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { let future = self .inner .dial_as_listener(addr) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(DialUpgradeFuture { future: Box::pin(future), - upgrade: future::Either::Left(Some(self.upgrade)), + upgrade: future::Either::Left(Some(self.upgrade.clone())), }) } - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let stream = self .inner .listen_on(addr) .map_err(|err| err.map(TransportUpgradeError::Transport))?; Ok(ListenerStream { stream: Box::pin(stream), - upgrade: self.upgrade, + upgrade: self.upgrade.clone(), }) } diff --git a/core/tests/transport_upgrade.rs b/core/tests/transport_upgrade.rs index f02fb2f3bd7..9fd1e8eaabb 100644 --- a/core/tests/transport_upgrade.rs +++ b/core/tests/transport_upgrade.rs @@ -84,7 +84,7 @@ fn upgrade_pipeline() { let listener_noise_keys = noise::Keypair::::new() .into_authentic(&listener_keys) .unwrap(); - let listener_transport = MemoryTransport::default() + let mut listener_transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(listener_noise_keys).into_authenticated()) .apply(HelloUpgrade {}) @@ -102,7 +102,7 @@ fn upgrade_pipeline() { let dialer_noise_keys = noise::Keypair::::new() .into_authentic(&dialer_keys) .unwrap(); - let dialer_transport = MemoryTransport::default() + let mut dialer_transport = MemoryTransport::default() .upgrade(upgrade::Version::V1) .authenticate(noise::NoiseConfig::xx(dialer_noise_keys).into_authenticated()) .apply(HelloUpgrade {}) diff --git a/misc/keygen/Cargo.toml b/misc/keygen/Cargo.toml index 58f29c8b103..434845b55fc 100644 --- a/misc/keygen/Cargo.toml +++ b/misc/keygen/Cargo.toml @@ -13,5 +13,5 @@ clap = {version = "3.1.6", features = ["derive"]} zeroize = "1" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -libp2p-core = { path = "../../core", default-features = false, version = "0.32.0"} +libp2p-core = { path = "../../core", default-features = false, version = "0.33.0"} base64 = "0.13.0" diff --git a/misc/metrics/CHANGELOG.md b/misc/metrics/CHANGELOG.md index 699331dccb6..474fe768749 100644 --- a/misc/metrics/CHANGELOG.md +++ b/misc/metrics/CHANGELOG.md @@ -1,3 +1,19 @@ +# 0.6.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + +- Update to `libp2p-dcutr` `v0.3.0`. + +- Update to `libp2p-ping` `v0.36.0`. + +- Update to `libp2p-identify` `v0.36.0`. + +- Update to `libp2p-relay` `v0.9.0`. + +- Update to `libp2p-kad` `v0.37.0`. + # 0.5.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/misc/metrics/Cargo.toml b/misc/metrics/Cargo.toml index dc8becc5023..84eb68cb016 100644 --- a/misc/metrics/Cargo.toml +++ b/misc/metrics/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-metrics" edition = "2021" rust-version = "1.56.1" description = "Metrics for libp2p" -version = "0.5.0" +version = "0.6.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -19,17 +19,17 @@ relay = ["libp2p-relay"] dcutr = ["libp2p-dcutr"] [dependencies] -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-dcutr = { version = "0.2.0", path = "../../protocols/dcutr", optional = true } -libp2p-identify = { version = "0.35.0", path = "../../protocols/identify", optional = true } -libp2p-kad = { version = "0.36.0", path = "../../protocols/kad", optional = true } -libp2p-ping = { version = "0.35.0", path = "../../protocols/ping", optional = true } -libp2p-relay = { version = "0.8.0", path = "../../protocols/relay", optional = true } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-dcutr = { version = "0.3.0", path = "../../protocols/dcutr", optional = true } +libp2p-identify = { version = "0.36.0", path = "../../protocols/identify", optional = true } +libp2p-kad = { version = "0.37.0", path = "../../protocols/kad", optional = true } +libp2p-ping = { version = "0.36.0", path = "../../protocols/ping", optional = true } +libp2p-relay = { version = "0.9.0", path = "../../protocols/relay", optional = true } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } prometheus-client = "0.15.0" [target.'cfg(not(target_os = "unknown"))'.dependencies] -libp2p-gossipsub = { version = "0.37.0", path = "../../protocols/gossipsub", optional = true } +libp2p-gossipsub = { version = "0.38.0", path = "../../protocols/gossipsub", optional = true } [dev-dependencies] log = "0.4.0" diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index 98583211b1e..af84e6639e7 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index 85950b4c3a1..dd1fb65523d 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-mplex" edition = "2021" rust-version = "1.56.1" description = "Mplex multiplexing protocol for libp2p" -version = "0.32.0" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4" nohash-hasher = "0.2" parking_lot = "0.12" diff --git a/muxers/mplex/benches/split_send_size.rs b/muxers/mplex/benches/split_send_size.rs index 5380f21cc6b..a9704b8e767 100644 --- a/muxers/mplex/benches/split_send_size.rs +++ b/muxers/mplex/benches/split_send_size.rs @@ -57,9 +57,15 @@ fn prepare(c: &mut Criterion) { let tcp_addr = multiaddr![Ip4(std::net::Ipv4Addr::new(127, 0, 0, 1)), Tcp(0u16)]; for &size in BENCH_SIZES.iter() { tcp.throughput(Throughput::Bytes(payload.len() as u64)); - let trans = tcp_transport(size); + let mut trans = tcp_transport(size); tcp.bench_function(format!("{}", size), |b| { - b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&tcp_addr))) + b.iter(|| { + run( + black_box(&mut trans), + black_box(&payload), + black_box(&tcp_addr), + ) + }) }); } tcp.finish(); @@ -68,17 +74,23 @@ fn prepare(c: &mut Criterion) { let mem_addr = multiaddr![Memory(0u64)]; for &size in BENCH_SIZES.iter() { mem.throughput(Throughput::Bytes(payload.len() as u64)); - let trans = mem_transport(size); + let mut trans = mem_transport(size); mem.bench_function(format!("{}", size), |b| { - b.iter(|| run(black_box(&trans), black_box(&payload), black_box(&mem_addr))) + b.iter(|| { + run( + black_box(&mut trans), + black_box(&payload), + black_box(&mem_addr), + ) + }) }); } mem.finish(); } /// Transfers the given payload between two nodes using the given transport. -fn run(transport: &BenchTransport, payload: &Vec, listen_addr: &Multiaddr) { - let mut listener = transport.clone().listen_on(listen_addr.clone()).unwrap(); +fn run(transport: &mut BenchTransport, payload: &Vec, listen_addr: &Multiaddr) { + let mut listener = transport.listen_on(listen_addr.clone()).unwrap(); let (addr_sender, addr_receiver) = oneshot::channel(); let mut addr_sender = Some(addr_sender); let payload_len = payload.len(); @@ -122,7 +134,7 @@ fn run(transport: &BenchTransport, payload: &Vec, listen_addr: &Multiaddr) { // Spawn and block on the sender, i.e. until all data is sent. task::block_on(async move { let addr = addr_receiver.await.unwrap(); - let (_peer, conn) = transport.clone().dial(addr).unwrap().await.unwrap(); + let (_peer, conn) = transport.dial(addr).unwrap().await.unwrap(); let mut handle = conn.open_outbound(); let mut stream = poll_fn(|cx| conn.poll_outbound(cx, &mut handle)) .await diff --git a/muxers/mplex/tests/async_write.rs b/muxers/mplex/tests/async_write.rs index d4a1df7c4c5..96b608a68ad 100644 --- a/muxers/mplex/tests/async_write.rs +++ b/muxers/mplex/tests/async_write.rs @@ -32,7 +32,7 @@ fn async_write() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -71,7 +71,7 @@ fn async_write() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); diff --git a/muxers/mplex/tests/two_peers.rs b/muxers/mplex/tests/two_peers.rs index 6be1cc4a4de..77e1a09997b 100644 --- a/muxers/mplex/tests/two_peers.rs +++ b/muxers/mplex/tests/two_peers.rs @@ -32,7 +32,7 @@ fn client_to_server_outbound() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -71,7 +71,7 @@ fn client_to_server_outbound() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = Arc::new(transport.dial(rx.await.unwrap()).unwrap().await.unwrap()); @@ -100,7 +100,7 @@ fn client_to_server_inbound() { let bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -147,7 +147,7 @@ fn client_to_server_inbound() { async_std::task::block_on(async { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let client = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); @@ -168,7 +168,7 @@ fn protocol_not_match() { let _bg_thread = async_std::task::spawn(async move { let mplex = libp2p_mplex::MplexConfig::new(); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); let mut listener = transport @@ -209,7 +209,7 @@ fn protocol_not_match() { // Make sure they do not connect when protocols do not match let mut mplex = libp2p_mplex::MplexConfig::new(); mplex.set_protocol_name(b"/mplextest/1.0.0"); - let transport = TcpConfig::new() + let mut transport = TcpConfig::new() .and_then(move |c, e| upgrade::apply(c, mplex, e, upgrade::Version::V1)); match transport.dial(rx.await.unwrap()).unwrap().await { Ok(_) => { diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index c73c0855dc9..e49a8845885 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.37.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.36.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index c2ce4244be5..461ddb8b3aa 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-yamux" edition = "2021" rust-version = "1.56.1" description = "Yamux multiplexing protocol for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } parking_lot = "0.12" thiserror = "1.0" yamux = "0.10.0" diff --git a/protocols/autonat/CHANGELOG.md b/protocols/autonat/CHANGELOG.md index 89baa415434..f76cf47e123 100644 --- a/protocols/autonat/CHANGELOG.md +++ b/protocols/autonat/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.4.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + +- Update to `libp2p-request-response` `v0.18.0`. + # 0.3.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/autonat/Cargo.toml b/protocols/autonat/Cargo.toml index 52197731ea8..cb5866b091e 100644 --- a/protocols/autonat/Cargo.toml +++ b/protocols/autonat/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-autonat" edition = "2021" rust-version = "1.56.1" description = "NAT and firewall detection for libp2p" -version = "0.3.0" +version = "0.4.0" authors = ["David Craven ", "Elena Frank "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -18,9 +18,9 @@ async-trait = "0.1" futures = "0.3" futures-timer = "3.0" instant = "0.1" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } -libp2p-request-response = { version = "0.17.0", path = "../request-response" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } +libp2p-request-response = { version = "0.18.0", path = "../request-response" } log = "0.4" rand = "0.8" prost = "0.10" diff --git a/protocols/dcutr/CHANGELOG.md b/protocols/dcutr/CHANGELOG.md index d852e4ba4c6..62e470b67c8 100644 --- a/protocols/dcutr/CHANGELOG.md +++ b/protocols/dcutr/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.3.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.2.0 - Expose `InboundUpgradeError` and `OutboundUpgradeError`. See [PR, 2586]. diff --git a/protocols/dcutr/Cargo.toml b/protocols/dcutr/Cargo.toml index 2ec1b224a42..6b8a84e399d 100644 --- a/protocols/dcutr/Cargo.toml +++ b/protocols/dcutr/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-dcutr" edition = "2021" rust-version = "1.56.1" description = "Direct connection upgrade through relay" -version = "0.2.0" +version = "0.3.0" authors = ["Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,8 +17,8 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3.0" instant = "0.1.11" -libp2p-core = { version = "0.32", path = "../../core" } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core" } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4" prost = "0.10" thiserror = "1.0" diff --git a/protocols/floodsub/CHANGELOG.md b/protocols/floodsub/CHANGELOG.md index 22e2c89c004..9896bc356f8 100644 --- a/protocols/floodsub/CHANGELOG.md +++ b/protocols/floodsub/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.36.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.35.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 2d433a802bb..81f53a3e16b 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-floodsub" edition = "2021" rust-version = "1.56.1" description = "Floodsub protocol for libp2p" -version = "0.35.0" +version = "0.36.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,8 +14,8 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4" prost = "0.10" rand = "0.7" diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 50d291e4459..2c8d7d37ee2 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.38.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.37.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 20e5b3168f8..33182e791aa 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-gossipsub" edition = "2021" rust-version = "1.56.1" description = "Gossipsub protocol for libp2p" -version = "0.37.0" +version = "0.38.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,8 +11,8 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" diff --git a/protocols/identify/CHANGELOG.md b/protocols/identify/CHANGELOG.md index 058f776d7a2..4a0356a7530 100644 --- a/protocols/identify/CHANGELOG.md +++ b/protocols/identify/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.36.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.35.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index ebcd40f1240..69945251e65 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-identify" edition = "2021" rust-version = "1.56.1" description = "Nodes identifcation protocol for libp2p" -version = "0.35.0" +version = "0.36.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,8 +13,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" futures-timer = "3.0.2" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4.1" lru = "0.7.2" prost = "0.10" diff --git a/protocols/identify/src/protocol.rs b/protocols/identify/src/protocol.rs index 2df49bf54f3..d54f5f4f9f7 100644 --- a/protocols/identify/src/protocol.rs +++ b/protocols/identify/src/protocol.rs @@ -276,7 +276,7 @@ mod tests { let (tx, rx) = oneshot::channel(); let bg_task = async_std::task::spawn(async move { - let transport = TcpConfig::new(); + let mut transport = TcpConfig::new(); let mut listener = transport .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap()) @@ -321,7 +321,7 @@ mod tests { }); async_std::task::block_on(async move { - let transport = TcpConfig::new(); + let mut transport = TcpConfig::new(); let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap(); let info = apply_outbound(socket, IdentifyProtocol, upgrade::Version::V1) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index c299f886720..e9aaa910c7c 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.37.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.36.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 26ec540bfcb..aea82612238 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-kad" edition = "2021" rust-version = "1.56.1" description = "Kademlia protocol for libp2p" -version = "0.36.0" +version = "0.37.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -18,8 +18,8 @@ fnv = "1.0" asynchronous-codec = "0.6" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } prost = "0.10" rand = "0.7.2" sha2 = "0.10.0" diff --git a/protocols/mdns/CHANGELOG.md b/protocols/mdns/CHANGELOG.md index 6fc082bb5f9..23adb9c74fd 100644 --- a/protocols/mdns/CHANGELOG.md +++ b/protocols/mdns/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.37.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.36.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 8873e701dcc..800ab92b17a 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mdns" edition = "2021" rust-version = "1.56.1" -version = "0.36.0" +version = "0.37.0" description = "Implementation of the libp2p mDNS discovery method" authors = ["Parity Technologies "] license = "MIT" @@ -17,8 +17,8 @@ dns-parser = "0.8.0" futures = "0.3.13" if-watch = "1.0.0" lazy_static = "1.4.0" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4.14" rand = "0.8.3" smallvec = "1.6.1" diff --git a/protocols/ping/CHANGELOG.md b/protocols/ping/CHANGELOG.md index d941b753176..9e0deab9c38 100644 --- a/protocols/ping/CHANGELOG.md +++ b/protocols/ping/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.36.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.35.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index e238f402deb..25c163034f5 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-ping" edition = "2021" rust-version = "1.56.1" description = "Ping protocol for libp2p" -version = "0.35.0" +version = "0.36.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,8 +14,8 @@ categories = ["network-programming", "asynchronous"] futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" void = "1.0" diff --git a/protocols/relay/CHANGELOG.md b/protocols/relay/CHANGELOG.md index 23dd2745e95..fe4f01624cf 100644 --- a/protocols/relay/CHANGELOG.md +++ b/protocols/relay/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.9.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.8.0 - Expose `{Inbound,Outbound}{Hop,Stop}UpgradeError`. See [PR 2586]. diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 8c49e41b4f1..a3aa84e9e53 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-relay" edition = "2021" rust-version = "1.56.1" description = "Communications relaying for libp2p" -version = "0.8.0" +version = "0.9.0" authors = ["Parity Technologies ", "Max Inden "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,8 +17,8 @@ either = "1.6.0" futures = "0.3.1" futures-timer = "3" instant = "0.1.11" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4" pin-project = "1" prost = "0.10" diff --git a/protocols/relay/src/v2/client/transport.rs b/protocols/relay/src/v2/client/transport.rs index 0a1cffad0bb..40eb6031fcc 100644 --- a/protocols/relay/src/v2/client/transport.rs +++ b/protocols/relay/src/v2/client/transport.rs @@ -54,7 +54,7 @@ use thiserror::Error; /// let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour( /// PeerId::random(), /// ); -/// let transport = OrTransport::new(relay_transport, actual_transport); +/// let mut transport = OrTransport::new(relay_transport, actual_transport); /// # let relay_id = PeerId::random(); /// # let destination_id = PeerId::random(); /// let dst_addr_via_relay = Multiaddr::empty() @@ -78,7 +78,7 @@ use thiserror::Error; /// let (relay_transport, behaviour) = client::Client::new_transport_and_behaviour( /// local_peer_id, /// ); -/// let transport = OrTransport::new(relay_transport, actual_transport); +/// let mut transport = OrTransport::new(relay_transport, actual_transport); /// let relay_addr = Multiaddr::empty() /// .with(Protocol::Memory(40)) // Relay address. /// .with(Protocol::P2p(relay_id.into())) // Relay peer id. @@ -108,7 +108,7 @@ impl ClientTransport { /// ); /// /// // To reduce unnecessary connection attempts, put `relay_transport` first. - /// let transport = OrTransport::new(relay_transport, actual_transport); + /// let mut transport = OrTransport::new(relay_transport, actual_transport); /// ``` pub(crate) fn new() -> (Self, mpsc::Receiver) { let (to_behaviour, from_transport) = mpsc::channel(0); @@ -124,7 +124,10 @@ impl Transport for ClientTransport { type ListenerUpgrade = Ready>; type Dial = RelayedDial; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let (relay_peer_id, relay_addr) = match parse_relayed_multiaddr(addr)? { RelayedMultiaddr { relay_peer_id: None, @@ -144,7 +147,7 @@ impl Transport for ClientTransport { }; let (to_listener, from_behaviour) = mpsc::channel(0); - let mut to_behaviour = self.to_behaviour; + let mut to_behaviour = self.to_behaviour.clone(); let msg_to_behaviour = Some( async move { to_behaviour @@ -165,7 +168,7 @@ impl Transport for ClientTransport { }) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let RelayedMultiaddr { relay_peer_id, relay_addr, @@ -178,7 +181,7 @@ impl Transport for ClientTransport { let relay_addr = relay_addr.ok_or(RelayError::MissingRelayAddr)?; let dst_peer_id = dst_peer_id.ok_or(RelayError::MissingDstPeerId)?; - let mut to_behaviour = self.to_behaviour; + let mut to_behaviour = self.to_behaviour.clone(); Ok(async move { let (tx, rx) = oneshot::channel(); to_behaviour @@ -197,7 +200,10 @@ impl Transport for ClientTransport { .boxed()) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> where Self: Sized, { diff --git a/protocols/rendezvous/CHANGELOG.md b/protocols/rendezvous/CHANGELOG.md index aa3a76802f3..d2c09314457 100644 --- a/protocols/rendezvous/CHANGELOG.md +++ b/protocols/rendezvous/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.6.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.5.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/rendezvous/Cargo.toml b/protocols/rendezvous/Cargo.toml index 93d6e9877f6..eee90d9154e 100644 --- a/protocols/rendezvous/Cargo.toml +++ b/protocols/rendezvous/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-rendezvous" edition = "2021" rust-version = "1.56.1" description = "Rendezvous protocol for libp2p" -version = "0.5.0" +version = "0.6.0" authors = ["The COMIT guys "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,8 +12,8 @@ categories = ["network-programming", "asynchronous"] [dependencies] asynchronous-codec = "0.6" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } prost = "0.10" void = "1" log = "0.4" diff --git a/protocols/request-response/CHANGELOG.md b/protocols/request-response/CHANGELOG.md index 65c933839a4..66e8b5bac1b 100644 --- a/protocols/request-response/CHANGELOG.md +++ b/protocols/request-response/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.18.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + +- Update to `libp2p-swarm` `v0.36.0`. + # 0.17.0 - Update to `libp2p-swarm` `v0.35.0`. diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index 8d696ec7b10..5d4dd1ca7b3 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-request-response" edition = "2021" rust-version = "1.56.1" description = "Generic Request/Response Protocols" -version = "0.17.0" +version = "0.18.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -15,8 +15,8 @@ async-trait = "0.1" bytes = "1" futures = "0.3.1" instant = "0.1.11" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } -libp2p-swarm = { version = "0.35.0", path = "../../swarm" } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } +libp2p-swarm = { version = "0.36.0", path = "../../swarm" } log = "0.4.11" rand = "0.7" smallvec = "1.6.1" diff --git a/src/bandwidth.rs b/src/bandwidth.rs index 1ff63cc5ec8..965c47d8bab 100644 --- a/src/bandwidth.rs +++ b/src/bandwidth.rs @@ -75,22 +75,28 @@ where type ListenerUpgrade = BandwidthFuture; type Dial = BandwidthFuture; - fn listen_on(self, addr: Multiaddr) -> Result> { - let sinks = self.sinks; + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { + let sinks = self.sinks.clone(); self.inner .listen_on(addr) .map(move |inner| BandwidthListener { inner, sinks }) } - fn dial(self, addr: Multiaddr) -> Result> { - let sinks = self.sinks; + fn dial(&mut self, addr: Multiaddr) -> Result> { + let sinks = self.sinks.clone(); self.inner .dial(addr) .map(move |fut| BandwidthFuture { inner: fut, sinks }) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { - let sinks = self.sinks; + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + let sinks = self.sinks.clone(); self.inner .dial_as_listener(addr) .map(move |fut| BandwidthFuture { inner: fut, sinks }) diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index 7ebe5883ccb..a334f46c6fa 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,3 +1,11 @@ +# 0.36.0 [unreleased] + +- Don't require `Transport` to be `Clone`. See [PR 2529]. + +- Update to `libp2p-core` `v0.33.0`. + +[PR 2529]: https://github.com/libp2p/rust-libp2p/pull/2529 + # 0.35.0 - Add impl `IntoIterator` for `MultiHandler`. See [PR 2572]. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 63709d36e8d..446b18b111b 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-swarm" edition = "2021" rust-version = "1.56.1" description = "The libp2p swarm" -version = "0.35.0" +version = "0.36.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -16,7 +16,7 @@ fnv = "1.0" futures = "0.3.1" futures-timer = "3.0.2" instant = "0.1.11" -libp2p-core = { version = "0.32.0", path = "../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../core", default-features = false } log = "0.4" pin-project = "1.0.0" rand = "0.7" diff --git a/swarm/src/connection/listeners.rs b/swarm/src/connection/listeners.rs index 8b4617afdbb..484a36dc15d 100644 --- a/swarm/src/connection/listeners.rs +++ b/swarm/src/connection/listeners.rs @@ -145,11 +145,8 @@ where pub fn listen_on( &mut self, addr: Multiaddr, - ) -> Result> - where - TTrans: Clone, - { - let listener = self.transport.clone().listen_on(addr)?; + ) -> Result> { + let listener = self.transport.listen_on(addr)?; self.listeners.push_back(Box::pin(Listener { id: self.next_id, listener, @@ -183,11 +180,16 @@ where } } - /// Returns the transport passed when building this object. + /// Returns a reference to the transport passed when building this object. pub fn transport(&self) -> &TTrans { &self.transport } + /// Returns a mutable reference to the transport passed when building this object. + pub fn transport_mut(&mut self) -> &mut TTrans { + &mut self.transport + } + /// Returns an iterator that produces the list of addresses we're listening on. pub fn listen_addrs(&self) -> impl Iterator { self.listeners.iter().flat_map(|l| l.addresses.iter()) @@ -365,7 +367,7 @@ mod tests { #[test] fn incoming_event() { async_std::task::block_on(async move { - let mem_transport = transport::MemoryTransport::default(); + let mut mem_transport = transport::MemoryTransport::default(); let mut listeners = ListenersStream::new(mem_transport); listeners.listen_on("/memory/0".parse().unwrap()).unwrap(); @@ -416,7 +418,7 @@ mod tests { type Dial = BoxFuture<'static, Result>; fn listen_on( - self, + &mut self, _: Multiaddr, ) -> Result> { Ok(Box::pin(stream::unfold((), |()| async move { @@ -430,14 +432,14 @@ mod tests { } fn dial( - self, + &mut self, _: Multiaddr, ) -> Result> { panic!() } fn dial_as_listener( - self, + &mut self, _: Multiaddr, ) -> Result> { panic!() @@ -479,7 +481,7 @@ mod tests { type Dial = BoxFuture<'static, Result>; fn listen_on( - self, + &mut self, _: Multiaddr, ) -> Result> { Ok(Box::pin(stream::unfold((), |()| async move { @@ -488,14 +490,14 @@ mod tests { } fn dial( - self, + &mut self, _: Multiaddr, ) -> Result> { panic!() } fn dial_as_listener( - self, + &mut self, _: Multiaddr, ) -> Result> { panic!() diff --git a/swarm/src/connection/pool.rs b/swarm/src/connection/pool.rs index bf21976f749..5fa1da1ef3f 100644 --- a/swarm/src/connection/pool.rs +++ b/swarm/src/connection/pool.rs @@ -477,15 +477,25 @@ where /// has been reached. pub fn add_outgoing( &mut self, - transport: TTrans, - addresses: impl Iterator + Send + 'static, + dials: Vec< + BoxFuture< + 'static, + ( + Multiaddr, + Result< + ::Output, + TransportError<::Error>, + >, + ), + >, + >, peer: Option, handler: THandler, role_override: Endpoint, dial_concurrency_factor_override: Option, ) -> Result where - TTrans: Clone + Send, + TTrans: Send, TTrans::Dial: Send + 'static, { if let Err(limit) = self.counters.check_max_pending_outgoing() { @@ -493,11 +503,8 @@ where }; let dial = ConcurrentDial::new( - transport, - peer, - addresses, + dials, dial_concurrency_factor_override.unwrap_or(self.dial_concurrency_factor), - role_override, ); let connection_id = self.next_connection_id(); diff --git a/swarm/src/connection/pool/concurrent_dial.rs b/swarm/src/connection/pool/concurrent_dial.rs index 194f18c8e5e..8f607ac7166 100644 --- a/swarm/src/connection/pool/concurrent_dial.rs +++ b/swarm/src/connection/pool/concurrent_dial.rs @@ -20,15 +20,13 @@ use crate::{ transport::{Transport, TransportError}, - Multiaddr, PeerId, + Multiaddr, }; use futures::{ - future::{BoxFuture, Future, FutureExt}, + future::{BoxFuture, Future}, ready, stream::{FuturesUnordered, StreamExt}, }; -use libp2p_core::connection::Endpoint; -use libp2p_core::multiaddr::Protocol; use std::{ num::NonZeroU8, pin::Pin, @@ -53,37 +51,13 @@ impl Unpin for ConcurrentDial {} impl ConcurrentDial where - TTrans: Transport + Clone + Send + 'static, + TTrans: Transport + Send + 'static, TTrans::Output: Send, TTrans::Error: Send, TTrans::Dial: Send + 'static, { - pub(crate) fn new( - transport: TTrans, - peer: Option, - addresses: impl Iterator + Send + 'static, - concurrency_factor: NonZeroU8, - role_override: Endpoint, - ) -> Self { - let mut pending_dials = addresses.map(move |address| match p2p_addr(peer, address) { - Ok(address) => { - let dial = match role_override { - Endpoint::Dialer => transport.clone().dial(address.clone()), - Endpoint::Listener => transport.clone().dial_as_listener(address.clone()), - }; - match dial { - Ok(fut) => fut - .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) - .boxed(), - Err(err) => futures::future::ready((address, Err(err))).boxed(), - } - } - Err(address) => futures::future::ready(( - address.clone(), - Err(TransportError::MultiaddrNotSupported(address)), - )) - .boxed(), - }); + pub(crate) fn new(pending_dials: Vec>, concurrency_factor: NonZeroU8) -> Self { + let mut pending_dials = pending_dials.into_iter(); let dials = FuturesUnordered::new(); while let Some(dial) = pending_dials.next() { @@ -137,29 +111,3 @@ where } } } - -/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer. -/// -/// If the given address is already a `p2p` address for the given peer, -/// i.e. the last encapsulated protocol is `/p2p/`, this is a no-op. -/// -/// If the given address is already a `p2p` address for a different peer -/// than the one given, the given `Multiaddr` is returned as an `Err`. -/// -/// If the given address is not yet a `p2p` address for the given peer, -/// the `/p2p/` protocol is appended to the returned address. -fn p2p_addr(peer: Option, addr: Multiaddr) -> Result { - let peer = match peer { - Some(p) => p, - None => return Ok(addr), - }; - - if let Some(Protocol::P2p(hash)) = addr.iter().last() { - if &hash != peer.as_ref() { - return Err(addr); - } - Ok(addr) - } else { - Ok(addr.with(Protocol::P2p(peer.into()))) - } -} diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index 5cc69030274..0242f2b25a9 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -91,7 +91,7 @@ use libp2p_core::{ muxing::StreamMuxerBox, transport::{self, TransportError}, upgrade::ProtocolName, - Executor, Multiaddr, Negotiated, PeerId, Transport, + Endpoint, Executor, Multiaddr, Negotiated, PeerId, Transport, }; use registry::{AddressIntoIter, Addresses}; use smallvec::SmallVec; @@ -502,9 +502,33 @@ where } }; + let dials = addresses + .map(|a| match p2p_addr(peer_id, a) { + Ok(address) => { + let dial = match role_override { + Endpoint::Dialer => self.listeners.transport_mut().dial(address.clone()), + Endpoint::Listener => self + .listeners + .transport_mut() + .dial_as_listener(address.clone()), + }; + match dial { + Ok(fut) => fut + .map(|r| (address, r.map_err(|e| TransportError::Other(e)))) + .boxed(), + Err(err) => futures::future::ready((address, Err(err))).boxed(), + } + } + Err(address) => futures::future::ready(( + address.clone(), + Err(TransportError::MultiaddrNotSupported(address)), + )) + .boxed(), + }) + .collect(); + match self.pool.add_outgoing( - self.listeners.transport().clone(), - addresses, + dials, peer_id, handler, role_override, @@ -1527,6 +1551,32 @@ impl NetworkInfo { } } +/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer. +/// +/// If the given address is already a `p2p` address for the given peer, +/// i.e. the last encapsulated protocol is `/p2p/`, this is a no-op. +/// +/// If the given address is already a `p2p` address for a different peer +/// than the one given, the given `Multiaddr` is returned as an `Err`. +/// +/// If the given address is not yet a `p2p` address for the given peer, +/// the `/p2p/` protocol is appended to the returned address. +fn p2p_addr(peer: Option, addr: Multiaddr) -> Result { + let peer = match peer { + Some(p) => p, + None => return Ok(addr), + }; + + if let Some(Protocol::P2p(hash)) = addr.iter().last() { + if &hash != peer.as_ref() { + return Err(addr); + } + Ok(addr) + } else { + Ok(addr.with(Protocol::P2p(peer.into()))) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/transports/deflate/CHANGELOG.md b/transports/deflate/CHANGELOG.md index bf0c4437f14..1bda3f3c23c 100644 --- a/transports/deflate/CHANGELOG.md +++ b/transports/deflate/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/deflate/Cargo.toml b/transports/deflate/Cargo.toml index 9ad297030ac..c728f34629f 100644 --- a/transports/deflate/Cargo.toml +++ b/transports/deflate/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-deflate" edition = "2021" rust-version = "1.56.1" description = "Deflate encryption protocol for libp2p" -version = "0.32.0" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } flate2 = "1.0" [dev-dependencies] diff --git a/transports/deflate/tests/test.rs b/transports/deflate/tests/test.rs index 6027c4f4afb..0bb27ed8d85 100644 --- a/transports/deflate/tests/test.rs +++ b/transports/deflate/tests/test.rs @@ -44,7 +44,7 @@ fn lot_of_data() { } async fn run(message1: Vec) { - let transport = TcpConfig::new().and_then(|conn, endpoint| { + let mut transport = TcpConfig::new().and_then(|conn, endpoint| { upgrade::apply( conn, DeflateConfig::default(), diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index 208bb73677f..64ee464b3c9 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.1 - Update to `trust-dns` `v0.21`. See [PR 2543]. diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 248e5c2c616..6b5c433ee35 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-dns" edition = "2021" rust-version = "1.56.1" description = "DNS transport implementation for libp2p" -version = "0.32.1" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,10 +11,11 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" async-std-resolver = { version = "0.21", optional = true } +parking_lot = "0.12.0" trust-dns-resolver = { version = "0.21", default-features = false, features = ["system-config"] } smallvec = "1.6.1" diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index 55684140152..7bdc9552688 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -63,9 +63,11 @@ use libp2p_core::{ transport::{ListenerEvent, TransportError}, Transport, }; +use parking_lot::Mutex; use smallvec::SmallVec; #[cfg(any(feature = "async-std", feature = "tokio"))] use std::io; +use std::sync::Arc; use std::{convert::TryFrom, error, fmt, iter, net::IpAddr, str}; #[cfg(any(feature = "async-std", feature = "tokio"))] use trust_dns_resolver::system_conf; @@ -112,7 +114,7 @@ where P: ConnectionProvider, { /// The underlying transport. - inner: T, + inner: Arc>, /// The DNS resolver used when dialing addresses with DNS components. resolver: AsyncResolver, } @@ -132,7 +134,7 @@ impl DnsConfig { opts: ResolverOpts, ) -> Result, io::Error> { Ok(DnsConfig { - inner, + inner: Arc::new(Mutex::new(inner)), resolver: async_std_resolver::resolver(cfg, opts).await?, }) } @@ -154,7 +156,7 @@ impl TokioDnsConfig { opts: ResolverOpts, ) -> Result, io::Error> { Ok(TokioDnsConfig { - inner, + inner: Arc::new(Mutex::new(inner)), resolver: TokioAsyncResolver::tokio(cfg, opts)?, }) } @@ -196,9 +198,13 @@ where BoxFuture<'static, Result>, >; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let listener = self .inner + .lock() .listen_on(addr) .map_err(|err| err.map(DnsErr::Transport))?; let listener = listener @@ -211,16 +217,19 @@ where Ok(listener) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { self.do_dial(addr, Endpoint::Dialer) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.do_dial(addr, Endpoint::Listener) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.inner.address_translation(server, observed) + self.inner.lock().address_translation(server, observed) } } @@ -233,16 +242,16 @@ where P: ConnectionProvider, { fn do_dial( - self, + &mut self, addr: Multiaddr, role_override: Endpoint, ) -> Result<::Dial, TransportError<::Error>> { + let resolver = self.resolver.clone(); + let inner = self.inner.clone(); + // Asynchronlously resolve all DNS names in the address before proceeding // with dialing on the underlying transport. Ok(async move { - let resolver = self.resolver; - let inner = self.inner; - let mut last_err = None; let mut dns_lookups = 0; let mut dial_attempts = 0; @@ -320,8 +329,8 @@ where let transport = inner.clone(); let dial = match role_override { - Endpoint::Dialer => transport.dial(addr), - Endpoint::Listener => transport.dial_as_listener(addr), + Endpoint::Dialer => transport.lock().dial(addr), + Endpoint::Listener => transport.lock().dial_as_listener(addr), }; let result = match dial { Ok(out) => { @@ -587,13 +596,13 @@ mod tests { type Dial = BoxFuture<'static, Result>; fn listen_on( - self, + &mut self, _: Multiaddr, ) -> Result> { unreachable!() } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { // Check that all DNS components have been resolved, i.e. replaced. assert!(!addr.iter().any(|p| match p { Protocol::Dns(_) @@ -606,7 +615,7 @@ mod tests { } fn dial_as_listener( - self, + &mut self, addr: Multiaddr, ) -> Result> { self.dial(addr) diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index d5e4dae5fa6..f6a5c71c402 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.36.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.35.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 7dff60f4c69..8bbed75cd7a 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-noise" edition = "2021" rust-version = "1.56.1" description = "Cryptographic handshake protocol using the noise framework." -version = "0.35.0" +version = "0.36.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ bytes = "1" curve25519-dalek = "3.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4" prost = "0.10" rand = "0.8.3" diff --git a/transports/noise/tests/smoke.rs b/transports/noise/tests/smoke.rs index e1e1e1c0c04..5c745c9463c 100644 --- a/transports/noise/tests/smoke.rs +++ b/transports/noise/tests/smoke.rs @@ -232,7 +232,7 @@ fn ik_xx() { type Output = (RemoteIdentity, NoiseOutput>>); -fn run(server_transport: T, client_transport: U, messages: I) +fn run(mut server_transport: T, mut client_transport: U, messages: I) where T: Transport>, T::Dial: Send + 'static, diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index 121ea426130..980ca1842a3 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index 2190c6baadb..a5678d014ca 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-plaintext" edition = "2021" rust-version = "1.56.1" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.32.0" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,7 +14,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4.8" prost = "0.10" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/transports/plaintext/tests/smoke.rs b/transports/plaintext/tests/smoke.rs index ce155bdd92e..ec20e8ff20e 100644 --- a/transports/plaintext/tests/smoke.rs +++ b/transports/plaintext/tests/smoke.rs @@ -45,7 +45,7 @@ fn variable_msg_length() { let client_id_public = client_id.public(); futures::executor::block_on(async { - let server_transport = + let mut server_transport = libp2p_core::transport::MemoryTransport {}.and_then(move |output, endpoint| { upgrade::apply( output, @@ -57,7 +57,7 @@ fn variable_msg_length() { ) }); - let client_transport = + let mut client_transport = libp2p_core::transport::MemoryTransport {}.and_then(move |output, endpoint| { upgrade::apply( output, diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index 9c787c8b4fa..0e44d181e31 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index f3234a7716b..67e28b35ff7 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-tcp" edition = "2021" rust-version = "1.56.1" description = "TCP/IP transport protocol for libp2p" -version = "0.32.0" +version = "0.33.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -18,7 +18,7 @@ if-watch = { version = "1.0.0", optional = true } if-addrs = { version = "0.7.0", optional = true } ipnet = "2.0.0" libc = "0.2.80" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true } diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 99eb8f46c12..cacf13bb7d5 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -59,7 +59,6 @@ use std::{ io, net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener}, pin::Pin, - sync::{Arc, RwLock}, task::{Context, Poll}, time::Duration, }; @@ -67,11 +66,6 @@ use std::{ use provider::{IfEvent, Provider}; /// The configuration for a TCP/IP transport capability for libp2p. -/// -/// A [`GenTcpConfig`] implements the [`Transport`] interface and thus -/// is consumed on [`Transport::listen_on`] and [`Transport::dial`]. -/// However, the config can be cheaply cloned to perform multiple such -/// operations with the same config. #[derive(Clone, Debug)] pub struct GenTcpConfig { /// The type of the I/O provider. @@ -101,7 +95,7 @@ enum PortReuse { Enabled { /// The addresses and ports of the listening sockets /// registered as eligible for port reuse when dialing. - listen_addrs: Arc>>, + listen_addrs: HashSet<(IpAddr, Port)>, }, } @@ -112,10 +106,7 @@ impl PortReuse { fn register(&mut self, ip: IpAddr, port: Port) { if let PortReuse::Enabled { listen_addrs } = self { log::trace!("Registering for port reuse: {}:{}", ip, port); - listen_addrs - .write() - .expect("`register()` and `unregister()` never panic while holding the lock") - .insert((ip, port)); + listen_addrs.insert((ip, port)); } } @@ -125,10 +116,7 @@ impl PortReuse { fn unregister(&mut self, ip: IpAddr, port: Port) { if let PortReuse::Enabled { listen_addrs } = self { log::trace!("Unregistering for port reuse: {}:{}", ip, port); - listen_addrs - .write() - .expect("`register()` and `unregister()` never panic while holding the lock") - .remove(&(ip, port)); + listen_addrs.remove(&(ip, port)); } } @@ -143,11 +131,7 @@ impl PortReuse { /// listening socket address is found. fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option { if let PortReuse::Enabled { listen_addrs } = self { - for (ip, port) in listen_addrs - .read() - .expect("`register()` and `unregister()` never panic while holding the lock") - .iter() - { + for (ip, port) in listen_addrs.iter() { if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() { @@ -243,15 +227,12 @@ where /// > a single outgoing connection to a particular address and port /// > of a peer per local listening socket address. /// - /// If enabled, the returned `GenTcpConfig` and all of its `Clone`s - /// keep track of the listen socket addresses as they are reported - /// by polling [`TcpListenStream`]s obtained from [`GenTcpConfig::listen_on()`]. - /// - /// In contrast, two `GenTcpConfig`s constructed separately via [`GenTcpConfig::new()`] - /// maintain these addresses independently. It is thus possible to listen on - /// multiple addresses, enabling port reuse for each, knowing exactly which - /// listen address is reused when dialing with a specific `GenTcpConfig`, as in - /// the following example: + /// `GenTcpConfig` keeps track of the listen socket addresses as they + /// are reported by polling [`TcpListenStream`]s obtained from + /// [`GenTcpConfig::listen_on()`]. It is possible to listen on multiple + /// addresses, enabling port reuse for each, knowing exactly which listen + /// address is reused when dialing with a specific `GenTcpConfig`, as in the + /// following example: /// /// ```no_run /// # use libp2p_core::transport::ListenerEvent; @@ -265,7 +246,7 @@ where /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap(); /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap(); /// - /// let tcp1 = TcpConfig::new().port_reuse(true); + /// let mut tcp1 = TcpConfig::new().port_reuse(true); /// let mut listener1 = tcp1.clone().listen_on(listen_addr1.clone()).expect("listener"); /// match listener1.next().await.expect("event")? { /// ListenerEvent::NewAddress(listen_addr) => { @@ -276,7 +257,7 @@ where /// _ => {} /// } /// - /// let tcp2 = TcpConfig::new().port_reuse(true); + /// let mut tcp2 = TcpConfig::new().port_reuse(true); /// let mut listener2 = tcp2.clone().listen_on(listen_addr2).expect("listener"); /// match listener2.next().await.expect("event")? { /// ListenerEvent::NewAddress(listen_addr) => { @@ -290,15 +271,14 @@ where /// } /// ``` /// - /// If a single `GenTcpConfig` is used and cloned for the creation of multiple - /// listening sockets or a wildcard listen socket address is used to listen - /// on any interface, there can be multiple such addresses registered for - /// port reuse. In this case, one is chosen whose IP protocol version and - /// loopback status is the same as that of the remote address. Consequently, for - /// maximum control of the local listening addresses and ports that are used - /// for outgoing connections, a new `GenTcpConfig` should be created for each - /// listening socket, avoiding the use of wildcard addresses which bind a - /// socket to all network interfaces. + /// If a wildcard listen socket address is used to listen on any interface, + /// there can be multiple such addresses registered for port reuse. In this + /// case, one is chosen whose IP protocol version and loopback status is the + /// same as that of the remote address. Consequently, for maximum control of + /// the local listening addresses and ports that are used for outgoing + /// connections, a new `GenTcpConfig` should be created for each listening + /// socket, avoiding the use of wildcard addresses which bind a socket to + /// all network interfaces. /// /// When this option is enabled on a unix system, the socket /// option `SO_REUSEPORT` is set, if available, to permit @@ -306,7 +286,7 @@ where pub fn port_reuse(mut self, port_reuse: bool) -> Self { self.port_reuse = if port_reuse { PortReuse::Enabled { - listen_addrs: Arc::new(RwLock::new(HashSet::new())), + listen_addrs: HashSet::new(), } } else { PortReuse::Disabled @@ -339,33 +319,12 @@ where Ok(socket) } - fn do_listen(self, socket_addr: SocketAddr) -> io::Result> { + fn do_listen(&mut self, socket_addr: SocketAddr) -> io::Result> { let socket = self.create_socket(&socket_addr)?; socket.bind(&socket_addr.into())?; socket.listen(self.backlog as _)?; socket.set_nonblocking(true)?; - TcpListenStream::::new(socket.into(), self.port_reuse) - } - - async fn do_dial(self, socket_addr: SocketAddr) -> Result { - let socket = self.create_socket(&socket_addr)?; - - if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) { - log::trace!("Binding dial socket to listen socket {}", addr); - socket.bind(&addr.into())?; - } - - socket.set_nonblocking(true)?; - - match socket.connect(&socket_addr.into()) { - Ok(()) => {} - Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} - Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} - Err(err) => return Err(err), - }; - - let stream = T::new_stream(socket.into()).await?; - Ok(stream) + TcpListenStream::::new(socket.into(), self.port_reuse.clone()) } } @@ -382,7 +341,10 @@ where type Listener = TcpListenStream; type ListenerUpgrade = Ready>; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) { sa } else { @@ -392,7 +354,7 @@ where self.do_listen(socket_addr).map_err(TransportError::Other) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) { if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -402,10 +364,40 @@ where return Err(TransportError::MultiaddrNotSupported(addr)); }; log::debug!("dialing {}", socket_addr); - Ok(Box::pin(self.do_dial(socket_addr))) + + let socket = self + .create_socket(&socket_addr) + .map_err(TransportError::Other)?; + + if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) { + log::trace!("Binding dial socket to listen socket {}", addr); + socket.bind(&addr.into()).map_err(TransportError::Other)?; + } + + socket + .set_nonblocking(true) + .map_err(TransportError::Other)?; + + Ok(async move { + // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus + // do the `connect` call within the [`Future`]. + match socket.connect(&socket_addr.into()) { + Ok(()) => {} + Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {} + Err(err) if err.kind() == io::ErrorKind::WouldBlock => {} + Err(err) => return Err(err), + }; + + let stream = T::new_stream(socket.into()).await?; + Ok(stream) + } + .boxed()) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.dial(addr) } @@ -765,7 +757,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let tcp = GenTcpConfig::::new(); + let mut tcp = GenTcpConfig::::new(); let mut listener = tcp.listen_on(addr).unwrap(); loop { match listener.next().await.unwrap().unwrap() { @@ -787,7 +779,7 @@ mod tests { async fn dialer(mut ready_rx: mpsc::Receiver) { let addr = ready_rx.next().await.unwrap(); - let tcp = GenTcpConfig::::new(); + let mut tcp = GenTcpConfig::::new(); // Obtain a future socket through dialing let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap(); @@ -834,7 +826,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let tcp = GenTcpConfig::::new(); + let mut tcp = GenTcpConfig::::new(); let mut listener = tcp.listen_on(addr).unwrap(); loop { @@ -863,7 +855,7 @@ mod tests { async fn dialer(mut ready_rx: mpsc::Receiver) { let dest_addr = ready_rx.next().await.unwrap(); - let tcp = GenTcpConfig::::new(); + let mut tcp = GenTcpConfig::::new(); tcp.dial(dest_addr).unwrap().await.unwrap(); } @@ -903,7 +895,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let tcp = GenTcpConfig::::new(); + let mut tcp = GenTcpConfig::::new(); let mut listener = tcp.listen_on(addr).unwrap(); loop { match listener.next().await.unwrap().unwrap() { @@ -925,7 +917,7 @@ mod tests { async fn dialer(addr: Multiaddr, mut ready_rx: mpsc::Receiver) { let dest_addr = ready_rx.next().await.unwrap(); - let tcp = GenTcpConfig::::new().port_reuse(true); + let mut tcp = GenTcpConfig::::new().port_reuse(true); let mut listener = tcp.clone().listen_on(addr).unwrap(); match listener.next().await.unwrap().unwrap() { ListenerEvent::NewAddress(_) => { @@ -1061,13 +1053,13 @@ mod tests { fn test(addr: Multiaddr) { #[cfg(feature = "async-io")] { - let tcp = TcpConfig::new(); + let mut tcp = TcpConfig::new(); assert!(tcp.listen_on(addr.clone()).is_err()); } #[cfg(feature = "tokio")] { - let tcp = TokioTcpConfig::new(); + let mut tcp = TokioTcpConfig::new(); assert!(tcp.listen_on(addr.clone()).is_err()); } } diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index 1a433d16850..2e9f390b887 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] async-std = { version = "1.6.2", optional = true } -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4.1" futures = "0.3.1" tokio = { version = "1.15", default-features = false, features = ["net"], optional = true } diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index a5fabc162a0..6dd0852396c 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -70,7 +70,7 @@ impl Transport for $uds_config { type ListenerUpgrade = Ready>; type Dial = BoxFuture<'static, Result>; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on(&mut self, addr: Multiaddr) -> Result> { if let Ok(path) = multiaddr_to_path(&addr) { Ok(async move { $build_listener(&path).await } .map_ok(move |listener| { @@ -104,7 +104,7 @@ impl Transport for $uds_config { } } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { // TODO: Should we dial at all? if let Ok(path) = multiaddr_to_path(&addr) { debug!("Dialing {}", addr); @@ -114,7 +114,7 @@ impl Transport for $uds_config { } } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener(&mut self, addr: Multiaddr) -> Result> { self.dial(addr) } @@ -228,7 +228,7 @@ mod tests { }); async_std::task::block_on(async move { - let uds = UdsConfig::new(); + let mut uds = UdsConfig::new(); let addr = rx.await.unwrap(); let mut socket = uds.dial(addr).unwrap().await.unwrap(); socket.write(&[1, 2, 3]).await.unwrap(); @@ -238,7 +238,7 @@ mod tests { #[test] #[ignore] // TODO: for the moment unix addresses fail to parse fn larger_addr_denied() { - let uds = UdsConfig::new(); + let mut uds = UdsConfig::new(); let addr = "/unix//foo/bar".parse::().unwrap(); assert!(uds.listen_on(addr).is_err()); diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index 3b741c2e494..f08d26a6800 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.33.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.32.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index c189a80a6c0..1531f48fa0a 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-wasm-ext" edition = "2021" rust-version = "1.56.1" description = "Allows passing in an external transport in a WASM environment" -version = "0.32.0" +version = "0.33.0" authors = ["Pierre Krieger "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.50" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/wasm-ext/src/lib.rs b/transports/wasm-ext/src/lib.rs index be32907d2f5..44f3dcbd54a 100644 --- a/transports/wasm-ext/src/lib.rs +++ b/transports/wasm-ext/src/lib.rs @@ -157,7 +157,7 @@ impl ExtTransport { } } fn do_dial( - self, + &mut self, addr: Multiaddr, role_override: Endpoint, ) -> Result<::Dial, TransportError<::Error>> { @@ -202,7 +202,10 @@ impl Transport for ExtTransport { type ListenerUpgrade = Ready>; type Dial = Dial; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let iter = self.inner.listen_on(&addr.to_string()).map_err(|err| { if is_not_supported_error(&err) { TransportError::MultiaddrNotSupported(addr) @@ -218,14 +221,17 @@ impl Transport for ExtTransport { }) } - fn dial(self, addr: Multiaddr) -> Result> + fn dial(&mut self, addr: Multiaddr) -> Result> where Self: Sized, { self.do_dial(addr, Endpoint::Dialer) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> where Self: Sized, { diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 97a5189a2c6..9d8a32c861a 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.35.0 [unreleased] + +- Update to `libp2p-core` `v0.33.0`. + # 0.34.0 [2022-02-22] - Update to `libp2p-core` `v0.32.0`. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index 2797562d51f..bc93facae9b 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -3,7 +3,7 @@ name = "libp2p-websocket" edition = "2021" rust-version = "1.56.1" description = "WebSocket transport for libp2p" -version = "0.34.0" +version = "0.35.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -14,8 +14,9 @@ categories = ["network-programming", "asynchronous"] futures-rustls = "0.22" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.32.0", path = "../../core", default-features = false } +libp2p-core = { version = "0.33.0", path = "../../core", default-features = false } log = "0.4.8" +parking_lot = "0.12.0" quicksink = "0.1" rw-stream-sink = "0.2.0" soketto = { version = "0.7.0", features = ["deflate"] } diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index 1e009248fad..c04e7354587 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -30,11 +30,13 @@ use libp2p_core::{ Transport, }; use log::{debug, trace}; +use parking_lot::Mutex; use soketto::{ connection::{self, CloseReason}, extension::deflate::Deflate, handshake, }; +use std::sync::Arc; use std::{convert::TryInto, fmt, io, mem, pin::Pin, task::Context, task::Poll}; use url::Url; @@ -44,20 +46,32 @@ const MAX_DATA_SIZE: usize = 256 * 1024 * 1024; /// A Websocket transport whose output type is a [`Stream`] and [`Sink`] of /// frame payloads which does not implement [`AsyncRead`] or /// [`AsyncWrite`]. See [`crate::WsConfig`] if you require the latter. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct WsConfig { - transport: T, + transport: Arc>, max_data_size: usize, tls_config: tls::Config, max_redirects: u8, use_deflate: bool, } +impl Clone for WsConfig { + fn clone(&self) -> Self { + Self { + transport: self.transport.clone(), + max_data_size: self.max_data_size, + tls_config: self.tls_config.clone(), + max_redirects: self.max_redirects, + use_deflate: self.use_deflate, + } + } +} + impl WsConfig { /// Create a new websocket transport based on another transport. pub fn new(transport: T) -> Self { WsConfig { - transport, + transport: Arc::new(Mutex::new(transport)), max_data_size: MAX_DATA_SIZE, tls_config: tls::Config::client(), max_redirects: 0, @@ -104,7 +118,7 @@ type TlsOrPlain = EitherOutput, server::Tls impl Transport for WsConfig where - T: Transport + Send + Clone + 'static, + T: Transport + Send + 'static, T::Error: Send + 'static, T::Dial: Send + 'static, T::Listener: Send + 'static, @@ -118,7 +132,10 @@ where type ListenerUpgrade = BoxFuture<'static, Result>; type Dial = BoxFuture<'static, Result>; - fn listen_on(self, addr: Multiaddr) -> Result> { + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { let mut inner_addr = addr.clone(); let (use_tls, proto) = match inner_addr.pop() { @@ -137,11 +154,12 @@ where } }; - let tls_config = self.tls_config; + let tls_config = self.tls_config.clone(); let max_size = self.max_data_size; let use_deflate = self.use_deflate; let transport = self .transport + .lock() .listen_on(inner_addr) .map_err(|e| e.map(Error::Transport))?; let listen = transport @@ -245,22 +263,25 @@ where Ok(Box::pin(listen)) } - fn dial(self, addr: Multiaddr) -> Result> { + fn dial(&mut self, addr: Multiaddr) -> Result> { self.do_dial(addr, Endpoint::Dialer) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { self.do_dial(addr, Endpoint::Listener) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { - self.transport.address_translation(server, observed) + self.transport.lock().address_translation(server, observed) } } impl WsConfig where - T: Transport + Send + Clone + 'static, + T: Transport + Send + 'static, T::Error: Send + 'static, T::Dial: Send + 'static, T::Listener: Send + 'static, @@ -268,11 +289,11 @@ where T::Output: AsyncRead + AsyncWrite + Unpin + Send + 'static, { fn do_dial( - self, + &mut self, addr: Multiaddr, role_override: Endpoint, ) -> Result<::Dial, TransportError<::Error>> { - let addr = match parse_ws_dial_addr(addr) { + let mut addr = match parse_ws_dial_addr(addr) { Ok(addr) => addr, Err(Error::InvalidMultiaddr(a)) => { return Err(TransportError::MultiaddrNotSupported(a)) @@ -282,14 +303,14 @@ where // We are looping here in order to follow redirects (if any): let mut remaining_redirects = self.max_redirects; - let mut addr = addr; + + let mut this = self.clone(); let future = async move { loop { - let this = self.clone(); match this.dial_once(addr, role_override).await { Ok(Either::Left(redirect)) => { if remaining_redirects == 0 { - debug!("Too many redirects (> {})", self.max_redirects); + debug!("Too many redirects (> {})", this.max_redirects); return Err(Error::TooManyRedirects); } remaining_redirects -= 1; @@ -305,15 +326,15 @@ where } /// Attempts to dial the given address and perform a websocket handshake. async fn dial_once( - self, + &mut self, addr: WsAddress, role_override: Endpoint, ) -> Result>, Error> { trace!("Dialing websocket address: {:?}", addr); let dial = match role_override { - Endpoint::Dialer => self.transport.dial(addr.tcp_addr), - Endpoint::Listener => self.transport.dial_as_listener(addr.tcp_addr), + Endpoint::Dialer => self.transport.lock().dial(addr.tcp_addr), + Endpoint::Listener => self.transport.lock().dial_as_listener(addr.tcp_addr), } .map_err(|e| match e { TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 369e9000855..a6a1c0971c1 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -45,11 +45,23 @@ use std::{ /// A Websocket transport. #[derive(Debug, Clone)] -pub struct WsConfig { - transport: framed::WsConfig, +pub struct WsConfig +where + T: Transport, + T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ + transport: libp2p_core::transport::map::Map, WrapperFn>, } -impl WsConfig { +impl WsConfig +where + T: Transport + Send + 'static, + T::Error: Send + 'static, + T::Dial: Send + 'static, + T::Listener: Send + 'static, + T::ListenerUpgrade: Send + 'static, + T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static, +{ /// Create a new websocket transport based on the given transport. /// /// > **Note*: The given transport must be based on TCP/IP and should @@ -59,53 +71,50 @@ impl WsConfig { /// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing /// > the inner transport. pub fn new(transport: T) -> Self { - framed::WsConfig::new(transport).into() + Self { + transport: framed::WsConfig::new(transport) + .map(wrap_connection as WrapperFn), + } } /// Return the configured maximum number of redirects. pub fn max_redirects(&self) -> u8 { - self.transport.max_redirects() + self.transport.inner().max_redirects() } /// Set max. number of redirects to follow. pub fn set_max_redirects(&mut self, max: u8) -> &mut Self { - self.transport.set_max_redirects(max); + self.transport.inner_mut().set_max_redirects(max); self } /// Get the max. frame data size we support. pub fn max_data_size(&self) -> usize { - self.transport.max_data_size() + self.transport.inner().max_data_size() } /// Set the max. frame data size we support. pub fn set_max_data_size(&mut self, size: usize) -> &mut Self { - self.transport.set_max_data_size(size); + self.transport.inner_mut().set_max_data_size(size); self } /// Set the TLS configuration if TLS support is desired. pub fn set_tls_config(&mut self, c: tls::Config) -> &mut Self { - self.transport.set_tls_config(c); + self.transport.inner_mut().set_tls_config(c); self } /// Should the deflate extension (RFC 7692) be used if supported? pub fn use_deflate(&mut self, flag: bool) -> &mut Self { - self.transport.use_deflate(flag); + self.transport.inner_mut().use_deflate(flag); self } } -impl From> for WsConfig { - fn from(framed: framed::WsConfig) -> Self { - WsConfig { transport: framed } - } -} - impl Transport for WsConfig where - T: Transport + Send + Clone + 'static, + T: Transport + Send + 'static, T::Error: Send + 'static, T::Dial: Send + 'static, T::Listener: Send + 'static, @@ -118,22 +127,22 @@ where type ListenerUpgrade = MapFuture, WrapperFn>; type Dial = MapFuture, WrapperFn>; - fn listen_on(self, addr: Multiaddr) -> Result> { - self.transport - .map(wrap_connection as WrapperFn) - .listen_on(addr) + fn listen_on( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.transport.listen_on(addr) } - fn dial(self, addr: Multiaddr) -> Result> { - self.transport - .map(wrap_connection as WrapperFn) - .dial(addr) + fn dial(&mut self, addr: Multiaddr) -> Result> { + self.transport.dial(addr) } - fn dial_as_listener(self, addr: Multiaddr) -> Result> { - self.transport - .map(wrap_connection as WrapperFn) - .dial_as_listener(addr) + fn dial_as_listener( + &mut self, + addr: Multiaddr, + ) -> Result> { + self.transport.dial_as_listener(addr) } fn address_translation(&self, server: &Multiaddr, observed: &Multiaddr) -> Option { @@ -228,9 +237,9 @@ mod tests { } async fn connect(listen_addr: Multiaddr) { - let ws_config = WsConfig::new(tcp::TcpConfig::new()); + let ws_config = || WsConfig::new(tcp::TcpConfig::new()); - let mut listener = ws_config.clone().listen_on(listen_addr).expect("listener"); + let mut listener = ws_config().listen_on(listen_addr).expect("listener"); let addr = listener .try_next() @@ -253,7 +262,7 @@ mod tests { conn.await }; - let outbound = ws_config + let outbound = ws_config() .dial(addr.with(Protocol::P2p(PeerId::random().into()))) .unwrap();