From d326beea450321e2a4027190a6f7db5fbce55f49 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 23 Sep 2022 08:42:33 +0200 Subject: [PATCH 1/6] fix(multistream-select): remove parallel dialing optimization This is to avoid the usage of the now optional `ls` command, and stay compatible with go-multistream. Closes #2925 --- misc/multistream-select/CHANGELOG.md | 4 + misc/multistream-select/src/dialer_select.rs | 211 +------------------ misc/multistream-select/src/tests.rs | 35 +-- 3 files changed, 10 insertions(+), 240 deletions(-) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 5b3b336775b..7f6ec7bfbc0 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,3 +1,7 @@ +# Unreleased + +- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. + # 0.11.0 [2022-01-27] - Migrate to Rust edition 2021 (see [PR 2339]). diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 7a8c75daa6f..ed6dcfe6e96 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -23,7 +23,7 @@ use crate::protocol::{HeaderLine, Message, MessageIO, Protocol, ProtocolError}; use crate::{Negotiated, NegotiationError, Version}; -use futures::{future::Either, prelude::*}; +use futures::prelude::*; use std::{ convert::TryFrom as _, iter, mem, @@ -39,12 +39,6 @@ use std::{ /// returned `Future` resolves with the name of the negotiated protocol and /// a [`Negotiated`] I/O stream. /// -/// The chosen message flow for protocol negotiation depends on the numbers of -/// supported protocols given. That is, this function delegates to serial or -/// parallel variant based on the number of protocols given. The number of -/// protocols is determined through the `size_hint` of the given iterator and -/// thus an inaccurate size estimate may result in a suboptimal choice. -/// /// Within the scope of this library, a dialer always commits to a specific /// multistream-select [`Version`], whereas a listener always supports /// all versions supported by this library. Frictionless multistream-select @@ -55,44 +49,13 @@ pub fn dialer_select_proto( protocols: I, version: Version, ) -> DialerSelectFuture -where - R: AsyncRead + AsyncWrite, - I: IntoIterator, - I::Item: AsRef<[u8]>, -{ - let iter = protocols.into_iter(); - // We choose between the "serial" and "parallel" strategies based on the number of protocols. - if iter.size_hint().1.map(|n| n <= 3).unwrap_or(false) { - Either::Left(dialer_select_proto_serial(inner, iter, version)) - } else { - Either::Right(dialer_select_proto_parallel(inner, iter, version)) - } -} - -/// Future, returned by `dialer_select_proto`, which selects a protocol and dialer -/// either trying protocols in-order, or by requesting all protocols supported -/// by the remote upfront, from which the first protocol found in the dialer's -/// list of protocols is selected. -pub type DialerSelectFuture = Either, DialerSelectPar>; - -/// Returns a `Future` that negotiates a protocol on the given I/O stream. -/// -/// Just like [`dialer_select_proto`] but always using an iterative message flow, -/// trying the given list of supported protocols one-by-one. -/// -/// This strategy is preferable if the dialer only supports a few protocols. -pub(crate) fn dialer_select_proto_serial( - inner: R, - protocols: I, - version: Version, -) -> DialerSelectSeq where R: AsyncRead + AsyncWrite, I: IntoIterator, I::Item: AsRef<[u8]>, { let protocols = protocols.into_iter().peekable(); - DialerSelectSeq { + DialerSelectFuture { version, protocols, state: SeqState::SendHeader { @@ -101,39 +64,10 @@ where } } -/// Returns a `Future` that negotiates a protocol on the given I/O stream. -/// -/// Just like [`dialer_select_proto`] but always using a message flow that first -/// requests all supported protocols from the remote, selecting the first -/// protocol from the given list of supported protocols that is supported -/// by the remote. -/// -/// This strategy may be beneficial if the dialer supports many protocols -/// and it is unclear whether the remote supports one of the first few. -pub(crate) fn dialer_select_proto_parallel( - inner: R, - protocols: I, - version: Version, -) -> DialerSelectPar -where - R: AsyncRead + AsyncWrite, - I: IntoIterator, - I::Item: AsRef<[u8]>, -{ - let protocols = protocols.into_iter(); - DialerSelectPar { - version, - protocols, - state: ParState::SendHeader { - io: MessageIO::new(inner), - }, - } -} - -/// A `Future` returned by [`dialer_select_proto_serial`] which negotiates +/// A `Future` returned by [`dialer_select_proto`] which negotiates /// a protocol iteratively by considering one protocol after the other. #[pin_project::pin_project] -pub struct DialerSelectSeq { +pub struct DialerSelectFuture { // TODO: It would be nice if eventually N = I::Item = Protocol. protocols: iter::Peekable, state: SeqState, @@ -148,7 +82,7 @@ enum SeqState { Done, } -impl Future for DialerSelectSeq +impl Future for DialerSelectFuture where // The Unpin bound here is required because we produce a `Negotiated` as the output. // It also makes the implementation considerably easier to write. @@ -267,138 +201,3 @@ where } } } - -/// A `Future` returned by [`dialer_select_proto_parallel`] which negotiates -/// a protocol selectively by considering all supported protocols of the remote -/// "in parallel". -#[pin_project::pin_project] -pub struct DialerSelectPar { - protocols: I, - state: ParState, - version: Version, -} - -enum ParState { - SendHeader { io: MessageIO }, - SendProtocolsRequest { io: MessageIO }, - Flush { io: MessageIO }, - RecvProtocols { io: MessageIO }, - SendProtocol { io: MessageIO, protocol: N }, - Done, -} - -impl Future for DialerSelectPar -where - // The Unpin bound here is required because we produce a `Negotiated` as the output. - // It also makes the implementation considerably easier to write. - R: AsyncRead + AsyncWrite + Unpin, - I: Iterator, - I::Item: AsRef<[u8]>, -{ - type Output = Result<(I::Item, Negotiated), NegotiationError>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - loop { - match mem::replace(this.state, ParState::Done) { - ParState::SendHeader { mut io } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendHeader { io }; - return Poll::Pending; - } - } - - let msg = Message::Header(HeaderLine::from(*this.version)); - if let Err(err) = Pin::new(&mut io).start_send(msg) { - return Poll::Ready(Err(From::from(err))); - } - - *this.state = ParState::SendProtocolsRequest { io }; - } - - ParState::SendProtocolsRequest { mut io } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendProtocolsRequest { io }; - return Poll::Pending; - } - } - - if let Err(err) = Pin::new(&mut io).start_send(Message::ListProtocols) { - return Poll::Ready(Err(From::from(err))); - } - - log::debug!("Dialer: Requested supported protocols."); - *this.state = ParState::Flush { io } - } - - ParState::Flush { mut io } => match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = ParState::RecvProtocols { io }, - Poll::Pending => { - *this.state = ParState::Flush { io }; - return Poll::Pending; - } - }, - - ParState::RecvProtocols { mut io } => { - let msg = match Pin::new(&mut io).poll_next(cx)? { - Poll::Ready(Some(msg)) => msg, - Poll::Pending => { - *this.state = ParState::RecvProtocols { io }; - return Poll::Pending; - } - // Treat EOF error as [`NegotiationError::Failed`], not as - // [`NegotiationError::ProtocolError`], allowing dropping or closing an I/O - // stream as a permissible way to "gracefully" fail a negotiation. - Poll::Ready(None) => return Poll::Ready(Err(NegotiationError::Failed)), - }; - - match &msg { - Message::Header(h) if h == &HeaderLine::from(*this.version) => { - *this.state = ParState::RecvProtocols { io } - } - Message::Protocols(supported) => { - let protocol = this - .protocols - .by_ref() - .find(|p| supported.iter().any(|s| s.as_ref() == p.as_ref())) - .ok_or(NegotiationError::Failed)?; - log::debug!( - "Dialer: Found supported protocol: {}", - String::from_utf8_lossy(protocol.as_ref()) - ); - *this.state = ParState::SendProtocol { io, protocol }; - } - _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), - } - } - - ParState::SendProtocol { mut io, protocol } => { - match Pin::new(&mut io).poll_ready(cx)? { - Poll::Ready(()) => {} - Poll::Pending => { - *this.state = ParState::SendProtocol { io, protocol }; - return Poll::Pending; - } - } - - let p = Protocol::try_from(protocol.as_ref())?; - if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { - return Poll::Ready(Err(From::from(err))); - } - - log::debug!("Dialer: Expecting proposed protocol: {}", p); - let io = Negotiated::expecting(io.into_reader(), p, None); - - return Poll::Ready(Ok((protocol, io))); - } - - ParState::Done => panic!("ParState::poll called after completion"), - } - } - } -} diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/src/tests.rs index ca627d24fcf..4a1592b741f 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/src/tests.rs @@ -22,7 +22,6 @@ #![cfg(test)] -use crate::dialer_select::{dialer_select_proto_parallel, dialer_select_proto_serial}; use crate::{dialer_select_proto, listener_select_proto}; use crate::{NegotiationError, Version}; @@ -182,38 +181,6 @@ fn negotiation_failed() { } } -#[test] -fn select_proto_parallel() { - async fn run(version: Version) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = async_std::task::spawn(async move { - let connec = listener.accept().await.unwrap().0; - let protos = vec![b"/proto1", b"/proto2"]; - let (proto, io) = listener_select_proto(connec, protos).await.unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - let client = async_std::task::spawn(async move { - let connec = TcpStream::connect(&listener_addr).await.unwrap(); - let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_parallel(connec, protos.into_iter(), version) - .await - .unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - server.await; - client.await; - } - - async_std::task::block_on(run(Version::V1)); - async_std::task::block_on(run(Version::V1Lazy)); -} - #[test] fn select_proto_serial() { async fn run(version: Version) { @@ -231,7 +198,7 @@ fn select_proto_serial() { let client = async_std::task::spawn(async move { let connec = TcpStream::connect(&listener_addr).await.unwrap(); let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto_serial(connec, protos.into_iter(), version) + let (proto, io) = dialer_select_proto(connec, protos.into_iter(), version) .await .unwrap(); assert_eq!(proto, b"/proto2"); From ee19c59ab573ac544293d9d0fafa1aff6ecaeb27 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 23 Sep 2022 11:35:59 +0200 Subject: [PATCH 2/6] apply CR --- core/Cargo.toml | 2 +- misc/multistream-select/CHANGELOG.md | 2 +- misc/multistream-select/Cargo.toml | 2 +- misc/multistream-select/src/dialer_select.rs | 38 +++++++++---------- misc/multistream-select/src/lib.rs | 1 - .../{src/tests.rs => tests/dialer_select.rs} | 36 +----------------- 6 files changed, 23 insertions(+), 58 deletions(-) rename misc/multistream-select/{src/tests.rs => tests/dialer_select.rs} (84%) diff --git a/core/Cargo.toml b/core/Cargo.toml index 201f5220a4e..0807190dc9d 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,7 @@ libsecp256k1 = { version = "0.7.0", optional = true } log = "0.4" multiaddr = { version = "0.14.0" } multihash = { version = "0.16", default-features = false, features = ["std", "multihash-impl", "identity", "sha2"] } -multistream-select = { version = "0.11", path = "../misc/multistream-select" } +multistream-select = { version = "0.12", path = "../misc/multistream-select" } p256 = { version = "0.11.1", default-features = false, features = ["ecdsa"], optional = true } parking_lot = "0.12.0" pin-project = "1.0.0" diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 7f6ec7bfbc0..c6f3525da57 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,4 +1,4 @@ -# Unreleased +# 0.12.0 [Unreleased] - Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. diff --git a/misc/multistream-select/Cargo.toml b/misc/multistream-select/Cargo.toml index 5fd65270d97..0e0dde2af75 100644 --- a/misc/multistream-select/Cargo.toml +++ b/misc/multistream-select/Cargo.toml @@ -3,7 +3,7 @@ name = "multistream-select" edition = "2021" rust-version = "1.56.1" description = "Multistream-select negotiation protocol for libp2p" -version = "0.11.0" +version = "0.12.0" authors = ["Parity Technologies "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index ed6dcfe6e96..893c86f8867 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -58,7 +58,7 @@ where DialerSelectFuture { version, protocols, - state: SeqState::SendHeader { + state: State::SendHeader { io: MessageIO::new(inner), }, } @@ -70,11 +70,11 @@ where pub struct DialerSelectFuture { // TODO: It would be nice if eventually N = I::Item = Protocol. protocols: iter::Peekable, - state: SeqState, + state: State, version: Version, } -enum SeqState { +enum State { SendHeader { io: MessageIO }, SendProtocol { io: MessageIO, protocol: N }, FlushProtocol { io: MessageIO, protocol: N }, @@ -96,12 +96,12 @@ where let this = self.project(); loop { - match mem::replace(this.state, SeqState::Done) { - SeqState::SendHeader { mut io } => { + match mem::replace(this.state, State::Done) { + State::SendHeader { mut io } => { match Pin::new(&mut io).poll_ready(cx)? { Poll::Ready(()) => {} Poll::Pending => { - *this.state = SeqState::SendHeader { io }; + *this.state = State::SendHeader { io }; return Poll::Pending; } } @@ -115,14 +115,14 @@ where // The dialer always sends the header and the first protocol // proposal in one go for efficiency. - *this.state = SeqState::SendProtocol { io, protocol }; + *this.state = State::SendProtocol { io, protocol }; } - SeqState::SendProtocol { mut io, protocol } => { + State::SendProtocol { mut io, protocol } => { match Pin::new(&mut io).poll_ready(cx)? { Poll::Ready(()) => {} Poll::Pending => { - *this.state = SeqState::SendProtocol { io, protocol }; + *this.state = State::SendProtocol { io, protocol }; return Poll::Pending; } } @@ -134,10 +134,10 @@ where log::debug!("Dialer: Proposed protocol: {}", p); if this.protocols.peek().is_some() { - *this.state = SeqState::FlushProtocol { io, protocol } + *this.state = State::FlushProtocol { io, protocol } } else { match this.version { - Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, + Version::V1 => *this.state = State::FlushProtocol { io, protocol }, // This is the only effect that `V1Lazy` has compared to `V1`: // Optimistically settling on the only protocol that // the dialer supports for this negotiation. Notably, @@ -152,21 +152,21 @@ where } } - SeqState::FlushProtocol { mut io, protocol } => { + State::FlushProtocol { mut io, protocol } => { match Pin::new(&mut io).poll_flush(cx)? { - Poll::Ready(()) => *this.state = SeqState::AwaitProtocol { io, protocol }, + Poll::Ready(()) => *this.state = State::AwaitProtocol { io, protocol }, Poll::Pending => { - *this.state = SeqState::FlushProtocol { io, protocol }; + *this.state = State::FlushProtocol { io, protocol }; return Poll::Pending; } } } - SeqState::AwaitProtocol { mut io, protocol } => { + State::AwaitProtocol { mut io, protocol } => { let msg = match Pin::new(&mut io).poll_next(cx)? { Poll::Ready(Some(msg)) => msg, Poll::Pending => { - *this.state = SeqState::AwaitProtocol { io, protocol }; + *this.state = State::AwaitProtocol { io, protocol }; return Poll::Pending; } // Treat EOF error as [`NegotiationError::Failed`], not as @@ -177,7 +177,7 @@ where match msg { Message::Header(v) if v == HeaderLine::from(*this.version) => { - *this.state = SeqState::AwaitProtocol { io, protocol }; + *this.state = State::AwaitProtocol { io, protocol }; } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { log::debug!("Dialer: Received confirmation for protocol: {}", p); @@ -190,13 +190,13 @@ where String::from_utf8_lossy(protocol.as_ref()) ); let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; - *this.state = SeqState::SendProtocol { io, protocol } + *this.state = State::SendProtocol { io, protocol } } _ => return Poll::Ready(Err(ProtocolError::InvalidMessage.into())), } } - SeqState::Done => panic!("SeqState::poll called after completion"), + State::Done => panic!("State::poll called after completion"), } } } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 00291f4ece8..0d7346750bc 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -92,7 +92,6 @@ mod length_delimited; mod listener_select; mod negotiated; mod protocol; -mod tests; pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; diff --git a/misc/multistream-select/src/tests.rs b/misc/multistream-select/tests/dialer_select.rs similarity index 84% rename from misc/multistream-select/src/tests.rs rename to misc/multistream-select/tests/dialer_select.rs index 4a1592b741f..9e31fda532a 100644 --- a/misc/multistream-select/src/tests.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -22,11 +22,9 @@ #![cfg(test)] -use crate::{dialer_select_proto, listener_select_proto}; -use crate::{NegotiationError, Version}; - use async_std::net::{TcpListener, TcpStream}; use futures::prelude::*; +use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version}; #[test] fn select_proto_basic() { @@ -180,35 +178,3 @@ fn negotiation_failed() { } } } - -#[test] -fn select_proto_serial() { - async fn run(version: Version) { - let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); - let listener_addr = listener.local_addr().unwrap(); - - let server = async_std::task::spawn(async move { - let connec = listener.accept().await.unwrap().0; - let protos = vec![b"/proto1", b"/proto2"]; - let (proto, io) = listener_select_proto(connec, protos).await.unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - let client = async_std::task::spawn(async move { - let connec = TcpStream::connect(&listener_addr).await.unwrap(); - let protos = vec![b"/proto3", b"/proto2"]; - let (proto, io) = dialer_select_proto(connec, protos.into_iter(), version) - .await - .unwrap(); - assert_eq!(proto, b"/proto2"); - io.complete().await.unwrap(); - }); - - server.await; - client.await; - } - - async_std::task::block_on(run(Version::V1)); - async_std::task::block_on(run(Version::V1Lazy)); -} From 0becbbf3873cc652437cebec16303faf62d0441f Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Fri, 23 Sep 2022 20:30:11 +1000 Subject: [PATCH 3/6] Update misc/multistream-select/CHANGELOG.md --- misc/multistream-select/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index c6f3525da57..6a72e37c112 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,6 +1,8 @@ # 0.12.0 [Unreleased] -- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. +- Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. See [PR 2934]. + +[PR 2934]: https://github.com/libp2p/rust-libp2p/pull/2934 # 0.11.0 [2022-01-27] From 0e6d27c608fb0d642542a6bf77a825d7cc2b0f79 Mon Sep 17 00:00:00 2001 From: Friedel Ziegelmayer Date: Fri, 23 Sep 2022 13:28:55 +0200 Subject: [PATCH 4/6] Update misc/multistream-select/tests/dialer_select.rs Co-authored-by: Thomas Eizinger --- misc/multistream-select/tests/dialer_select.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index 9e31fda532a..31c358cd1a2 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -20,7 +20,6 @@ //! Integration tests for protocol negotiation. -#![cfg(test)] use async_std::net::{TcpListener, TcpStream}; use futures::prelude::*; From 8620a404fbb2f4ac7bf00c5cce26a47bd7a38b0c Mon Sep 17 00:00:00 2001 From: Thomas Eizinger Date: Sat, 24 Sep 2022 00:24:40 +1000 Subject: [PATCH 5/6] Update misc/multistream-select/tests/dialer_select.rs --- misc/multistream-select/tests/dialer_select.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/misc/multistream-select/tests/dialer_select.rs b/misc/multistream-select/tests/dialer_select.rs index 31c358cd1a2..66fd1593a62 100644 --- a/misc/multistream-select/tests/dialer_select.rs +++ b/misc/multistream-select/tests/dialer_select.rs @@ -20,7 +20,6 @@ //! Integration tests for protocol negotiation. - use async_std::net::{TcpListener, TcpStream}; use futures::prelude::*; use multistream_select::{dialer_select_proto, listener_select_proto, NegotiationError, Version}; From 855c91817751283cb087c299deebea284d913d0a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 27 Sep 2022 18:22:22 +0100 Subject: [PATCH 6/6] Update misc/multistream-select/CHANGELOG.md --- misc/multistream-select/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index 6a72e37c112..4b15cc51ba1 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,4 +1,4 @@ -# 0.12.0 [Unreleased] +# 0.12.0 [unreleased] - Remove parallel dialing optimization, to avoid requiring the use of the `ls` command. See [PR 2934].