Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncBufRead trait #1552

Merged
merged 1 commit into from
Apr 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions futures-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,63 @@ mod if_std {
-> Poll<Result<u64>>;
}

/// Read bytes asynchronously.
///
/// This trait is analogous to the `std::io::BufRead` trait, but integrates
/// with the asynchronous task system. In particular, the `poll_fill_buf`
/// method, unlike `BufRead::fill_buf`, will automatically queue the current task
/// for wakeup and return if data is not yet available, rather than blocking
/// the calling thread.
pub trait AsyncBufRead: AsyncRead {
/// Attempt to return the contents of the internal buffer, filling it with more data
/// from the inner reader if it is empty.
///
/// On success, returns `Poll::Ready(Ok(buf))`.
///
/// If no data is available for reading, the method returns
/// `Poll::Pending` and arranges for the current task (via
/// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes
/// readable or is closed.
///
/// This function is a lower-level call. It needs to be paired with the
/// [`consume`] method to function properly. When calling this
/// method, none of the contents will be "read" in the sense that later
/// calling [`poll_read`] may return the same contents. As such, [`consume`] must
/// be called with the number of bytes that are consumed from this buffer to
/// ensure that the bytes are never returned twice.
///
/// [`poll_read`]: AsyncRead::poll_read
/// [`consume`]: AsyncBufRead::consume
///
/// An empty buffer returned indicates that the stream has reached EOF.
///
/// # Implementation
///
/// This function may not return errors of kind `WouldBlock` or
/// `Interrupted`. Implementations must convert `WouldBlock` into
/// `Poll::Pending` and either internally retry or convert
/// `Interrupted` into another error kind.
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
-> Poll<Result<&'a [u8]>>;

/// Tells this buffer that `amt` bytes have been consumed from the buffer,
/// so they should no longer be returned in calls to [`poll_read`].
///
/// This function is a lower-level call. It needs to be paired with the
/// [`poll_fill_buf`] method to function properly. This function does
/// not perform any I/O, it simply informs this object that some amount of
/// its buffer, returned from [`poll_fill_buf`], has been consumed and should
/// no longer be returned. As such, this function may do odd things if
/// [`poll_fill_buf`] isn't called before calling it.
///
/// The `amt` must be `<=` the number of bytes in the buffer returned by
/// [`poll_fill_buf`].
///
/// [`poll_read`]: AsyncRead::poll_read
/// [`poll_fill_buf`]: AsyncBufRead::poll_fill_buf
fn consume(self: Pin<&mut Self>, amt: usize);
}

macro_rules! deref_async_read {
() => {
unsafe fn initializer(&self) -> Initializer {
Expand Down Expand Up @@ -340,6 +397,10 @@ mod if_std {
unsafe_delegate_async_read_to_stdio!();
}

impl AsyncRead for StdIo::Empty {
unsafe_delegate_async_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncRead for StdIo::Cursor<T> {
unsafe_delegate_async_read_to_stdio!();
}
Expand Down Expand Up @@ -499,6 +560,70 @@ mod if_std {
impl<T: AsRef<[u8]> + Unpin> AsyncSeek for StdIo::Cursor<T> {
delegate_async_seek_to_stdio!();
}

macro_rules! deref_async_buf_read {
() => {
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
-> Poll<Result<&'a [u8]>>
{
Pin::new(&mut **self.get_mut()).poll_fill_buf(cx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
Pin::new(&mut **self.get_mut()).consume(amt)
}
}
}

impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for Box<T> {
deref_async_buf_read!();
}

impl<T: ?Sized + AsyncBufRead + Unpin> AsyncBufRead for &mut T {
deref_async_buf_read!();
}

impl<P> AsyncBufRead for Pin<P>
where
P: DerefMut + Unpin,
P::Target: AsyncBufRead,
{
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>)
-> Poll<Result<&'a [u8]>>
{
self.get_mut().as_mut().poll_fill_buf(cx)
}

fn consume(self: Pin<&mut Self>, amt: usize) {
self.get_mut().as_mut().consume(amt)
}
}

macro_rules! delegate_async_buf_read_to_stdio {
() => {
fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>)
-> Poll<Result<&'a [u8]>>
{
Poll::Ready(StdIo::BufRead::fill_buf(self.get_mut()))
}

fn consume(self: Pin<&mut Self>, amt: usize) {
StdIo::BufRead::consume(self.get_mut(), amt)
}
}
}

impl AsyncBufRead for &[u8] {
delegate_async_buf_read_to_stdio!();
}

impl AsyncBufRead for StdIo::Empty {
delegate_async_buf_read_to_stdio!();
}

impl<T: AsRef<[u8]> + Unpin> AsyncBufRead for StdIo::Cursor<T> {
delegate_async_buf_read_to_stdio!();
}
}

#[cfg(feature = "std")]
Expand Down
3 changes: 2 additions & 1 deletion futures-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ Common utilities and extension traits for the futures-rs library.
name = "futures_util"

[features]
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab"]
std = ["alloc", "futures-core-preview/std", "futures-io-preview/std", "futures-sink-preview/std", "futures-select-macro-preview/std", "rand", "rand_core", "slab", "memchr"]
default = ["std"]
compat = ["std", "futures_01"]
io-compat = ["compat", "tokio-io"]
Expand All @@ -36,6 +36,7 @@ proc-macro-nested = "0.1.2"
rand = { version = "0.6.4", optional = true }
rand_core = { version = ">=0.2.2, <0.4", optional = true } # See https://github.com/rust-random/rand/issues/645
slab = { version = "0.4", optional = true }
memchr = { version = "2.2", optional = true }
futures_01 = { version = "0.1.25", optional = true, package = "futures" }
tokio-io = { version = "0.1.9", optional = true }
pin-utils = "0.1.0-alpha.4"
Expand Down
26 changes: 23 additions & 3 deletions futures-util/src/io/allow_std.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use futures_core::task::{Context, Poll};
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek};
use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
use std::{fmt, io};
use std::pin::Pin;
use std::string::String;
use std::vec::Vec;

/// A simple wrapper type which allows types which implement only
/// implement `std::io::Read` or `std::io::Write`
Expand Down Expand Up @@ -130,3 +128,25 @@ impl<T> AsyncSeek for AllowStdIo<T> where T: io::Seek {
Poll::Ready(Ok(try_with_interrupt!(self.0.seek(pos))))
}
}

impl<T> io::BufRead for AllowStdIo<T> where T: io::BufRead {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.0.fill_buf()
}
fn consume(&mut self, amt: usize) {
self.0.consume(amt)
}
}

