Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[multistream] Make the lazy variant more interoperable. #1855

Merged
merged 7 commits into from
Nov 25, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions misc/multistream-select/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# 0.9.0 [unreleased]

- Make the `V1Lazy` upgrade strategy more interoperable with `V1`.
romanb marked this conversation as resolved.
Show resolved Hide resolved
romanb marked this conversation as resolved.
Show resolved Hide resolved

- Fix the encoding and decoding of `ls` responses to
be spec-compliant and interoperable with other implementations.
For a clean upgrade, `0.8.4` must already be deployed.
Expand Down
16 changes: 12 additions & 4 deletions misc/multistream-select/src/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ where
}

let protocol = this.protocols.next().ok_or(NegotiationError::Failed)?;

// The dialer always sends the header and the first protocol
// proposal in one go for efficiency.
*this.state = SeqState::SendProtocol { io, protocol };
}

Expand All @@ -209,9 +212,13 @@ where
} else {
match this.version {
Version::V1 => *this.state = SeqState::FlushProtocol { io, protocol },
// This is the only effect that `V1Lazy` has compared to `V1`:
// Optimistically settling on the only protocol that
// the dialer supports for this negotiation. Notably,
// the dialer expects a regular `V1` response.
Version::V1Lazy => {
log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, *this.version);
let io = Negotiated::expecting(io.into_reader(), p, Some(Version::V1));
return Poll::Ready(Ok((protocol, io)))
}
}
Expand Down Expand Up @@ -242,7 +249,7 @@ where
};

