Skip to content

Commit

Permalink
Merge pull request #618 from srijs/feat/futures-unordered-iter-mut
Browse files Browse the repository at this point in the history
Implement FuturesUnordered::iter_mut
  • Loading branch information
alexcrichton authored Nov 2, 2017
2 parents a16b5ab + d59b8a3 commit 5a4bfcb
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 26 deletions.
67 changes: 43 additions & 24 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//! An unbounded set of futures.
use std::cell::UnsafeCell;
use std::fmt::{self, Debug};
use std::iter::FromIterator;
Expand All @@ -9,7 +11,7 @@ use std::sync::atomic::{AtomicPtr, AtomicBool};
use std::sync::{Arc, Weak};
use std::usize;

use {task, Stream, Future, Poll, Async, IntoFuture};
use {task, Stream, Future, Poll, Async};
use executor::{Notify, UnsafeNotify, NotifyHandle};
use task_impl::{self, AtomicTask};

Expand Down Expand Up @@ -51,29 +53,6 @@ pub struct FuturesUnordered<F> {
unsafe impl<T: Send> Send for FuturesUnordered<T> {}
unsafe impl<T: Sync> Sync for FuturesUnordered<T> {}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
where I: IntoIterator,
I::Item: IntoFuture
{
let mut set = FuturesUnordered::new();

for future in futures {
set.push(future.into_future());
}

return set
}

// FuturesUnordered is implemented using two linked lists. One which links all
// futures managed by a `FuturesUnordered` and one that tracks futures that have
// been scheduled for polling. The first linked list is not thread safe and is
Expand Down Expand Up @@ -207,6 +186,15 @@ impl<T> FuturesUnordered<T> {
self.inner.enqueue(ptr);
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> IterMut<T> {
IterMut {
node: self.head_all,
len: self.len,
_marker: PhantomData
}
}

fn release_node(&mut self, node: Arc<Node<T>>) {
// The future is done, try to reset the queued flag. This will prevent
// `notify` from doing any work in the future
Expand Down Expand Up @@ -440,6 +428,37 @@ impl<F: Future> FromIterator<F> for FuturesUnordered<F> {
}
}

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
pub struct IterMut<'a, F: 'a> {
node: *const Node<F>,
len: usize,
_marker: PhantomData<&'a mut FuturesUnordered<F>>
}

impl<'a, F> Iterator for IterMut<'a, F> {
type Item = &'a mut F;

fn next(&mut self) -> Option<&'a mut F> {
if self.node.is_null() {
return None;
}
unsafe {
let future = (*(*self.node).future.get()).as_mut().unwrap();
let next = *(*self.node).next_all.get();
self.node = next;
self.len -= 1;
return Some(future);
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<'a, F> ExactSizeIterator for IterMut<'a, F> {}

impl<T> Inner<T> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
fn enqueue(&self, node: *const Node<T>) {
Expand Down
28 changes: 26 additions & 2 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ if_std! {
mod wait;
mod channel;
mod split;
mod futures_unordered;
pub mod futures_unordered;
mod futures_ordered;
pub use self::buffered::Buffered;
pub use self::buffer_unordered::BufferUnordered;
Expand All @@ -112,7 +112,7 @@ if_std! {
pub use self::collect::Collect;
pub use self::wait::Wait;
pub use self::split::{SplitStream, SplitSink};
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
pub use self::futures_unordered::FuturesUnordered;
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};

#[doc(hidden)]
Expand Down Expand Up @@ -1102,3 +1102,27 @@ impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
(**self).poll()
}
}

/// Converts a list of futures into a `Stream` of results from the futures.
///
/// This function will take an list of futures (e.g. a vector, an iterator,
/// etc), and return a stream. The stream will yield items as they become
/// available on the futures internally, in the order that they become
/// available. This function is similar to `buffer_unordered` in that it may
/// return items in a different order than in the list specified.
///
/// Note that the returned set can also be used to dynamically push more
/// futures into the set as they become available.
#[cfg(feature = "use_std")]
pub fn futures_unordered<I>(futures: I) -> FuturesUnordered<<I::Item as IntoFuture>::Future>
where I: IntoIterator,
I::Item: IntoFuture
{
let mut set = FuturesUnordered::new();

for future in futures {
set.push(future.into_future());
}

return set
}
42 changes: 42 additions & 0 deletions tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,45 @@ fn finished_future_ok() {
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
}

#[test]
fn iter_mut_cancel() {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let (b_tx, b_rx) = oneshot::channel::<u32>();
let (c_tx, c_rx) = oneshot::channel::<u32>();

let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);

for rx in stream.iter_mut() {
rx.close();
}

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

let mut spawn = futures::executor::spawn(stream);
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(None, spawn.wait_stream());
}

#[test]
fn iter_mut_len() {
let mut stream = futures_unordered(vec![
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>()
]);

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 2);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 1);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 0);
assert!(iter_mut.next().is_none());
}

0 comments on commit 5a4bfcb

Please sign in to comment.