Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistream] Make the lazy variant more interoperable. #1855

Merged
merged 7 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# 0.9.0 [unreleased]

- Make the `V1Lazy` upgrade strategy more interoperable with `V1`. Specifically,
the listener now behaves identically with `V1` and `V1Lazy`. Furthermore, the
multistream-select protocol header is now also identical, making `V1` and `V1Lazy`
indistinguishable on the wire. The remaining central effect of `V1Lazy` is that the dialer,
if it only supports a single protocol in a negotiation, optimistically settles on that
protocol without immediately flushing the negotiation data (i.e. protocol proposal)
and without waiting for the corresponding confirmation before it is able to start
sending application data, expecting the used protocol to be confirmed with
the response.

- Fix the encoding and decoding of `ls` responses to
be spec-compliant and interoperable with other implementations.
For a clean upgrade, `0.8.4` must already be deployed.
Expand Down
29 changes: 20 additions & 9 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

//! Protocol negotiation strategies for the peer acting as the dialer.

use crate::{Negotiated, NegotiationError};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use crate::{Negotiated, NegotiationError, Version};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};

use futures::{future::Either, prelude::*};
use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
Expand All @@ -41,7 +41,7 @@ use std::{convert::TryFrom as _, iter, mem, pin::Pin, task::{Context, Poll}};
/// thus an inaccurate size estimate may result in a suboptimal choice.
///
/// Within the scope of this library, a dialer always commits to a specific
/// multistream-select protocol [`Version`], whereas a listener always supports
/// multistream-select [`Version`], whereas a listener always supports
/// all versions supported by this library. Frictionless multistream-select
/// protocol upgrades may thus proceed by deployments with updated listeners,
/// eventually followed by deployments of dialers choosing the newer protocol.
Expand Down Expand Up @@ -181,11 +181,15 @@ where
},
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
let h = HeaderLine::from(*this.version);
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(h)) {
return Poll::Ready(Err(From::from(err)));
}

let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;

// The dialer always sends the header and the first protocol
// proposal in one go for efficiency.
*this.state = SeqState::SendProtocol { io, protocol };
}

Expand All @@ -209,9 +213,14 @@ where
} else {
match this.version {
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
// This is the only effect that `V1Lazy` has compared to `V1`:
// Optimistically settling on the only protocol that
// the dialer supports for this negotiation. Notably,
// the dialer expects a regular `V1` response.
Version::V1Lazy => {
log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, *this.version);
let hl = HeaderLine::from(Version::V1Lazy);
let io = Negotiated::expecting(io.into_reader(), p, Some(hl));
return Poll::Ready(Ok((protocol, io)))
}
}
Expand Down Expand Up @@ -242,7 +251,7 @@ where
};

