Skip to content

Commit

Permalink
io: add unidirectional stream structure
Browse files Browse the repository at this point in the history
Up until this patch we had the `DuplexStream` which was backed by
two underlying pipes. This patch makes public this underlying structure
and renames it from the `Pipe` to the `SimplexStream` to provent
name confusion and keep simililiarity with the already existing duplex.
  • Loading branch information
wutchzone committed Aug 5, 2024
1 parent ab53bf0 commit 45fb332
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
4 changes: 2 additions & 2 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ cfg_io_util! {
pub(crate) mod seek;
pub(crate) mod util;
pub use util::{
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
};
}

Expand Down
65 changes: 44 additions & 21 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! In-process memory IO types.

use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};
use crate::loom::sync::Mutex;

use bytes::{Buf, BytesMut};
Expand Down Expand Up @@ -47,15 +47,31 @@ use std::{
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
read: Arc<Mutex<SimplexStream>>,
write: Arc<Mutex<SimplexStream>>,
}

/// A unidirectional IO over a piece of memory.
/// A unidirectional pipe to read and write bytes in memory.
///
/// Data can be written to the pipe, and reading will return that data.
/// # Example
///
/// ```
/// # async fn ex() -> std::io::Result<()> {
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// let (mut receiver, mut sender) = tokio::io::simplex(64);
///
/// sender.write_all(b"ping").await?;
///
/// let mut buf = [0u8; 4];
/// receiver.read_exact(&mut buf).await?;
/// assert_eq!(&buf, b"ping");
///
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
struct Pipe {
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct SimplexStream {
/// The buffer storing the bytes written, also read from.
///
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
Expand Down Expand Up @@ -83,8 +99,10 @@ struct Pipe {
/// written to a side before the write returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let (reader, writer) = simplex(max_buf_size);
let one = Arc::new(Mutex::new(reader.unsplit(writer)));
let (reader, writer) = simplex(max_buf_size);
let two = Arc::new(Mutex::new(reader.unsplit(writer)));

(
DuplexStream {
Expand Down Expand Up @@ -161,19 +179,24 @@ impl Drop for DuplexStream {
}
}

// ===== impl Pipe =====
// ===== impl SimplexStream =====

impl Pipe {
fn new(max_buf_size: usize) -> Self {
Pipe {
buffer: BytesMut::new(),
is_closed: false,
max_buf_size,
read_waker: None,
write_waker: None,
}
}
/// Creates unidirectional buffer that acts like a pipe.
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a buffer before the it returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) {
split(SimplexStream {
buffer: BytesMut::new(),
is_closed: false,
max_buf_size,
read_waker: None,
write_waker: None,
})
}

impl SimplexStream {
fn close_write(&mut self) {
self.is_closed = true;
// needs to notify any readers that no more data will come
Expand Down Expand Up @@ -269,7 +292,7 @@ impl Pipe {
}
}

impl AsyncRead for Pipe {
impl AsyncRead for SimplexStream {
cfg_coop! {
fn poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -299,7 +322,7 @@ impl AsyncRead for Pipe {
}
}

impl AsyncWrite for Pipe {
impl AsyncWrite for SimplexStream {
cfg_coop! {
fn poll_write(
self: Pin<&mut Self>,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ cfg_io_util! {
pub use lines::Lines;

mod mem;
pub use mem::{duplex, DuplexStream};
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};

mod read;
mod read_buf;
Expand Down

0 comments on commit 45fb332

Please sign in to comment.