From d46e78744773756ecf2a58b474fdb9025ed5064c Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Fri, 17 Jan 2025 01:30:05 +1100 Subject: [PATCH] Move `std::pipe::*` into `std::io` Signed-off-by: Jiahao XU --- std/src/io/mod.rs | 249 +++++++++++++++++++++ std/src/io/tests.rs | 17 ++ std/src/lib.rs | 2 - std/src/pipe.rs | 258 ---------------------- std/src/pipe/tests.rs | 19 -- std/src/sys/anonymous_pipe/unix.rs | 3 +- std/src/sys/anonymous_pipe/unsupported.rs | 3 +- std/src/sys/anonymous_pipe/windows.rs | 4 +- std/tests/pipe_subprocess.rs | 3 +- 9 files changed, 271 insertions(+), 287 deletions(-) delete mode 100644 std/src/pipe.rs delete mode 100644 std/src/pipe/tests.rs diff --git a/std/src/io/mod.rs b/std/src/io/mod.rs index 7912f969bbd9f..231c8712ebd55 100644 --- a/std/src/io/mod.rs +++ b/std/src/io/mod.rs @@ -330,6 +330,7 @@ pub use self::{ }; use crate::mem::take; use crate::ops::{Deref, DerefMut}; +use crate::sys::anonymous_pipe::{AnonPipe, pipe as pipe_inner}; use crate::{cmp, fmt, slice, str, sys}; mod buffered; @@ -3250,3 +3251,251 @@ impl Iterator for Lines { } } } + +/// Create anonymous pipe that is close-on-exec and blocking. +/// +/// # Behavior +/// +/// A pipe is a synchronous, unidirectional data channel between two or more processes, like an +/// interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular: +/// +/// * A read on a [`PipeReader`] blocks until the pipe is non-empty. +/// * A write on a [`PipeWriter`] blocks when the pipe is full. +/// * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`] +/// returns EOF. +/// * [`PipeReader`] can be shared, but only one process will consume the data in the pipe. +/// +/// # Capacity +/// +/// Pipe capacity is platform dependent. To quote the Linux [man page]: +/// +/// > Different implementations have different limits for the pipe capacity. Applications should +/// > not rely on a particular capacity: an application should be designed so that a reading process +/// > consumes data as soon as it is available, so that a writing process does not remain blocked. +/// +/// # Examples +/// +/// ```no_run +/// #![feature(anonymous_pipe)] +/// # #[cfg(miri)] fn main() {} +/// # #[cfg(not(miri))] +/// # fn main() -> std::io::Result<()> { +/// # use std::process::Command; +/// # use std::io::{Read, Write}; +/// let (ping_rx, mut ping_tx) = std::io::pipe()?; +/// let (mut pong_rx, pong_tx) = std::io::pipe()?; +/// +/// // Spawn a process that echoes its input. +/// let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?; +/// +/// ping_tx.write_all(b"hello")?; +/// // Close to unblock echo_server's reader. +/// drop(ping_tx); +/// +/// let mut buf = String::new(); +/// // Block until echo_server's writer is closed. +/// pong_rx.read_to_string(&mut buf)?; +/// assert_eq!(&buf, "hello"); +/// +/// echo_server.wait()?; +/// # Ok(()) +/// # } +/// ``` +/// [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html +/// [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe +/// [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html +#[unstable(feature = "anonymous_pipe", issue = "127154")] +#[inline] +pub fn pipe() -> Result<(PipeReader, PipeWriter)> { + pipe_inner().map(|(reader, writer)| (PipeReader(reader), PipeWriter(writer))) +} + +/// Read end of the anonymous pipe. +#[unstable(feature = "anonymous_pipe", issue = "127154")] +#[derive(Debug)] +pub struct PipeReader(pub(crate) AnonPipe); + +/// Write end of the anonymous pipe. +#[unstable(feature = "anonymous_pipe", issue = "127154")] +#[derive(Debug)] +pub struct PipeWriter(pub(crate) AnonPipe); + +impl PipeReader { + /// Create a new [`PipeReader`] instance that shares the same underlying file description. + /// + /// # Examples + /// + /// ```no_run + /// #![feature(anonymous_pipe)] + /// # #[cfg(miri)] fn main() {} + /// # #[cfg(not(miri))] + /// # fn main() -> std::io::Result<()> { + /// # use std::fs; + /// # use std::io::Write; + /// # use std::process::Command; + /// const NUM_SLOT: u8 = 2; + /// const NUM_PROC: u8 = 5; + /// const OUTPUT: &str = "work.txt"; + /// + /// let mut jobs = vec![]; + /// let (reader, mut writer) = std::io::pipe()?; + /// + /// // Write NUM_SLOT characters the pipe. + /// writer.write_all(&[b'|'; NUM_SLOT as usize])?; + /// + /// // Spawn several processes that read a character from the pipe, do some work, then + /// // write back to the pipe. When the pipe is empty, the processes block, so only + /// // NUM_SLOT processes can be working at any given time. + /// for _ in 0..NUM_PROC { + /// jobs.push( + /// Command::new("bash") + /// .args(["-c", + /// &format!( + /// "read -n 1\n\ + /// echo -n 'x' >> '{OUTPUT}'\n\ + /// echo -n '|'", + /// ), + /// ]) + /// .stdin(reader.try_clone()?) + /// .stdout(writer.try_clone()?) + /// .spawn()?, + /// ); + /// } + /// + /// // Wait for all jobs to finish. + /// for mut job in jobs { + /// job.wait()?; + /// } + /// + /// // Check our work and clean up. + /// let xs = fs::read_to_string(OUTPUT)?; + /// fs::remove_file(OUTPUT)?; + /// assert_eq!(xs, "x".repeat(NUM_PROC.into())); + /// # Ok(()) + /// # } + /// ``` + #[unstable(feature = "anonymous_pipe", issue = "127154")] + pub fn try_clone(&self) -> Result { + self.0.try_clone().map(Self) + } +} + +impl PipeWriter { + /// Create a new [`PipeWriter`] instance that shares the same underlying file description. + /// + /// # Examples + /// + /// ```no_run + /// #![feature(anonymous_pipe)] + /// # #[cfg(miri)] fn main() {} + /// # #[cfg(not(miri))] + /// # fn main() -> std::io::Result<()> { + /// # use std::process::Command; + /// # use std::io::Read; + /// let (mut reader, writer) = std::io::pipe()?; + /// + /// // Spawn a process that writes to stdout and stderr. + /// let mut peer = Command::new("bash") + /// .args([ + /// "-c", + /// "echo -n foo\n\ + /// echo -n bar >&2" + /// ]) + /// .stdout(writer.try_clone()?) + /// .stderr(writer) + /// .spawn()?; + /// + /// // Read and check the result. + /// let mut msg = String::new(); + /// reader.read_to_string(&mut msg)?; + /// assert_eq!(&msg, "foobar"); + /// + /// peer.wait()?; + /// # Ok(()) + /// # } + /// ``` + #[unstable(feature = "anonymous_pipe", issue = "127154")] + pub fn try_clone(&self) -> Result { + self.0.try_clone().map(Self) + } +} + +#[unstable(feature = "anonymous_pipe", issue = "127154")] +impl Read for &PipeReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.0.read(buf) + } + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result { + self.0.read_vectored(bufs) + } + #[inline] + fn is_read_vectored(&self) -> bool { + self.0.is_read_vectored() + } + fn read_to_end(&mut self, buf: &mut Vec) -> Result { + self.0.read_to_end(buf) + } + fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> { + self.0.read_buf(buf) + } +} + +#[unstable(feature = "anonymous_pipe", issue = "127154")] +impl Read for PipeReader { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.0.read(buf) + } + fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result { + self.0.read_vectored(bufs) + } + #[inline] + fn is_read_vectored(&self) -> bool { + self.0.is_read_vectored() + } + fn read_to_end(&mut self, buf: &mut Vec) -> Result { + self.0.read_to_end(buf) + } + fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<()> { + self.0.read_buf(buf) + } +} + +#[unstable(feature = "anonymous_pipe", issue = "127154")] +impl Write for &PipeWriter { + fn write(&mut self, buf: &[u8]) -> Result { + self.0.write(buf) + } + #[inline] + fn flush(&mut self) -> Result<()> { + Ok(()) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result { + self.0.write_vectored(bufs) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } +} + +#[unstable(feature = "anonymous_pipe", issue = "127154")] +impl Write for PipeWriter { + fn write(&mut self, buf: &[u8]) -> Result { + self.0.write(buf) + } + #[inline] + fn flush(&mut self) -> Result<()> { + Ok(()) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result { + self.0.write_vectored(bufs) + } + + #[inline] + fn is_write_vectored(&self) -> bool { + self.0.is_write_vectored() + } +} diff --git a/std/src/io/tests.rs b/std/src/io/tests.rs index 47cbb9614afdd..85098b3bb181f 100644 --- a/std/src/io/tests.rs +++ b/std/src/io/tests.rs @@ -823,3 +823,20 @@ fn try_oom_error() { let io_err = io::Error::from(reserve_err); assert_eq!(io::ErrorKind::OutOfMemory, io_err.kind()); } + +#[test] +#[cfg(all(windows, unix, not(miri)))] +fn pipe_creation_clone_and_rw() { + let (rx, tx) = std::io::pipe().unwrap(); + + tx.try_clone().unwrap().write_all(b"12345").unwrap(); + drop(tx); + + let mut rx2 = rx.try_clone().unwrap(); + drop(rx); + + let mut s = String::new(); + rx2.read_to_string(&mut s).unwrap(); + drop(rx2); + assert_eq!(s, "12345"); +} diff --git a/std/src/lib.rs b/std/src/lib.rs index 5c12236617c98..38d7ecc736392 100644 --- a/std/src/lib.rs +++ b/std/src/lib.rs @@ -596,8 +596,6 @@ pub mod panic; #[unstable(feature = "pattern_type_macro", issue = "123646")] pub mod pat; pub mod path; -#[unstable(feature = "anonymous_pipe", issue = "127154")] -pub mod pipe; pub mod process; #[unstable(feature = "random", issue = "130703")] pub mod random; diff --git a/std/src/pipe.rs b/std/src/pipe.rs deleted file mode 100644 index 913c22588a76c..0000000000000 --- a/std/src/pipe.rs +++ /dev/null @@ -1,258 +0,0 @@ -//! A cross-platform anonymous pipe. -//! -//! This module provides support for anonymous OS pipes, like [pipe] on Linux or [CreatePipe] on -//! Windows. -//! -//! # Behavior -//! -//! A pipe is a synchronous, unidirectional data channel between two or more processes, like an -//! interprocess [`mpsc`](crate::sync::mpsc) provided by the OS. In particular: -//! -//! * A read on a [`PipeReader`] blocks until the pipe is non-empty. -//! * A write on a [`PipeWriter`] blocks when the pipe is full. -//! * When all copies of a [`PipeWriter`] are closed, a read on the corresponding [`PipeReader`] -//! returns EOF. -//! * [`PipeReader`] can be shared, but only one process will consume the data in the pipe. -//! -//! # Capacity -//! -//! Pipe capacity is platform dependent. To quote the Linux [man page]: -//! -//! > Different implementations have different limits for the pipe capacity. Applications should -//! > not rely on a particular capacity: an application should be designed so that a reading process -//! > consumes data as soon as it is available, so that a writing process does not remain blocked. -//! -//! # Examples -//! -//! ```no_run -//! #![feature(anonymous_pipe)] -//! # #[cfg(miri)] fn main() {} -//! # #[cfg(not(miri))] -//! # fn main() -> std::io::Result<()> { -//! # use std::process::Command; -//! # use std::io::{Read, Write}; -//! let (ping_rx, mut ping_tx) = std::pipe::pipe()?; -//! let (mut pong_rx, pong_tx) = std::pipe::pipe()?; -//! -//! // Spawn a process that echoes its input. -//! let mut echo_server = Command::new("cat").stdin(ping_rx).stdout(pong_tx).spawn()?; -//! -//! ping_tx.write_all(b"hello")?; -//! // Close to unblock echo_server's reader. -//! drop(ping_tx); -//! -//! let mut buf = String::new(); -//! // Block until echo_server's writer is closed. -//! pong_rx.read_to_string(&mut buf)?; -//! assert_eq!(&buf, "hello"); -//! -//! echo_server.wait()?; -//! # Ok(()) -//! # } -//! ``` -//! [pipe]: https://man7.org/linux/man-pages/man2/pipe.2.html -//! [CreatePipe]: https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-createpipe -//! [man page]: https://man7.org/linux/man-pages/man7/pipe.7.html -use crate::io; -use crate::sys::anonymous_pipe::{AnonPipe, pipe as pipe_inner}; - -/// Create anonymous pipe that is close-on-exec and blocking. -/// -/// # Examples -/// -/// See the [module-level](crate::pipe) documentation for examples. -#[unstable(feature = "anonymous_pipe", issue = "127154")] -#[inline] -pub fn pipe() -> io::Result<(PipeReader, PipeWriter)> { - pipe_inner().map(|(reader, writer)| (PipeReader(reader), PipeWriter(writer))) -} - -/// Read end of the anonymous pipe. -#[unstable(feature = "anonymous_pipe", issue = "127154")] -#[derive(Debug)] -pub struct PipeReader(pub(crate) AnonPipe); - -/// Write end of the anonymous pipe. -#[unstable(feature = "anonymous_pipe", issue = "127154")] -#[derive(Debug)] -pub struct PipeWriter(pub(crate) AnonPipe); - -impl PipeReader { - /// Create a new [`PipeReader`] instance that shares the same underlying file description. - /// - /// # Examples - /// - /// ```no_run - /// #![feature(anonymous_pipe)] - /// # #[cfg(miri)] fn main() {} - /// # #[cfg(not(miri))] - /// # fn main() -> std::io::Result<()> { - /// # use std::fs; - /// # use std::io::Write; - /// # use std::process::Command; - /// const NUM_SLOT: u8 = 2; - /// const NUM_PROC: u8 = 5; - /// const OUTPUT: &str = "work.txt"; - /// - /// let mut jobs = vec![]; - /// let (reader, mut writer) = std::pipe::pipe()?; - /// - /// // Write NUM_SLOT characters the pipe. - /// writer.write_all(&[b'|'; NUM_SLOT as usize])?; - /// - /// // Spawn several processes that read a character from the pipe, do some work, then - /// // write back to the pipe. When the pipe is empty, the processes block, so only - /// // NUM_SLOT processes can be working at any given time. - /// for _ in 0..NUM_PROC { - /// jobs.push( - /// Command::new("bash") - /// .args(["-c", - /// &format!( - /// "read -n 1\n\ - /// echo -n 'x' >> '{OUTPUT}'\n\ - /// echo -n '|'", - /// ), - /// ]) - /// .stdin(reader.try_clone()?) - /// .stdout(writer.try_clone()?) - /// .spawn()?, - /// ); - /// } - /// - /// // Wait for all jobs to finish. - /// for mut job in jobs { - /// job.wait()?; - /// } - /// - /// // Check our work and clean up. - /// let xs = fs::read_to_string(OUTPUT)?; - /// fs::remove_file(OUTPUT)?; - /// assert_eq!(xs, "x".repeat(NUM_PROC.into())); - /// # Ok(()) - /// # } - /// ``` - #[unstable(feature = "anonymous_pipe", issue = "127154")] - pub fn try_clone(&self) -> io::Result { - self.0.try_clone().map(Self) - } -} - -impl PipeWriter { - /// Create a new [`PipeWriter`] instance that shares the same underlying file description. - /// - /// # Examples - /// - /// ```no_run - /// #![feature(anonymous_pipe)] - /// # #[cfg(miri)] fn main() {} - /// # #[cfg(not(miri))] - /// # fn main() -> std::io::Result<()> { - /// # use std::process::Command; - /// # use std::io::Read; - /// let (mut reader, writer) = std::pipe::pipe()?; - /// - /// // Spawn a process that writes to stdout and stderr. - /// let mut peer = Command::new("bash") - /// .args([ - /// "-c", - /// "echo -n foo\n\ - /// echo -n bar >&2" - /// ]) - /// .stdout(writer.try_clone()?) - /// .stderr(writer) - /// .spawn()?; - /// - /// // Read and check the result. - /// let mut msg = String::new(); - /// reader.read_to_string(&mut msg)?; - /// assert_eq!(&msg, "foobar"); - /// - /// peer.wait()?; - /// # Ok(()) - /// # } - /// ``` - #[unstable(feature = "anonymous_pipe", issue = "127154")] - pub fn try_clone(&self) -> io::Result { - self.0.try_clone().map(Self) - } -} - -#[unstable(feature = "anonymous_pipe", issue = "127154")] -impl io::Read for &PipeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) - } - fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { - self.0.read_vectored(bufs) - } - #[inline] - fn is_read_vectored(&self) -> bool { - self.0.is_read_vectored() - } - fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - self.0.read_to_end(buf) - } - fn read_buf(&mut self, buf: io::BorrowedCursor<'_>) -> io::Result<()> { - self.0.read_buf(buf) - } -} - -#[unstable(feature = "anonymous_pipe", issue = "127154")] -impl io::Read for PipeReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.0.read(buf) - } - fn read_vectored(&mut self, bufs: &mut [io::IoSliceMut<'_>]) -> io::Result { - self.0.read_vectored(bufs) - } - #[inline] - fn is_read_vectored(&self) -> bool { - self.0.is_read_vectored() - } - fn read_to_end(&mut self, buf: &mut Vec) -> io::Result { - self.0.read_to_end(buf) - } - fn read_buf(&mut self, buf: io::BorrowedCursor<'_>) -> io::Result<()> { - self.0.read_buf(buf) - } -} - -#[unstable(feature = "anonymous_pipe", issue = "127154")] -impl io::Write for &PipeWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) - } - #[inline] - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - self.0.write_vectored(bufs) - } - - #[inline] - fn is_write_vectored(&self) -> bool { - self.0.is_write_vectored() - } -} - -#[unstable(feature = "anonymous_pipe", issue = "127154")] -impl io::Write for PipeWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.0.write(buf) - } - #[inline] - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result { - self.0.write_vectored(bufs) - } - - #[inline] - fn is_write_vectored(&self) -> bool { - self.0.is_write_vectored() - } -} diff --git a/std/src/pipe/tests.rs b/std/src/pipe/tests.rs deleted file mode 100644 index 9c38e10678752..0000000000000 --- a/std/src/pipe/tests.rs +++ /dev/null @@ -1,19 +0,0 @@ -use crate::io::{Read, Write}; -use crate::pipe::pipe; - -#[test] -#[cfg(all(windows, unix, not(miri)))] -fn pipe_creation_clone_and_rw() { - let (rx, tx) = pipe().unwrap(); - - tx.try_clone().unwrap().write_all(b"12345").unwrap(); - drop(tx); - - let mut rx2 = rx.try_clone().unwrap(); - drop(rx); - - let mut s = String::new(); - rx2.read_to_string(&mut s).unwrap(); - drop(rx2); - assert_eq!(s, "12345"); -} diff --git a/std/src/sys/anonymous_pipe/unix.rs b/std/src/sys/anonymous_pipe/unix.rs index 9168024730e67..9e398765634b7 100644 --- a/std/src/sys/anonymous_pipe/unix.rs +++ b/std/src/sys/anonymous_pipe/unix.rs @@ -1,6 +1,5 @@ -use crate::io; +use crate::io::{self, PipeReader, PipeWriter}; use crate::os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; -use crate::pipe::{PipeReader, PipeWriter}; use crate::process::Stdio; use crate::sys::fd::FileDesc; use crate::sys::pipe::anon_pipe; diff --git a/std/src/sys/anonymous_pipe/unsupported.rs b/std/src/sys/anonymous_pipe/unsupported.rs index dd51e70315e96..4e79ac9c21aad 100644 --- a/std/src/sys/anonymous_pipe/unsupported.rs +++ b/std/src/sys/anonymous_pipe/unsupported.rs @@ -1,5 +1,4 @@ -use crate::io; -use crate::pipe::{PipeReader, PipeWriter}; +use crate::io::{self, PipeReader, PipeWriter}; use crate::process::Stdio; pub use crate::sys::pipe::AnonPipe; diff --git a/std/src/sys/anonymous_pipe/windows.rs b/std/src/sys/anonymous_pipe/windows.rs index a48198f8a812b..eb7fa8ec1c9a1 100644 --- a/std/src/sys/anonymous_pipe/windows.rs +++ b/std/src/sys/anonymous_pipe/windows.rs @@ -1,12 +1,12 @@ +use crate::io::{self, PipeReader, PipeWriter}; use crate::os::windows::io::{ AsHandle, AsRawHandle, BorrowedHandle, FromRawHandle, IntoRawHandle, OwnedHandle, RawHandle, }; -use crate::pipe::{PipeReader, PipeWriter}; use crate::process::Stdio; +use crate::ptr; use crate::sys::c; use crate::sys::handle::Handle; use crate::sys_common::{FromInner, IntoInner}; -use crate::{io, ptr}; pub type AnonPipe = Handle; diff --git a/std/tests/pipe_subprocess.rs b/std/tests/pipe_subprocess.rs index 1535742a83a21..df946cdcf2b70 100644 --- a/std/tests/pipe_subprocess.rs +++ b/std/tests/pipe_subprocess.rs @@ -3,8 +3,7 @@ fn main() { #[cfg(all(not(miri), any(unix, windows)))] { - use std::io::Read; - use std::pipe::pipe; + use std::io::{Read, pipe}; use std::{env, process}; if env::var("I_AM_THE_CHILD").is_ok() {