match msg {
Message::Header(v) if v == *this.version => {
Message::Header(v) if v == HeaderLine::from(*this.version) => {
*this.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
Expand Down Expand Up @@ -318,7 +327,8 @@ where
},
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(*this.version)) {
let msg = Message::Header(HeaderLine::from(*this.version));
if let Err(err) = Pin::new(&mut io).start_send(msg) {
return Poll::Ready(Err(From::from(err)));
}

Expand Down Expand Up @@ -366,7 +376,7 @@ where
};

match &msg {
Message::Header(v) if v == this.version => {
Message::Header(h) if h == &HeaderLine::from(*this.version) => {
*this.state = ParState::RecvProtocols { io }
}
Message::Protocols(supported) => {
Expand Down Expand Up @@ -395,9 +405,10 @@ where
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
return Poll::Ready(Err(From::from(err)));
}

log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, None);

let io = Negotiated::expecting(io.into_reader(), p, *this.version);
return Poll::Ready(Ok((protocol, io)))
}

Expand Down
89 changes: 66 additions & 23 deletions misc/multistream-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,23 @@
//!
//! ## [`Negotiated`](self::Negotiated)
//!
//! When a dialer or listener participating in a negotiation settles
//! on a protocol to use, the [`DialerSelectFuture`] respectively
//! [`ListenerSelectFuture`] yields a [`Negotiated`](self::Negotiated)
//! I/O stream.
//!
//! Notably, when a `DialerSelectFuture` resolves to a `Negotiated`, it may not yet
//! have written the last negotiation message to the underlying I/O stream and may
//! still be expecting confirmation for that protocol, despite having settled on
//! a protocol to use.
//!
//! Similarly, when a `ListenerSelectFuture` resolves to a `Negotiated`, it may not
//! yet have sent the last negotiation message despite having settled on a protocol
//! proposed by the dialer that it supports.
//!
//! This behaviour allows both the dialer and the listener to send data
//! relating to the negotiated protocol together with the last negotiation
//! message(s), which, in the case of the dialer only supporting a single
//! protocol, results in 0-RTT negotiation. Note, however, that a dialer
//! that performs multiple 0-RTT negotiations in sequence for different
//! protocols layered on top of each other may trigger undesirable behaviour
//! for a listener not supporting one of the intermediate protocols.
//! See [`dialer_select_proto`](self::dialer_select_proto).
//! A `Negotiated` represents an I/O stream that has settled on a protocol
//! to use. By default, with [`Version::V1`], protocol negotiation is always
//! at least one dedicated round-trip message exchange, before application
//! data for the negotiated protocol can be sent by the dialer. There is
//! a variant [`Version::V1Lazy`] that permits 0-RTT negotiation if the
//! dialer only supports a single protocol. In that case, when a dialer
//! settles on a protocol to use, the [`DialerSelectFuture`] yields a
//! [`Negotiated`](self::Negotiated) I/O stream before the negotiation
//! data has been flushed. It is then expecting confirmation for that protocol
//! as the first messages read from the stream. This behaviour allows the dialer
//! to immediately send data relating to the negotiated protocol together with the
//! remaining negotiation message(s). Note, however, that a dialer that performs
//! multiple 0-RTT negotiations in sequence for different protocols layered on
//! top of each other may trigger undesirable behaviour for a listener not
//! supporting one of the intermediate protocols. See
//! [`dialer_select_proto`](self::dialer_select_proto) and the documentation
//! of [`Version::V1Lazy`] for further details.
//!
//! ## Examples
//!
Expand Down Expand Up @@ -100,6 +95,54 @@ mod protocol;
mod tests;

pub use self::negotiated::{Negotiated, NegotiatedComplete, NegotiationError};
pub use self::protocol::{ProtocolError, Version};
pub use self::protocol::ProtocolError;
pub use self::dialer_select::{dialer_select_proto, DialerSelectFuture};
pub use self::listener_select::{listener_select_proto, ListenerSelectFuture};

/// Supported multistream-select versions.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Version {
/// Version 1 of the multistream-select protocol. See [1] and [2].
///
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
/// [2]: https://github.com/multiformats/multistream-select
V1,
/// A "lazy" variant of version 1 that is identical on the wire but whereby
/// the dialer delays flushing protocol negotiation data in order to combine
/// it with initial application data, thus performing 0-RTT negotiation.
///
/// This strategy is only applicable for the node with the role of "dialer"
/// in the negotiation and only if the dialer supports just a single
/// application protocol. In that case the dialer immedidately "settles"
/// on that protocol, buffering the negotiation messages to be sent
/// with the first round of application protocol data (or an attempt
/// is made to read from the `Negotiated` I/O stream).
///
/// A listener will behave identically to `V1`. This ensures interoperability with `V1`.
/// Notably, it will immediately send the multistream header as well as the protocol
/// confirmation, resulting in multiple frames being sent on the underlying transport.
/// Nevertheless, if the listener supports the protocol that the dialer optimistically
/// settled on, it can be a 0-RTT negotiation.
///
/// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1`
/// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a
/// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the
/// > listener should either be known to support all of the dialer's optimistically chosen
/// > protocols or there is must be no intermediate protocol without a payload and none of
/// > the protocol payloads must have the potential for being mistaken for a multistream-select
/// > protocol message. This avoids rare edge-cases whereby the listener may not recognize
/// > upgrade boundaries and erroneously process a request despite not supporting one of
/// > the intermediate protocols that the dialer committed to. See [1] and [2].
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
V1Lazy,
// Draft: https://github.com/libp2p/specs/pull/95
// V2,
}

impl Default for Version {
fn default() -> Self {
Version::V1
}
}
22 changes: 11 additions & 11 deletions misc/multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! in a multistream-select protocol negotiation.

use crate::{Negotiated, NegotiationError};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, Version};
use crate::protocol::{Protocol, ProtocolError, MessageIO, Message, HeaderLine};

