Skip to content

Commit

Permalink
Implement FuturesUnordered::iter_mut
Browse files Browse the repository at this point in the history
  • Loading branch information
srijs committed Oct 22, 2017
1 parent 2b3f0f0 commit d022efb
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 1 deletion.
40 changes: 40 additions & 0 deletions src/stream/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,15 @@ impl<T> FuturesUnordered<T> {
self.inner.enqueue(ptr);
}

/// Returns an iterator that allows modifying each future in the set.
pub fn iter_mut(&mut self) -> FuturesUnorderedIterMut<T> {
FuturesUnorderedIterMut {
node: self.head_all,
len: self.len,
set: self
}
}

fn release_node(&mut self, node: Arc<Node<T>>) {
// The future is done, try to reset the queued flag. This will prevent
// `notify` from doing any work in the future
Expand Down Expand Up @@ -427,6 +436,37 @@ impl<T> Drop for FuturesUnordered<T> {
}
}

#[derive(Debug)]
/// Mutable iterator over all futures in the unordered set.
pub struct FuturesUnorderedIterMut<'a, F: 'a> {
set: &'a mut FuturesUnordered<F>,
len: usize,
node: *const Node<F>
}

impl<'a, F> Iterator for FuturesUnorderedIterMut<'a, F> {
type Item = &'a mut F;

fn next(&mut self) -> Option<&'a mut F> {
if self.node.is_null() {
return None;
}
unsafe {
let future = (*(*self.node).future.get()).as_mut().unwrap();
let next = *(*self.node).next_all.get();
self.node = next;
self.len -= 1;
return Some(future);
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.len, Some(self.len))
}
}

impl<'a, F> ExactSizeIterator for FuturesUnorderedIterMut<'a, F> {}

impl<T> Inner<T> {
/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
fn enqueue(&self, node: *const Node<T>) {
Expand Down
2 changes: 1 addition & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ if_std! {
pub use self::collect::Collect;
pub use self::wait::Wait;
pub use self::split::{SplitStream, SplitSink};
pub use self::futures_unordered::{futures_unordered, FuturesUnordered};
pub use self::futures_unordered::{futures_unordered, FuturesUnordered, FuturesUnorderedIterMut};
pub use self::futures_ordered::{futures_ordered, FuturesOrdered};

#[doc(hidden)]
Expand Down
42 changes: 42 additions & 0 deletions tests/futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,45 @@ fn finished_future_ok() {
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready());
}

#[test]
fn iter_mut_cancel() {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let (b_tx, b_rx) = oneshot::channel::<u32>();
let (c_tx, c_rx) = oneshot::channel::<u32>();

let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]);

for rx in stream.iter_mut() {
rx.close();
}

assert!(a_tx.is_canceled());
assert!(b_tx.is_canceled());
assert!(c_tx.is_canceled());

let mut spawn = futures::executor::spawn(stream);
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream());
assert_eq!(None, spawn.wait_stream());
}

#[test]
fn iter_mut_len() {
let mut stream = futures_unordered(vec![
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>(),
futures::future::empty::<(),()>()
]);

let mut iter_mut = stream.iter_mut();
assert_eq!(iter_mut.len(), 3);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 2);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 1);
assert!(iter_mut.next().is_some());
assert_eq!(iter_mut.len(), 0);
assert!(iter_mut.next().is_none());
}

0 comments on commit d022efb

Please sign in to comment.