diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index b35a20dd35b..99cabde0ab8 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -271,8 +271,8 @@ cfg_io_util! { pub(crate) mod seek; pub(crate) mod util; pub use util::{ - copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, - BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, + copy, copy_bidirectional, copy_bidirectional_with_sizes, copy_buf, duplex, empty, repeat, sink, simplex, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, SimplexStream, }; } diff --git a/tokio/src/io/util/mem.rs b/tokio/src/io/util/mem.rs index 96676e64cff..15916cc0052 100644 --- a/tokio/src/io/util/mem.rs +++ b/tokio/src/io/util/mem.rs @@ -1,6 +1,6 @@ //! In-process memory IO types. -use crate::io::{AsyncRead, AsyncWrite, ReadBuf}; +use crate::io::{split, AsyncRead, AsyncWrite, ReadBuf, ReadHalf, WriteHalf}; use crate::loom::sync::Mutex; use bytes::{Buf, BytesMut}; @@ -47,15 +47,34 @@ use std::{ #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct DuplexStream { - read: Arc>, - write: Arc>, + read: Arc>, + write: Arc>, } -/// A unidirectional IO over a piece of memory. +/// A unidirectional pipe to read and write bytes in memory. /// -/// Data can be written to the pipe, and reading will return that data. +/// It can be constructed by [`simplex`] function which will create a pair of +/// reader and writer or by calling [`SimplexStream::new_unsplit`] that will +/// create a handle for both reading and writing. +/// +/// # Example +/// +/// ``` +/// # async fn ex() -> std::io::Result<()> { +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// let (mut receiver, mut sender) = tokio::io::simplex(64); +/// +/// sender.write_all(b"ping").await?; +/// +/// let mut buf = [0u8; 4]; +/// receiver.read_exact(&mut buf).await?; +/// assert_eq!(&buf, b"ping"); +/// # Ok(()) +/// # } +/// ``` #[derive(Debug)] -struct Pipe { +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] +pub struct SimplexStream { /// The buffer storing the bytes written, also read from. /// /// Using a `BytesMut` because it has efficient `Buf` and `BufMut` @@ -83,8 +102,8 @@ struct Pipe { /// written to a side before the write returns `Poll::Pending`. #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub fn duplex(max_buf_size: usize) -> (DuplexStream, DuplexStream) { - let one = Arc::new(Mutex::new(Pipe::new(max_buf_size))); - let two = Arc::new(Mutex::new(Pipe::new(max_buf_size))); + let one = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size))); + let two = Arc::new(Mutex::new(SimplexStream::new_unsplit(max_buf_size))); ( DuplexStream { @@ -161,11 +180,47 @@ impl Drop for DuplexStream { } } -// ===== impl Pipe ===== +// ===== impl SimplexStream ===== + +/// Creates unidirectional buffer that acts like in memory pipe. +/// +/// The `max_buf_size` argument is the maximum amount of bytes that can be +/// written to a buffer before the it returns `Poll::Pending`. +/// +/// # Unify reader and writer +/// +/// The reader and writer half can be unified into a single structure +/// of `SimplexStream` that supports both reading and writing or +/// the `SimplexStream` can be already created as unified structure +/// using [`SimplexStream::new_unsplit()`]. +/// +/// ``` +/// # async fn ex() -> std::io::Result<()> { +/// # use tokio::io::{AsyncReadExt, AsyncWriteExt}; +/// let (writer, reader) = tokio::io::simplex(64); +/// let mut simplex_stream = writer.unsplit(reader); +/// simplex_stream.write_all(b"hello").await?; +/// +/// let mut buf = [0u8; 5]; +/// simplex_stream.read_exact(&mut buf).await?; +/// assert_eq!(&buf, b"hello"); +/// # Ok(()) +/// # } +/// ``` +#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] +pub fn simplex(max_buf_size: usize) -> (ReadHalf, WriteHalf) { + split(SimplexStream::new_unsplit(max_buf_size)) +} -impl Pipe { - fn new(max_buf_size: usize) -> Self { - Pipe { +impl SimplexStream { + /// Creates unidirectional buffer that acts like in memory pipe. To create split + /// version with separate reader and writer you can use [`simplex`] function. + /// + /// The `max_buf_size` argument is the maximum amount of bytes that can be + /// written to a buffer before the it returns `Poll::Pending`. + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub fn new_unsplit(max_buf_size: usize) -> SimplexStream { + SimplexStream { buffer: BytesMut::new(), is_closed: false, max_buf_size, @@ -269,7 +324,7 @@ impl Pipe { } } -impl AsyncRead for Pipe { +impl AsyncRead for SimplexStream { cfg_coop! { fn poll_read( self: Pin<&mut Self>, @@ -299,7 +354,7 @@ impl AsyncRead for Pipe { } } -impl AsyncWrite for Pipe { +impl AsyncWrite for SimplexStream { cfg_coop! { fn poll_write( self: Pin<&mut Self>, diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 5010fc01d29..b2f8618c7ee 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -42,7 +42,7 @@ cfg_io_util! { pub use lines::Lines; mod mem; - pub use mem::{duplex, DuplexStream}; + pub use mem::{duplex, simplex, DuplexStream, SimplexStream}; mod read; mod read_buf;