diff --git a/Cargo.toml b/Cargo.toml index dfe9c360516..656bbebdf04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,35 +64,35 @@ atomic = "0.5.0" bytes = "1" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.27.2", path = "core", default-features = false } +libp2p-core = { version = "0.28.0", path = "core", default-features = false } libp2p-floodsub = { version = "0.28.0", path = "protocols/floodsub", optional = true } libp2p-gossipsub = { version = "0.29.0", path = "./protocols/gossipsub", optional = true } libp2p-identify = { version = "0.28.0", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.29.0", path = "protocols/kad", optional = true } -libp2p-mplex = { version = "0.27.2", path = "muxers/mplex", optional = true } -libp2p-noise = { version = "0.29.0", path = "transports/noise", optional = true } +libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true } +libp2p-noise = { version = "0.30.0", path = "transports/noise", optional = true } libp2p-ping = { version = "0.28.0", path = "protocols/ping", optional = true } -libp2p-plaintext = { version = "0.27.1", path = "transports/plaintext", optional = true } +libp2p-plaintext = { version = "0.28.0", path = "transports/plaintext", optional = true } libp2p-pnet = { version = "0.20.0", path = "transports/pnet", optional = true } libp2p-relay = { version = "0.1.0", path = "protocols/relay", optional = true } libp2p-request-response = { version = "0.10.0", path = "protocols/request-response", optional = true } libp2p-swarm = { version = "0.28.0", path = "swarm" } libp2p-swarm-derive = { version = "0.22.0", path = "swarm-derive" } -libp2p-uds = { version = "0.27.0", path = "transports/uds", optional = true } -libp2p-wasm-ext = { version = "0.27.0", path = "transports/wasm-ext", default-features = false, optional = true } -libp2p-yamux = { version = "0.30.1", path = "muxers/yamux", optional = true } -multiaddr = { package = "parity-multiaddr", version = "0.11.1", path = "misc/multiaddr" } +libp2p-uds = { version = "0.28.0", path = "transports/uds", optional = true } +libp2p-wasm-ext = { version = "0.28.0", path = "transports/wasm-ext", default-features = false, optional = true } +libp2p-yamux = { version = "0.31.0", path = "muxers/yamux", optional = true } +multiaddr = { package = "parity-multiaddr", version = "0.11.2", path = "misc/multiaddr" } parking_lot = "0.11.0" pin-project = "1.0.0" smallvec = "1.6.1" wasm-timer = "0.2.4" [target.'cfg(not(any(target_os = "emscripten", target_os = "wasi", target_os = "unknown")))'.dependencies] -libp2p-deflate = { version = "0.27.1", path = "transports/deflate", optional = true } +libp2p-deflate = { version = "0.28.0", path = "transports/deflate", optional = true } libp2p-dns = { version = "0.28.0", path = "transports/dns", optional = true, default-features = false } libp2p-mdns = { version = "0.29.0", path = "protocols/mdns", optional = true } -libp2p-tcp = { version = "0.27.1", path = "transports/tcp", default-features = false, optional = true } -libp2p-websocket = { version = "0.28.0", path = "transports/websocket", optional = true } +libp2p-tcp = { version = "0.28.0", path = "transports/tcp", default-features = false, optional = true } +libp2p-websocket = { version = "0.29.0", path = "transports/websocket", optional = true } [dev-dependencies] async-std = { version = "1.6.2", features = ["attributes"] } diff --git a/core/CHANGELOG.md b/core/CHANGELOG.md index 091b62b3f93..bc4b0558ecc 100644 --- a/core/CHANGELOG.md +++ b/core/CHANGELOG.md @@ -1,4 +1,12 @@ -# 0.27.2 [unreleased] +# 0.28.0 [unreleased] + +- `Network::dial()` understands `/p2p` addresses and `Transport::dial` + gets a "fully qualified" `/p2p` address when dialing a specific peer, + whether through the `Network::peer()` API or via `Network::dial()` + with a `/p2p` address. + +- `Network::dial()` and `network::Peer::dial()` return a `DialError` + on error. - Shorten and unify `Debug` impls of public keys. diff --git a/core/Cargo.toml b/core/Cargo.toml index 21650fcb254..83f12d2b3f7 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-core" edition = "2018" description = "Core traits and structs of libp2p" -version = "0.27.2" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/core/src/connection/pool.rs b/core/src/connection/pool.rs index 2bc7ca087a2..6615219ea44 100644 --- a/core/src/connection/pool.rs +++ b/core/src/connection/pool.rs @@ -554,7 +554,7 @@ impl /// Returns an iterator over all connected peers, i.e. those that have /// at least one established connection in the pool. - pub fn iter_connected<'a>(&'a self) -> impl Iterator + 'a { + pub fn iter_connected(&self) -> impl Iterator { self.established.keys() } diff --git a/core/src/network.rs b/core/src/network.rs index 2667ca41601..806112a4730 100644 --- a/core/src/network.rs +++ b/core/src/network.rs @@ -209,13 +209,14 @@ where &self.local_peer_id } - /// Dials a multiaddress without expecting a particular remote peer ID. + /// Dials a [`Multiaddr`] that may or may not encapsulate a + /// specific expected remote peer ID. /// /// The given `handler` will be used to create the /// [`Connection`](crate::connection::Connection) upon success and the /// connection ID is returned. pub fn dial(&mut self, address: &Multiaddr, handler: THandler) - -> Result + -> Result where TTrans: Transport, TTrans::Error: Send + 'static, @@ -225,15 +226,32 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { + // If the address ultimately encapsulates an expected peer ID, dial that peer + // such that any mismatch is detected. We do not "pop off" the `P2p` protocol + // from the address, because it may be used by the `Transport`, i.e. `P2p` + // is a protocol component that can influence any transport, like `libp2p-dns`. + if let Some(multiaddr::Protocol::P2p(ma)) = address.iter().last() { + if let Ok(peer) = PeerId::try_from(ma) { + return self.dial_peer(DialingOpts { + peer, + address: address.clone(), + handler, + remaining: Vec::new(), + }) + } + } + + // The address does not specify an expected peer, so just try to dial it as-is, + // accepting any peer ID that the remote identifies as. let info = OutgoingInfo { address, peer_id: None }; match self.transport().clone().dial(address.clone()) { Ok(f) => { let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err))); - self.pool.add_outgoing(f, handler, info) + self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit) } Err(err) => { let f = future::err(PendingConnectionError::Transport(err)); - self.pool.add_outgoing(f, handler, info) + self.pool.add_outgoing(f, handler, info).map_err(DialError::ConnectionLimit) } } } @@ -430,7 +448,7 @@ where /// Initiates a connection attempt to a known peer. fn dial_peer(&mut self, opts: DialingOpts) - -> Result + -> Result where TTrans: Transport, TTrans::Dial: Send + 'static, @@ -460,7 +478,7 @@ fn dial_peer_impl( ::Error>, dialing: &mut FnvHashMap>, opts: DialingOpts -) -> Result +) -> Result where THandler: IntoConnectionHandler + Send + 'static, ::Error: error::Error + Send + 'static, @@ -478,23 +496,28 @@ where TInEvent: Send + 'static, TOutEvent: Send + 'static, { - let result = match transport.dial(opts.address.clone()) { + // Ensure the address to dial encapsulates the `p2p` protocol for the + // targeted peer, so that the transport has a "fully qualified" address + // to work with. + let addr = p2p_addr(opts.peer, opts.address).map_err(DialError::InvalidAddress)?; + + let result = match transport.dial(addr.clone()) { Ok(fut) => { let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e))); - let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) }; - pool.add_outgoing(fut, opts.handler, info) + let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) }; + pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit) }, Err(err) => { let fut = future::err(PendingConnectionError::Transport(err)); - let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) }; - pool.add_outgoing(fut, opts.handler, info) + let info = OutgoingInfo { address: &addr, peer_id: Some(&opts.peer) }; + pool.add_outgoing(fut, opts.handler, info).map_err(DialError::ConnectionLimit) }, }; if let Ok(id) = &result { dialing.entry(opts.peer).or_default().push( peer::DialingState { - current: (*id, opts.address), + current: (*id, addr), remaining: opts.remaining, }, ); @@ -668,6 +691,37 @@ impl NetworkConfig { } } +/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer. +/// +/// If the given address is already a `p2p` address for the given peer, +/// i.e. the last encapsulated protocol is `/p2p/`, this is a no-op. +/// +/// If the given address is already a `p2p` address for a different peer +/// than the one given, the given `Multiaddr` is returned as an `Err`. +/// +/// If the given address is not yet a `p2p` address for the given peer, +/// the `/p2p/` protocol is appended to the returned address. +fn p2p_addr(peer: PeerId, addr: Multiaddr) -> Result { + if let Some(multiaddr::Protocol::P2p(hash)) = addr.iter().last() { + if &hash != peer.as_ref() { + return Err(addr) + } + Ok(addr) + } else { + Ok(addr.with(multiaddr::Protocol::P2p(peer.into()))) + } +} + +/// Possible (synchronous) errors when dialing a peer. +#[derive(Clone, Debug)] +pub enum DialError { + /// The dialing attempt is rejected because of a connection limit. + ConnectionLimit(ConnectionLimit), + /// The address being dialed is invalid, e.g. if it refers to a different + /// remote peer than the one being dialed. + InvalidAddress(Multiaddr), +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/src/network/peer.rs b/core/src/network/peer.rs index 3e66930234a..c62c937e43d 100644 --- a/core/src/network/peer.rs +++ b/core/src/network/peer.rs @@ -45,7 +45,7 @@ use std::{ error, fmt, }; -use super::{Network, DialingOpts}; +use super::{Network, DialingOpts, DialError}; /// The possible representations of a peer in a [`Network`], as /// seen by the local node. @@ -210,7 +210,7 @@ where pub fn dial(self, address: Multiaddr, remaining: I, handler: THandler) -> Result< (ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>), - ConnectionLimit + DialError > where I: IntoIterator, @@ -219,7 +219,9 @@ where Peer::Connected(p) => (p.peer_id, p.network), Peer::Dialing(p) => (p.peer_id, p.network), Peer::Disconnected(p) => (p.peer_id, p.network), - Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 }) + Peer::Local => return Err(DialError::ConnectionLimit(ConnectionLimit { + current: 0, limit: 0 + })) }; let id = network.dial_peer(DialingOpts { diff --git a/core/src/transport/memory.rs b/core/src/transport/memory.rs index 366abd4e9c8..043dcee06b7 100644 --- a/core/src/transport/memory.rs +++ b/core/src/transport/memory.rs @@ -263,19 +263,14 @@ impl Drop for Listener { /// If the address is `/memory/n`, returns the value of `n`. fn parse_memory_addr(a: &Multiaddr) -> Result { - let mut iter = a.iter(); - - let port = if let Some(Protocol::Memory(port)) = iter.next() { - port - } else { - return Err(()); - }; - - if iter.next().is_some() { - return Err(()); + let mut protocols = a.iter(); + match protocols.next() { + Some(Protocol::Memory(port)) => match protocols.next() { + None | Some(Protocol::P2p(_)) => Ok(port), + _ => Err(()) + } + _ => Err(()) } - - Ok(port) } /// A channel represents an established, in-memory, logical connection between two endpoints. diff --git a/core/tests/connection_limits.rs b/core/tests/connection_limits.rs index 4c9c3193066..178eacbd192 100644 --- a/core/tests/connection_limits.rs +++ b/core/tests/connection_limits.rs @@ -25,7 +25,7 @@ use libp2p_core::multiaddr::{multiaddr, Multiaddr}; use libp2p_core::{ PeerId, connection::PendingConnectionError, - network::{NetworkEvent, NetworkConfig, ConnectionLimits}, + network::{NetworkEvent, NetworkConfig, ConnectionLimits, DialError}, }; use rand::Rng; use std::task::Poll; @@ -47,12 +47,16 @@ fn max_outgoing() { .expect("Unexpected connection limit."); } - let err = network.peer(target.clone()) + match network.peer(target.clone()) .dial(Multiaddr::empty(), Vec::new(), TestHandler()) - .expect_err("Unexpected dialing success."); - - assert_eq!(err.current, outgoing_limit); - assert_eq!(err.limit, outgoing_limit); + .expect_err("Unexpected dialing success.") + { + DialError::ConnectionLimit(err) => { + assert_eq!(err.current, outgoing_limit); + assert_eq!(err.limit, outgoing_limit); + } + e => panic!("Unexpected error: {:?}", e), + } let info = network.info(); assert_eq!(info.num_peers(), 0); diff --git a/core/tests/network_dial_error.rs b/core/tests/network_dial_error.rs index 85b2186d206..2edb133ffc9 100644 --- a/core/tests/network_dial_error.rs +++ b/core/tests/network_dial_error.rs @@ -25,6 +25,7 @@ use libp2p_core::multiaddr::multiaddr; use libp2p_core::{ PeerId, connection::PendingConnectionError, + multiaddr::Protocol, network::{NetworkEvent, NetworkConfig}, }; use rand::seq::SliceRandom; @@ -70,7 +71,7 @@ fn deny_incoming_connec() { error: PendingConnectionError::Transport(_) }) => { assert_eq!(&peer_id, swarm1.local_peer_id()); - assert_eq!(multiaddr, address); + assert_eq!(multiaddr, address.clone().with(Protocol::P2p(peer_id.into()))); return Poll::Ready(Ok(())); }, Poll::Ready(_) => unreachable!(), @@ -162,21 +163,27 @@ fn dial_self_by_id() { fn multiple_addresses_err() { // Tries dialing multiple addresses, and makes sure there's one dialing error per address. + let target = PeerId::random(); + let mut swarm = test_network(NetworkConfig::default()); let mut addresses = Vec::new(); for _ in 0 .. 3 { - addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::())]); + addresses.push(multiaddr![ + Ip4([0, 0, 0, 0]), + Tcp(rand::random::()) + ]); } for _ in 0 .. 5 { - addresses.push(multiaddr![Udp(rand::random::())]); + addresses.push(multiaddr![ + Udp(rand::random::()) + ]); } addresses.shuffle(&mut rand::thread_rng()); let first = addresses[0].clone(); let rest = (&addresses[1..]).iter().cloned(); - let target = PeerId::random(); swarm.peer(target.clone()) .dial(first, rest, TestHandler()) .unwrap(); @@ -191,7 +198,7 @@ fn multiple_addresses_err() { error: PendingConnectionError::Transport(_) }) => { assert_eq!(peer_id, target); - let expected = addresses.remove(0); + let expected = addresses.remove(0).with(Protocol::P2p(target.clone().into())); assert_eq!(multiaddr, expected); if addresses.is_empty() { assert_eq!(attempts_remaining, 0); diff --git a/examples/ipfs-kad.rs b/examples/ipfs-kad.rs index 99eabe54551..6ab1d16ea89 100644 --- a/examples/ipfs-kad.rs +++ b/examples/ipfs-kad.rs @@ -25,6 +25,7 @@ use async_std::task; use libp2p::{ + Multiaddr, Swarm, PeerId, identity, @@ -38,7 +39,14 @@ use libp2p::kad::{ QueryResult, }; use libp2p::kad::record::store::MemoryStore; -use std::{env, error::Error, time::Duration}; +use std::{env, error::Error, str::FromStr, time::Duration}; + +const BOOTNODES: [&'static str; 4] = [ + "QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", + "QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", + "QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", + "QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt" +]; #[async_std::main] async fn main() -> Result<(), Box> { @@ -59,28 +67,14 @@ async fn main() -> Result<(), Box> { let store = MemoryStore::new(local_peer_id.clone()); let mut behaviour = Kademlia::with_config(local_peer_id.clone(), store, cfg); - // TODO: the /dnsaddr/ scheme is not supported (https://github.com/libp2p/rust-libp2p/issues/967) - /*behaviour.add_address(&"QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); - behaviour.add_address(&"QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); - behaviour.add_address(&"QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap()); - behaviour.add_address(&"QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt".parse().unwrap(), "/dnsaddr/bootstrap.libp2p.io".parse().unwrap());*/ - - // The only address that currently works. - behaviour.add_address(&"QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ".parse()?, "/ip4/104.131.131.82/tcp/4001".parse()?); - - // The following addresses always fail signature verification, possibly due to - // RSA keys with < 2048 bits. - // behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip4/104.236.179.241/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip4/128.199.219.111/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip4/104.236.76.40/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip4/178.62.158.247/tcp/4001".parse().unwrap()); + // Add the bootnodes to the local routing table. `libp2p-dns` built + // into the `transport` resolves the `dnsaddr` when Kademlia tries + // to dial these nodes. + let bootaddr = Multiaddr::from_str("/dnsaddr/bootstrap.libp2p.io")?; + for peer in &BOOTNODES { + behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone()); + } - // The following addresses are permanently unreachable: - // Other(Other(A(Transport(A(Underlying(Os { code: 101, kind: Other, message: "Network is unreachable" })))))) - // behaviour.add_address(&"QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM".parse().unwrap(), "/ip6/2604:a880:1:20::203:d001/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu".parse().unwrap(), "/ip6/2400:6180:0:d0::151:6001/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64".parse().unwrap(), "/ip6/2604:a880:800:10::4a:5001/tcp/4001".parse().unwrap()); - // behaviour.add_address(&"QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd".parse().unwrap(), "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001".parse().unwrap()); Swarm::new(transport, behaviour, local_peer_id) }; diff --git a/misc/multiaddr/CHANGELOG.md b/misc/multiaddr/CHANGELOG.md index 663f5336c0d..baf6410f511 100644 --- a/misc/multiaddr/CHANGELOG.md +++ b/misc/multiaddr/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.11.2 [unreleased] + +- Add `Multiaddr::ends_with()`. + # 0.11.1 [2021-02-15] - Update dependencies diff --git a/misc/multiaddr/Cargo.toml b/misc/multiaddr/Cargo.toml index 75c446b9a3b..12b7a75a135 100644 --- a/misc/multiaddr/Cargo.toml +++ b/misc/multiaddr/Cargo.toml @@ -6,7 +6,7 @@ description = "Implementation of the multiaddr format" homepage = "https://github.com/libp2p/rust-libp2p" keywords = ["multiaddr", "ipfs"] license = "MIT" -version = "0.11.1" +version = "0.11.2" [features] default = ["url"] diff --git a/misc/multiaddr/src/lib.rs b/misc/multiaddr/src/lib.rs index 0174e33c754..01632e3142e 100644 --- a/misc/multiaddr/src/lib.rs +++ b/misc/multiaddr/src/lib.rs @@ -174,6 +174,16 @@ impl Multiaddr { if replaced { Some(address) } else { None } } + + /// Checks whether the given `Multiaddr` is a suffix of this `Multiaddr`. + pub fn ends_with(&self, other: &Multiaddr) -> bool { + let n = self.bytes.len(); + let m = other.bytes.len(); + if n < m { + return false + } + self.bytes[(n - m) ..] == other.bytes[..] + } } impl fmt::Debug for Multiaddr { diff --git a/misc/multiaddr/tests/lib.rs b/misc/multiaddr/tests/lib.rs index 55e22db1576..15993e4672b 100644 --- a/misc/multiaddr/tests/lib.rs +++ b/misc/multiaddr/tests/lib.rs @@ -56,6 +56,18 @@ fn push_pop_identity() { QuickCheck::new().quickcheck(prop as fn(Ma, Proto) -> bool) } +#[test] +fn ends_with() { + fn prop(Ma(m): Ma) { + let n = m.iter().count(); + for i in 0 .. n { + let suffix = m.iter().skip(i).collect::(); + assert!(m.ends_with(&suffix)); + } + } + QuickCheck::new().quickcheck(prop as fn(_)) +} + // Arbitrary impls diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index cd2de6529e2..482d31a56b3 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -260,7 +260,7 @@ where let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; *this.state = SeqState::SendProtocol { io, protocol } } - _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), + _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())) } } diff --git a/muxers/mplex/CHANGELOG.md b/muxers/mplex/CHANGELOG.md index c6cc002b54c..e5034c979e0 100644 --- a/muxers/mplex/CHANGELOG.md +++ b/muxers/mplex/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.27.2 [unreleased] +# 0.28.0 [unreleased] - Update dependencies. diff --git a/muxers/mplex/Cargo.toml b/muxers/mplex/Cargo.toml index ebdea534e3f..4a90d7b4255 100644 --- a/muxers/mplex/Cargo.toml +++ b/muxers/mplex/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-mplex" edition = "2018" description = "Mplex multiplexing protocol for libp2p" -version = "0.27.2" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4" nohash-hasher = "0.2" parking_lot = "0.11" diff --git a/muxers/mplex/src/io.rs b/muxers/mplex/src/io.rs index dcc1c4a4aa4..d750b6bd83f 100644 --- a/muxers/mplex/src/io.rs +++ b/muxers/mplex/src/io.rs @@ -238,7 +238,7 @@ where num_buffered += 1; } Frame::Close { stream_id } => { - self.on_close(stream_id.into_local())?; + self.on_close(stream_id.into_local()); } Frame::Reset { stream_id } => { self.on_reset(stream_id.into_local()) @@ -460,7 +460,7 @@ where } Frame::Close { stream_id } => { let stream_id = stream_id.into_local(); - self.on_close(stream_id)?; + self.on_close(stream_id); if id == stream_id { return Poll::Ready(Ok(None)) } @@ -683,7 +683,7 @@ where } /// Processes an inbound `Close` frame. - fn on_close(&mut self, id: LocalStreamId) -> io::Result<()> { + fn on_close(&mut self, id: LocalStreamId) { if let Some(state) = self.substreams.remove(&id) { match state { SubstreamState::RecvClosed { .. } | SubstreamState::Closed { .. } => { @@ -715,8 +715,6 @@ where trace!("{}: Ignoring `Close` for unknown substream {}. Possibly dropped earlier.", self.id, id); } - - Ok(()) } /// Generates the next outbound stream ID. diff --git a/muxers/yamux/CHANGELOG.md b/muxers/yamux/CHANGELOG.md index 1a3cfdd8e98..2b4636e61db 100644 --- a/muxers/yamux/CHANGELOG.md +++ b/muxers/yamux/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.31.0 [unreleased] + +- Update `libp2p-core`. + # 0.30.1 [2021-02-17] - Update `yamux` to `0.8.1`. diff --git a/muxers/yamux/Cargo.toml b/muxers/yamux/Cargo.toml index df47e5939f3..88490b74a60 100644 --- a/muxers/yamux/Cargo.toml +++ b/muxers/yamux/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-yamux" edition = "2018" description = "Yamux multiplexing protocol for libp2p" -version = "0.30.1" +version = "0.31.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } parking_lot = "0.11" thiserror = "1.0" yamux = "0.8.1" diff --git a/protocols/floodsub/Cargo.toml b/protocols/floodsub/Cargo.toml index 53133259986..348d5aa681d 100644 --- a/protocols/floodsub/Cargo.toml +++ b/protocols/floodsub/Cargo.toml @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] cuckoofilter = "0.5.0" fnv = "1.0" futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } log = "0.4" prost = "0.7" diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index 66330adfd94..e64eb9d2df2 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] libp2p-swarm = { version = "0.28.0", path = "../../swarm" } -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } bytes = "1.0" byteorder = "1.3.4" fnv = "1.0.7" diff --git a/protocols/identify/Cargo.toml b/protocols/identify/Cargo.toml index f3825603c2b..79c466e9632 100644 --- a/protocols/identify/Cargo.toml +++ b/protocols/identify/Cargo.toml @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } log = "0.4.1" prost = "0.7" diff --git a/protocols/kad/Cargo.toml b/protocols/kad/Cargo.toml index 444dde53b9e..19c154e7787 100644 --- a/protocols/kad/Cargo.toml +++ b/protocols/kad/Cargo.toml @@ -17,7 +17,7 @@ fnv = "1.0" asynchronous-codec = "0.6" futures = "0.3.1" log = "0.4" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } prost = "0.7" rand = "0.7.2" diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 4275d55c091..0125bc6c492 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -1072,7 +1072,11 @@ fn manual_bucket_inserts() { let mut swarms = build_connected_nodes_with_config(3, 1, cfg); // The peers and their addresses for which we expect `RoutablePeer` events. let mut expected = swarms.iter().skip(2) - .map(|(a, s)| (a.clone(), Swarm::local_peer_id(s).clone())) + .map(|(a, s)| { + let pid = *Swarm::local_peer_id(s); + let addr = a.clone().with(Protocol::P2p(pid.into())); + (addr, pid) + }) .collect::>(); // We collect the peers for which a `RoutablePeer` event // was received in here to check at the end of the test @@ -1087,7 +1091,7 @@ fn manual_bucket_inserts() { Poll::Ready(Some(KademliaEvent::RoutablePeer { peer, address })) => { - assert_eq!(peer, expected.remove(&address).expect("Unexpected address")); + assert_eq!(peer, expected.remove(&address).expect("Missing address")); routable.push(peer); if expected.is_empty() { for peer in routable.iter() { diff --git a/protocols/mdns/Cargo.toml b/protocols/mdns/Cargo.toml index 1aa3d77f1b5..8fad91af47c 100644 --- a/protocols/mdns/Cargo.toml +++ b/protocols/mdns/Cargo.toml @@ -16,7 +16,7 @@ dns-parser = "0.8.0" futures = "0.3.13" if-watch = "0.2.0" lazy_static = "1.4.0" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } log = "0.4.14" rand = "0.8.3" diff --git a/protocols/ping/Cargo.toml b/protocols/ping/Cargo.toml index 975a8d9a7f9..d174198ca88 100644 --- a/protocols/ping/Cargo.toml +++ b/protocols/ping/Cargo.toml @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } log = "0.4.1" rand = "0.7.2" diff --git a/protocols/relay/Cargo.toml b/protocols/relay/Cargo.toml index 61d6a6fac58..20639acbcd7 100644 --- a/protocols/relay/Cargo.toml +++ b/protocols/relay/Cargo.toml @@ -14,7 +14,7 @@ asynchronous-codec = "0.6" bytes = "1" futures = "0.3.1" futures-timer = "3" -libp2p-core = { version = "0.27", path = "../../core" } +libp2p-core = { version = "0.28", path = "../../core" } libp2p-swarm = { version = "0.28", path = "../../swarm" } log = "0.4" pin-project = "1" diff --git a/protocols/relay/src/transport.rs b/protocols/relay/src/transport.rs index 1d5ff1bf57b..4aa96f69933 100644 --- a/protocols/relay/src/transport.rs +++ b/protocols/relay/src/transport.rs @@ -401,14 +401,12 @@ impl Stream for RelayListener { Poll::Ready(Some(BehaviourToListenerMsg::IncomingRelayedConnection { stream, src_peer_id, - relay_peer_id, relay_addr, + relay_peer_id: _ })) => { return Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade: RelayedListenerUpgrade::Relayed(Some(stream)), - local_addr: relay_addr - .with(Protocol::P2p(relay_peer_id.into())) - .with(Protocol::P2pCircuit), + local_addr: relay_addr.with(Protocol::P2pCircuit), remote_addr: Protocol::P2p(src_peer_id.into()).into(), }))); } diff --git a/protocols/relay/tests/lib.rs b/protocols/relay/tests/lib.rs index 5699cba8300..c8c1692ccf0 100644 --- a/protocols/relay/tests/lib.rs +++ b/protocols/relay/tests/lib.rs @@ -381,9 +381,10 @@ fn src_try_connect_to_offline_dst() { loop { match src_swarm.next_event().await { - SwarmEvent::UnknownPeerUnreachableAddr { address, .. } + SwarmEvent::UnreachableAddr { address, peer_id, .. } if address == dst_addr_via_relay => { + assert_eq!(peer_id, dst_peer_id); break; } SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} @@ -437,9 +438,10 @@ fn src_try_connect_to_unsupported_dst() { loop { match src_swarm.next_event().await { - SwarmEvent::UnknownPeerUnreachableAddr { address, .. } + SwarmEvent::UnreachableAddr { address, peer_id, .. } if address == dst_addr_via_relay => { + assert_eq!(peer_id, dst_peer_id); break; } SwarmEvent::ConnectionClosed { peer_id, .. } if peer_id == relay_peer_id => {} @@ -486,8 +488,10 @@ fn src_try_connect_to_offline_dst_via_offline_relay() { // Source Node fail to reach Destination Node due to failure reaching Relay. match src_swarm.next_event().await { - SwarmEvent::UnknownPeerUnreachableAddr { address, .. } - if address == dst_addr_via_relay => {} + SwarmEvent::UnreachableAddr { address, peer_id, .. } + if address == dst_addr_via_relay => { + assert_eq!(peer_id, dst_peer_id); + } e => panic!("{:?}", e), } }); @@ -573,6 +577,9 @@ fn firewalled_src_discover_firewalled_dst_via_kad_and_connect_to_dst_via_routabl } } SwarmEvent::Behaviour(CombinedEvent::Ping(_)) => {} + SwarmEvent::Behaviour(CombinedEvent::Kad(KademliaEvent::RoutingUpdated { + .. + })) => {} e => panic!("{:?}", e), } } @@ -1005,9 +1012,10 @@ fn yield_incoming_connection_through_correct_listener() { } match src_3_swarm.next_event().boxed().poll_unpin(cx) { - Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, .. }) + Poll::Ready(SwarmEvent::UnreachableAddr { address, peer_id, .. }) if address == dst_addr_via_relay_3 => { + assert_eq!(peer_id, dst_peer_id); return Poll::Ready(()); } Poll::Ready(SwarmEvent::Dialing { .. }) => {} diff --git a/protocols/request-response/Cargo.toml b/protocols/request-response/Cargo.toml index d0275fa44dd..0c404c79402 100644 --- a/protocols/request-response/Cargo.toml +++ b/protocols/request-response/Cargo.toml @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] async-trait = "0.1" bytes = "1" futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } libp2p-swarm = { version = "0.28.0", path = "../../swarm" } log = "0.4.11" lru = "0.6" diff --git a/src/lib.rs b/src/lib.rs index 41d3ee70968..cf3f824fb32 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -283,9 +283,9 @@ pub async fn development_transport(keypair: identity::Keypair) { let transport = { let tcp = tcp::TcpConfig::new().nodelay(true); - let transport = dns::DnsConfig::system(tcp).await?; - let websockets = websocket::WsConfig::new(transport.clone()); - transport.or_transport(websockets) + let dns_tcp = dns::DnsConfig::system(tcp).await?; + let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone()); + dns_tcp.or_transport(ws_dns_tcp) }; let noise_keys = noise::Keypair::::new() @@ -318,9 +318,9 @@ pub fn tokio_development_transport(keypair: identity::Keypair) { let transport = { let tcp = tcp::TokioTcpConfig::new().nodelay(true); - let transport = dns::TokioDnsConfig::system(tcp)?; - let websockets = websocket::WsConfig::new(transport.clone()); - transport.or_transport(websockets) + let dns_tcp = dns::TokioDnsConfig::system(tcp)?; + let ws_dns_tcp = websocket::WsConfig::new(dns_tcp.clone()); + dns_tcp.or_transport(ws_dns_tcp) }; let noise_keys = noise::Keypair::::new() diff --git a/swarm/CHANGELOG.md b/swarm/CHANGELOG.md index ad1bbdfdee5..f965d12e055 100644 --- a/swarm/CHANGELOG.md +++ b/swarm/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.28.0 [unreleased] +- New error variant `DialError::InvalidAddress` + +- `Swarm::dial_addr()` now returns a `DialError` on error. + - Remove the option for a substream-specific multistream select protocol override. The override at this granularity is no longer deemed useful, in particular because it can usually not be configured for existing protocols like `libp2p-kad` and others. diff --git a/swarm/Cargo.toml b/swarm/Cargo.toml index 459cd09f61b..712b0c586ef 100644 --- a/swarm/Cargo.toml +++ b/swarm/Cargo.toml @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] either = "1.6.0" futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../core" } +libp2p-core = { version = "0.28.0", path = "../core" } log = "0.4" rand = "0.7" smallvec = "1.6.1" diff --git a/swarm/src/lib.rs b/swarm/src/lib.rs index c61f12e1de3..b243969dbfe 100644 --- a/swarm/src/lib.rs +++ b/swarm/src/lib.rs @@ -113,6 +113,7 @@ use libp2p_core::{ transport::{self, TransportError}, muxing::StreamMuxerBox, network::{ + self, ConnectionLimits, Network, NetworkInfo, @@ -359,11 +360,11 @@ where TBehaviour: NetworkBehaviour, } /// Initiates a new dialing attempt to the given address. - pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), ConnectionLimit> { + pub fn dial_addr(me: &mut Self, addr: Multiaddr) -> Result<(), DialError> { let handler = me.behaviour.new_handler() .into_node_handler_builder() .with_substream_upgrade_protocol_override(me.substream_upgrade_protocol_override); - me.network.dial(&addr, handler).map(|_id| ()) + Ok(me.network.dial(&addr, handler).map(|_id| ())?) } /// Initiates a new dialing attempt to the given peer. @@ -386,7 +387,7 @@ where TBehaviour: NetworkBehaviour, me.network.peer(*peer_id) .dial(first, addrs, handler) .map(|_| ()) - .map_err(DialError::ConnectionLimit) + .map_err(DialError::from) } else { Err(DialError::NoAddresses) }; @@ -1053,16 +1054,28 @@ pub enum DialError { /// The configured limit for simultaneous outgoing connections /// has been reached. ConnectionLimit(ConnectionLimit), + /// The address given for dialing is invalid. + InvalidAddress(Multiaddr), /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses /// for the peer to dial. NoAddresses } +impl From for DialError { + fn from(err: network::DialError) -> DialError { + match err { + network::DialError::ConnectionLimit(l) => DialError::ConnectionLimit(l), + network::DialError::InvalidAddress(a) => DialError::InvalidAddress(a), + } + } +} + impl fmt::Display for DialError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err), DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."), + DialError::InvalidAddress(a) => write!(f, "Dial error: invalid address: {}", a), DialError::Banned => write!(f, "Dial error: peer is banned.") } } @@ -1072,6 +1085,7 @@ impl error::Error for DialError { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { DialError::ConnectionLimit(err) => Some(err), + DialError::InvalidAddress(_) => None, DialError::NoAddresses => None, DialError::Banned => None } diff --git a/transports/deflate/CHANGELOG.md b/transports/deflate/CHANGELOG.md index 26b92b2813e..6d61a443ba9 100644 --- a/transports/deflate/CHANGELOG.md +++ b/transports/deflate/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.28.0 [unreleased] + +- Update `libp2p-core`. + # 0.27.1 [2021-01-27] - Ensure read buffers are initialised. diff --git a/transports/deflate/Cargo.toml b/transports/deflate/Cargo.toml index 2b01990eeea..a25bf4f5c34 100644 --- a/transports/deflate/Cargo.toml +++ b/transports/deflate/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-deflate" edition = "2018" description = "Deflate encryption protocol for libp2p" -version = "0.27.1" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } flate2 = "1.0" [dev-dependencies] diff --git a/transports/dns/CHANGELOG.md b/transports/dns/CHANGELOG.md index 80200ca71eb..4899092ba94 100644 --- a/transports/dns/CHANGELOG.md +++ b/transports/dns/CHANGELOG.md @@ -1,5 +1,9 @@ # 0.28.0 [unreleased] +- Update `libp2p-core`. + +- Add support for resolving `/dnsaddr` addresses. + - Use `trust-dns-resolver`, removing the internal thread pool and expanding the configurability of `libp2p-dns` by largely exposing the configuration of `trust-dns-resolver`. diff --git a/transports/dns/Cargo.toml b/transports/dns/Cargo.toml index 280acab75a0..c65103090f9 100644 --- a/transports/dns/Cargo.toml +++ b/transports/dns/Cargo.toml @@ -10,11 +10,12 @@ keywords = ["peer-to-peer", "libp2p", "networking"] categories = ["network-programming", "asynchronous"] [dependencies] -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" trust-dns-resolver = { version = "0.20", default-features = false, features = ["system-config"] } async-std-resolver = { version = "0.20", optional = true } +smallvec = "1.6" [dev-dependencies] env_logger = "0.6" diff --git a/transports/dns/src/lib.rs b/transports/dns/src/lib.rs index a9eb0234610..0ecd3e3dacb 100644 --- a/transports/dns/src/lib.rs +++ b/transports/dns/src/lib.rs @@ -24,10 +24,11 @@ //! [`DnsConfig`] and `TokioDnsConfig` for use with `async-std` and `tokio`, //! respectively. //! -//! A [`GenDnsConfig`] is a [`Transport`] wrapper that is created around +//! A [`GenDnsConfig`] is an address-rewriting [`Transport`] wrapper around //! an inner `Transport`. The composed transport behaves like the inner -//! transport, except that [`Transport::dial`] resolves `/dns`, `/dns4/` and -//! `/dns6/` components of a given `Multiaddr` through a DNS. +//! transport, except that [`Transport::dial`] resolves `/dns/...`, `/dns4/...`, +//! `/dns6/...` and `/dnsaddr/...` components of the given `Multiaddr` through +//! a DNS, replacing them with the resolved protocols (typically TCP/IP). //! //! The `async-std` feature and hence the `DnsConfig` are //! enabled by default. Tokio users can furthermore opt-in @@ -37,14 +38,14 @@ //! //![trust-dns-resolver]: https://docs.rs/trust-dns-resolver/latest/trust_dns_resolver/#dns-over-tls-and-dns-over-https -use futures::{prelude::*, future::BoxFuture, stream::FuturesOrdered}; +use futures::{prelude::*, future::BoxFuture}; use libp2p_core::{ Transport, multiaddr::{Protocol, Multiaddr}, transport::{TransportError, ListenerEvent} }; -use log::{debug, trace}; -use std::{error, fmt, net::IpAddr}; +use smallvec::SmallVec; +use std::{borrow::Cow, convert::TryFrom, error, fmt, iter, net::IpAddr, str}; #[cfg(any(feature = "async-std", feature = "tokio"))] use std::io; #[cfg(any(feature = "async-std", feature = "tokio"))] @@ -62,6 +63,24 @@ use async_std_resolver::{AsyncStdConnection, AsyncStdConnectionProvider}; pub use trust_dns_resolver::config::{ResolverConfig, ResolverOpts}; pub use trust_dns_resolver::error::{ResolveError, ResolveErrorKind}; +/// The prefix for `dnsaddr` protocol TXT record lookups. +const DNSADDR_PREFIX: &'static str = "_dnsaddr."; + +/// The maximum number of dialing attempts to resolved addresses. +const MAX_DIAL_ATTEMPTS: usize = 16; + +/// The maximum number of DNS lookups when dialing. +/// +/// This limit is primarily a safeguard against too many, possibly +/// even cyclic, indirections in the addresses obtained from the +/// TXT records of a `/dnsaddr`. +const MAX_DNS_LOOKUPS: usize = 32; + +/// The maximum number of TXT records applicable for the address +/// being dialed that are considered for further lookups as a +/// result of a single `/dnsaddr` lookup. +const MAX_TXT_RECORDS: usize = 16; + /// A `Transport` wrapper for performing DNS lookups when dialing `Multiaddr`esses /// using `async-std` for all async I/O. #[cfg(feature = "async-std")] @@ -137,7 +156,7 @@ where impl Transport for GenDnsConfig where - T: Transport + Send + 'static, + T: Transport + Clone + Send + 'static, T::Error: Send, T::Dial: Send, C: DnsHandle, @@ -171,44 +190,120 @@ where } fn dial(self, addr: Multiaddr) -> Result> { - // Check if there are any domain names in the address. If not, proceed - // straight away with dialing on the underlying transport. - if !addr.iter().any(|p| match p { - Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) => true, - _ => false - }) { - trace!("Pass-through address without DNS: {}", addr); - let inner_dial = self.inner.dial(addr) - .map_err(|err| err.map(DnsErr::Transport))?; - return Ok(inner_dial.map_err::<_, fn(_) -> _>(DnsErr::Transport).left_future()); - } - // Asynchronlously resolve all DNS names in the address before proceeding // with dialing on the underlying transport. Ok(async move { let resolver = self.resolver; let inner = self.inner; - trace!("Resolving DNS: {}", addr); - - let resolved = addr.into_iter() - .map(|proto| resolve(proto, &resolver)) - .collect::>() - .collect::, Self::Error>>>() - .await - .into_iter() - .collect::>, Self::Error>>()? - .into_iter() - .collect::(); - - debug!("DNS resolved: {} => {}", addr, resolved); - - match inner.dial(resolved) { - Ok(out) => out.await.map_err(DnsErr::Transport), - Err(TransportError::MultiaddrNotSupported(a)) => - Err(DnsErr::MultiaddrNotSupported(a)), - Err(TransportError::Other(err)) => Err(DnsErr::Transport(err)) + let mut last_err = None; + let mut dns_lookups = 0; + let mut dial_attempts = 0; + // We optimise for the common case of a single DNS component + // in the address that is resolved with a single lookup. + let mut unresolved = SmallVec::<[Multiaddr; 1]>::new(); + unresolved.push(addr.clone()); + + // Resolve (i.e. replace) all DNS protocol components, initiating + // dialing attempts as soon as there is another fully resolved + // address. + while let Some(addr) = unresolved.pop() { + if let Some((i, name)) = addr.iter().enumerate().find(|(_, p)| match p { + Protocol::Dns(_) | + Protocol::Dns4(_) | + Protocol::Dns6(_) | + Protocol::Dnsaddr(_) => true, + _ => false + }) { + if dns_lookups == MAX_DNS_LOOKUPS { + log::debug!("Too many DNS lookups. Dropping unresolved {}.", addr); + last_err = Some(DnsErr::TooManyLookups); + // There may still be fully resolved addresses in `unresolved`, + // so keep going until `unresolved` is empty. + continue + } + dns_lookups += 1; + match resolve(&name, &resolver).await { + Err(e) => { + if unresolved.is_empty() { + return Err(e) + } + // If there are still unresolved addresses, there is + // a chance of success, but we track the last error. + last_err = Some(e); + } + Ok(Resolved::One(ip)) => { + log::trace!("Resolved {} -> {}", name, ip); + let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index"); + unresolved.push(addr); + } + Ok(Resolved::Many(ips)) => { + for ip in ips { + log::trace!("Resolved {} -> {}", name, ip); + let addr = addr.replace(i, |_| Some(ip)).expect("`i` is a valid index"); + unresolved.push(addr); + } + } + Ok(Resolved::Addrs(addrs)) => { + let suffix = addr.iter().skip(i + 1).collect::(); + let prefix = addr.iter().take(i).collect::(); + let mut n = 0; + for a in addrs { + if a.ends_with(&suffix) { + if n < MAX_TXT_RECORDS { + n += 1; + log::trace!("Resolved {} -> {}", name, a); + let addr = prefix.iter().chain(a.iter()).collect::(); + unresolved.push(addr); + } else { + log::debug!("Too many TXT records. Dropping resolved {}.", a); + } + } + } + } + } + } else { + // We have a fully resolved address, so try to dial it. + log::debug!("Dialing {}", addr); + + let transport = inner.clone(); + let result = match transport.dial(addr) { + Ok(out) => { + // We only count attempts that the inner transport + // actually accepted, i.e. for which it produced + // a dialing future. + dial_attempts += 1; + out.await.map_err(DnsErr::Transport) + } + Err(TransportError::MultiaddrNotSupported(a)) => + Err(DnsErr::MultiaddrNotSupported(a)), + Err(TransportError::Other(err)) => Err(DnsErr::Transport(err)) + }; + + match result { + Ok(out) => return Ok(out), + Err(err) => { + log::debug!("Dial error: {:?}.", err); + if unresolved.is_empty() { + return Err(err) + } + if dial_attempts == MAX_DIAL_ATTEMPTS { + log::debug!("Aborting dialing after {} attempts.", MAX_DIAL_ATTEMPTS); + return Err(err) + } + last_err = Some(err); + } + } + } } + + // At this point, if there was at least one failed dialing + // attempt, return that error. Otherwise there were no valid DNS records + // for the given address to begin with (i.e. DNS lookups succeeded but + // produced no records relevant for the given `addr`). + Err(last_err.unwrap_or_else(|| + DnsErr::ResolveError( + ResolveErrorKind::Message("No matching records found.").into()))) }.boxed().right_future()) } @@ -226,6 +321,13 @@ pub enum DnsErr { ResolveError(ResolveError), /// DNS resolution was successful, but the underlying transport refused the resolved address. MultiaddrNotSupported(Multiaddr), + /// DNS resolution involved too many lookups. + /// + /// DNS resolution on dialing performs up to 32 DNS lookups. If these + /// are not sufficient to obtain a fully-resolved address, this error + /// is returned and the DNS records for the domain(s) being dialed + /// should be investigated. + TooManyLookups, } impl fmt::Display for DnsErr @@ -236,6 +338,7 @@ where TErr: fmt::Display DnsErr::Transport(err) => write!(f, "{}", err), DnsErr::ResolveError(err) => write!(f, "{}", err), DnsErr::MultiaddrNotSupported(a) => write!(f, "Unsupported resolved address: {}", a), + DnsErr::TooManyLookups => write!(f, "Too many DNS lookups"), } } } @@ -248,14 +351,33 @@ where TErr: error::Error + 'static DnsErr::Transport(err) => Some(err), DnsErr::ResolveError(err) => Some(err), DnsErr::MultiaddrNotSupported(_) => None, + DnsErr::TooManyLookups => None, } } } -/// Asynchronously resolves the domain name of a `Dns`, `Dns4` or `Dns6` protocol -/// component. If the given protocol is not a DNS component, it is returned unchanged. -fn resolve<'a, E: 'a, C, P>(proto: Protocol<'a>, resolver: &'a AsyncResolver) - -> impl Future, DnsErr>> + 'a +/// The successful outcome of [`resolve`] for a given [`Protocol`]. +enum Resolved<'a> { + /// The given `Protocol` has been resolved to a single `Protocol`, + /// which may be identical to the one given, in case it is not + /// a DNS protocol component. + One(Protocol<'a>), + /// The given `Protocol` has been resolved to multiple alternative + /// `Protocol`s as a result of a DNS lookup. + Many(Vec>), + /// The given `Protocol` has been resolved to a new list of `Multiaddr`s + /// obtained from DNS TXT records representing possible alternatives. + /// These addresses may contain further DNS names that need resolving. + Addrs(Vec), +} + +/// Asynchronously resolves the domain name of a `Dns`, `Dns4`, `Dns6` or `Dnsaddr` protocol +/// component. If the given protocol is of a different type, it is returned unchanged as a +/// [`Resolved::One`]. +fn resolve<'a, E: 'a + Send, C, P>( + proto: &Protocol<'a>, + resolver: &'a AsyncResolver, +) -> BoxFuture<'a, Result, DnsErr>> where C: DnsHandle, P: ConnectionProvider, @@ -263,39 +385,105 @@ where match proto { Protocol::Dns(ref name) => { resolver.lookup_ip(fqdn(name)).map(move |res| match res { - Ok(ips) => Ok(ips.into_iter() - .next() - .map(Protocol::from) - .expect("If there are no results, `Err(NoRecordsFound)` is expected.")), - Err(e) => return Err(DnsErr::ResolveError(e)) - }).left_future() + Ok(ips) => { + let mut ips = ips.into_iter(); + let one = ips.next() + .expect("If there are no results, `Err(NoRecordsFound)` is expected."); + if let Some(two) = ips.next() { + Ok(Resolved::Many( + iter::once(one).chain(iter::once(two)) + .chain(ips) + .map(Protocol::from) + .collect())) + } else { + Ok(Resolved::One(Protocol::from(one))) + } + } + Err(e) => Err(DnsErr::ResolveError(e)) + }).boxed() } Protocol::Dns4(ref name) => { resolver.ipv4_lookup(fqdn(name)).map(move |res| match res { - Ok(ips) => Ok(ips.into_iter() - .map(IpAddr::from) - .next() - .map(Protocol::from) - .expect("If there are no results, `Err(NoRecordsFound)` is expected.")), - Err(e) => return Err(DnsErr::ResolveError(e)) - }).left_future().left_future().right_future() + Ok(ips) => { + let mut ips = ips.into_iter(); + let one = ips.next() + .expect("If there are no results, `Err(NoRecordsFound)` is expected."); + if let Some(two) = ips.next() { + Ok(Resolved::Many( + iter::once(one).chain(iter::once(two)) + .chain(ips) + .map(IpAddr::from) + .map(Protocol::from) + .collect())) + } else { + Ok(Resolved::One(Protocol::from(IpAddr::from(one)))) + } + } + Err(e) => Err(DnsErr::ResolveError(e)) + }).boxed() } Protocol::Dns6(ref name) => { resolver.ipv6_lookup(fqdn(name)).map(move |res| match res { - Ok(ips) => Ok(ips.into_iter() - .map(IpAddr::from) - .next() - .map(Protocol::from) - .expect("If there are no results, `Err(NoRecordsFound)` is expected.")), - Err(e) => return Err(DnsErr::ResolveError(e)) - }).right_future().left_future().right_future() + Ok(ips) => { + let mut ips = ips.into_iter(); + let one = ips.next() + .expect("If there are no results, `Err(NoRecordsFound)` is expected."); + if let Some(two) = ips.next() { + Ok(Resolved::Many( + iter::once(one).chain(iter::once(two)) + .chain(ips) + .map(IpAddr::from) + .map(Protocol::from) + .collect())) + } else { + Ok(Resolved::One(Protocol::from(IpAddr::from(one)))) + } + } + Err(e) => Err(DnsErr::ResolveError(e)) + }).boxed() }, - proto => future::ready(Ok(proto)).right_future().right_future() + Protocol::Dnsaddr(ref name) => { + let name = Cow::Owned([DNSADDR_PREFIX, name].concat()); + resolver.txt_lookup(fqdn(&name)).map(move |res| match res { + Ok(txts) => { + let mut addrs = Vec::new(); + for txt in txts { + if let Some(chars) = txt.txt_data().first() { + match parse_dnsaddr_txt(chars) { + Err(e) => { + // Skip over seemingly invalid entries. + log::debug!("Invalid TXT record: {:?}", e); + } + Ok(a) => { + addrs.push(a); + } + } + } + } + Ok(Resolved::Addrs(addrs)) + } + Err(e) => Err(DnsErr::ResolveError(e)) + }).boxed() + } + proto => future::ready(Ok(Resolved::One(proto.clone()))).boxed() + } +} + +/// Parses a `` of a `dnsaddr` TXT record. +fn parse_dnsaddr_txt(txt: &[u8]) -> io::Result { + let s = str::from_utf8(txt).map_err(invalid_data)?; + match s.strip_prefix("dnsaddr=") { + None => Err(invalid_data("Missing `dnsaddr=` prefix.")), + Some(a) => Ok(Multiaddr::try_from(a).map_err(invalid_data)?) } } -fn fqdn(name: &std::borrow::Cow<'_, str>) -> String { - if name.ends_with(".") { +fn invalid_data(e: impl Into>) -> io::Error { + io::Error::new(io::ErrorKind::InvalidData, e) +} + +fn fqdn(name: &Cow<'_, str>) -> String { + if name.ends_with('.') { name.to_string() } else { format!("{}.", name) @@ -308,6 +496,7 @@ mod tests { use futures::{future::BoxFuture, stream::BoxStream}; use libp2p_core::{ Transport, + PeerId, multiaddr::{Protocol, Multiaddr}, transport::ListenerEvent, transport::TransportError, @@ -332,17 +521,12 @@ mod tests { } fn dial(self, addr: Multiaddr) -> Result> { - let addr = addr.iter().collect::>(); - assert_eq!(addr.len(), 2); - match addr[1] { - Protocol::Tcp(_) => (), - _ => panic!(), - }; - match addr[0] { - Protocol::Ip4(_) => (), - Protocol::Ip6(_) => (), - _ => panic!(), - }; + // Check that all DNS components have been resolved, i.e. replaced. + assert!(!addr.iter().any(|p| match p { + Protocol::Dns(_) | Protocol::Dns4(_) | Protocol::Dns6(_) | Protocol::Dnsaddr(_) + => true, + _ => false, + })); Ok(Box::pin(future::ready(Ok(())))) } @@ -383,6 +567,37 @@ mod tests { .await .unwrap(); + // Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io. + let _ = transport + .clone() + .dial("/dnsaddr/bootstrap.libp2p.io".parse().unwrap()) + .unwrap() + .await + .unwrap(); + + // Success due to the DNS TXT records at _dnsaddr.bootstrap.libp2p.io having + // an entry with suffix `/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN`, + // i.e. a bootnode with such a peer ID. + let _ = transport + .clone() + .dial("/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN".parse().unwrap()) + .unwrap() + .await + .unwrap(); + + // Failure due to the DNS TXT records at _dnsaddr.libp2p.io not having + // an entry with a random `p2p` suffix. + match transport + .clone() + .dial(format!("/dnsaddr/bootstrap.libp2p.io/p2p/{}", PeerId::random()).parse().unwrap()) + .unwrap() + .await + { + Err(DnsErr::ResolveError(_)) => {}, + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(_) => panic!("Unexpected success.") + } + // Failure due to no records. match transport .clone() @@ -401,19 +616,27 @@ mod tests { #[cfg(feature = "async-std")] { + // Be explicit about the resolver used. At least on github CI, TXT + // type record lookups may not work with the system DNS resolver. + let config = ResolverConfig::quad9(); + let opts = ResolverOpts::default(); async_std_crate::task::block_on( - DnsConfig::system(CustomTransport).then(|dns| run(dns.unwrap())) + DnsConfig::custom(CustomTransport, config, opts).then(|dns| run(dns.unwrap())) ); } #[cfg(feature = "tokio")] { + // Be explicit about the resolver used. At least on github CI, TXT + // type record lookups may not work with the system DNS resolver. + let config = ResolverConfig::quad9(); + let opts = ResolverOpts::default(); let rt = tokio_crate::runtime::Builder::new_current_thread() .enable_io() .enable_time() .build() .unwrap(); - rt.block_on(run(TokioDnsConfig::system(CustomTransport).unwrap())); + rt.block_on(run(TokioDnsConfig::custom(CustomTransport, config, opts).unwrap())); } } } diff --git a/transports/noise/CHANGELOG.md b/transports/noise/CHANGELOG.md index d1d0a6ba0ef..daa66574952 100644 --- a/transports/noise/CHANGELOG.md +++ b/transports/noise/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.30.0 [unreleased] + +- Update `libp2p-core`. + # 0.29.0 [2021-01-12] - Update dependencies. diff --git a/transports/noise/Cargo.toml b/transports/noise/Cargo.toml index 829d907c411..1385b32291e 100644 --- a/transports/noise/Cargo.toml +++ b/transports/noise/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "libp2p-noise" description = "Cryptographic handshake protocol using the noise framework." -version = "0.29.0" +version = "0.30.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -12,7 +12,7 @@ bytes = "1" curve25519-dalek = "3.0.0" futures = "0.3.1" lazy_static = "1.2" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4" prost = "0.7" rand = "0.7.2" diff --git a/transports/plaintext/CHANGELOG.md b/transports/plaintext/CHANGELOG.md index bb8ba04053c..626c6725124 100644 --- a/transports/plaintext/CHANGELOG.md +++ b/transports/plaintext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.28.0 [unreleased] + +- Update `libp2p-core`. + # 0.27.1 [2021-02-15] - Update dependencies. diff --git a/transports/plaintext/Cargo.toml b/transports/plaintext/Cargo.toml index 4d50e68da2b..7507579bebe 100644 --- a/transports/plaintext/Cargo.toml +++ b/transports/plaintext/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-plaintext" edition = "2018" description = "Plaintext encryption dummy protocol for libp2p" -version = "0.27.1" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] bytes = "1" futures = "0.3.1" asynchronous-codec = "0.6" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4.8" prost = "0.7" unsigned-varint = { version = "0.7", features = ["asynchronous_codec"] } diff --git a/transports/plaintext/src/handshake.rs b/transports/plaintext/src/handshake.rs index 078a5109b96..a77c265a112 100644 --- a/transports/plaintext/src/handshake.rs +++ b/transports/plaintext/src/handshake.rs @@ -51,7 +51,7 @@ pub struct Remote { } impl HandshakeContext { - fn new(config: PlainText2Config) -> Result { + fn new(config: PlainText2Config) -> Self { let exchange = Exchange { id: Some(config.local_public_key.clone().into_peer_id().to_bytes()), pubkey: Some(config.local_public_key.clone().into_protobuf_encoding()) @@ -59,12 +59,12 @@ impl HandshakeContext { let mut buf = Vec::with_capacity(exchange.encoded_len()); exchange.encode(&mut buf).expect("Vec provides capacity as needed"); - Ok(Self { + Self { config, state: Local { exchange_bytes: buf } - }) + } } fn with_remote(self, exchange_bytes: BytesMut) @@ -119,7 +119,7 @@ where let mut framed_socket = Framed::new(socket, UviBytes::default()); trace!("starting handshake"); - let context = HandshakeContext::new(config)?; + let context = HandshakeContext::new(config); trace!("sending exchange to remote"); framed_socket.send(BytesMut::from(&context.state.exchange_bytes[..])).await?; diff --git a/transports/tcp/CHANGELOG.md b/transports/tcp/CHANGELOG.md index b7d98c14e3b..8c2724154ed 100644 --- a/transports/tcp/CHANGELOG.md +++ b/transports/tcp/CHANGELOG.md @@ -1,4 +1,8 @@ -# 0.27.2 [unreleased] +# 0.28.0 [unreleased] + +- Update `libp2p-core`. + +- Permit `/p2p` addresses. - Update to `if-watch-0.2`. diff --git a/transports/tcp/Cargo.toml b/transports/tcp/Cargo.toml index cbd3363345f..54698d76a6b 100644 --- a/transports/tcp/Cargo.toml +++ b/transports/tcp/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-tcp" edition = "2018" description = "TCP/IP transport protocol for libp2p" -version = "0.27.2" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -17,7 +17,7 @@ if-watch = { version = "0.2.0", optional = true } if-addrs = { version = "0.6.4", optional = true } ipnet = "2.0.0" libc = "0.2.80" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4.11" socket2 = { version = "0.4.0", features = ["all"] } tokio-crate = { package = "tokio", version = "1.0.1", default-features = false, features = ["net"], optional = true } diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index 7902eeecbb3..5cf4f0fcebd 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -379,7 +379,7 @@ where type ListenerUpgrade = Ready>; fn listen_on(self, addr: Multiaddr) -> Result> { - let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) { + let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(addr.clone()) { sa } else { return Err(TransportError::MultiaddrNotSupported(addr)); @@ -390,7 +390,7 @@ where } fn dial(self, addr: Multiaddr) -> Result> { - let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) { + let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) { if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() { return Err(TransportError::MultiaddrNotSupported(addr)); } @@ -653,21 +653,34 @@ where } } -// This type of logic should probably be moved into the multiaddr package -fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result { - let mut iter = addr.iter(); - let proto1 = iter.next().ok_or(())?; - let proto2 = iter.next().ok_or(())?; - - if iter.next().is_some() { - return Err(()); - } - - match (proto1, proto2) { - (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), - (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)), - _ => Err(()), +/// Extracts a `SocketAddr` from a given `Multiaddr`. +/// +/// Fails if the given `Multiaddr` does not begin with an IP +/// protocol encapsulating a TCP port. +fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result { + // "Pop" the IP address and TCP port from the end of the address, + // ignoring a `/p2p/...` suffix as well as any prefix of possibly + // outer protocols, if present. + let mut port = None; + while let Some(proto) = addr.pop() { + match proto { + Protocol::Ip4(ipv4) => match port { + Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)), + None => return Err(()) + }, + Protocol::Ip6(ipv6) => match port { + Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)), + None => return Err(()) + }, + Protocol::Tcp(portnum) => match port { + Some(_) => return Err(()), + None => { port = Some(portnum) } + } + Protocol::P2p(_) => {} + _ => return Err(()) + } } + Err(()) } // Create a [`Multiaddr`] from the given IP address and port number. @@ -687,12 +700,12 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; assert!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::().unwrap()) + multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::().unwrap()) .is_err() ); assert_eq!( - multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::().unwrap()), + multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 12345, @@ -700,7 +713,7 @@ mod tests { ); assert_eq!( multiaddr_to_socketaddr( - &"/ip4/255.255.255.255/tcp/8080" + "/ip4/255.255.255.255/tcp/8080" .parse::() .unwrap() ), @@ -710,7 +723,7 @@ mod tests { )) ); assert_eq!( - multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::().unwrap()), + multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::().unwrap()), Ok(SocketAddr::new( IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 12345, @@ -718,7 +731,7 @@ mod tests { ); assert_eq!( multiaddr_to_socketaddr( - &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" + "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080" .parse::() .unwrap() ), diff --git a/transports/uds/CHANGELOG.md b/transports/uds/CHANGELOG.md index 209187e66ca..45e240f478e 100644 --- a/transports/uds/CHANGELOG.md +++ b/transports/uds/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.28.0 [unreleased] + +- Update `libp2p-core`. + +- Permit `/p2p` addresses. + # 0.27.0 [2021-01-12] - Update dependencies. diff --git a/transports/uds/Cargo.toml b/transports/uds/Cargo.toml index ef765bf0da9..db582c56ecc 100644 --- a/transports/uds/Cargo.toml +++ b/transports/uds/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-uds" edition = "2018" description = "Unix domain sockets transport for libp2p" -version = "0.27.0" +version = "0.28.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -11,7 +11,7 @@ categories = ["network-programming", "asynchronous"] [target.'cfg(all(unix, not(target_os = "emscripten")))'.dependencies] async-std = { version = "1.6.2", optional = true } -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4.1" futures = "0.3.1" tokio = { version = "1.0.1", default-features = false, features = ["net"], optional = true } diff --git a/transports/uds/src/lib.rs b/transports/uds/src/lib.rs index ce698e22f5c..67da6c5fd85 100644 --- a/transports/uds/src/lib.rs +++ b/transports/uds/src/lib.rs @@ -140,23 +140,20 @@ codegen!( /// paths. // This type of logic should probably be moved into the multiaddr package fn multiaddr_to_path(addr: &Multiaddr) -> Result { - let mut iter = addr.iter(); - let path = iter.next(); - - if iter.next().is_some() { - return Err(()); - } - - let out: PathBuf = match path { - Some(Protocol::Unix(ref path)) => path.as_ref().into(), - _ => return Err(()) - }; - - if !out.is_absolute() { - return Err(()); + let mut protocols = addr.iter(); + match protocols.next() { + Some(Protocol::Unix(ref path)) => { + let path = PathBuf::from(path.as_ref()); + if !path.is_absolute() { + return Err(()) + } + match protocols.next() { + None | Some(Protocol::P2p(_)) => Ok(path), + Some(_) => Err(()) + } + } + _ => Err(()) } - - Ok(out) } #[cfg(all(test, feature = "async-std"))] diff --git a/transports/wasm-ext/CHANGELOG.md b/transports/wasm-ext/CHANGELOG.md index bd36be01090..52350a71969 100644 --- a/transports/wasm-ext/CHANGELOG.md +++ b/transports/wasm-ext/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.28.0 [unreleased] + +- Update `libp2p-core`. + # 0.27.0 [2021-01-12] - Update dependencies. diff --git a/transports/wasm-ext/Cargo.toml b/transports/wasm-ext/Cargo.toml index 1f1f0611c60..b1c842d3a3a 100644 --- a/transports/wasm-ext/Cargo.toml +++ b/transports/wasm-ext/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libp2p-wasm-ext" -version = "0.27.0" +version = "0.28.0" authors = ["Pierre Krieger "] edition = "2018" description = "Allows passing in an external transport in a WASM environment" @@ -12,7 +12,7 @@ categories = ["network-programming", "asynchronous"] [dependencies] futures = "0.3.1" js-sys = "0.3.19" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } parity-send-wrapper = "0.1.0" wasm-bindgen = "0.2.42" wasm-bindgen-futures = "0.4.4" diff --git a/transports/websocket/CHANGELOG.md b/transports/websocket/CHANGELOG.md index 9e4d51609ec..7969f7309ec 100644 --- a/transports/websocket/CHANGELOG.md +++ b/transports/websocket/CHANGELOG.md @@ -1,3 +1,9 @@ +# 0.29.0 [unreleased] + +- Update `libp2p-core`. + +- Permit dialing `/p2p` addresses. + # 0.28.0 [2021-01-12] - Update dependencies. diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index af8a44507bb..64458370347 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-websocket" edition = "2018" description = "WebSocket transport for libp2p" -version = "0.28.0" +version = "0.29.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" @@ -13,7 +13,7 @@ categories = ["network-programming", "asynchronous"] futures-rustls = "0.21" either = "1.5.3" futures = "0.3.1" -libp2p-core = { version = "0.27.0", path = "../../core" } +libp2p-core = { version = "0.28.0", path = "../../core" } log = "0.4.8" quicksink = "0.1" rw-stream-sink = "0.2.0" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index d5bd63b82c6..204eddd836f 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -231,13 +231,11 @@ where } fn dial(self, addr: Multiaddr) -> Result> { - // Quick sanity check of the provided Multiaddr. - if let Some(Protocol::Ws(_)) | Some(Protocol::Wss(_)) = addr.iter().last() { - // ok - } else { - debug!("{} is not a websocket multiaddr", addr); - return Err(TransportError::MultiaddrNotSupported(addr)) - } + let addr = match parse_ws_dial_addr(addr) { + Ok(addr) => addr, + Err(Error::InvalidMultiaddr(a)) => return Err(TransportError::MultiaddrNotSupported(a)), + Err(e) => return Err(TransportError::Other(e)), + }; // We are looping here in order to follow redirects (if any): let mut remaining_redirects = self.max_redirects; @@ -248,11 +246,11 @@ where match this.dial_once(addr).await { Ok(Either::Left(redirect)) => { if remaining_redirects == 0 { - debug!("too many redirects"); + debug!("Too many redirects (> {})", self.max_redirects); return Err(Error::TooManyRedirects) } remaining_redirects -= 1; - addr = location_to_multiaddr(&redirect)? + addr = parse_ws_dial_addr(location_to_multiaddr(&redirect)?)? } Ok(Either::Right(conn)) => return Ok(conn), Err(e) => return Err(e) @@ -273,46 +271,26 @@ where T: Transport, T::Output: AsyncRead + AsyncWrite + Send + Unpin + 'static { - /// Attempty to dial the given address and perform a websocket handshake. - async fn dial_once(self, address: Multiaddr) -> Result>, Error> { - trace!("dial address: {}", address); - - let (host_port, dns_name) = host_and_dnsname(&address)?; - - let mut inner_addr = address.clone(); - - let (use_tls, path) = - match inner_addr.pop() { - Some(Protocol::Ws(path)) => (false, path), - Some(Protocol::Wss(path)) => { - if dns_name.is_none() { - debug!("no DNS name in {}", address); - return Err(Error::InvalidMultiaddr(address)) - } - (true, path) - } - _ => { - debug!("{} is not a websocket multiaddr", address); - return Err(Error::InvalidMultiaddr(address)) - } - }; + /// Attempts to dial the given address and perform a websocket handshake. + async fn dial_once(self, addr: WsAddress) -> Result>, Error> { + trace!("Dialing websocket address: {:?}", addr); - let dial = self.transport.dial(inner_addr) + let dial = self.transport.dial(addr.tcp_addr) .map_err(|e| match e { TransportError::MultiaddrNotSupported(a) => Error::InvalidMultiaddr(a), TransportError::Other(e) => Error::Transport(e) })?; let stream = dial.map_err(Error::Transport).await?; - trace!("connected to {}", address); + trace!("TCP connection to {} established.", addr.host_port); let stream = - if use_tls { // begin TLS session - let dns_name = dns_name.expect("for use_tls we have checked that dns_name is some"); - trace!("starting TLS handshake with {}", address); + if addr.use_tls { // begin TLS session + let dns_name = addr.dns_name.expect("for use_tls we have checked that dns_name is some"); + trace!("Starting TLS handshake with {:?}", dns_name); let stream = self.tls_config.client.connect(dns_name.as_ref(), stream) .map_err(|e| { - debug!("TLS handshake with {} failed: {}", address, e); + debug!("TLS handshake with {:?} failed: {}", dns_name, e); Error::Tls(tls::Error::from(e)) }) .await?; @@ -323,9 +301,9 @@ where EitherOutput::Second(stream) }; - trace!("sending websocket handshake request to {}", address); + trace!("Sending websocket handshake to {}", addr.host_port); - let mut client = handshake::Client::new(stream, &host_port, path.as_ref()); + let mut client = handshake::Client::new(stream, &addr.host_port, addr.path.as_ref()); if self.use_deflate { client.add_extension(Box::new(Deflate::new(connection::Mode::Client))); @@ -341,32 +319,87 @@ where Err(Error::Handshake(msg.into())) } handshake::ServerResponse::Accepted { .. } => { - trace!("websocket handshake with {} successful", address); + trace!("websocket handshake with {} successful", addr.host_port); Ok(Either::Right(Connection::new(client.into_builder()))) } } } } -// Extract host, port and optionally the DNS name from the given [`Multiaddr`]. -fn host_and_dnsname(addr: &Multiaddr) -> Result<(String, Option), Error> { - let mut iter = addr.iter(); - match (iter.next(), iter.next()) { - (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) => - Ok((format!("{}:{}", ip, port), None)), - (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) => - Ok((format!("{}:{}", ip, port), None)), - (Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) => - Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), - (Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) => - Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), - (Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) => - Ok((format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned()))), - _ => { - debug!("multi-address format not supported: {}", addr); - Err(Error::InvalidMultiaddr(addr.clone())) +#[derive(Debug)] +struct WsAddress { + host_port: String, + path: String, + dns_name: Option, + use_tls: bool, + tcp_addr: Multiaddr, +} + +/// Tries to parse the given `Multiaddr` into a `WsAddress` used +/// for dialing. +/// +/// Fails if the given `Multiaddr` does not represent a TCP/IP-based +/// websocket protocol stack. +fn parse_ws_dial_addr(addr: Multiaddr) -> Result> { + // The encapsulating protocol must be based on TCP/IP, possibly via DNS. + // We peek at it in order to learn the hostname and port to use for + // the websocket handshake. + let mut protocols = addr.iter(); + let mut ip = protocols.next(); + let mut tcp = protocols.next(); + let (host_port, dns_name) = loop { + match (ip, tcp) { + (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port))) + => break (format!("{}:{}", ip, port), None), + (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port))) + => break (format!("{}:{}", ip, port), None), + (Some(Protocol::Dns(h)), Some(Protocol::Tcp(port))) | + (Some(Protocol::Dns4(h)), Some(Protocol::Tcp(port))) | + (Some(Protocol::Dns6(h)), Some(Protocol::Tcp(port))) | + (Some(Protocol::Dnsaddr(h)), Some(Protocol::Tcp(port))) + => break (format!("{}:{}", &h, port), Some(tls::dns_name_ref(&h)?.to_owned())), + (Some(_), Some(p)) => { + ip = Some(p); + tcp = protocols.next(); + } + _ => return Err(Error::InvalidMultiaddr(addr)) } - } + }; + + // Now consume the `Ws` / `Wss` protocol from the end of the address, + // preserving the trailing `P2p` protocol that identifies the remote, + // if any. + let mut protocols = addr.clone(); + let mut p2p = None; + let (use_tls, path) = loop { + match protocols.pop() { + p@Some(Protocol::P2p(_)) => { p2p = p } + Some(Protocol::Ws(path)) => break (false, path.into_owned()), + Some(Protocol::Wss(path)) => { + if dns_name.is_none() { + debug!("Missing DNS name in WSS address: {}", addr); + return Err(Error::InvalidMultiaddr(addr)) + } + break (true, path.into_owned()) + } + _ => return Err(Error::InvalidMultiaddr(addr)) + } + }; + + // The original address, stripped of the `/ws` and `/wss` protocols, + // makes up the the address for the inner TCP-based transport. + let tcp_addr = match p2p { + Some(p) => protocols.with(p), + None => protocols + }; + + Ok(WsAddress { + host_port, + dns_name, + path, + use_tls, + tcp_addr, + }) } // Given a location URL, build a new websocket [`Multiaddr`]. diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index 0ee346fdfa8..4473ed65d73 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -44,6 +44,13 @@ pub struct WsConfig { impl WsConfig { /// Create a new websocket transport based on the given transport. + /// + /// > **Note*: The given transport must be based on TCP/IP and should + /// > usually incorporate DNS resolution, though the latter is not + /// > strictly necessary if one wishes to only use the `Ws` protocol + /// > with known IP addresses and ports. See [`libp2p-tcp`](https://docs.rs/libp2p-tcp/) + /// > and [`libp2p-dns`](https://docs.rs/libp2p-dns) for constructing + /// > the inner transport. pub fn new(transport: T) -> Self { framed::WsConfig::new(transport).into() } @@ -187,10 +194,9 @@ where #[cfg(test)] mod tests { - use libp2p_core::Multiaddr; + use libp2p_core::{Multiaddr, PeerId, Transport, multiaddr::Protocol}; use libp2p_tcp as tcp; use futures::prelude::*; - use libp2p_core::{Transport, multiaddr::Protocol}; use super::WsConfig; #[test] @@ -230,7 +236,7 @@ mod tests { conn.await }; - let outbound = ws_config.dial(addr).unwrap(); + let outbound = ws_config.dial(addr.with(Protocol::P2p(PeerId::random().into()))).unwrap(); let (a, b) = futures::join!(inbound, outbound); a.and(b).unwrap();