From d022efb6e3a5e09eb73b81beb0ac08baa7655333 Mon Sep 17 00:00:00 2001 From: Sam Rijs Date: Sun, 22 Oct 2017 14:32:49 +1100 Subject: [PATCH] Implement FuturesUnordered::iter_mut --- src/stream/futures_unordered.rs | 40 +++++++++++++++++++++++++++++++ src/stream/mod.rs | 2 +- tests/futures_unordered.rs | 42 +++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 1 deletion(-) diff --git a/src/stream/futures_unordered.rs b/src/stream/futures_unordered.rs index 845c21ab94..8863581eae 100644 --- a/src/stream/futures_unordered.rs +++ b/src/stream/futures_unordered.rs @@ -206,6 +206,15 @@ impl FuturesUnordered { self.inner.enqueue(ptr); } + /// Returns an iterator that allows modifying each future in the set. + pub fn iter_mut(&mut self) -> FuturesUnorderedIterMut { + FuturesUnorderedIterMut { + node: self.head_all, + len: self.len, + set: self + } + } + fn release_node(&mut self, node: Arc>) { // The future is done, try to reset the queued flag. This will prevent // `notify` from doing any work in the future @@ -427,6 +436,37 @@ impl Drop for FuturesUnordered { } } +#[derive(Debug)] +/// Mutable iterator over all futures in the unordered set. +pub struct FuturesUnorderedIterMut<'a, F: 'a> { + set: &'a mut FuturesUnordered, + len: usize, + node: *const Node +} + +impl<'a, F> Iterator for FuturesUnorderedIterMut<'a, F> { + type Item = &'a mut F; + + fn next(&mut self) -> Option<&'a mut F> { + if self.node.is_null() { + return None; + } + unsafe { + let future = (*(*self.node).future.get()).as_mut().unwrap(); + let next = *(*self.node).next_all.get(); + self.node = next; + self.len -= 1; + return Some(future); + } + } + + fn size_hint(&self) -> (usize, Option) { + (self.len, Some(self.len)) + } +} + +impl<'a, F> ExactSizeIterator for FuturesUnorderedIterMut<'a, F> {} + impl Inner { /// The enqueue function from the 1024cores intrusive MPSC queue algorithm. fn enqueue(&self, node: *const Node) { diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 53e866a3b6..fcaf841b82 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -112,7 +112,7 @@ if_std! { pub use self::collect::Collect; pub use self::wait::Wait; pub use self::split::{SplitStream, SplitSink}; - pub use self::futures_unordered::{futures_unordered, FuturesUnordered}; + pub use self::futures_unordered::{futures_unordered, FuturesUnordered, FuturesUnorderedIterMut}; pub use self::futures_ordered::{futures_ordered, FuturesOrdered}; #[doc(hidden)] diff --git a/tests/futures_unordered.rs b/tests/futures_unordered.rs index 9f1fb95c5b..8558121a3e 100644 --- a/tests/futures_unordered.rs +++ b/tests/futures_unordered.rs @@ -69,3 +69,45 @@ fn finished_future_ok() { assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); assert!(spawn.poll_stream_notify(&support::notify_noop(), 0).unwrap().is_not_ready()); } + +#[test] +fn iter_mut_cancel() { + let (a_tx, a_rx) = oneshot::channel::(); + let (b_tx, b_rx) = oneshot::channel::(); + let (c_tx, c_rx) = oneshot::channel::(); + + let mut stream = futures_unordered(vec![a_rx, b_rx, c_rx]); + + for rx in stream.iter_mut() { + rx.close(); + } + + assert!(a_tx.is_canceled()); + assert!(b_tx.is_canceled()); + assert!(c_tx.is_canceled()); + + let mut spawn = futures::executor::spawn(stream); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(Some(Err(futures::sync::oneshot::Canceled)), spawn.wait_stream()); + assert_eq!(None, spawn.wait_stream()); +} + +#[test] +fn iter_mut_len() { + let mut stream = futures_unordered(vec![ + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>(), + futures::future::empty::<(),()>() + ]); + + let mut iter_mut = stream.iter_mut(); + assert_eq!(iter_mut.len(), 3); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 2); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 1); + assert!(iter_mut.next().is_some()); + assert_eq!(iter_mut.len(), 0); + assert!(iter_mut.next().is_none()); +}