diff --git a/misc/multistream-select/CHANGELOG.md b/misc/multistream-select/CHANGELOG.md index e61fa9b9d68..3ec5fb8d52d 100644 --- a/misc/multistream-select/CHANGELOG.md +++ b/misc/multistream-select/CHANGELOG.md @@ -1,5 +1,15 @@ # 0.9.0 [unreleased] +- Make the `V1Lazy` upgrade strategy more interoperable with `V1`. Specifically, + the listener now behaves identically with `V1` and `V1Lazy`. Furthermore, the + multistream-select protocol header is now also identical, making `V1` and `V1Lazy` + indistinguishable on the wire. The remaining central effect of `V1Lazy` is that the dialer, + if it only supports a single protocol in a negotiation, optimistically settles on that + protocol without immediately flushing the negotiation data (i.e. protocol proposal) + and without waiting for the corresponding confirmation before it is able to start + sending application data, expecting the used protocol to be confirmed with + the response. + - Fix the encoding and decoding of `ls` responses to be spec-compliant and interoperable with other implementations. For a clean upgrade, `0.8.4` must already be deployed. diff --git a/misc/multistream-select/src/dialer_select.rs b/misc/multistream-select/src/dialer_select.rs index 38111d99c25..733c4903e82 100644 --- a/misc/multistream-select/src/dialer_select.rs +++ b/misc/multistream-select/src/dialer_select.rs @@ -20,8 +20,8 @@ //! Protocol negotiation strategies for the peer acting as the dialer. -use crate::{Negotiated, NegotiationError}; -use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version}; +use crate::{Negotiated, NegotiationError, Version}; +use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine}; use futures::{future::Either, prelude::*}; use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; @@ -41,7 +41,7 @@ use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}}; /// thus an inaccurate size estimate may result in a suboptimal choice. /// /// Within the scope of this library, a dialer always commits to a specific -/// multistream-select protocol [`Version`], whereas a listener always supports +/// multistream-select [`Version`], whereas a listener always supports /// all versions supported by this library. Frictionless multistream-select /// protocol upgrades may thus proceed by deployments with updated listeners, /// eventually followed by deployments of dialers choosing the newer protocol. @@ -181,11 +181,15 @@ where }, } - if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) { + let h = HeaderLine::from(*this.version); + if let Err(err) = Pin::new(&mut io).start_send(Message::Header(h)) { return Poll::Ready(Err(From::from(err))); } let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?; + + // The dialer always sends the header and the first protocol + // proposal in one go for efficiency. *this.state = SeqState::SendProtocol { io, protocol }; } @@ -209,9 +213,14 @@ where } else { match this.version { Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol }, + // This is the only effect that `V1Lazy` has compared to `V1`: + // Optimistically settling on the only protocol that + // the dialer supports for this negotiation. Notably, + // the dialer expects a regular `V1` response. Version::V1Lazy => { log::debug!("Dialer: Expecting proposed protocol: {}", p); - let io = Negotiated::expecting(io.into_reader(), p, *this.version); + let hl = HeaderLine::from(Version::V1Lazy); + let io = Negotiated::expecting(io.into_reader(), p, Some(hl)); return Poll::Ready(Ok((protocol, io))) } } @@ -242,7 +251,7 @@ where }; match msg { - Message::Header(v) if v == *this.version => { + Message::Header(v) if v == HeaderLine::from(*this.version) => { *this.state = SeqState::AwaitProtocol { io, protocol }; } Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => { @@ -318,7 +327,8 @@ where }, } - if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) { + let msg = Message::Header(HeaderLine::from(*this.version)); + if let Err(err) = Pin::new(&mut io).start_send(msg) { return Poll::Ready(Err(From::from(err))); } @@ -366,7 +376,7 @@ where }; match &msg { - Message::Header(v) if v == this.version => { + Message::Header(h) if h == &HeaderLine::from(*this.version) => { *this.state = ParState::RecvProtocols { io } } Message::Protocols(supported) => { @@ -395,9 +405,10 @@ where if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) { return Poll::Ready(Err(From::from(err))); } + log::debug!("Dialer: Expecting proposed protocol: {}", p); + let io = Negotiated::expecting(io.into_reader(), p, None); - let io = Negotiated::expecting(io.into_reader(), p, *this.version); return Poll::Ready(Ok((protocol, io))) } diff --git a/misc/multistream-select/src/lib.rs b/misc/multistream-select/src/lib.rs index 246a620a4bb..087b2a2cb21 100644 --- a/misc/multistream-select/src/lib.rs +++ b/misc/multistream-select/src/lib.rs @@ -48,28 +48,23 @@ //! //! ## [`Negotiated`](self::Negotiated) //! -//! When a dialer or listener participating in a negotiation settles -//! on a protocol to use, the [`DialerSelectFuture`] respectively -//! [`ListenerSelectFuture`] yields a [`Negotiated`](self::Negotiated) -//! I/O stream. -//! -//! Notably, when a `DialerSelectFuture` resolves to a `Negotiated`, it may not yet -//! have written the last negotiation message to the underlying I/O stream and may -//! still be expecting confirmation for that protocol, despite having settled on -//! a protocol to use. -//! -//! Similarly, when a `ListenerSelectFuture` resolves to a `Negotiated`, it may not -//! yet have sent the last negotiation message despite having settled on a protocol -//! proposed by the dialer that it supports. -//! -//! This behaviour allows both the dialer and the listener to send data -//! relating to the negotiated protocol together with the last negotiation -//! message(s), which, in the case of the dialer only supporting a single -//! protocol, results in 0-RTT negotiation. Note, however, that a dialer -//! that performs multiple 0-RTT negotiations in sequence for different -//! protocols layered on top of each other may trigger undesirable behaviour -//! for a listener not supporting one of the intermediate protocols. -//! See [`dialer_select_proto`](self::dialer_select_proto). +//! A `Negotiated` represents an I/O stream that has settled on a protocol +//! to use. By default, with [`Version::V1`], protocol negotiation is always +//! at least one dedicated round-trip message exchange, before application +//! data for the negotiated protocol can be sent by the dialer. There is +//! a variant [`Version::V1Lazy`] that permits 0-RTT negotiation if the +//! dialer only supports a single protocol. In that case, when a dialer +//! settles on a protocol to use, the [`DialerSelectFuture`] yields a +//! [`Negotiated`](self::Negotiated) I/O stream before the negotiation +//! data has been flushed. It is then expecting confirmation for that protocol +//! as the first messages read from the stream. This behaviour allows the dialer +//! to immediately send data relating to the negotiated protocol together with the +//! remaining negotiation message(s). Note, however, that a dialer that performs +//! multiple 0-RTT negotiations in sequence for different protocols layered on +//! top of each other may trigger undesirable behaviour for a listener not +//! supporting one of the intermediate protocols. See +//! [`dialer_select_proto`](self::dialer_select_proto) and the documentation +//! of [`Version::V1Lazy`] for further details. //! //! ## Examples //! @@ -100,6 +95,54 @@ mod protocol; mod tests; pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError}; -pub use self::protocol::{ProtocolError, Version}; +pub use self::protocol::ProtocolError; pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture}; pub use self::listener_select::{listener_select_proto, ListenerSelectFuture}; + +/// Supported multistream-select versions. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Version { + /// Version 1 of the multistream-select protocol. See [1] and [2]. + /// + /// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation + /// [2]: https://github.com/multiformats/multistream-select + V1, + /// A "lazy" variant of version 1 that is identical on the wire but whereby + /// the dialer delays flushing protocol negotiation data in order to combine + /// it with initial application data, thus performing 0-RTT negotiation. + /// + /// This strategy is only applicable for the node with the role of "dialer" + /// in the negotiation and only if the dialer supports just a single + /// application protocol. In that case the dialer immedidately "settles" + /// on that protocol, buffering the negotiation messages to be sent + /// with the first round of application protocol data (or an attempt + /// is made to read from the `Negotiated` I/O stream). + /// + /// A listener will behave identically to `V1`. This ensures interoperability with `V1`. + /// Notably, it will immediately send the multistream header as well as the protocol + /// confirmation, resulting in multiple frames being sent on the underlying transport. + /// Nevertheless, if the listener supports the protocol that the dialer optimistically + /// settled on, it can be a 0-RTT negotiation. + /// + /// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1` + /// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a + /// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the + /// > listener should either be known to support all of the dialer's optimistically chosen + /// > protocols or there is must be no intermediate protocol without a payload and none of + /// > the protocol payloads must have the potential for being mistaken for a multistream-select + /// > protocol message. This avoids rare edge-cases whereby the listener may not recognize + /// > upgrade boundaries and erroneously process a request despite not supporting one of + /// > the intermediate protocols that the dialer committed to. See [1] and [2]. + /// + /// [1]: https://github.com/multiformats/go-multistream/issues/20 + /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 + V1Lazy, + // Draft: https://github.com/libp2p/specs/pull/95 + // V2, +} + +impl Default for Version { + fn default() -> Self { + Version::V1 + } +} \ No newline at end of file diff --git a/misc/multistream-select/src/listener_select.rs b/misc/multistream-select/src/listener_select.rs index 04f23228369..4b76d04ea04 100644 --- a/misc/multistream-select/src/listener_select.rs +++ b/misc/multistream-select/src/listener_select.rs @@ -22,7 +22,7 @@ //! in a multistream-select protocol negotiation. use crate::{Negotiated, NegotiationError}; -use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version}; +use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine}; use futures::prelude::*; use smallvec::SmallVec; @@ -81,7 +81,7 @@ where N: AsRef<[u8]> { RecvHeader { io: MessageIO }, - SendHeader { io: MessageIO, version: Version }, + SendHeader { io: MessageIO }, RecvMessage { io: MessageIO }, SendMessage { io: MessageIO, @@ -111,8 +111,10 @@ where match mem::replace(this.state, State::Done) { State::RecvHeader { mut io } => { match io.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(Message::Header(version)))) => { - *this.state = State::SendHeader { io, version } + Poll::Ready(Some(Ok(Message::Header(h)))) => { + match h { + HeaderLine::V1 => *this.state = State::SendHeader { io } + } } Poll::Ready(Some(Ok(_))) => { return Poll::Ready(Err(ProtocolError::InvalidMessage.into())) @@ -129,24 +131,22 @@ where } } - State::SendHeader { mut io, version } => { + State::SendHeader { mut io } => { match Pin::new(&mut io).poll_ready(cx) { Poll::Pending => { - *this.state = State::SendHeader { io, version }; + *this.state = State::SendHeader { io }; return Poll::Pending }, Poll::Ready(Ok(())) => {}, Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))), } - if let Err(err) = Pin::new(&mut io).start_send(Message::Header(version)) { + let msg = Message::Header(HeaderLine::V1); + if let Err(err) = Pin::new(&mut io).start_send(msg) { return Poll::Ready(Err(From::from(err))); } - *this.state = match version { - Version::V1 => State::Flush { io, protocol: None }, - Version::V1Lazy => State::RecvMessage { io }, - } + *this.state = State::Flush { io, protocol: None }; } State::RecvMessage { mut io } => { diff --git a/misc/multistream-select/src/negotiated.rs b/misc/multistream-select/src/negotiated.rs index 9e8f38efe2c..e80d579f2b4 100644 --- a/misc/multistream-select/src/negotiated.rs +++ b/misc/multistream-select/src/negotiated.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError}; +use crate::protocol::{Protocol, MessageReader, Message, ProtocolError, HeaderLine}; use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready}; use pin_project::pin_project; @@ -80,8 +80,12 @@ impl Negotiated { /// Creates a `Negotiated` in state [`State::Expecting`] that is still /// expecting confirmation of the given `protocol`. - pub(crate) fn expecting(io: MessageReader, protocol: Protocol, version: Version) -> Self { - Negotiated { state: State::Expecting { io, protocol, version } } + pub(crate) fn expecting( + io: MessageReader, + protocol: Protocol, + header: Option + ) -> Self { + Negotiated { state: State::Expecting { io, protocol, header } } } /// Polls the `Negotiated` for completion. @@ -111,11 +115,11 @@ impl Negotiated { // Read outstanding protocol negotiation messages. loop { match mem::replace(&mut *this.state, State::Invalid) { - State::Expecting { mut io, protocol, version } => { + State::Expecting { mut io, header, protocol } => { let msg = match Pin::new(&mut io).poll_next(cx)? { Poll::Ready(Some(msg)) => msg, Poll::Pending => { - *this.state = State::Expecting { io, protocol, version }; + *this.state = State::Expecting { io, header, protocol }; return Poll::Pending }, Poll::Ready(None) => { @@ -124,9 +128,9 @@ impl Negotiated { } }; - if let Message::Header(v) = &msg { - if *v == version { - *this.state = State::Expecting { io, protocol, version }; + if let Message::Header(h) = &msg { + if Some(h) == header.as_ref() { + *this.state = State::Expecting { io, protocol, header: None }; continue } } @@ -165,10 +169,11 @@ enum State { /// The underlying I/O stream. #[pin] io: MessageReader, - /// The expected protocol (i.e. name and version). + /// The expected negotiation header/preamble (i.e. multistream-select version), + /// if one is still expected to be received. + header: Option, + /// The expected application protocol (i.e. name and version). protocol: Protocol, - /// The expected multistream-select protocol version. - version: Version }, /// In this state, a protocol has been agreed upon and I/O diff --git a/misc/multistream-select/src/protocol.rs b/misc/multistream-select/src/protocol.rs index 4944e5f4459..1d056de75ec 100644 --- a/misc/multistream-select/src/protocol.rs +++ b/misc/multistream-select/src/protocol.rs @@ -25,6 +25,7 @@ //! `Stream` and `Sink` implementations of `MessageIO` and //! `MessageReader`. +use crate::Version; use crate::length_delimited::{LengthDelimited, LengthDelimitedReader}; use bytes::{Bytes, BytesMut, BufMut}; @@ -37,71 +38,25 @@ const MAX_PROTOCOLS: usize = 1000; /// The encoded form of a multistream-select 1.0.0 header message. const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n"; -/// The encoded form of a multistream-select 1.0.0 header message. -const MSG_MULTISTREAM_1_0_LAZY: &[u8] = b"/multistream-lazy/1\n"; /// The encoded form of a multistream-select 'na' message. const MSG_PROTOCOL_NA: &[u8] = b"na\n"; /// The encoded form of a multistream-select 'ls' message. const MSG_LS: &[u8] = b"ls\n"; -/// Supported multistream-select protocol versions. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum Version { - /// Version 1 of the multistream-select protocol. See [1] and [2]. - /// - /// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation - /// [2]: https://github.com/multiformats/multistream-select +/// The multistream-select header lines preceeding negotiation. +/// +/// Every [`Version`] has a corresponding header line. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum HeaderLine { + /// The `/multistream/1.0.0` header line. V1, - /// A lazy variant of version 1 that is identical on the wire but delays - /// sending of protocol negotiation data as much as possible. - /// - /// Delaying the sending of protocol negotiation data can result in - /// significantly fewer network roundtrips used for the negotiation, - /// up to 0-RTT negotiation. - /// - /// 0-RTT negotiation is achieved if the dialer supports only a single - /// application protocol. In that case the dialer immedidately settles - /// on that protocol, buffering the negotiation messages to be sent - /// with the first round of application protocol data (or an attempt - /// is made to read from the `Negotiated` I/O stream). - /// - /// A listener receiving a `V1Lazy` header will similarly delay sending - /// of the protocol confirmation. Though typically the listener will need - /// to read the request data before sending its response, thus triggering - /// sending of the protocol confirmation, which, in absence of additional - /// buffering on lower layers will result in at least two response frames - /// to be sent. - /// - /// `V1Lazy` is specific to `rust-libp2p`: While the wire protocol - /// is identical to `V1`, delayed sending of protocol negotiation frames - /// is only safe under the following assumptions: - /// - /// 1. The dialer is assumed to always send the first multistream-select - /// protocol message immediately after the multistream header, without - /// first waiting for confirmation of that header. Since the listener - /// delays sending the protocol confirmation, a deadlock situation may - /// otherwise occurs that is only resolved by a timeout. This assumption - /// is trivially satisfied if both peers support and use `V1Lazy`. - /// - /// 2. When nesting multiple protocol negotiations, the listener is either - /// known to support all of the dialer's optimistically chosen protocols - /// or there is no intermediate protocol without a payload and none of - /// the protocol payloads has the potential for being mistaken for a - /// multistream-select protocol message. This avoids rare edge-cases whereby - /// the listener may not recognize upgrade boundaries and erroneously - /// process a request despite not supporting one of the intermediate - /// protocols that the dialer committed to. See [1] and [2]. - /// - /// [1]: https://github.com/multiformats/go-multistream/issues/20 - /// [2]: https://github.com/libp2p/rust-libp2p/pull/1212 - V1Lazy, - // Draft: https://github.com/libp2p/specs/pull/95 - // V2, } -impl Default for Version { - fn default() -> Self { - Version::V1 +impl From for HeaderLine { + fn from(v: Version) -> HeaderLine { + match v { + Version::V1 | Version::V1Lazy => HeaderLine::V1, + } } } @@ -148,7 +103,7 @@ impl fmt::Display for Protocol { pub enum Message { /// A header message identifies the multistream-select protocol /// that the sender wishes to speak. - Header(Version), + Header(HeaderLine), /// A protocol message identifies a protocol request or acknowledgement. Protocol(Protocol), /// A message through which a peer requests the complete list of @@ -164,16 +119,11 @@ impl Message { /// Encodes a `Message` into its byte representation. pub fn encode(&self, dest: &mut BytesMut) -> Result<(), ProtocolError> { match self { - Message::Header(Version::V1) => { + Message::Header(HeaderLine::V1) => { dest.reserve(MSG_MULTISTREAM_1_0.len()); dest.put(MSG_MULTISTREAM_1_0); Ok(()) } - Message::Header(Version::V1Lazy) => { - dest.reserve(MSG_MULTISTREAM_1_0_LAZY.len()); - dest.put(MSG_MULTISTREAM_1_0_LAZY); - Ok(()) - } Message::Protocol(p) => { let len = p.0.as_ref().len() + 1; // + 1 for \n dest.reserve(len); @@ -209,12 +159,8 @@ impl Message { /// Decodes a `Message` from its byte representation. pub fn decode(mut msg: Bytes) -> Result { - if msg == MSG_MULTISTREAM_1_0_LAZY { - return Ok(Message::Header(Version::V1Lazy)) - } - if msg == MSG_MULTISTREAM_1_0 { - return Ok(Message::Header(Version::V1)) + return Ok(Message::Header(HeaderLine::V1)) } if msg == MSG_PROTOCOL_NA { @@ -506,7 +452,7 @@ mod tests { impl Arbitrary for Message { fn arbitrary(g: &mut G) -> Message { match g.gen_range(0, 5) { - 0 => Message::Header(Version::V1), + 0 => Message::Header(HeaderLine::V1), 1 => Message::NotAvailable, 2 => Message::ListProtocols, 3 => Message::Protocol(Protocol::arbitrary(g)), diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 71fede97ea9..81533a5b5b7 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -702,8 +702,8 @@ fn get_record_many() { .. })) => { assert_eq!(id, qid); - assert_eq!(records.len(), num_results); - assert_eq!(records.first().unwrap().record, record); + assert!(records.len() >= num_results); + assert!(records.into_iter().all(|r| r.record == record)); return Poll::Ready(()); } // Ignore any other event.