Skip to content

Commit

Permalink
Do not store items field in ReadyChunks (#2656)
Browse files Browse the repository at this point in the history
`items` is always empty when `poll_ready` exists, so there's no
reason to store it inside `ReadyChunks`.

This makes code a bit more efficient, but also makes code easier
to understand.
  • Loading branch information
stepancheg committed Oct 22, 2022
1 parent 3c657e5 commit 8cfc085
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 33 deletions.
44 changes: 13 additions & 31 deletions futures-util/src/stream/stream/ready_chunks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::stream::Fuse;
use alloc::vec::Vec;
use core::mem;
use core::pin::Pin;
use futures_core::stream::{FusedStream, Stream};
use futures_core::task::{Context, Poll};
Expand All @@ -15,7 +14,6 @@ pin_project! {
pub struct ReadyChunks<St: Stream> {
#[pin]
stream: Fuse<St>,
items: Vec<St::Item>,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
}
}
Expand All @@ -24,11 +22,7 @@ impl<St: Stream> ReadyChunks<St> {
pub(super) fn new(stream: St, capacity: usize) -> Self {
assert!(capacity > 0);

Self {
stream: super::Fuse::new(stream),
items: Vec::with_capacity(capacity),
cap: capacity,
}
Self { stream: super::Fuse::new(stream), cap: capacity }
}

delegate_access_inner!(stream, St, (.));
Expand All @@ -40,40 +34,33 @@ impl<St: Stream> Stream for ReadyChunks<St> {
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();

let mut items: Vec<St::Item> = Vec::new();

loop {
match this.stream.as_mut().poll_next(cx) {
// Flush all collected data if underlying stream doesn't contain
// more ready values
Poll::Pending => {
return if this.items.is_empty() {
Poll::Pending
} else {
Poll::Ready(Some(mem::replace(this.items, Vec::with_capacity(*this.cap))))
}
return if items.is_empty() { Poll::Pending } else { Poll::Ready(Some(items)) }
}

// Push the ready item into the buffer and check whether it is full.
// If so, replace our buffer with a new and empty one and return
// the full one.
Poll::Ready(Some(item)) => {
this.items.push(item);
if this.items.len() >= *this.cap {
return Poll::Ready(Some(mem::replace(
this.items,
Vec::with_capacity(*this.cap),
)));
if items.is_empty() {
items.reserve(*this.cap);
}
items.push(item);
if items.len() >= *this.cap {
return Poll::Ready(Some(items));
}
}

// Since the underlying stream ran out of values, return what we
// have buffered, if we have anything.
Poll::Ready(None) => {
let last = if this.items.is_empty() {
None
} else {
let full_buf = mem::take(this.items);
Some(full_buf)
};
let last = if items.is_empty() { None } else { Some(items) };

return Poll::Ready(last);
}
Expand All @@ -82,20 +69,15 @@ impl<St: Stream> Stream for ReadyChunks<St> {
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = usize::from(!self.items.is_empty());
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = match upper {
Some(x) => x.checked_add(chunk_len),
None => None,
};
let lower = lower / self.cap;
(lower, upper)
}
}

impl<St: FusedStream> FusedStream for ReadyChunks<St> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
self.stream.is_terminated()
}
}

Expand Down
4 changes: 2 additions & 2 deletions futures/tests/auto_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1492,10 +1492,10 @@ pub mod stream {
assert_not_impl!(PollImmediate<PinnedStream>: Unpin);

assert_impl!(ReadyChunks<SendStream<()>>: Send);
assert_not_impl!(ReadyChunks<SendStream>: Send);
assert_impl!(ReadyChunks<SendStream>: Send);
assert_not_impl!(ReadyChunks<LocalStream>: Send);
assert_impl!(ReadyChunks<SyncStream<()>>: Sync);
assert_not_impl!(ReadyChunks<SyncStream>: Sync);
assert_impl!(ReadyChunks<SyncStream>: Sync);
assert_not_impl!(ReadyChunks<LocalStream>: Sync);
assert_impl!(ReadyChunks<UnpinStream>: Unpin);
assert_not_impl!(ReadyChunks<PinnedStream>: Unpin);
Expand Down

0 comments on commit 8cfc085

Please sign in to comment.