Skip to content

Commit

Permalink
fix FuturesUnordered
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Feb 7, 2021
1 parent 1803948 commit ff9d015
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 1 deletion.
5 changes: 4 additions & 1 deletion futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures_core::task::{Context, Poll};
use futures_task::{FutureObj, LocalFutureObj, Spawn, LocalSpawn, SpawnError};
use crate::task::AtomicWaker;
use core::cell::UnsafeCell;
use core::cmp;
use core::fmt::{self, Debug};
use core::iter::FromIterator;
use core::marker::PhantomData;
Expand Down Expand Up @@ -414,6 +415,8 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>
{
let yield_every = cmp::min(self.len(), YIELD_EVERY);

// Keep track of how many child futures we have polled,
// in case we want to forcibly yield.
let mut polled = 0;
Expand Down Expand Up @@ -548,7 +551,7 @@ impl<Fut: Future> Stream for FuturesUnordered<Fut> {
let task = bomb.task.take().unwrap();
bomb.queue.link(task);

if polled == YIELD_EVERY {
if polled >= yield_every {
// We have polled a large number of futures in a row without yielding.
// To ensure we do not starve other tasks waiting on the executor,
// we yield here, but immediately wake ourselves up to continue.
Expand Down
38 changes: 38 additions & 0 deletions futures/tests/futures_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use futures::future::Future;
use futures::stream::{FuturesUnordered, StreamExt};
use futures::task::{Context, Poll};
use futures_test::task::noop_context;
use std::iter::FromIterator;
use std::pin::Pin;

#[test]
fn is_terminated() {
use futures::future;
Expand Down Expand Up @@ -326,3 +333,34 @@ fn len_valid_during_out_of_order_completion() {
assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
assert_eq!(stream.len(), 0);
}

#[test]
fn polled_only_once_at_most_per_iteration() {
#[derive(Debug, Clone, Copy, Default)]
struct F {
polled: bool,
}

impl Future for F {
type Output = ();

fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
if self.polled {
panic!("polled twice")
} else {
self.polled = true;
Poll::Pending
}
}
}

let cx = &mut noop_context();

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(10, tasks.iter().filter(|f| f.polled).count());

let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
assert!(tasks.poll_next_unpin(cx).is_pending());
assert_eq!(32, tasks.iter().filter(|f| f.polled).count());
}

0 comments on commit ff9d015

Please sign in to comment.