Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SimplexStream structure #6589

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ cfg_io_util! {
pub(crate) mod seek;
pub(crate) mod util;
pub use util::{
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take,
copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream,
};
}

Expand Down
83 changes: 69 additions & 14 deletions tokio/src/io/util/mem.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! In-process memory IO types.

use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf};
use crate::loom::sync::Mutex;

use bytes::{Buf, BytesMut};
Expand Down Expand Up @@ -47,15 +47,34 @@ use std::{
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct DuplexStream {
read: Arc<Mutex<Pipe>>,
write: Arc<Mutex<Pipe>>,
read: Arc<Mutex<SimplexStream>>,
write: Arc<Mutex<SimplexStream>>,
}

/// A unidirectional IO over a piece of memory.
/// A unidirectional pipe to read and write bytes in memory.
///
/// Data can be written to the pipe, and reading will return that data.
/// It can be constructed by [`simplex`] function which will create a pair of
/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will
/// create a handle for both reading and writing.
///
/// # Example
///
/// ```
/// # async fn ex() -> std::io::Result<()> {
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// let (mut receiver, mut sender) = tokio::io::simplex(64);
///
/// sender.write_all(b"ping").await?;
///
/// let mut buf = [0u8; 4];
/// receiver.read_exact(&mut buf).await?;
/// assert_eq!(&buf, b"ping");
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
struct Pipe {
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct SimplexStream {
/// The buffer storing the bytes written, also read from.
///
/// Using a `BytesMut` because it has efficient `Buf` and `BufMut`
Expand Down Expand Up @@ -83,8 +102,8 @@ struct Pipe {
/// written to a side before the write returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) {
let one = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let two = Arc::new(Mutex::new(Pipe::new(max_buf_size)));
let one = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));
let two = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size)));

(
DuplexStream {
Expand Down Expand Up @@ -161,11 +180,47 @@ impl Drop for DuplexStream {
}
}

// ===== impl Pipe =====
// ===== impl SimplexStream =====

/// Creates unidirectional buffer that acts like in memory pipe.
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a buffer before the it returns `Poll::Pending`.
///
/// # Unify reader and writer
///
/// The reader and writer half can be unified into a single structure
/// of `SimplexStream` that supports both reading and writing or
/// the `SimplexStream` can be already created as unified structure
/// using [`SimplexStream::new_unsplit()`].
///
/// ```
/// # async fn ex() -> std::io::Result<()> {
/// # use tokio::io::{AsyncReadExt, AsyncWriteExt};
/// let (writer, reader) = tokio::io::simplex(64);
/// let mut simplex_stream = writer.unsplit(reader);
/// simplex_stream.write_all(b"hello").await?;
///
/// let mut buf = [0u8; 5];
/// simplex_stream.read_exact(&mut buf).await?;
/// assert_eq!(&buf, b"hello");
/// # Ok(())
/// # }
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn simplex(max_buf_size: usize) -> (ReadHalf<SimplexStream>, WriteHalf<SimplexStream>) {
split(SimplexStream::new_unsplit(max_buf_size))
}

impl Pipe {
fn new(max_buf_size: usize) -> Self {
Pipe {
impl SimplexStream {
/// Creates unidirectional buffer that acts like in memory pipe. To create split
/// version with separate reader and writer you can use [`simplex`] function.
///
/// The `max_buf_size` argument is the maximum amount of bytes that can be
/// written to a buffer before the it returns `Poll::Pending`.
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn new_unsplit(max_buf_size: usize) -> SimplexStream {
SimplexStream {
buffer: BytesMut::new(),
is_closed: false,
max_buf_size,
Expand Down Expand Up @@ -269,7 +324,7 @@ impl Pipe {
}
}

impl AsyncRead for Pipe {
impl AsyncRead for SimplexStream {
cfg_coop! {
fn poll_read(
self: Pin<&mut Self>,
Expand Down Expand Up @@ -299,7 +354,7 @@ impl AsyncRead for Pipe {
}
}

impl AsyncWrite for Pipe {
impl AsyncWrite for SimplexStream {
cfg_coop! {
fn poll_write(
self: Pin<&mut Self>,
Expand Down
2 changes: 1 addition & 1 deletion tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ cfg_io_util! {
pub use lines::Lines;

mod mem;
pub use mem::{duplex, DuplexStream};
pub use mem::{duplex, simplex, DuplexStream, SimplexStream};

mod read;
mod read_buf;
Expand Down
Loading