impl<T> AsyncBufRead for AllowStdIo<T> where T: io::BufRead {
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
-> Poll<io::Result<&'a [u8]>>
{
let this: *mut Self = &mut *self as *mut _;
Poll::Ready(Ok(try_with_interrupt!(unsafe { &mut *this }.0.fill_buf())))
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.0.consume(amt)
}
}
70 changes: 0 additions & 70 deletions futures-util/src/io/disabled/read_until.rs

This file was deleted.

81 changes: 72 additions & 9 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,12 @@
//! `AsyncReadExt` and `AsyncWriteExt` traits which add methods
//! to the `AsyncRead` and `AsyncWrite` types.

use std::vec::Vec;

pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, IoVec, SeekFrom};
pub use futures_io::{
AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead, IoVec, SeekFrom,
};

#[cfg(feature = "io-compat")] use crate::compat::Compat;

// Temporarily removed until AsyncBufRead is implemented
// pub use io::lines::{lines, Lines};
// pub use io::read_until::{read_until, ReadUntil};
// mod lines;
// mod read_until;

mod allow_std;
pub use self::allow_std::AllowStdIo;

Expand All @@ -26,15 +20,26 @@ pub use self::copy_into::CopyInto;
mod flush;
pub use self::flush::Flush;

// TODO
// mod lines;
// pub use self::lines::Lines;

mod read;
pub use self::read::Read;

mod read_exact;
pub use self::read_exact::ReadExact;

// TODO
// mod read_line;
// pub use self::read_line::ReadLine;

mod read_to_end;
pub use self::read_to_end::ReadToEnd;

mod read_until;
pub use self::read_until::ReadUntil;

mod close;
pub use self::close::Close;

Expand Down Expand Up @@ -343,3 +348,61 @@ pub trait AsyncSeekExt: AsyncSeek {
}

impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}

/// An extension trait which adds utility methods to `AsyncBufRead` types.
pub trait AsyncBufReadExt: AsyncBufRead {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine we'll also want read_line, lines etc. but those can wait for a followup

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until the delimiter `byte` or EOF is reached.
/// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
///
/// This function will read bytes from the underlying stream until the
/// delimiter or EOF is found. Once found, all bytes up to, and including,
/// the delimiter (if found) will be appended to `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
/// # Examples
///
/// ```
/// #![feature(async_await, await_macro)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncBufReadExt;
/// use std::io::Cursor;
///
/// let mut cursor = Cursor::new(b"lorem-ipsum");
/// let mut buf = vec![];
///
/// // cursor is at 'l'
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
/// assert_eq!(num_bytes, 6);
/// assert_eq!(buf, b"lorem-");
/// buf.clear();
///
/// // cursor is at 'i'
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
/// assert_eq!(num_bytes, 5);
/// assert_eq!(buf, b"ipsum");
/// buf.clear();
///
/// // cursor is at EOF
/// let num_bytes = await!(cursor.read_until(b'-', &mut buf))?;
/// assert_eq!(num_bytes, 0);
/// assert_eq!(buf, b"");
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
/// ```
fn read_until<'a>(
&'a mut self,
byte: u8,
buf: &'a mut Vec<u8>,
) -> ReadUntil<'a, Self>
where Self: Unpin,
{
ReadUntil::new(self, byte, buf)
}
}

impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
Loading