match msg {
Message::Header(v) if v == *this.version => {
Message::Header(Version::V1) => {
*this.state = SeqState::AwaitProtocol { io, protocol };
}
Message::Protocol(ref p) if p.as_ref() == protocol.as_ref() => {
Expand Down Expand Up @@ -366,7 +373,7 @@ where
};

match &msg {
Message::Header(v) if v == this.version => {
Message::Header(Version::V1) => {
*this.state = ParState::RecvProtocols { io }
}
Message::Protocols(supported) => {
Expand Down Expand Up @@ -395,9 +402,10 @@ where
if let Err(err) = Pin::new(&mut io).start_send(Message::Protocol(p.clone())) {
return Poll::Ready(Err(From::from(err)));
}

log::debug!("Dialer: Expecting proposed protocol: {}", p);
let io = Negotiated::expecting(io.into_reader(), p, None);

let io = Negotiated::expecting(io.into_reader(), p, *this.version);
return Poll::Ready(Ok((protocol, io)))
}

Expand Down
39 changes: 17 additions & 22 deletions misc/multistream-select/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,23 @@
//!
//! ## [`Negotiated`](self::Negotiated)
//!
//! When a dialer or listener participating in a negotiation settles
//! on a protocol to use, the [`DialerSelectFuture`] respectively
//! [`ListenerSelectFuture`] yields a [`Negotiated`](self::Negotiated)
//! I/O stream.
//!
//! Notably, when a `DialerSelectFuture` resolves to a `Negotiated`, it may not yet
//! have written the last negotiation message to the underlying I/O stream and may
//! still be expecting confirmation for that protocol, despite having settled on
//! a protocol to use.
//!
//! Similarly, when a `ListenerSelectFuture` resolves to a `Negotiated`, it may not
//! yet have sent the last negotiation message despite having settled on a protocol
//! proposed by the dialer that it supports.
//!
//! This behaviour allows both the dialer and the listener to send data
//! relating to the negotiated protocol together with the last negotiation
//! message(s), which, in the case of the dialer only supporting a single
//! protocol, results in 0-RTT negotiation. Note, however, that a dialer
//! that performs multiple 0-RTT negotiations in sequence for different
//! protocols layered on top of each other may trigger undesirable behaviour
//! for a listener not supporting one of the intermediate protocols.
//! See [`dialer_select_proto`](self::dialer_select_proto).
//! A `Negotiated` represents an I/O stream that has settled on a protocol
//! to use. By default, with [`Version::V1`], protocol negotiation is always
//! at least one dedicated round-trip message exchange, before application
//! data for the negotiated protocol can be sent by the dialer. There is
//! a variant [`Version::V1Lazy`] that permits 0-RTT negotiation if the
//! dialer only supports a single protocol. In that case, when a dialer
//! settles on a protocol to use, the [`DialerSelectFuture`] yields a
//! [`Negotiated`](self::Negotiated) I/O stream before the negotiation
//! data has been flushed. It is then expecting confirmation for that protocol
//! as the first messages read from the stream. This behaviour allows the dialer
//! to immediately send data relating to the negotiated protocol together with the
//! remaining negotiation message(s). Note, however, that a dialer that performs
//! multiple 0-RTT negotiations in sequence for different protocols layered on
//! top of each other may trigger undesirable behaviour for a listener not
//! supporting one of the intermediate protocols. See
//! [`dialer_select_proto`](self::dialer_select_proto) and the documentation
//! of [`Version::V1Lazy`] for further details.
//!
//! ## Examples
//!
Expand Down
17 changes: 7 additions & 10 deletions misc/multistream-select/src/listener_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
N: AsRef<[u8]>
{
RecvHeader { io: MessageIO<R> },
SendHeader { io: MessageIO<R>, version: Version },
SendHeader { io: MessageIO<R> },
RecvMessage { io: MessageIO<R> },
SendMessage {
io: MessageIO<R>,
Expand Down Expand Up @@ -111,8 +111,8 @@ where
match mem::replace(this.state, State::Done) {
State::RecvHeader { mut io } => {
match io.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(Message::Header(version)))) => {
*this.state = State::SendHeader { io, version }
Poll::Ready(Some(Ok(Message::Header(Version::V1)))) => {
*this.state = State::SendHeader { io }
}
Poll::Ready(Some(Ok(_))) => {
return Poll::Ready(Err(ProtocolError::InvalidMessage.into()))
Expand All @@ -129,24 +129,21 @@ where
}
}

State::SendHeader { mut io, version } => {
State::SendHeader { mut io } => {
match Pin::new(&mut io).poll_ready(cx) {
Poll::Pending => {
*this.state = State::SendHeader { io, version };
*this.state = State::SendHeader { io };
return Poll::Pending
},
Poll::Ready(Ok(())) => {},
Poll::Ready(Err(err)) => return Poll::Ready(Err(From::from(err))),
}

if let Err(err) = Pin::new(&mut io).start_send(Message::Header(version)) {
if let Err(err) = Pin::new(&mut io).start_send(Message::Header(Version::V1)) {
return Poll::Ready(Err(From::from(err)));
}

*this.state = match version {
Version::V1 => State::Flush { io, protocol: None },
Version::V1Lazy => State::RecvMessage { io },
}
*this.state = State::Flush { io, protocol: None };
}

State::RecvMessage { mut io } => {
Expand Down
18 changes: 12 additions & 6 deletions misc/multistream-select/src/negotiated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ impl<TInner> Negotiated<TInner> {

/// Creates a `Negotiated` in state [`State::Expecting`] that is still
/// expecting confirmation of the given `protocol`.
pub(crate) fn expecting(io: MessageReader<TInner>, protocol: Protocol, version: Version) -> Self {
pub(crate) fn expecting(
io: MessageReader<TInner>,
protocol: Protocol,
version: Option<Version>
) -> Self {
Negotiated { state: State::Expecting { io, protocol, version } }
}

Expand Down Expand Up @@ -125,11 +129,12 @@ impl<TInner> Negotiated<TInner> {
}
};

if let Message::Header(v) = &msg {
if *v == version {
*this.state = State::Expecting { io, protocol, version };
match (&msg, version) {
(Message::Header(Version::V1), Some(Version::V1)) => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we assume that version can never be V1Lazy looks a bit like a hack to me.

In an ideal world, we would differentiate the Version enum (containing V1 and V1Lazy) from the VersionHandshake (containing only V1), no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we assume that version can never be V1Lazy looks a bit like a hack to me.

By definition of these versions, they're now indistinguishable on the wire and it is always expected to receive V1. So receiving V1Lazy is considered an error (in the decoder really).

In an ideal world, we would differentiate the Version enum (containing V1 and V1Lazy) from the VersionHandshake (containing only V1), no?

Since V1Lazy is now really the same negotiation protocol as V1, only differing subtly in the behaviour of the dialer, it may be cleaner to separate this "lazy flush yes/no" setting from the Version, making it a separate configuration option for the dialer only, since V1Lazy used for a listener now has no effect at all. However, for transport and substream upgrades we currently only have this choice of Version without a concept of separate parameters for the roles of dialer and/or listener. Maybe Version::V1 { dialer_lazy_flush: bool } would already be an improvement, though be another API change. It would always be decoded from the wire with dialer_lazy_flush: false since that option is local. Should I give that a try? So due to the fact that this would require larger API changes, together with my hope that V1Lazy will eventually disappear altogether, merging its behaviour into V1, led me to the current approach, but I'm open to alternatives.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would always be decoded from the wire with dialer_lazy_flush: false since that option is local.

What I had in mind is that you wouldn't decode a Version, but a new enum called for example VersionHandshake that would only contain V1 without any parameter.

I don't know if that makes sense, and I'm also not strongly opinionated. I'm fine with these changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had in mind is that you wouldn't decode a Version, but a new enum called for example VersionHandshake that would only contain V1 without any parameter.

I see, separating these two kinds of versions. I will give that a try.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My proposal is 70f041b.

*this.state = State::Expecting { io, protocol, version: None };
continue
}
_ => {}
}

if let Message::Protocol(p) = &msg {
Expand Down Expand Up @@ -168,8 +173,9 @@ enum State<R> {
io: MessageReader<R>,
/// The expected protocol (i.e. name and version).
protocol: Protocol,
/// The expected multistream-select protocol version.
version: Version
/// The expected protocol negotiation (i.e. multistream-select) header,
/// if one is still expected to be received.
version: Option<Version>,
},

/// In this state, a protocol has been agreed upon and I/O
Expand Down
66 changes: 25 additions & 41 deletions misc/multistream-select/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ const MAX_PROTOCOLS: usize = 1000;

/// The encoded form of a multistream-select 1.0.0 header message.
const MSG_MULTISTREAM_1_0: &[u8] = b"/multistream/1.0.0\n";
/// The encoded form of a multistream-select 1.0.0 header message.
const MSG_MULTISTREAM_1_0_LAZY: &[u8] = b"/multistream-lazy/1\n";
/// The encoded form of a multistream-select 'na' message.
const MSG_PROTOCOL_NA: &[u8] = b"na\n";
/// The encoded form of a multistream-select 'ls' message.
Expand All @@ -52,45 +50,32 @@ pub enum Version {
/// [1]: https://github.com/libp2p/specs/blob/master/connections/README.md#protocol-negotiation
/// [2]: https://github.com/multiformats/multistream-select
V1,
/// A lazy variant of version 1 that is identical on the wire but delays
/// sending of protocol negotiation data as much as possible.
///
/// Delaying the sending of protocol negotiation data can result in
/// significantly fewer network roundtrips used for the negotiation,
/// up to 0-RTT negotiation.
/// A "lazy" variant of version 1 that is identical on the wire but whereby
/// the dialer delays flushing protocol negotiation data in order to combine
/// it with initial application data, thus performing 0-RTT negotiation.
///
/// 0-RTT negotiation is achieved if the dialer supports only a single
/// application protocol. In that case the dialer immedidately settles
/// This strategy is only applicable for the node with the role of "dialer"
/// in the negotiation and only if the dialer supports just a single
/// application protocol. In that case the dialer immedidately "settles"
/// on that protocol, buffering the negotiation messages to be sent
/// with the first round of application protocol data (or an attempt
/// is made to read from the `Negotiated` I/O stream).
///
/// A listener receiving a `V1Lazy` header will similarly delay sending
/// of the protocol confirmation. Though typically the listener will need
/// to read the request data before sending its response, thus triggering
/// sending of the protocol confirmation, which, in absence of additional
/// buffering on lower layers will result in at least two response frames
/// to be sent.
///
/// `V1Lazy` is specific to `rust-libp2p`: While the wire protocol
/// is identical to `V1`, delayed sending of protocol negotiation frames
/// is only safe under the following assumptions:
///
/// 1. The dialer is assumed to always send the first multistream-select
/// protocol message immediately after the multistream header, without
/// first waiting for confirmation of that header. Since the listener
/// delays sending the protocol confirmation, a deadlock situation may
/// otherwise occurs that is only resolved by a timeout. This assumption
/// is trivially satisfied if both peers support and use `V1Lazy`.
/// A listener will behave identically to `V1`. This ensures interoperability with `V1`.
/// Notably, it will immediately send the multistream header as well as the protocol
/// confirmation, resulting in multiple frames being sent on the underlying transport.
/// Nevertheless, if the listener supports the protocol that the dialer optimistically
/// settled on, it can be a 0-RTT negotiation.
///
/// 2. When nesting multiple protocol negotiations, the listener is either
/// known to support all of the dialer's optimistically chosen protocols
/// or there is no intermediate protocol without a payload and none of
/// the protocol payloads has the potential for being mistaken for a
/// multistream-select protocol message. This avoids rare edge-cases whereby
/// the listener may not recognize upgrade boundaries and erroneously
/// process a request despite not supporting one of the intermediate
/// protocols that the dialer committed to. See [1] and [2].
/// > **Note**: `V1Lazy` is specific to `rust-libp2p`. The wire protocol is identical to `V1`
/// > and generally interoperable with peers only supporting `V1`. Nevertheless, there is a
/// > pitfall that is rarely encountered: When nesting multiple protocol negotiations, the
/// > listener should either be known to support all of the dialer's optimistically chosen
/// > protocols or there is must be no intermediate protocol without a payload and none of
/// > the protocol payloads must have the potential for being mistaken for a multistream-select
/// > protocol message. This avoids rare edge-cases whereby the listener may not recognize
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if I understand correctly:

If I create a protocol named /foo and the first message sent after opening a /foo substream is /bar\nhello world, then, if the listener doesn't support protocol /foo, it will misinterpret the payload as the dialer opening the protocol /bar and sending hello world.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, see multiformats/go-multistream#20 for more details (that link is also in the code).

/// > upgrade boundaries and erroneously process a request despite not supporting one of
/// > the intermediate protocols that the dialer committed to. See [1] and [2].
///
/// [1]: https://github.com/multiformats/go-multistream/issues/20
/// [2]: https://github.com/libp2p/rust-libp2p/pull/1212
Expand Down Expand Up @@ -170,8 +155,11 @@ impl Message {
Ok(())
}
Message::Header(Version::V1Lazy) => {
dest.reserve(MSG_MULTISTREAM_1_0_LAZY.len());
dest.put(MSG_MULTISTREAM_1_0_LAZY);
// Note: Encoded identically to `V1`. `V1Lazy` only
// has a local effect for the dialer, determining when
// it flushes negotiation data.
dest.reserve(MSG_MULTISTREAM_1_0.len());
dest.put(MSG_MULTISTREAM_1_0);
Ok(())
}
Message::Protocol(p) => {
Expand Down Expand Up @@ -209,10 +197,6 @@ impl Message {

/// Decodes a `Message` from its byte representation.
pub fn decode(mut msg: Bytes) -> Result<Message, ProtocolError> {
if msg == MSG_MULTISTREAM_1_0_LAZY {
return Ok(Message::Header(Version::V1Lazy))
}

if msg == MSG_MULTISTREAM_1_0 {
return Ok(Message::Header(Version::V1))
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,8 @@ fn get_record_many() {
..
})) => {
assert_eq!(id, qid);
assert_eq!(records.len(), num_results);
assert_eq!(records.first().unwrap().record, record);
assert!(records.len() >= num_results);
assert!(records.into_iter().all(|r| r.record == record));
return Poll::Ready(());
}
// Ignore any other event.
Expand Down