use futures::prelude::*;
use smallvec::SmallVec;
Expand Down Expand Up @@ -81,7 +81,7 @@ where
N: AsRef<[u8]>
{
RecvHeader { io: MessageIO<R> },
SendHeader { io: MessageIO<R>, version: Version },
SendHeader { io: MessageIO<R> },
RecvMessage { io: MessageIO<R> },
SendMessage {
io: MessageIO<R>,
Expand Down Expand Up @@ -111,8 +111,10 @@ where
match mem::replace(this.state, State::Done) {
State::RecvHeader { mut io } => {
match io.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Message::Header(version)))) => {
*this.state = State::SendHeader { io, version }
Poll::Ready(Some(Ok(Message::Header(h)))) => {
match h {
HeaderLine::V1 => *this.state = State::SendHeader { io }
}
}
Poll::Ready(Some(Ok(_))) => {
return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
Expand All @@ -129,24 +131,22 @@ where
}
}

State::SendHeader { mut io, version } => {
State::SendHeader { mut io } => {
match Pin::new(&mut io).poll_ready(cx) {
Poll::Pending => {
*this.state = State::SendHeader { io, version };
*this.state = State::SendHeader { io };
return Poll::Pending
},
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(version)) {
let msg = Message::Header(HeaderLine::V1);
if let Err(err) = Pin::new(&mut io).start_send(msg) {
return Poll::Ready(Err(From::from(err)));
}

*this.state = match version {
Version::V1 => State::Flush { io, protocol: None },
Version::V1Lazy => State::RecvMessage { io },
}
*this.state = State::Flush { io, protocol: None };
}

State::RecvMessage { mut io } => {
Expand Down
27 changes: 16 additions & 11 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::protocol::{Protocol, MessageReader, Message, Version, ProtocolError};
use crate::protocol::{Protocol, MessageReader, Message, ProtocolError, HeaderLine};

use futures::{prelude::*, io::{IoSlice, IoSliceMut}, ready};
use pin_project::pin_project;
Expand Down Expand Up @@ -80,8 +80,12 @@ impl<TInner> Negotiated<TInner> {

/// Creates a `Negotiated` in state [`State::Expecting`] that is still
/// expecting confirmation of the given `protocol`.
pub(crate) fn expecting(io: MessageReader<TInner>, protocol: Protocol, version: Version) -> Self {
Negotiated { state: State::Expecting { io, protocol, version } }
pub(crate) fn expecting(
io: MessageReader<TInner>,
protocol: Protocol,
header: Option<HeaderLine>
) -> Self {
Negotiated { state: State::Expecting { io, protocol, header } }
}

/// Polls the `Negotiated` for completion.
Expand Down Expand Up @@ -111,11 +115,11 @@ impl<TInner> Negotiated<TInner> {
// Read outstanding protocol negotiation messages.
loop {
match mem::replace(&mut *this.state, State::Invalid) {
State::Expecting { mut io, protocol, version } => {
State::Expecting { mut io, header, protocol } => {
let msg = match Pin::new(&mut io).poll_next(cx)? {
Poll::Ready(Some(msg)) => msg,
Poll::Pending => {
*this.state = State::Expecting { io, protocol, version };
*this.state = State::Expecting { io, header, protocol };
return Poll::Pending
},
Poll::Ready(None) => {
Expand All @@ -124,9 +128,9 @@ impl<TInner> Negotiated<TInner> {
}
};

if let Message::Header(v) = &msg {
if *v == version {
*this.state = State::Expecting { io, protocol, version };
if let Message::Header(h) = &msg {
if Some(h) == header.as_ref() {
*this.state = State::Expecting { io, protocol, header: None };
continue
}
}
Expand Down Expand Up @@ -165,10 +169,11 @@ enum State<R> {
/// The underlying I/O stream.
#[pin]
io: MessageReader<R>,
/// The expected protocol (i.e. name and version).
/// The expected negotiation header/preamble (i.e. multistream-select version),
/// if one is still expected to be received.
header: Option<HeaderLine>,
/// The expected application protocol (i.e. name and version).
protocol: Protocol,
/// The expected multistream-select protocol version.
version: Version
},

/// In this state, a protocol has been agreed upon and I/O
Expand Down
Loading