From 514bd4657dc2f064d1fc82ff727caebac00f6a03 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:39:30 +0200 Subject: [PATCH 1/8] cargo: Add rustlibp2p-yamux to our dependency tree Signed-off-by: Alexandru Vasile --- Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/Cargo.toml b/Cargo.toml index 41e68f24..2d7caac5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ yasna = "0.5.0" zeroize = "1.8.1" nohash-hasher = "0.2.0" static_assertions = "1.1.0" +libp2p-yamux = { version = "0.13.4", package = "yamux" } # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] From 2e23baa5de108207811e3fee8d43c9f647af0400 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:39:57 +0200 Subject: [PATCH 2/8] cargo: Update cargo lock Signed-off-by: Alexandru Vasile --- Cargo.lock | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index f1211dd7..d772aa0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2765,7 +2765,7 @@ dependencies = [ "libp2p-core", "log", "thiserror", - "yamux", + "yamux 0.10.2", ] [[package]] @@ -2922,6 +2922,7 @@ dependencies = [ "webpki", "x25519-dalek 2.0.1", "x509-parser 0.16.0", + "yamux 0.13.4", "yasna", "zeroize", ] @@ -6927,6 +6928,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" @@ -7352,6 +7363,22 @@ dependencies = [ "static_assertions", ] +[[package]] +name = "yamux" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17610762a1207ee816c6fadc29220904753648aba0a9ed61c7b8336e80a559c4" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand 0.8.5", + "static_assertions", + "web-time", +] + [[package]] name = "yasna" version = "0.5.2" From db17185dba522bbd279e66eb310561674f1ffe46 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:45:05 +0200 Subject: [PATCH 3/8] yamux: Use upstream and keep controller API Signed-off-by: Alexandru Vasile --- src/yamux/mod.rs | 152 +++-------------------------------------------- 1 file changed, 7 insertions(+), 145 deletions(-) diff --git a/src/yamux/mod.rs b/src/yamux/mod.rs index 2671e937..76c60cb8 100644 --- a/src/yamux/mod.rs +++ b/src/yamux/mod.rs @@ -23,158 +23,20 @@ #![forbid(unsafe_code)] -mod chunks; mod control; -mod error; -mod frame; -pub(crate) mod connection; -mod tagged_stream; - -pub use crate::yamux::{ - connection::{Connection, Mode, Packet, Stream}, - control::{Control, ControlledConnection}, - error::ConnectionError, - frame::{ - header::{HeaderDecodeError, StreamId}, - FrameDecodeError, - }, +pub use libp2p_yamux::{ + Config, Connection, ConnectionError, FrameDecodeError, HeaderDecodeError, Mode, Packet, Result, + Stream, StreamId, }; -pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification +// Switching to the "poll" based yamux API is a massive breaking change for litep2p. +// Instead, we rely on the upstream yamux and keep the old controller API. +pub use crate::yamux::control::{Control, ControlledConnection}; -pub type Result = std::result::Result; +pub const DEFAULT_CREDIT: u32 = 256 * 1024; // as per yamux specification /// The maximum number of streams we will open without an acknowledgement from the other peer. /// /// This enables a very basic form of backpressure on the creation of streams. const MAX_ACK_BACKLOG: usize = 256; - -/// Default maximum number of bytes a Yamux data frame might carry as its -/// payload when being send. Larger Payloads will be split. -/// -/// The data frame payload size is not restricted by the yamux specification. -/// Still, this implementation restricts the size to: -/// -/// 1. Reduce delays sending time-sensitive frames, e.g. window updates. -/// 2. Minimize head-of-line blocking across streams. -/// 3. Enable better interleaving of send and receive operations, as each is carried out atomically -/// instead of concurrently with its respective counterpart. -/// -/// For details on why this concrete value was chosen, see -/// . -const DEFAULT_SPLIT_SEND_SIZE: usize = 16 * 1024; - -/// Specifies when window update frames are sent. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub enum WindowUpdateMode { - /// Send window updates as soon as a [`Stream`]'s receive window drops to 0. - /// - /// This ensures that the sender can resume sending more data as soon as possible - /// but a slow reader on the receiving side may be overwhelmed, i.e. it accumulates - /// data in its buffer which may reach its limit (see `set_max_buffer_size`). - /// In this mode, window updates merely prevent head of line blocking but do not - /// effectively exercise back pressure on senders. - OnReceive, - - /// Send window updates only when data is read on the receiving end. - /// - /// This ensures that senders do not overwhelm receivers and keeps buffer usage - /// low. However, depending on the protocol, there is a risk of deadlock, namely - /// if both endpoints want to send data larger than the receivers window and they - /// do not read before finishing their writes. Use this mode only if you are sure - /// that this will never happen, i.e. if - /// - /// - Endpoints *A* and *B* never write at the same time, *or* - /// - Endpoints *A* and *B* write at most *n* frames concurrently such that the sum of the - /// frame lengths is less or equal to the available credit of *A* and *B* respectively. - OnRead, -} - -/// Yamux configuration. -/// -/// The default configuration values are as follows: -/// -/// - receive window = 256 KiB -/// - max. buffer size (per stream) = 1 MiB -/// - max. number of streams = 8192 -/// - window update mode = on read -/// - read after close = true -/// - split send size = 16 KiB -#[derive(Debug, Clone)] -pub struct Config { - receive_window: u32, - max_buffer_size: usize, - max_num_streams: usize, - window_update_mode: WindowUpdateMode, - read_after_close: bool, - split_send_size: usize, -} - -impl Default for Config { - fn default() -> Self { - Config { - receive_window: DEFAULT_CREDIT, - max_buffer_size: 1024 * 1024, - max_num_streams: 8192, - window_update_mode: WindowUpdateMode::OnRead, - read_after_close: true, - split_send_size: DEFAULT_SPLIT_SEND_SIZE, - } - } -} - -impl Config { - /// Set the receive window per stream (must be >= 256 KiB). - /// - /// # Panics - /// - /// If the given receive window is < 256 KiB. - pub fn set_receive_window(&mut self, n: u32) -> &mut Self { - assert!(n >= DEFAULT_CREDIT); - self.receive_window = n; - self - } - - /// Set the max. buffer size per stream. - pub fn set_max_buffer_size(&mut self, n: usize) -> &mut Self { - self.max_buffer_size = n; - self - } - - /// Set the max. number of streams. - pub fn set_max_num_streams(&mut self, n: usize) -> &mut Self { - self.max_num_streams = n; - self - } - - /// Set the window update mode to use. - pub fn set_window_update_mode(&mut self, m: WindowUpdateMode) -> &mut Self { - self.window_update_mode = m; - self - } - - /// Allow or disallow streams to read from buffered data after - /// the connection has been closed. - pub fn set_read_after_close(&mut self, b: bool) -> &mut Self { - self.read_after_close = b; - self - } - - /// Set the max. payload size used when sending data frames. Payloads larger - /// than the configured max. will be split. - pub fn set_split_send_size(&mut self, n: usize) -> &mut Self { - self.split_send_size = n; - self - } -} - -// Check that we can safely cast a `usize` to a `u64`. -static_assertions::const_assert! { - std::mem::size_of::() <= std::mem::size_of::() -} - -// Check that we can safely cast a `u32` to a `usize`. -static_assertions::const_assert! { - std::mem::size_of::() <= std::mem::size_of::() -} From 1e008de34c2e70b02e8d45a88677fc48282c0083 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:45:44 +0200 Subject: [PATCH 4/8] yamux/control: Rely on upstream objects Signed-off-by: Alexandru Vasile --- src/yamux/control.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/yamux/control.rs b/src/yamux/control.rs index 89e3c350..1114f745 100644 --- a/src/yamux/control.rs +++ b/src/yamux/control.rs @@ -8,7 +8,8 @@ // at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license // at https://opensource.org/licenses/MIT. -use crate::yamux::{error::ConnectionError, Connection, Result, Stream, MAX_ACK_BACKLOG}; +use crate::yamux::{Connection, ConnectionError, Result, Stream, MAX_ACK_BACKLOG}; + use futures::{ channel::{mpsc, oneshot}, prelude::*, From 68da86b16e65b1ee86af71a3589d2c1f7f75fa3a Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:46:44 +0200 Subject: [PATCH 5/8] error: Implement PartialEq for YamuxConnectionErrors Signed-off-by: Alexandru Vasile --- src/error.rs | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 604d00e9..edd3eb87 100644 --- a/src/error.rs +++ b/src/error.rs @@ -182,7 +182,7 @@ pub enum ParseError { InvalidData, } -#[derive(Debug, thiserror::Error, PartialEq)] +#[derive(Debug, thiserror::Error)] pub enum SubstreamError { #[error("Connection closed")] ConnectionClosed, @@ -202,6 +202,76 @@ pub enum SubstreamError { NegotiationError(#[from] NegotiationError), } +// Libp2p yamux does not implement PartialEq for ConnectionError. +impl PartialEq for SubstreamError { + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (Self::ConnectionClosed, Self::ConnectionClosed) => true, + (Self::ChannelClogged, Self::ChannelClogged) => true, + (Self::PeerDoesNotExist(lhs), Self::PeerDoesNotExist(rhs)) => lhs == rhs, + (Self::IoError(lhs), Self::IoError(rhs)) => lhs == rhs, + (Self::YamuxError(lhs, lhs_1), Self::YamuxError(rhs, rhs_1)) => { + if lhs_1 != rhs_1 { + return false; + } + + match (lhs, rhs) { + ( + crate::yamux::ConnectionError::Io(lhs), + crate::yamux::ConnectionError::Io(rhs), + ) => lhs.kind() == rhs.kind(), + ( + crate::yamux::ConnectionError::Decode(lhs), + crate::yamux::ConnectionError::Decode(rhs), + ) => match (lhs, rhs) { + ( + crate::yamux::FrameDecodeError::Io(lhs), + crate::yamux::FrameDecodeError::Io(rhs), + ) => lhs.kind() == rhs.kind(), + ( + crate::yamux::FrameDecodeError::FrameTooLarge(lhs), + crate::yamux::FrameDecodeError::FrameTooLarge(rhs), + ) => lhs == rhs, + ( + crate::yamux::FrameDecodeError::Header(lhs), + crate::yamux::FrameDecodeError::Header(rhs), + ) => match (lhs, rhs) { + ( + crate::yamux::HeaderDecodeError::Version(lhs), + crate::yamux::HeaderDecodeError::Version(rhs), + ) => lhs == rhs, + ( + crate::yamux::HeaderDecodeError::Type(lhs), + crate::yamux::HeaderDecodeError::Type(rhs), + ) => lhs == rhs, + _ => false, + }, + _ => false, + }, + ( + crate::yamux::ConnectionError::NoMoreStreamIds, + crate::yamux::ConnectionError::NoMoreStreamIds, + ) => true, + ( + crate::yamux::ConnectionError::Closed, + crate::yamux::ConnectionError::Closed, + ) => true, + ( + crate::yamux::ConnectionError::TooManyStreams, + crate::yamux::ConnectionError::TooManyStreams, + ) => true, + _ => false, + } + } + + (Self::ReadFailure(lhs), Self::ReadFailure(rhs)) => lhs == rhs, + (Self::WriteFailure(lhs), Self::WriteFailure(rhs)) => lhs == rhs, + (Self::NegotiationError(lhs), Self::NegotiationError(rhs)) => lhs == rhs, + _ => false, + } + } +} + /// Error during the negotiation phase. #[derive(Debug, thiserror::Error)] pub enum NegotiationError { From 638779015ea2399ae0bd3e0e8376509539bf3044 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 11:50:32 +0200 Subject: [PATCH 6/8] yamux: Remove inhouse yamux code in favor of upstream yamux Signed-off-by: Alexandru Vasile --- src/yamux/chunks.rs | 111 ---- src/yamux/connection.rs | 983 -------------------------------- src/yamux/connection/cleanup.rs | 64 --- src/yamux/connection/closing.rs | 101 ---- src/yamux/connection/stream.rs | 524 ----------------- src/yamux/error.rs | 89 --- src/yamux/frame.rs | 156 ----- src/yamux/frame/header.rs | 443 -------------- src/yamux/frame/io.rs | 384 ------------- src/yamux/tagged_stream.rs | 54 -- 10 files changed, 2909 deletions(-) delete mode 100644 src/yamux/chunks.rs delete mode 100644 src/yamux/connection.rs delete mode 100644 src/yamux/connection/cleanup.rs delete mode 100644 src/yamux/connection/closing.rs delete mode 100644 src/yamux/connection/stream.rs delete mode 100644 src/yamux/error.rs delete mode 100644 src/yamux/frame.rs delete mode 100644 src/yamux/frame/header.rs delete mode 100644 src/yamux/frame/io.rs delete mode 100644 src/yamux/tagged_stream.rs diff --git a/src/yamux/chunks.rs b/src/yamux/chunks.rs deleted file mode 100644 index 19515dc7..00000000 --- a/src/yamux/chunks.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Copyright (c) 2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use std::{collections::VecDeque, io}; - -/// A sequence of [`Chunk`] values. -/// -/// [`Chunks::len`] considers all [`Chunk`] elements and computes the total -/// result, i.e. the length of all bytes, by summing up the lengths of all -/// [`Chunk`] elements. -#[derive(Debug)] -pub(crate) struct Chunks { - seq: VecDeque, - len: usize, -} - -impl Chunks { - /// A new empty chunk list. - pub(crate) fn new() -> Self { - Chunks { - seq: VecDeque::new(), - len: 0, - } - } - - /// The total length of bytes yet-to-be-read in all `Chunk`s. - pub(crate) fn len(&self) -> usize { - self.len - self.seq.front().map(|c| c.offset()).unwrap_or(0) - } - - /// Add another chunk of bytes to the end. - pub(crate) fn push(&mut self, x: Vec) { - self.len += x.len(); - if !x.is_empty() { - self.seq.push_back(Chunk { - cursor: io::Cursor::new(x), - }) - } - } - - /// Remove and return the first chunk. - pub(crate) fn pop(&mut self) -> Option { - let chunk = self.seq.pop_front(); - self.len -= chunk.as_ref().map(|c| c.len() + c.offset()).unwrap_or(0); - chunk - } - - /// Get a mutable reference to the first chunk. - pub(crate) fn front_mut(&mut self) -> Option<&mut Chunk> { - self.seq.front_mut() - } -} - -/// A `Chunk` wraps a `std::io::Cursor>`. -/// -/// It provides a byte-slice view and a way to advance the cursor so the -/// vector can be consumed in steps. -#[derive(Debug)] -pub(crate) struct Chunk { - cursor: io::Cursor>, -} - -impl Chunk { - /// Is this chunk empty? - pub(crate) fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// The remaining number of bytes in this `Chunk`. - pub(crate) fn len(&self) -> usize { - self.cursor.get_ref().len() - self.offset() - } - - /// The sum of bytes that the cursor has been `advance`d over. - pub(crate) fn offset(&self) -> usize { - self.cursor.position() as usize - } - - /// Move the cursor position by `amount` bytes. - /// - /// The `AsRef<[u8]>` impl of `Chunk` provides a byte-slice view - /// from the current position to the end. - pub(crate) fn advance(&mut self, amount: usize) { - assert!({ - // the new position must not exceed the vector's length - let pos = self.offset().checked_add(amount); - let max = self.cursor.get_ref().len(); - pos.is_some() && pos <= Some(max) - }); - - self.cursor.set_position(self.cursor.position() + amount as u64); - } - - /// Consume `self` and return the inner vector. - pub(crate) fn into_vec(self) -> Vec { - self.cursor.into_inner() - } -} - -impl AsRef<[u8]> for Chunk { - fn as_ref(&self) -> &[u8] { - &self.cursor.get_ref()[self.offset()..] - } -} diff --git a/src/yamux/connection.rs b/src/yamux/connection.rs deleted file mode 100644 index f6ba5b39..00000000 --- a/src/yamux/connection.rs +++ /dev/null @@ -1,983 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -// This module contains the `Connection` type and associated helpers. -// A `Connection` wraps an underlying (async) I/O resource and multiplexes -// `Stream`s over it. -// -// The overall idea is as follows: The `Connection` makes progress via calls -// to its `next_stream` method which polls several futures, one that decodes -// `Frame`s from the I/O resource, one that consumes `ControlCommand`s -// from an MPSC channel and another one that consumes `StreamCommand`s from -// yet another MPSC channel. The latter channel is shared with every `Stream` -// created and whenever a `Stream` wishes to send a `Frame` to the remote end, -// it enqueues it into this channel (waiting if the channel is full). The -// former is shared with every `Control` clone and used to open new outbound -// streams or to trigger a connection close. -// -// The `Connection` updates the `Stream` state based on incoming frames, e.g. -// it pushes incoming data to the `Stream`'s buffer or increases the sending -// credit if the remote has sent us a corresponding `Frame::`. -// Updating a `Stream`'s state acquires a `Mutex`, which every `Stream` has -// around its `Shared` state. While blocking, we make sure the lock is only -// held for brief moments and *never* while doing I/O. The only contention is -// between the `Connection` and a single `Stream`, which should resolve -// quickly. Ideally, we could use `futures::lock::Mutex` but it does not offer -// a poll-based API as of futures-preview 0.3.0-alpha.19, which makes it -// difficult to use in a `Stream`'s `AsyncRead` and `AsyncWrite` trait -// implementations. -// -// Closing a `Connection` -// ---------------------- -// -// Every `Control` may send a `ControlCommand::Close` at any time and then -// waits on a `oneshot::Receiver` for confirmation that the connection is -// closed. The closing proceeds as follows: -// -// 1. As soon as we receive the close command we close the MPSC receiver of `StreamCommand`s. We -// want to process any stream commands which are already enqueued at this point but no more. -// 2. We change the internal shutdown state to `Shutdown::InProgress` which contains the -// `oneshot::Sender` of the `Control` which triggered the closure and which we need to notify -// eventually. -// 3. Crucially -- while closing -- we no longer process further control commands, because opening -// new streams should no longer be allowed and further close commands would mean we need to save -// those `oneshot::Sender`s for later. On the other hand we also do not simply close the control -// channel as this would signal to `Control`s that try to send close commands, that the -// connection is already closed, which it is not. So we just pause processing control commands -// which means such `Control`s will wait. -// 4. We keep processing I/O and stream commands until the remaining stream commands have all been -// consumed, at which point we transition the shutdown state to `Shutdown::Complete`, which -// entails sending the final termination frame to the remote, informing the `Control` and now -// also closing the control channel. -// 5. Now that we are closed we go through all pending control commands and tell the `Control`s that -// we are closed and we are finally done. -// -// While all of this may look complicated, it ensures that `Control`s are -// only informed about a closed connection when it really is closed. -// -// Potential improvements -// ---------------------- -// -// There is always more work that can be done to make this a better crate, -// for example: -// -// - Instead of `futures::mpsc` a more efficient channel implementation could be used, e.g. -// `tokio-sync`. Unfortunately `tokio-sync` is about to be merged into `tokio` and depending on -// this large crate is not attractive, especially given the dire situation around cargo's flag -// resolution. -// - Flushing could be optimised. This would also require adding a `StreamCommand::Flush` so that -// `Stream`s can trigger a flush, which they would have to when they run out of credit, or else a -// series of send operations might never finish. -// - If Rust gets async destructors, the `garbage_collect()` method can be removed. Instead a -// `Stream` would send a `StreamCommand::Dropped(..)` or something similar and the removal logic -// could happen within regular command processing instead of having to scan the whole collection -// of `Stream`s on each loop iteration, which is not great. - -mod cleanup; -mod closing; -mod stream; - -use crate::yamux::{ - error::ConnectionError, - frame::{ - self, - header::{self, Data, GoAway, Header, Ping, StreamId, Tag, WindowUpdate, CONNECTION_ID}, - Frame, - }, - tagged_stream::TaggedStream, - Config, Result, WindowUpdateMode, DEFAULT_CREDIT, MAX_ACK_BACKLOG, -}; -use cleanup::Cleanup; -use closing::Closing; -use futures::{ - channel::mpsc, - future::Either, - prelude::*, - sink::SinkExt, - stream::{Fuse, SelectAll}, -}; -use nohash_hasher::IntMap; -use parking_lot::Mutex; -use std::{ - collections::VecDeque, - fmt, - sync::Arc, - task::{Context, Poll, Waker}, -}; - -pub use stream::{Packet, State, Stream}; - -/// Logging target for the file. -const LOG_TARGET: &str = "litep2p::yamux"; - -/// How the connection is used. -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)] -pub enum Mode { - /// Client to server connection. - Client, - /// Server to client connection. - Server, -} - -/// The connection identifier. -/// -/// Randomly generated, this is mainly intended to improve log output. -#[derive(Clone, Copy)] -pub(crate) struct Id(u32); - -impl Id { - /// Create a random connection ID. - pub(crate) fn random() -> Self { - Id(rand::random()) - } -} - -impl fmt::Debug for Id { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:08x}", self.0) - } -} - -impl fmt::Display for Id { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{:08x}", self.0) - } -} - -#[derive(Debug)] -pub struct Connection { - inner: ConnectionState, -} - -impl Connection { - pub fn new(socket: T, cfg: Config, mode: Mode) -> Self { - Self { - inner: ConnectionState::Active(Active::new(socket, cfg, mode)), - } - } - - /// Poll for a new outbound stream. - /// - /// This function will fail if the current state does not allow opening new outbound streams. - pub fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) { - ConnectionState::Active(mut active) => match active.poll_new_outbound(cx) { - Poll::Ready(Ok(stream)) => { - self.inner = ConnectionState::Active(active); - return Poll::Ready(Ok(stream)); - } - Poll::Pending => { - self.inner = ConnectionState::Active(active); - return Poll::Pending; - } - Poll::Ready(Err(e)) => { - self.inner = ConnectionState::Cleanup(active.cleanup(e)); - continue; - } - }, - ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx) { - Poll::Ready(Ok(())) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Err(ConnectionError::Closed)); - } - Poll::Ready(Err(e)) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Err(e)); - } - Poll::Pending => { - self.inner = ConnectionState::Closing(inner); - return Poll::Pending; - } - }, - ConnectionState::Cleanup(mut inner) => match inner.poll_unpin(cx) { - Poll::Ready(e) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Err(e)); - } - Poll::Pending => { - self.inner = ConnectionState::Cleanup(inner); - return Poll::Pending; - } - }, - ConnectionState::Closed => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Err(ConnectionError::Closed)); - } - ConnectionState::Poisoned => unreachable!(), - } - } - } - - /// Poll for the next inbound stream. - /// - /// If this function returns `None`, the underlying connection is closed. - pub fn poll_next_inbound(&mut self, cx: &mut Context<'_>) -> Poll>> { - loop { - match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) { - ConnectionState::Active(mut active) => match active.poll(cx) { - Poll::Ready(Ok(stream)) => { - self.inner = ConnectionState::Active(active); - return Poll::Ready(Some(Ok(stream))); - } - Poll::Ready(Err(e)) => { - self.inner = ConnectionState::Cleanup(active.cleanup(e)); - continue; - } - Poll::Pending => { - self.inner = ConnectionState::Active(active); - return Poll::Pending; - } - }, - ConnectionState::Closing(mut closing) => match closing.poll_unpin(cx) { - Poll::Ready(Ok(())) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(None); - } - Poll::Ready(Err(e)) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Some(Err(e))); - } - Poll::Pending => { - self.inner = ConnectionState::Closing(closing); - return Poll::Pending; - } - }, - ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) { - Poll::Ready(ConnectionError::Closed) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(None); - } - Poll::Ready(other) => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Some(Err(other))); - } - Poll::Pending => { - self.inner = ConnectionState::Cleanup(cleanup); - return Poll::Pending; - } - }, - ConnectionState::Closed => { - self.inner = ConnectionState::Closed; - return Poll::Ready(None); - } - ConnectionState::Poisoned => unreachable!(), - } - } - } - - /// Close the connection. - pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - match std::mem::replace(&mut self.inner, ConnectionState::Poisoned) { - ConnectionState::Active(active) => { - self.inner = ConnectionState::Closing(active.close()); - } - ConnectionState::Closing(mut inner) => match inner.poll_unpin(cx)? { - Poll::Ready(()) => { - self.inner = ConnectionState::Closed; - } - Poll::Pending => { - self.inner = ConnectionState::Closing(inner); - return Poll::Pending; - } - }, - ConnectionState::Cleanup(mut cleanup) => match cleanup.poll_unpin(cx) { - Poll::Ready(reason) => { - tracing::warn!(target: LOG_TARGET, "Failure while closing connection: {}", reason); - self.inner = ConnectionState::Closed; - return Poll::Ready(Ok(())); - } - Poll::Pending => { - self.inner = ConnectionState::Cleanup(cleanup); - return Poll::Pending; - } - }, - ConnectionState::Closed => { - self.inner = ConnectionState::Closed; - return Poll::Ready(Ok(())); - } - ConnectionState::Poisoned => { - unreachable!() - } - } - } - } -} - -impl Drop for Connection { - fn drop(&mut self) { - match &mut self.inner { - ConnectionState::Active(active) => active.drop_all_streams(), - ConnectionState::Closing(_) => {} - ConnectionState::Cleanup(_) => {} - ConnectionState::Closed => {} - ConnectionState::Poisoned => {} - } - } -} - -enum ConnectionState { - /// The connection is alive and healthy. - Active(Active), - /// Our user requested to shutdown the connection, we are working on it. - Closing(Closing), - /// An error occurred and we are cleaning up our resources. - Cleanup(Cleanup), - /// The connection is closed. - Closed, - /// Something went wrong during our state transitions. Should never happen unless there is a - /// bug. - Poisoned, -} - -impl fmt::Debug for ConnectionState { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ConnectionState::Active(_) => write!(f, "Active"), - ConnectionState::Closing(_) => write!(f, "Closing"), - ConnectionState::Cleanup(_) => write!(f, "Cleanup"), - ConnectionState::Closed => write!(f, "Closed"), - ConnectionState::Poisoned => write!(f, "Poisoned"), - } - } -} - -/// A Yamux connection object. -/// -/// Wraps the underlying I/O resource and makes progress via its -/// [`Connection::poll_next_inbound`] method which must be called repeatedly -/// until `Ok(None)` signals EOF or an error is encountered. -struct Active { - id: Id, - mode: Mode, - config: Arc, - socket: Fuse>, - next_id: u32, - - streams: IntMap>>, - stream_receivers: SelectAll>>, - no_streams_waker: Option, - - pending_frames: VecDeque>, - new_outbound_stream_waker: Option, -} - -/// `Stream` to `Connection` commands. -#[derive(Debug)] -pub(crate) enum StreamCommand { - /// A new frame should be sent to the remote. - SendFrame(Frame>), - /// Close a stream. - CloseStream { ack: bool }, -} - -/// Possible actions as a result of incoming frame handling. -#[derive(Debug)] -enum Action { - /// Nothing to be done. - None, - /// A new stream has been opened by the remote. - New(Stream, Option>), - /// A window update should be sent to the remote. - Update(Frame), - /// A ping should be answered. - Ping(Frame), - /// A stream should be reset. - Reset(Frame), - /// The connection should be terminated. - Terminate(Frame), -} - -impl fmt::Debug for Active { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Connection") - .field("id", &self.id) - .field("mode", &self.mode) - .field("streams", &self.streams.len()) - .field("next_id", &self.next_id) - .finish() - } -} - -impl fmt::Display for Active { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "(Connection {} {:?} (streams {}))", - self.id, - self.mode, - self.streams.len() - ) - } -} - -impl Active { - /// Create a new `Connection` from the given I/O resource. - fn new(socket: T, cfg: Config, mode: Mode) -> Self { - let id = Id::random(); - tracing::debug!(target: LOG_TARGET, "new connection: {} ({:?})", id, mode); - let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse(); - Active { - id, - mode, - config: Arc::new(cfg), - socket, - streams: IntMap::default(), - stream_receivers: SelectAll::default(), - no_streams_waker: None, - next_id: match mode { - Mode::Client => 1, - Mode::Server => 2, - }, - pending_frames: VecDeque::default(), - new_outbound_stream_waker: None, - } - } - - /// Gracefully close the connection to the remote. - fn close(self) -> Closing { - Closing::new(self.stream_receivers, self.pending_frames, self.socket) - } - - /// Cleanup all our resources. - /// - /// This should be called in the context of an unrecoverable error on the connection. - fn cleanup(mut self, error: ConnectionError) -> Cleanup { - self.drop_all_streams(); - - Cleanup::new(self.stream_receivers, error) - } - - fn poll(&mut self, cx: &mut Context<'_>) -> Poll> { - loop { - if self.socket.poll_ready_unpin(cx).is_ready() { - if let Some(frame) = self.pending_frames.pop_front() { - self.socket.start_send_unpin(frame)?; - continue; - } - } - - match self.socket.poll_flush_unpin(cx)? { - Poll::Ready(()) => {} - Poll::Pending => {} - } - - match self.stream_receivers.poll_next_unpin(cx) { - Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => { - self.on_send_frame(frame); - continue; - } - Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - self.on_close_stream(id, ack); - continue; - } - Poll::Ready(Some((id, None))) => { - self.on_drop_stream(id); - continue; - } - Poll::Ready(None) => { - self.no_streams_waker = Some(cx.waker().clone()); - } - Poll::Pending => {} - } - - match self.socket.poll_next_unpin(cx) { - Poll::Ready(Some(frame)) => { - if let Some(stream) = self.on_frame(frame?)? { - return Poll::Ready(Ok(stream)); - } - continue; - } - Poll::Ready(None) => { - return Poll::Ready(Err(ConnectionError::Closed)); - } - Poll::Pending => {} - } - - // If we make it this far, at least one of the above must have registered a waker. - return Poll::Pending; - } - } - - fn poll_new_outbound(&mut self, cx: &mut Context<'_>) -> Poll> { - if self.streams.len() >= self.config.max_num_streams { - tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id); - return Poll::Ready(Err(ConnectionError::TooManyStreams)); - } - - if self.ack_backlog() >= MAX_ACK_BACKLOG { - tracing::debug!(target: LOG_TARGET, "{MAX_ACK_BACKLOG} streams waiting for ACK, registering task for wake-up until remote acknowledges at least one stream"); - self.new_outbound_stream_waker = Some(cx.waker().clone()); - return Poll::Pending; - } - - tracing::trace!(target: LOG_TARGET, "{}: creating new outbound stream", self.id); - - let id = self.next_stream_id()?; - let extra_credit = self.config.receive_window - DEFAULT_CREDIT; - - if extra_credit > 0 { - let mut frame = Frame::window_update(id, extra_credit); - frame.header_mut().syn(); - tracing::trace!(target: LOG_TARGET, "{}/{}: sending initial {}", self.id, id, frame.header()); - self.pending_frames.push_back(frame.into()); - } - - let mut stream = self.make_new_outbound_stream(id, self.config.receive_window); - - if extra_credit == 0 { - stream.set_flag(stream::Flag::Syn) - } - - tracing::debug!(target: LOG_TARGET, "{}: new outbound {} of {}", self.id, stream, self); - self.streams.insert(id, stream.clone_shared()); - - Poll::Ready(Ok(stream)) - } - - fn on_send_frame(&mut self, frame: Frame>) { - tracing::trace!(target: LOG_TARGET, - "{}/{}: sending: {}", - self.id, - frame.header().stream_id(), - frame.header() - ); - self.pending_frames.push_back(frame.into()); - } - - fn on_close_stream(&mut self, id: StreamId, ack: bool) { - tracing::trace!(target: LOG_TARGET, "{}/{}: sending close", self.id, id); - self.pending_frames.push_back(Frame::close_stream(id, ack).into()); - } - - fn on_drop_stream(&mut self, stream_id: StreamId) { - let s = self.streams.remove(&stream_id).expect("stream not found"); - - tracing::trace!(target: LOG_TARGET, "{}: removing dropped stream {}", self.id, stream_id); - let frame = { - let mut shared = s.lock(); - let frame = match shared.update_state(self.id, stream_id, State::Closed) { - // The stream was dropped without calling `poll_close`. - // We reset the stream to inform the remote of the closure. - State::Open { .. } => { - let mut header = Header::data(stream_id, 0); - header.rst(); - Some(Frame::new(header)) - } - // The stream was dropped without calling `poll_close`. - // We have already received a FIN from remote and send one - // back which closes the stream for good. - State::RecvClosed => { - let mut header = Header::data(stream_id, 0); - header.fin(); - Some(Frame::new(header)) - } - // The stream was properly closed. We already sent our FIN frame. - // The remote may be out of credit though and blocked on - // writing more data. We may need to reset the stream. - State::SendClosed => { - if self.config.window_update_mode == WindowUpdateMode::OnRead - && shared.window == 0 - { - // The remote may be waiting for a window update - // which we will never send, so reset the stream now. - let mut header = Header::data(stream_id, 0); - header.rst(); - Some(Frame::new(header)) - } else { - // The remote has either still credit or will be given more - // (due to an enqueued window update or because the update - // mode is `OnReceive`) or we already have inbound frames in - // the socket buffer which will be processed later. In any - // case we will reply with an RST in `Connection::on_data` - // because the stream will no longer be known. - None - } - } - // The stream was properly closed. We already have sent our FIN frame. The - // remote end has already done so in the past. - State::Closed => None, - }; - if let Some(w) = shared.reader.take() { - w.wake() - } - if let Some(w) = shared.writer.take() { - w.wake() - } - frame - }; - if let Some(f) = frame { - tracing::trace!(target: LOG_TARGET, "{}/{}: sending: {}", self.id, stream_id, f.header()); - self.pending_frames.push_back(f.into()); - } - } - - /// Process the result of reading from the socket. - /// - /// Unless `frame` is `Ok(Some(_))` we will assume the connection got closed - /// and return a corresponding error, which terminates the connection. - /// Otherwise we process the frame and potentially return a new `Stream` - /// if one was opened by the remote. - fn on_frame(&mut self, frame: Frame<()>) -> Result> { - tracing::trace!(target: LOG_TARGET, "{}: received: {}", self.id, frame.header()); - - if frame.header().flags().contains(header::ACK) { - let id = frame.header().stream_id(); - if let Some(stream) = self.streams.get(&id) { - stream.lock().update_state(self.id, id, State::Open { acknowledged: true }); - } - if let Some(waker) = self.new_outbound_stream_waker.take() { - waker.wake(); - } - } - - let action = match frame.header().tag() { - Tag::Data => self.on_data(frame.into_data()), - Tag::WindowUpdate => self.on_window_update(&frame.into_window_update()), - Tag::Ping => self.on_ping(&frame.into_ping()), - Tag::GoAway => return Err(ConnectionError::Closed), - }; - match action { - Action::None => {} - Action::New(stream, update) => { - tracing::trace!(target: LOG_TARGET, "{}: new inbound {} of {}", self.id, stream, self); - if let Some(f) = update { - tracing::trace!(target: LOG_TARGET, "{}/{}: sending update", self.id, f.header().stream_id()); - self.pending_frames.push_back(f.into()); - } - return Ok(Some(stream)); - } - Action::Update(f) => { - tracing::trace!(target: LOG_TARGET, "{}: sending update: {:?}", self.id, f.header()); - self.pending_frames.push_back(f.into()); - } - Action::Ping(f) => { - tracing::trace!(target: LOG_TARGET, "{}/{}: pong", self.id, f.header().stream_id()); - self.pending_frames.push_back(f.into()); - } - Action::Reset(f) => { - tracing::trace!(target: LOG_TARGET, "{}/{}: sending reset", self.id, f.header().stream_id()); - self.pending_frames.push_back(f.into()); - } - Action::Terminate(f) => { - tracing::trace!(target: LOG_TARGET, "{}: sending term", self.id); - self.pending_frames.push_back(f.into()); - } - } - - Ok(None) - } - - fn on_data(&mut self, frame: Frame) -> Action { - let stream_id = frame.header().stream_id(); - - if frame.header().flags().contains(header::RST) { - // stream reset - if let Some(s) = self.streams.get_mut(&stream_id) { - let mut shared = s.lock(); - shared.update_state(self.id, stream_id, State::Closed); - if let Some(w) = shared.reader.take() { - w.wake() - } - if let Some(w) = shared.writer.take() { - w.wake() - } - } - return Action::None; - } - - let is_finish = frame.header().flags().contains(header::FIN); // half-close - - if frame.header().flags().contains(header::SYN) { - // new stream - if !self.is_valid_remote_id(stream_id, Tag::Data) { - tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id); - return Action::Terminate(Frame::protocol_error()); - } - if frame.body().len() > DEFAULT_CREDIT as usize { - tracing::error!(target: LOG_TARGET, - "{}/{}: 1st body of stream exceeds default credit", - self.id, - stream_id - ); - return Action::Terminate(Frame::protocol_error()); - } - if self.streams.contains_key(&stream_id) { - tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id); - return Action::Terminate(Frame::protocol_error()); - } - if self.streams.len() == self.config.max_num_streams { - tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id); - return Action::Terminate(Frame::internal_error()); - } - let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT); - let mut window_update = None; - { - let mut shared = stream.shared(); - if is_finish { - shared.update_state(self.id, stream_id, State::RecvClosed); - } - shared.window = shared.window.saturating_sub(frame.body_len()); - shared.buffer.push(frame.into_body()); - - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - if let Some(credit) = shared.next_window_update() { - shared.window += credit; - let mut frame = Frame::window_update(stream_id, credit); - frame.header_mut().ack(); - window_update = Some(frame) - } - } - } - if window_update.is_none() { - stream.set_flag(stream::Flag::Ack) - } - self.streams.insert(stream_id, stream.clone_shared()); - return Action::New(stream, window_update); - } - - if let Some(s) = self.streams.get_mut(&stream_id) { - let mut shared = s.lock(); - if frame.body().len() > shared.window as usize { - tracing::error!(target: LOG_TARGET, - "{}/{}: frame body larger than window of stream", - self.id, - stream_id - ); - return Action::Terminate(Frame::protocol_error()); - } - if is_finish { - shared.update_state(self.id, stream_id, State::RecvClosed); - } - let max_buffer_size = self.config.max_buffer_size; - if shared.buffer.len() >= max_buffer_size { - tracing::error!(target: LOG_TARGET, - "{}/{}: buffer of stream grows beyond limit", - self.id, - stream_id - ); - let mut header = Header::data(stream_id, 0); - header.rst(); - return Action::Reset(Frame::new(header)); - } - shared.window = shared.window.saturating_sub(frame.body_len()); - shared.buffer.push(frame.into_body()); - if let Some(w) = shared.reader.take() { - w.wake() - } - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - if let Some(credit) = shared.next_window_update() { - shared.window += credit; - let frame = Frame::window_update(stream_id, credit); - return Action::Update(frame); - } - } - } else { - tracing::trace!(target: LOG_TARGET, - "{}/{}: data frame for unknown stream, possibly dropped earlier: {:?}", - self.id, - stream_id, - frame - ); - // We do not consider this a protocol violation and thus do not send a stream reset - // because we may still be processing pending `StreamCommand`s of this stream that were - // sent before it has been dropped and "garbage collected". Such a stream reset would - // interfere with the frames that still need to be sent, causing premature stream - // termination for the remote. - // - // See https://github.com/paritytech/yamux/issues/110 for details. - } - - Action::None - } - - fn on_window_update(&mut self, frame: &Frame) -> Action { - let stream_id = frame.header().stream_id(); - - if frame.header().flags().contains(header::RST) { - // stream reset - if let Some(s) = self.streams.get_mut(&stream_id) { - let mut shared = s.lock(); - shared.update_state(self.id, stream_id, State::Closed); - if let Some(w) = shared.reader.take() { - w.wake() - } - if let Some(w) = shared.writer.take() { - w.wake() - } - } - return Action::None; - } - - let is_finish = frame.header().flags().contains(header::FIN); // half-close - - if frame.header().flags().contains(header::SYN) { - // new stream - if !self.is_valid_remote_id(stream_id, Tag::WindowUpdate) { - tracing::error!(target: LOG_TARGET, "{}: invalid stream id {}", self.id, stream_id); - return Action::Terminate(Frame::protocol_error()); - } - if self.streams.contains_key(&stream_id) { - tracing::error!(target: LOG_TARGET, "{}/{}: stream already exists", self.id, stream_id); - return Action::Terminate(Frame::protocol_error()); - } - if self.streams.len() == self.config.max_num_streams { - tracing::error!(target: LOG_TARGET, "{}: maximum number of streams reached", self.id); - return Action::Terminate(Frame::protocol_error()); - } - - let credit = frame.header().credit() + DEFAULT_CREDIT; - let mut stream = self.make_new_inbound_stream(stream_id, credit); - stream.set_flag(stream::Flag::Ack); - - if is_finish { - stream.shared().update_state(self.id, stream_id, State::RecvClosed); - } - self.streams.insert(stream_id, stream.clone_shared()); - return Action::New(stream, None); - } - - if let Some(s) = self.streams.get_mut(&stream_id) { - let mut shared = s.lock(); - shared.credit += frame.header().credit(); - if is_finish { - shared.update_state(self.id, stream_id, State::RecvClosed); - } - if let Some(w) = shared.writer.take() { - w.wake() - } - } else { - tracing::trace!(target: LOG_TARGET, - "{}/{}: window update for unknown stream, possibly dropped earlier: {:?}", - self.id, - stream_id, - frame - ); - // We do not consider this a protocol violation and thus do not send a stream reset - // because we may still be processing pending `StreamCommand`s of this stream that were - // sent before it has been dropped and "garbage collected". Such a stream reset would - // interfere with the frames that still need to be sent, causing premature stream - // termination for the remote. - // - // See https://github.com/paritytech/yamux/issues/110 for details. - } - - Action::None - } - - fn on_ping(&mut self, frame: &Frame) -> Action { - let stream_id = frame.header().stream_id(); - if frame.header().flags().contains(header::ACK) { - // pong - return Action::None; - } - if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) { - let mut hdr = Header::ping(frame.header().nonce()); - hdr.ack(); - return Action::Ping(Frame::new(hdr)); - } - tracing::trace!(target: LOG_TARGET, - "{}/{}: ping for unknown stream, possibly dropped earlier: {:?}", - self.id, - stream_id, - frame - ); - // We do not consider this a protocol violation and thus do not send a stream reset because - // we may still be processing pending `StreamCommand`s of this stream that were sent before - // it has been dropped and "garbage collected". Such a stream reset would interfere with the - // frames that still need to be sent, causing premature stream termination for the remote. - // - // See https://github.com/paritytech/yamux/issues/110 for details. - - Action::None - } - - fn make_new_inbound_stream(&mut self, id: StreamId, credit: u32) -> Stream { - let config = self.config.clone(); - - let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. - self.stream_receivers.push(TaggedStream::new(id, receiver)); - if let Some(waker) = self.no_streams_waker.take() { - waker.wake(); - } - - Stream::new_inbound(id, self.id, config, credit, sender) - } - - fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream { - let config = self.config.clone(); - - let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number. - self.stream_receivers.push(TaggedStream::new(id, receiver)); - if let Some(waker) = self.no_streams_waker.take() { - waker.wake(); - } - - Stream::new_outbound(id, self.id, config, window, sender) - } - - fn next_stream_id(&mut self) -> Result { - let proposed = StreamId::new(self.next_id); - self.next_id = self.next_id.checked_add(2).ok_or(ConnectionError::NoMoreStreamIds)?; - match self.mode { - Mode::Client => assert!(proposed.is_client()), - Mode::Server => assert!(proposed.is_server()), - } - Ok(proposed) - } - - /// The ACK backlog is defined as the number of outbound streams that have not yet been - /// acknowledged. - fn ack_backlog(&mut self) -> usize { - self.streams - .iter() - // Whether this is an outbound stream. - // - // Clients use odd IDs and servers use even IDs. - // A stream is outbound if: - // - // - Its ID is odd and we are the client. - // - Its ID is even and we are the server. - .filter(|(id, _)| match self.mode { - Mode::Client => id.is_client(), - Mode::Server => id.is_server(), - }) - .filter(|(_, s)| s.lock().is_pending_ack()) - .count() - } - - // Check if the given stream ID is valid w.r.t. the provided tag and our connection mode. - fn is_valid_remote_id(&self, id: StreamId, tag: Tag) -> bool { - if tag == Tag::Ping || tag == Tag::GoAway { - return id.is_session(); - } - match self.mode { - Mode::Client => id.is_server(), - Mode::Server => id.is_client(), - } - } -} - -impl Active { - /// Close and drop all `Stream`s and wake any pending `Waker`s. - fn drop_all_streams(&mut self) { - for (id, s) in self.streams.drain() { - let mut shared = s.lock(); - shared.update_state(self.id, id, State::Closed); - if let Some(w) = shared.reader.take() { - w.wake() - } - if let Some(w) = shared.writer.take() { - w.wake() - } - } - } -} diff --git a/src/yamux/connection/cleanup.rs b/src/yamux/connection/cleanup.rs deleted file mode 100644 index d1be682c..00000000 --- a/src/yamux/connection/cleanup.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::yamux::{ - connection::StreamCommand, tagged_stream::TaggedStream, ConnectionError, StreamId, -}; -use futures::{channel::mpsc, stream::SelectAll, StreamExt}; -use std::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -/// A [`Future`] that cleans up resources in case of an error. -#[must_use] -pub struct Cleanup { - state: State, - stream_receivers: SelectAll>>, - error: Option, -} - -impl Cleanup { - pub(crate) fn new( - stream_receivers: SelectAll>>, - error: ConnectionError, - ) -> Self { - Self { - state: State::ClosingStreamReceiver, - stream_receivers, - error: Some(error), - } - } -} - -impl Future for Cleanup { - type Output = ConnectionError; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - match this.state { - State::ClosingStreamReceiver => { - for stream in this.stream_receivers.iter_mut() { - stream.inner_mut().close(); - } - this.state = State::DrainingStreamReceiver; - } - State::DrainingStreamReceiver => match this.stream_receivers.poll_next_unpin(cx) { - Poll::Ready(Some(cmd)) => { - drop(cmd); - } - Poll::Ready(None) | Poll::Pending => - return Poll::Ready( - this.error.take().expect("to not be called after completion"), - ), - }, - } - } - } -} - -#[allow(clippy::enum_variant_names)] -enum State { - ClosingStreamReceiver, - DrainingStreamReceiver, -} diff --git a/src/yamux/connection/closing.rs b/src/yamux/connection/closing.rs deleted file mode 100644 index 6ae541e4..00000000 --- a/src/yamux/connection/closing.rs +++ /dev/null @@ -1,101 +0,0 @@ -use crate::yamux::{ - connection::StreamCommand, frame, frame::Frame, tagged_stream::TaggedStream, Result, StreamId, -}; -use futures::{ - channel::mpsc, - ready, - stream::{Fuse, SelectAll}, - AsyncRead, AsyncWrite, SinkExt, StreamExt, -}; -use std::{ - collections::VecDeque, - future::Future, - pin::Pin, - task::{Context, Poll}, -}; - -/// A [`Future`] that gracefully closes the yamux connection. -#[must_use] -pub struct Closing { - state: State, - stream_receivers: SelectAll>>, - pending_frames: VecDeque>, - socket: Fuse>, -} - -impl Closing -where - T: AsyncRead + AsyncWrite + Unpin, -{ - pub(crate) fn new( - stream_receivers: SelectAll>>, - pending_frames: VecDeque>, - socket: Fuse>, - ) -> Self { - Self { - state: State::ClosingStreamReceiver, - stream_receivers, - pending_frames, - socket, - } - } -} - -impl Future for Closing -where - T: AsyncRead + AsyncWrite + Unpin, -{ - type Output = Result<()>; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); - - loop { - match this.state { - State::ClosingStreamReceiver => { - for stream in this.stream_receivers.iter_mut() { - stream.inner_mut().close(); - } - this.state = State::DrainingStreamReceiver; - } - - State::DrainingStreamReceiver => { - match this.stream_receivers.poll_next_unpin(cx) { - Poll::Ready(Some((_, Some(StreamCommand::SendFrame(frame))))) => - this.pending_frames.push_back(frame.into()), - Poll::Ready(Some((id, Some(StreamCommand::CloseStream { ack })))) => { - this.pending_frames.push_back(Frame::close_stream(id, ack).into()); - } - Poll::Ready(Some((_, None))) => {} - Poll::Pending | Poll::Ready(None) => { - // No more frames from streams, append `Term` frame and flush them all. - this.pending_frames.push_back(Frame::term().into()); - this.state = State::FlushingPendingFrames; - continue; - } - } - } - State::FlushingPendingFrames => { - ready!(this.socket.poll_ready_unpin(cx))?; - - match this.pending_frames.pop_front() { - Some(frame) => this.socket.start_send_unpin(frame)?, - None => this.state = State::ClosingSocket, - } - } - State::ClosingSocket => { - ready!(this.socket.poll_close_unpin(cx))?; - - return Poll::Ready(Ok(())); - } - } - } - } -} - -enum State { - ClosingStreamReceiver, - DrainingStreamReceiver, - FlushingPendingFrames, - ClosingSocket, -} diff --git a/src/yamux/connection/stream.rs b/src/yamux/connection/stream.rs deleted file mode 100644 index d6762bb2..00000000 --- a/src/yamux/connection/stream.rs +++ /dev/null @@ -1,524 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use crate::yamux::{ - chunks::Chunks, - connection::{self, StreamCommand}, - frame::{ - header::{Data, Header, StreamId, WindowUpdate, ACK}, - Frame, - }, - Config, WindowUpdateMode, DEFAULT_CREDIT, -}; -use futures::{ - channel::mpsc, - future::Either, - io::{AsyncRead, AsyncWrite}, - ready, SinkExt, -}; -use parking_lot::{Mutex, MutexGuard}; -use std::{ - convert::TryInto, - fmt, io, - pin::Pin, - sync::Arc, - task::{Context, Poll, Waker}, -}; - -/// Logging target for the file. -const LOG_TARGET: &str = "litep2p::yamux"; - -/// The state of a Yamux stream. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum State { - /// Open bidirectionally. - Open { - /// Whether the stream is acknowledged. - /// - /// For outbound streams, this tracks whether the remote has acknowledged our stream. - /// For inbound streams, this tracks whether we have acknowledged the stream to the remote. - /// - /// This starts out with `false` and is set to `true` when we receive or send an `ACK` flag - /// for this stream. We may also directly transition: - /// - from `Open` to `RecvClosed` if the remote immediately sends `FIN`. - /// - from `Open` to `Closed` if the remote immediately sends `RST`. - acknowledged: bool, - }, - /// Open for incoming messages. - SendClosed, - /// Open for outgoing messages. - RecvClosed, - /// Closed (terminal state). - Closed, -} - -impl State { - /// Can we receive messages over this stream? - pub fn can_read(self) -> bool { - !matches!(self, State::RecvClosed | State::Closed) - } - - /// Can we send messages over this stream? - pub fn can_write(self) -> bool { - !matches!(self, State::SendClosed | State::Closed) - } -} - -/// Indicate if a flag still needs to be set on an outbound header. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub(crate) enum Flag { - /// No flag needs to be set. - None, - /// The stream was opened lazily, so set the initial SYN flag. - Syn, - /// The stream still needs acknowledgement, so set the ACK flag. - Ack, -} - -/// A multiplexed Yamux stream. -/// -/// Streams are created either outbound via [`crate::yamux::Control::open_stream`] -/// or inbound via [`crate::yamux::Connection::poll_next_inbound`]. -/// -/// [`Stream`] implements [`AsyncRead`] and [`AsyncWrite`] and also -/// [`futures::stream::Stream`]. -pub struct Stream { - id: StreamId, - conn: connection::Id, - config: Arc, - sender: mpsc::Sender, - flag: Flag, - shared: Arc>, -} - -impl fmt::Debug for Stream { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.debug_struct("Stream") - .field("id", &self.id.val()) - .field("connection", &self.conn) - .finish() - } -} - -impl fmt::Display for Stream { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "(Stream {}/{})", self.conn, self.id.val()) - } -} - -impl Stream { - pub(crate) fn new_inbound( - id: StreamId, - conn: connection::Id, - config: Arc, - credit: u32, - sender: mpsc::Sender, - ) -> Self { - Self { - id, - conn, - config: config.clone(), - sender, - flag: Flag::None, - shared: Arc::new(Mutex::new(Shared::new(DEFAULT_CREDIT, credit, config))), - } - } - - pub(crate) fn new_outbound( - id: StreamId, - conn: connection::Id, - config: Arc, - window: u32, - sender: mpsc::Sender, - ) -> Self { - Self { - id, - conn, - config: config.clone(), - sender, - flag: Flag::None, - shared: Arc::new(Mutex::new(Shared::new(window, DEFAULT_CREDIT, config))), - } - } - - /// Get this stream's identifier. - pub fn id(&self) -> StreamId { - self.id - } - - pub fn is_write_closed(&self) -> bool { - matches!(self.shared().state(), State::SendClosed) - } - - pub fn is_closed(&self) -> bool { - matches!(self.shared().state(), State::Closed) - } - - /// Whether we are still waiting for the remote to acknowledge this stream. - pub fn is_pending_ack(&self) -> bool { - self.shared().is_pending_ack() - } - - /// Set the flag that should be set on the next outbound frame header. - pub(crate) fn set_flag(&mut self, flag: Flag) { - self.flag = flag - } - - pub(crate) fn shared(&self) -> MutexGuard<'_, Shared> { - self.shared.lock() - } - - pub(crate) fn clone_shared(&self) -> Arc> { - self.shared.clone() - } - - fn write_zero_err(&self) -> io::Error { - let msg = format!("{}/{}: connection is closed", self.conn, self.id); - io::Error::new(io::ErrorKind::WriteZero, msg) - } - - /// Set ACK or SYN flag if necessary. - fn add_flag(&mut self, header: &mut Header>) { - match self.flag { - Flag::None => (), - Flag::Syn => { - header.syn(); - self.flag = Flag::None - } - Flag::Ack => { - header.ack(); - self.flag = Flag::None - } - } - } - - /// Send new credit to the sending side via a window update message if - /// permitted. - fn send_window_update(&mut self, cx: &mut Context) -> Poll> { - // When using [`WindowUpdateMode::OnReceive`] window update messages are - // send early on data receival (see [`crate::Connection::on_frame`]). - if matches!(self.config.window_update_mode, WindowUpdateMode::OnReceive) { - return Poll::Ready(Ok(())); - } - - let mut shared = self.shared.lock(); - - if let Some(credit) = shared.next_window_update() { - ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?); - - shared.window += credit; - drop(shared); - - let mut frame = Frame::window_update(self.id, credit).right(); - self.add_flag(frame.header_mut()); - let cmd = StreamCommand::SendFrame(frame); - self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?; - } - - Poll::Ready(Ok(())) - } -} - -/// Byte data produced by the [`futures::stream::Stream`] impl of [`Stream`]. -#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct Packet(Vec); - -impl AsRef<[u8]> for Packet { - fn as_ref(&self) -> &[u8] { - self.0.as_ref() - } -} - -impl futures::stream::Stream for Stream { - type Item = io::Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if !self.config.read_after_close && self.sender.is_closed() { - return Poll::Ready(None); - } - - match self.send_window_update(cx) { - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))), - // Continue reading buffered data even though sending a window update blocked. - Poll::Pending => {} - } - - let mut shared = self.shared(); - - if let Some(bytes) = shared.buffer.pop() { - let off = bytes.offset(); - let mut vec = bytes.into_vec(); - if off != 0 { - // This should generally not happen when the stream is used only as - // a `futures::stream::Stream` since the whole point of this impl is - // to consume chunks atomically. It may perhaps happen when mixing - // this impl and the `AsyncRead` one. - tracing::debug!( - target: LOG_TARGET, - "{}/{}: chunk has been partially consumed", - self.conn, - self.id - ); - vec = vec.split_off(off) - } - return Poll::Ready(Some(Ok(Packet(vec)))); - } - - // Buffer is empty, let's check if we can expect to read more data. - if !shared.state().can_read() { - tracing::debug!(target: LOG_TARGET, "{}/{}: eof", self.conn, self.id); - return Poll::Ready(None); // stream has been reset - } - - // Since we have no more data at this point, we want to be woken up - // by the connection when more becomes available for us. - shared.reader = Some(cx.waker().clone()); - - Poll::Pending - } -} - -// Like the `futures::stream::Stream` impl above, but copies bytes into the -// provided mutable slice. -impl AsyncRead for Stream { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &mut [u8], - ) -> Poll> { - if !self.config.read_after_close && self.sender.is_closed() { - return Poll::Ready(Ok(0)); - } - - match self.send_window_update(cx) { - Poll::Ready(Ok(())) => {} - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - // Continue reading buffered data even though sending a window update blocked. - Poll::Pending => {} - } - - // Copy data from stream buffer. - let mut shared = self.shared(); - let mut n = 0; - while let Some(chunk) = shared.buffer.front_mut() { - if chunk.is_empty() { - shared.buffer.pop(); - continue; - } - let k = std::cmp::min(chunk.len(), buf.len() - n); - buf[n..n + k].copy_from_slice(&chunk.as_ref()[..k]); - n += k; - chunk.advance(k); - if n == buf.len() { - break; - } - } - - if n > 0 { - tracing::trace!(target: LOG_TARGET,"{}/{}: read {} bytes", self.conn, self.id, n); - return Poll::Ready(Ok(n)); - } - - // Buffer is empty, let's check if we can expect to read more data. - if !shared.state().can_read() { - tracing::debug!(target: LOG_TARGET,"{}/{}: eof", self.conn, self.id); - return Poll::Ready(Ok(0)); // stream has been reset - } - - // Since we have no more data at this point, we want to be woken up - // by the connection when more becomes available for us. - shared.reader = Some(cx.waker().clone()); - - Poll::Pending - } -} - -impl AsyncWrite for Stream { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context, - buf: &[u8], - ) -> Poll> { - ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?); - let body = { - let mut shared = self.shared(); - if !shared.state().can_write() { - tracing::debug!(target: LOG_TARGET,"{}/{}: can no longer write", self.conn, self.id); - return Poll::Ready(Err(self.write_zero_err())); - } - if shared.credit == 0 { - tracing::trace!(target: LOG_TARGET,"{}/{}: no more credit left", self.conn, self.id); - shared.writer = Some(cx.waker().clone()); - return Poll::Pending; - } - let k = std::cmp::min(shared.credit as usize, buf.len()); - let k = std::cmp::min(k, self.config.split_send_size); - shared.credit = shared.credit.saturating_sub(k as u32); - Vec::from(&buf[..k]) - }; - let n = body.len(); - let mut frame = Frame::data(self.id, body).expect("body <= u32::MAX").left(); - self.add_flag(frame.header_mut()); - tracing::trace!(target: LOG_TARGET,"{}/{}: write {} bytes", self.conn, self.id, n); - - // technically, the frame hasn't been sent yet on the wire but from the perspective of this - // data structure, we've queued the frame for sending We are tracking this - // information: a) to be consistent with outbound streams - // b) to correctly test our behaviour around timing of when ACKs are sent. See - // `ack_timing.rs` test. - if frame.header().flags().contains(ACK) { - self.shared() - .update_state(self.conn, self.id, State::Open { acknowledged: true }); - } - - let cmd = StreamCommand::SendFrame(frame); - self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?; - Poll::Ready(Ok(n)) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - self.sender.poll_flush_unpin(cx).map_err(|_| self.write_zero_err()) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - if self.is_closed() { - return Poll::Ready(Ok(())); - } - ready!(self.sender.poll_ready(cx).map_err(|_| self.write_zero_err())?); - let ack = if self.flag == Flag::Ack { - self.flag = Flag::None; - true - } else { - false - }; - tracing::trace!(target: LOG_TARGET,"{}/{}: close", self.conn, self.id); - let cmd = StreamCommand::CloseStream { ack }; - self.sender.start_send(cmd).map_err(|_| self.write_zero_err())?; - self.shared().update_state(self.conn, self.id, State::SendClosed); - Poll::Ready(Ok(())) - } -} - -#[derive(Debug)] -pub(crate) struct Shared { - state: State, - pub(crate) window: u32, - pub(crate) credit: u32, - pub(crate) buffer: Chunks, - pub(crate) reader: Option, - pub(crate) writer: Option, - config: Arc, -} - -impl Shared { - fn new(window: u32, credit: u32, config: Arc) -> Self { - Shared { - state: State::Open { - acknowledged: false, - }, - window, - credit, - buffer: Chunks::new(), - reader: None, - writer: None, - config, - } - } - - pub(crate) fn state(&self) -> State { - self.state - } - - /// Update the stream state and return the state before it was updated. - pub(crate) fn update_state( - &mut self, - cid: connection::Id, - sid: StreamId, - next: State, - ) -> State { - use self::State::*; - - let current = self.state; - - match (current, next) { - (Closed, _) => {} - (Open { .. }, _) => self.state = next, - (RecvClosed, Closed) => self.state = Closed, - (RecvClosed, Open { .. }) => {} - (RecvClosed, RecvClosed) => {} - (RecvClosed, SendClosed) => self.state = Closed, - (SendClosed, Closed) => self.state = Closed, - (SendClosed, Open { .. }) => {} - (SendClosed, RecvClosed) => self.state = Closed, - (SendClosed, SendClosed) => {} - } - - tracing::trace!(target: LOG_TARGET, - "{}/{}: update state: (from {:?} to {:?} -> {:?})", - cid, - sid, - current, - next, - self.state - ); - - current // Return the previous stream state for informational purposes. - } - - /// Calculate the number of additional window bytes the receiving side - /// should grant the sending side via a window update message. - /// - /// Returns `None` if too small to justify a window update message. - /// - /// Note: Once a caller successfully sent a window update message, the - /// locally tracked window size needs to be updated manually by the caller. - pub(crate) fn next_window_update(&mut self) -> Option { - if !self.state.can_read() { - return None; - } - - let new_credit = match self.config.window_update_mode { - WindowUpdateMode::OnReceive => { - debug_assert!(self.config.receive_window >= self.window); - - self.config.receive_window.saturating_sub(self.window) - } - WindowUpdateMode::OnRead => { - debug_assert!(self.config.receive_window >= self.window); - let bytes_received = self.config.receive_window.saturating_sub(self.window); - let buffer_len: u32 = self.buffer.len().try_into().unwrap_or(u32::MAX); - - bytes_received.saturating_sub(buffer_len) - } - }; - - // Send WindowUpdate message when half or more of the configured receive - // window can be granted as additional credit to the sender. - // - // See https://github.com/paritytech/yamux/issues/100 for a detailed - // discussion. - if new_credit >= self.config.receive_window / 2 { - Some(new_credit) - } else { - None - } - } - - /// Whether we are still waiting for the remote to acknowledge this stream. - pub fn is_pending_ack(&self) -> bool { - matches!( - self.state(), - State::Open { - acknowledged: false - } - ) - } -} diff --git a/src/yamux/error.rs b/src/yamux/error.rs deleted file mode 100644 index 672e4ad9..00000000 --- a/src/yamux/error.rs +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use crate::yamux::frame::FrameDecodeError; - -/// The various error cases a connection may encounter. -#[non_exhaustive] -#[derive(Debug)] -pub enum ConnectionError { - /// An underlying I/O error occured. - Io(std::io::Error), - /// Decoding a Yamux message frame failed. - Decode(FrameDecodeError), - /// The whole range of stream IDs has been used up. - NoMoreStreamIds, - /// An operation fails because the connection is closed. - Closed, - /// Too many streams are open, so no further ones can be opened at this time. - TooManyStreams, -} - -impl PartialEq for ConnectionError { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (ConnectionError::Io(e1), ConnectionError::Io(e2)) => e1.kind() == e2.kind(), - (ConnectionError::Decode(e1), ConnectionError::Decode(e2)) => e1 == e2, - (ConnectionError::NoMoreStreamIds, ConnectionError::NoMoreStreamIds) - | (ConnectionError::Closed, ConnectionError::Closed) - | (ConnectionError::TooManyStreams, ConnectionError::TooManyStreams) => true, - _ => false, - } - } -} - -impl std::fmt::Display for ConnectionError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - ConnectionError::Io(e) => write!(f, "i/o error: {}", e), - ConnectionError::Decode(e) => write!(f, "decode error: {}", e), - ConnectionError::NoMoreStreamIds => - f.write_str("number of stream ids has been exhausted"), - ConnectionError::Closed => f.write_str("connection is closed"), - ConnectionError::TooManyStreams => f.write_str("maximum number of streams reached"), - } - } -} - -impl std::error::Error for ConnectionError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - ConnectionError::Io(e) => Some(e), - ConnectionError::Decode(e) => Some(e), - ConnectionError::NoMoreStreamIds - | ConnectionError::Closed - | ConnectionError::TooManyStreams => None, - } - } -} - -impl From for ConnectionError { - fn from(e: std::io::Error) -> Self { - ConnectionError::Io(e) - } -} - -impl From for ConnectionError { - fn from(e: FrameDecodeError) -> Self { - ConnectionError::Decode(e) - } -} - -impl From for ConnectionError { - fn from(_: futures::channel::mpsc::SendError) -> Self { - ConnectionError::Closed - } -} - -impl From for ConnectionError { - fn from(_: futures::channel::oneshot::Canceled) -> Self { - ConnectionError::Closed - } -} diff --git a/src/yamux/frame.rs b/src/yamux/frame.rs deleted file mode 100644 index 692840a4..00000000 --- a/src/yamux/frame.rs +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -pub mod header; -mod io; - -use futures::future::Either; -use header::{Data, GoAway, Header, Ping, StreamId, WindowUpdate}; -use std::{convert::TryInto, num::TryFromIntError}; - -pub use io::FrameDecodeError; -pub(crate) use io::Io; - -/// A Yamux message frame consisting of header and body. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Frame { - header: Header, - body: Vec, -} - -impl Frame { - pub fn new(header: Header) -> Self { - Frame { - header, - body: Vec::new(), - } - } - - pub fn header(&self) -> &Header { - &self.header - } - - pub fn header_mut(&mut self) -> &mut Header { - &mut self.header - } - - /// Introduce this frame to the right of a binary frame type. - pub(crate) fn right(self) -> Frame> { - Frame { - header: self.header.right(), - body: self.body, - } - } - - /// Introduce this frame to the left of a binary frame type. - pub(crate) fn left(self) -> Frame> { - Frame { - header: self.header.left(), - body: self.body, - } - } -} - -impl From> for Frame<()> { - fn from(f: Frame) -> Frame<()> { - Frame { - header: f.header.into(), - body: f.body, - } - } -} - -impl Frame<()> { - pub(crate) fn into_data(self) -> Frame { - Frame { - header: self.header.into_data(), - body: self.body, - } - } - - pub(crate) fn into_window_update(self) -> Frame { - Frame { - header: self.header.into_window_update(), - body: self.body, - } - } - - pub(crate) fn into_ping(self) -> Frame { - Frame { - header: self.header.into_ping(), - body: self.body, - } - } -} - -impl Frame { - pub fn data(id: StreamId, b: Vec) -> Result { - Ok(Frame { - header: Header::data(id, b.len().try_into()?), - body: b, - }) - } - - pub fn close_stream(id: StreamId, ack: bool) -> Self { - let mut header = Header::data(id, 0); - header.fin(); - if ack { - header.ack() - } - - Frame::new(header) - } - - pub fn body(&self) -> &[u8] { - &self.body - } - - pub fn body_len(&self) -> u32 { - // Safe cast since we construct `Frame::`s only with - // `Vec` of length [0, u32::MAX] in `Frame::data` above. - self.body().len() as u32 - } - - pub fn into_body(self) -> Vec { - self.body - } -} - -impl Frame { - pub fn window_update(id: StreamId, credit: u32) -> Self { - Frame { - header: Header::window_update(id, credit), - body: Vec::new(), - } - } -} - -impl Frame { - pub fn term() -> Self { - Frame { - header: Header::term(), - body: Vec::new(), - } - } - - pub fn protocol_error() -> Self { - Frame { - header: Header::protocol_error(), - body: Vec::new(), - } - } - - pub fn internal_error() -> Self { - Frame { - header: Header::internal_error(), - body: Vec::new(), - } - } -} diff --git a/src/yamux/frame/header.rs b/src/yamux/frame/header.rs deleted file mode 100644 index cad9ed64..00000000 --- a/src/yamux/frame/header.rs +++ /dev/null @@ -1,443 +0,0 @@ -// Copyright (c) 2018-2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use futures::future::Either; -use std::fmt; - -/// The message frame header. -#[derive(Clone, Debug, PartialEq, Eq)] -pub struct Header { - version: Version, - tag: Tag, - flags: Flags, - stream_id: StreamId, - length: Len, - _marker: std::marker::PhantomData, -} - -impl fmt::Display for Header { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "(Header {:?} {} (len {}) (flags {:?}))", - self.tag, - self.stream_id, - self.length.val(), - self.flags.val() - ) - } -} - -impl Header { - pub fn tag(&self) -> Tag { - self.tag - } - - pub fn flags(&self) -> Flags { - self.flags - } - - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - pub fn len(&self) -> Len { - self.length - } - - #[cfg(test)] - pub fn set_len(&mut self, len: u32) { - self.length = Len(len) - } - - /// Arbitrary type cast, use with caution. - fn cast(self) -> Header { - Header { - version: self.version, - tag: self.tag, - flags: self.flags, - stream_id: self.stream_id, - length: self.length, - _marker: std::marker::PhantomData, - } - } - - /// Introduce this header to the right of a binary header type. - pub(crate) fn right(self) -> Header> { - self.cast() - } - - /// Introduce this header to the left of a binary header type. - pub(crate) fn left(self) -> Header> { - self.cast() - } -} - -impl From> for Header<()> { - fn from(h: Header) -> Header<()> { - h.cast() - } -} - -impl Header<()> { - pub(crate) fn into_data(self) -> Header { - debug_assert_eq!(self.tag, Tag::Data); - self.cast() - } - - pub(crate) fn into_window_update(self) -> Header { - debug_assert_eq!(self.tag, Tag::WindowUpdate); - self.cast() - } - - pub(crate) fn into_ping(self) -> Header { - debug_assert_eq!(self.tag, Tag::Ping); - self.cast() - } -} - -impl Header { - /// Set the [`SYN`] flag. - pub fn syn(&mut self) { - self.flags.0 |= SYN.0 - } -} - -impl Header { - /// Set the [`ACK`] flag. - pub fn ack(&mut self) { - self.flags.0 |= ACK.0 - } -} - -impl Header { - /// Set the [`FIN`] flag. - pub fn fin(&mut self) { - self.flags.0 |= FIN.0 - } -} - -impl Header { - /// Set the [`RST`] flag. - pub fn rst(&mut self) { - self.flags.0 |= RST.0 - } -} - -impl Header { - /// Create a new data frame header. - pub fn data(id: StreamId, len: u32) -> Self { - Header { - version: Version(0), - tag: Tag::Data, - flags: Flags(0), - stream_id: id, - length: Len(len), - _marker: std::marker::PhantomData, - } - } -} - -impl Header { - /// Create a new window update frame header. - pub fn window_update(id: StreamId, credit: u32) -> Self { - Header { - version: Version(0), - tag: Tag::WindowUpdate, - flags: Flags(0), - stream_id: id, - length: Len(credit), - _marker: std::marker::PhantomData, - } - } - - /// The credit this window update grants to the remote. - pub fn credit(&self) -> u32 { - self.length.0 - } -} - -impl Header { - /// Create a new ping frame header. - pub fn ping(nonce: u32) -> Self { - Header { - version: Version(0), - tag: Tag::Ping, - flags: Flags(0), - stream_id: StreamId(0), - length: Len(nonce), - _marker: std::marker::PhantomData, - } - } - - /// The nonce of this ping. - pub fn nonce(&self) -> u32 { - self.length.0 - } -} - -impl Header { - /// Terminate the session without indicating an error to the remote. - pub fn term() -> Self { - Self::go_away(0) - } - - /// Terminate the session indicating a protocol error to the remote. - pub fn protocol_error() -> Self { - Self::go_away(1) - } - - /// Terminate the session indicating an internal error to the remote. - pub fn internal_error() -> Self { - Self::go_away(2) - } - - fn go_away(code: u32) -> Self { - Header { - version: Version(0), - tag: Tag::GoAway, - flags: Flags(0), - stream_id: StreamId(0), - length: Len(code), - _marker: std::marker::PhantomData, - } - } -} - -/// Data message type. -#[derive(Clone, Debug)] -pub enum Data {} - -/// Window update message type. -#[derive(Clone, Debug)] -pub enum WindowUpdate {} - -/// Ping message type. -#[derive(Clone, Debug)] -pub enum Ping {} - -/// Go Away message type. -#[derive(Clone, Debug)] -pub enum GoAway {} - -/// Types which have a `syn` method. -pub trait HasSyn: private::Sealed {} -impl HasSyn for Data {} -impl HasSyn for WindowUpdate {} -impl HasSyn for Ping {} -impl HasSyn for Either {} - -/// Types which have an `ack` method. -pub trait HasAck: private::Sealed {} -impl HasAck for Data {} -impl HasAck for WindowUpdate {} -impl HasAck for Ping {} -impl HasAck for Either {} - -/// Types which have a `fin` method. -pub trait HasFin: private::Sealed {} -impl HasFin for Data {} -impl HasFin for WindowUpdate {} - -/// Types which have a `rst` method. -pub trait HasRst: private::Sealed {} -impl HasRst for Data {} -impl HasRst for WindowUpdate {} - -pub(super) mod private { - pub trait Sealed {} - - impl Sealed for super::Data {} - impl Sealed for super::WindowUpdate {} - impl Sealed for super::Ping {} - impl Sealed for super::GoAway {} - impl Sealed for super::Either {} -} - -/// A tag is the runtime representation of a message type. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub enum Tag { - Data, - WindowUpdate, - Ping, - GoAway, -} - -/// The protocol version a message corresponds to. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct Version(u8); - -/// The message length. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct Len(u32); - -impl Len { - pub fn val(self) -> u32 { - self.0 - } -} - -pub const CONNECTION_ID: StreamId = StreamId(0); - -/// The ID of a stream. -/// -/// The value 0 denotes no particular stream but the whole session. -#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] -pub struct StreamId(u32); - -impl StreamId { - pub(crate) fn new(val: u32) -> Self { - StreamId(val) - } - - pub fn is_server(self) -> bool { - self.0 % 2 == 0 - } - - pub fn is_client(self) -> bool { - !self.is_server() - } - - pub fn is_session(self) -> bool { - self == CONNECTION_ID - } - - pub fn val(self) -> u32 { - self.0 - } -} - -impl fmt::Display for StreamId { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - -impl nohash_hasher::IsEnabled for StreamId {} - -/// Possible flags set on a message. -#[derive(Copy, Clone, Debug, PartialEq, Eq)] -pub struct Flags(u16); - -impl Flags { - pub fn contains(self, other: Flags) -> bool { - self.0 & other.0 == other.0 - } - - pub fn val(self) -> u16 { - self.0 - } -} - -/// Indicates the start of a new stream. -pub const SYN: Flags = Flags(1); - -/// Acknowledges the start of a new stream. -pub const ACK: Flags = Flags(2); - -/// Indicates the half-closing of a stream. -pub const FIN: Flags = Flags(4); - -/// Indicates an immediate stream reset. -pub const RST: Flags = Flags(8); - -/// The serialised header size in bytes. -pub const HEADER_SIZE: usize = 12; - -/// Encode a [`Header`] value. -pub fn encode(hdr: &Header) -> [u8; HEADER_SIZE] { - let mut buf = [0; HEADER_SIZE]; - buf[0] = hdr.version.0; - buf[1] = hdr.tag as u8; - buf[2..4].copy_from_slice(&hdr.flags.0.to_be_bytes()); - buf[4..8].copy_from_slice(&hdr.stream_id.0.to_be_bytes()); - buf[8..HEADER_SIZE].copy_from_slice(&hdr.length.0.to_be_bytes()); - buf -} - -/// Decode a [`Header`] value. -pub fn decode(buf: &[u8; HEADER_SIZE]) -> Result, HeaderDecodeError> { - if buf[0] != 0 { - return Err(HeaderDecodeError::Version(buf[0])); - } - - let hdr = Header { - version: Version(buf[0]), - tag: match buf[1] { - 0 => Tag::Data, - 1 => Tag::WindowUpdate, - 2 => Tag::Ping, - 3 => Tag::GoAway, - t => return Err(HeaderDecodeError::Type(t)), - }, - flags: Flags(u16::from_be_bytes([buf[2], buf[3]])), - stream_id: StreamId(u32::from_be_bytes([buf[4], buf[5], buf[6], buf[7]])), - length: Len(u32::from_be_bytes([buf[8], buf[9], buf[10], buf[11]])), - _marker: std::marker::PhantomData, - }; - - Ok(hdr) -} - -/// Possible errors while decoding a message frame header. -#[non_exhaustive] -#[derive(Debug, PartialEq)] -pub enum HeaderDecodeError { - /// Unknown version. - Version(u8), - /// An unknown frame type. - Type(u8), -} - -impl std::fmt::Display for HeaderDecodeError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - HeaderDecodeError::Version(v) => write!(f, "unknown version: {}", v), - HeaderDecodeError::Type(t) => write!(f, "unknown frame type: {}", t), - } - } -} - -impl std::error::Error for HeaderDecodeError {} - -#[cfg(test)] -mod tests { - use super::*; - use quickcheck::{Arbitrary, Gen, QuickCheck}; - - impl Arbitrary for Header<()> { - fn arbitrary(g: &mut Gen) -> Self { - let tag = *g.choose(&[Tag::Data, Tag::WindowUpdate, Tag::Ping, Tag::GoAway]).unwrap(); - - Header { - version: Version(0), - tag, - flags: Flags(Arbitrary::arbitrary(g)), - stream_id: StreamId(Arbitrary::arbitrary(g)), - length: Len(Arbitrary::arbitrary(g)), - _marker: std::marker::PhantomData, - } - } - } - - #[test] - fn encode_decode_identity() { - fn property(hdr: Header<()>) -> bool { - match decode(&encode(&hdr)) { - Ok(x) => x == hdr, - Err(e) => { - eprintln!("decode error: {}", e); - false - } - } - } - QuickCheck::new().tests(10_000).quickcheck(property as fn(Header<()>) -> bool) - } -} diff --git a/src/yamux/frame/io.rs b/src/yamux/frame/io.rs deleted file mode 100644 index ba799016..00000000 --- a/src/yamux/frame/io.rs +++ /dev/null @@ -1,384 +0,0 @@ -// Copyright (c) 2019 Parity Technologies (UK) Ltd. -// -// Licensed under the Apache License, Version 2.0 or MIT license, at your option. -// -// A copy of the Apache License, Version 2.0 is included in the software as -// LICENSE-APACHE and a copy of the MIT license is included in the software -// as LICENSE-MIT. You may also obtain a copy of the Apache License, Version 2.0 -// at https://www.apache.org/licenses/LICENSE-2.0 and a copy of the MIT license -// at https://opensource.org/licenses/MIT. - -use super::{ - header::{self, HeaderDecodeError}, - Frame, -}; -use crate::yamux::connection::Id; -use futures::{prelude::*, ready}; -use std::{ - fmt, io, - pin::Pin, - task::{Context, Poll}, -}; - -/// Logging target for the file. -const LOG_TARGET: &str = "litep2p::yamux"; - -/// A [`Stream`] and writer of [`Frame`] values. -#[derive(Debug)] -pub(crate) struct Io { - id: Id, - io: T, - read_state: ReadState, - write_state: WriteState, - max_body_len: usize, -} - -impl Io { - pub(crate) fn new(id: Id, io: T, max_frame_body_len: usize) -> Self { - Io { - id, - io, - read_state: ReadState::Init, - write_state: WriteState::Init, - max_body_len: max_frame_body_len, - } - } -} - -/// The stages of writing a new `Frame`. -enum WriteState { - Init, - Header { - header: [u8; header::HEADER_SIZE], - buffer: Vec, - offset: usize, - }, - Body { - buffer: Vec, - offset: usize, - }, -} - -impl fmt::Debug for WriteState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - WriteState::Init => f.write_str("(WriteState::Init)"), - WriteState::Header { offset, .. } => { - write!(f, "(WriteState::Header (offset {}))", offset) - } - WriteState::Body { offset, buffer } => { - write!( - f, - "(WriteState::Body (offset {}) (buffer-len {}))", - offset, - buffer.len() - ) - } - } - } -} - -impl Sink> for Io { - type Error = io::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - loop { - tracing::trace!(target: LOG_TARGET, "{}: write: {:?}", this.id, this.write_state); - match &mut this.write_state { - WriteState::Init => return Poll::Ready(Ok(())), - WriteState::Header { - header, - buffer, - ref mut offset, - } => match Pin::new(&mut this.io).poll_write(cx, &header[*offset..]) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Ready(Ok(n)) => { - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); - } - *offset += n; - if *offset == header.len() { - if !buffer.is_empty() { - let buffer = std::mem::take(buffer); - this.write_state = WriteState::Body { buffer, offset: 0 }; - } else { - this.write_state = WriteState::Init; - } - } - } - }, - WriteState::Body { - buffer, - ref mut offset, - } => match Pin::new(&mut this.io).poll_write(cx, &buffer[*offset..]) { - Poll::Pending => return Poll::Pending, - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Ready(Ok(n)) => { - if n == 0 { - return Poll::Ready(Err(io::ErrorKind::WriteZero.into())); - } - *offset += n; - if *offset == buffer.len() { - this.write_state = WriteState::Init; - } - } - }, - } - } - } - - fn start_send(self: Pin<&mut Self>, f: Frame<()>) -> Result<(), Self::Error> { - let header = header::encode(&f.header); - let buffer = f.body; - self.get_mut().write_state = WriteState::Header { - header, - buffer, - offset: 0, - }; - Ok(()) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - ready!(this.poll_ready_unpin(cx))?; - Pin::new(&mut this.io).poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = Pin::into_inner(self); - ready!(this.poll_ready_unpin(cx))?; - Pin::new(&mut this.io).poll_close(cx) - } -} - -/// The stages of reading a new `Frame`. -enum ReadState { - /// Initial reading state. - Init, - /// Reading the frame header. - Header { - offset: usize, - buffer: [u8; header::HEADER_SIZE], - }, - /// Reading the frame body. - Body { - header: header::Header<()>, - offset: usize, - buffer: Vec, - }, -} - -impl Stream for Io { - type Item = Result, FrameDecodeError>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = &mut *self; - loop { - tracing::trace!(target: LOG_TARGET, "{}: read: {:?}", this.id, this.read_state); - match this.read_state { - ReadState::Init => { - this.read_state = ReadState::Header { - offset: 0, - buffer: [0; header::HEADER_SIZE], - }; - } - ReadState::Header { - ref mut offset, - ref mut buffer, - } => { - if *offset == header::HEADER_SIZE { - let header = match header::decode(buffer) { - Ok(hd) => hd, - Err(e) => return Poll::Ready(Some(Err(e.into()))), - }; - - tracing::trace!(target: LOG_TARGET, "{}: read: {}", this.id, header); - - if header.tag() != header::Tag::Data { - this.read_state = ReadState::Init; - return Poll::Ready(Some(Ok(Frame::new(header)))); - } - - let body_len = header.len().val() as usize; - - if body_len > this.max_body_len { - return Poll::Ready(Some(Err(FrameDecodeError::FrameTooLarge( - body_len, - )))); - } - - this.read_state = ReadState::Body { - header, - offset: 0, - buffer: vec![0; body_len], - }; - - continue; - } - - let buf = &mut buffer[*offset..header::HEADER_SIZE]; - match ready!(Pin::new(&mut this.io).poll_read(cx, buf))? { - 0 => { - if *offset == 0 { - return Poll::Ready(None); - } - let e = FrameDecodeError::Io(io::ErrorKind::UnexpectedEof.into()); - return Poll::Ready(Some(Err(e))); - } - n => *offset += n, - } - } - ReadState::Body { - ref header, - ref mut offset, - ref mut buffer, - } => { - let body_len = header.len().val() as usize; - - if *offset == body_len { - let h = header.clone(); - let v = std::mem::take(buffer); - this.read_state = ReadState::Init; - return Poll::Ready(Some(Ok(Frame { header: h, body: v }))); - } - - let buf = &mut buffer[*offset..body_len]; - match ready!(Pin::new(&mut this.io).poll_read(cx, buf))? { - 0 => { - let e = FrameDecodeError::Io(io::ErrorKind::UnexpectedEof.into()); - return Poll::Ready(Some(Err(e))); - } - n => *offset += n, - } - } - } - } - } -} - -impl fmt::Debug for ReadState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - ReadState::Init => f.write_str("(ReadState::Init)"), - ReadState::Header { offset, .. } => { - write!(f, "(ReadState::Header (offset {}))", offset) - } - ReadState::Body { - header, - offset, - buffer, - } => { - write!( - f, - "(ReadState::Body (header {}) (offset {}) (buffer-len {}))", - header, - offset, - buffer.len() - ) - } - } - } -} - -/// Possible errors while decoding a message frame. -#[non_exhaustive] -#[derive(Debug)] -pub enum FrameDecodeError { - /// An I/O error. - Io(io::Error), - /// Decoding the frame header failed. - Header(HeaderDecodeError), - /// A data frame body length is larger than the configured maximum. - FrameTooLarge(usize), -} - -impl PartialEq for FrameDecodeError { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (FrameDecodeError::Io(e1), FrameDecodeError::Io(e2)) => e1.kind() == e2.kind(), - (FrameDecodeError::Header(e1), FrameDecodeError::Header(e2)) => e1 == e2, - (FrameDecodeError::FrameTooLarge(n1), FrameDecodeError::FrameTooLarge(n2)) => n1 == n2, - _ => false, - } - } -} - -impl std::fmt::Display for FrameDecodeError { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - match self { - FrameDecodeError::Io(e) => write!(f, "i/o error: {}", e), - FrameDecodeError::Header(e) => write!(f, "decode error: {}", e), - FrameDecodeError::FrameTooLarge(n) => write!(f, "frame body is too large ({})", n), - } - } -} - -impl std::error::Error for FrameDecodeError { - fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { - match self { - FrameDecodeError::Io(e) => Some(e), - FrameDecodeError::Header(e) => Some(e), - FrameDecodeError::FrameTooLarge(_) => None, - } - } -} - -impl From for FrameDecodeError { - fn from(e: std::io::Error) -> Self { - FrameDecodeError::Io(e) - } -} - -impl From for FrameDecodeError { - fn from(e: HeaderDecodeError) -> Self { - FrameDecodeError::Header(e) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use quickcheck::{Arbitrary, Gen, QuickCheck}; - use rand::RngCore; - - impl Arbitrary for Frame<()> { - fn arbitrary(g: &mut Gen) -> Self { - let mut header: header::Header<()> = Arbitrary::arbitrary(g); - let body = if header.tag() == header::Tag::Data { - header.set_len(header.len().val() % 4096); - let mut b = vec![0; header.len().val() as usize]; - rand::thread_rng().fill_bytes(&mut b); - b - } else { - Vec::new() - }; - Frame { header, body } - } - } - - #[test] - fn encode_decode_identity() { - fn property(f: Frame<()>) -> bool { - futures::executor::block_on(async move { - let id = crate::yamux::connection::Id::random(); - let mut io = Io::new(id, futures::io::Cursor::new(Vec::new()), f.body.len()); - if io.send(f.clone()).await.is_err() { - return false; - } - if io.flush().await.is_err() { - return false; - } - io.io.set_position(0); - if let Ok(Some(x)) = io.try_next().await { - x == f - } else { - false - } - }) - } - - QuickCheck::new().tests(10_000).quickcheck(property as fn(Frame<()>) -> bool) - } -} diff --git a/src/yamux/tagged_stream.rs b/src/yamux/tagged_stream.rs deleted file mode 100644 index 5583a5b7..00000000 --- a/src/yamux/tagged_stream.rs +++ /dev/null @@ -1,54 +0,0 @@ -use futures::Stream; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; - -/// A stream that yields its tag with every item. -#[pin_project::pin_project] -pub struct TaggedStream { - key: K, - #[pin] - inner: S, - - reported_none: bool, -} - -impl TaggedStream { - pub fn new(key: K, inner: S) -> Self { - Self { - key, - inner, - reported_none: false, - } - } - - pub fn inner_mut(&mut self) -> &mut S { - &mut self.inner - } -} - -impl Stream for TaggedStream -where - K: Copy, - S: Stream, -{ - type Item = (K, Option); - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - if *this.reported_none { - return Poll::Ready(None); - } - - match futures::ready!(this.inner.poll_next(cx)) { - Some(item) => Poll::Ready(Some((*this.key, Some(item)))), - None => { - *this.reported_none = true; - - Poll::Ready(Some((*this.key, None))) - } - } - } -} From 16ef344ba9601a85e0363161ddcd58fcd987094f Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Thu, 23 Jan 2025 14:37:04 +0200 Subject: [PATCH 7/8] cargo: Remove unused dependencies Signed-off-by: Alexandru Vasile --- Cargo.lock | 2 -- Cargo.toml | 2 -- 2 files changed, 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d772aa0a..1ee40fa2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2886,7 +2886,6 @@ dependencies = [ "multiaddr", "multihash 0.17.0", "network-interface", - "nohash-hasher", "parking_lot 0.12.3", "pin-project", "prost 0.12.6", @@ -2907,7 +2906,6 @@ dependencies = [ "smallvec", "snow", "socket2 0.5.7", - "static_assertions", "str0m", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 2d7caac5..bf9749c0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,8 +52,6 @@ x25519-dalek = "2.0.0" x509-parser = "0.16.0" yasna = "0.5.0" zeroize = "1.8.1" -nohash-hasher = "0.2.0" -static_assertions = "1.1.0" libp2p-yamux = { version = "0.13.4", package = "yamux" } # Exposed dependencies. Breaking changes to these are breaking changes to us. From 5b91511aa876d5cf2d55d3cc08c3fabdfbbabdcb Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Wed, 29 Jan 2025 14:08:25 +0200 Subject: [PATCH 8/8] cargo: Rename libp2p-yamux to simply yamux Signed-off-by: Alexandru Vasile --- Cargo.toml | 2 +- src/yamux/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf9749c0..4866c340 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,7 +52,7 @@ x25519-dalek = "2.0.0" x509-parser = "0.16.0" yasna = "0.5.0" zeroize = "1.8.1" -libp2p-yamux = { version = "0.13.4", package = "yamux" } +yamux = "0.13.4" # Exposed dependencies. Breaking changes to these are breaking changes to us. [dependencies.rustls] diff --git a/src/yamux/mod.rs b/src/yamux/mod.rs index 76c60cb8..f2635193 100644 --- a/src/yamux/mod.rs +++ b/src/yamux/mod.rs @@ -25,7 +25,7 @@ mod control; -pub use libp2p_yamux::{ +pub use yamux::{ Config, Connection, ConnectionError, FrameDecodeError, HeaderDecodeError, Mode, Packet, Result, Stream, StreamId, };