diff --git a/src/iter/collect/consumer.rs b/src/iter/collect/consumer.rs index f9637a4f7..04f172180 100644 --- a/src/iter/collect/consumer.rs +++ b/src/iter/collect/consumer.rs @@ -1,63 +1,93 @@ use super::super::noop::*; use super::super::plumbing::*; +use std::marker::PhantomData; use std::ptr; use std::slice; use std::sync::atomic::{AtomicUsize, Ordering}; -pub struct CollectConsumer<'c, T: Send + 'c> { +pub struct CollectConsumer<'c, T: Send + 'c, U, F: MapFolder, R> { /// Tracks how many items we successfully wrote. Used to guarantee /// safety in the face of panics or buggy parallel iterators. writes: &'c AtomicUsize, /// A slice covering the target memory, not yet initialized! target: &'c mut [T], + + marker: PhantomData, + map_folder: F, + reducer: R, } -pub struct CollectFolder<'c, T: Send + 'c> { +pub struct CollectFolder<'c, T: Send + 'c, U, F: MapFolder> { global_writes: &'c AtomicUsize, local_writes: usize, /// An iterator over the *uninitialized* target memory. target: slice::IterMut<'c, T>, + + marker: PhantomData, + map_folder: F, } -impl<'c, T: Send + 'c> CollectConsumer<'c, T> { +impl<'c, T, U, F, R> CollectConsumer<'c, T, U, F, R> +where + T: Send + 'c, + F: MapFolder, + R: Reducer, +{ /// The target memory is considered uninitialized, and will be /// overwritten without dropping anything. - pub fn new(writes: &'c AtomicUsize, target: &'c mut [T]) -> CollectConsumer<'c, T> { + pub fn new( + writes: &'c AtomicUsize, + target: &'c mut [T], + map_folder: F, + reducer: R, + ) -> CollectConsumer<'c, T, U, F, R> { CollectConsumer { writes: writes, target: target, + marker: PhantomData, + map_folder: map_folder, + reducer: reducer, } } } -impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { - type Folder = CollectFolder<'c, T>; - type Reducer = NoopReducer; - type Result = (); - - fn split_at(self, index: usize) -> (Self, Self, NoopReducer) { +impl<'c, T, U, F, R> Consumer for CollectConsumer<'c, T, U, F, R> +where + T: Send + 'c, + U: Send, + F: Clone + MapFolder + Send, + F::Result: Send, + R: Clone + Reducer + Send, +{ + type Folder = CollectFolder<'c, T, U, F>; + type Reducer = R; + type Result = F::Result; + + fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) { // instances Read in the fields from `self` and then // forget `self`, since it has been legitimately consumed // (and not dropped during unwinding). - let CollectConsumer { writes, target } = self; + let CollectConsumer { writes, target, map_folder, reducer, .. } = self; // Produce new consumers. Normal slicing ensures that the // memory range given to each consumer is disjoint. let (left, right) = target.split_at_mut(index); ( - CollectConsumer::new(writes, left), - CollectConsumer::new(writes, right), - NoopReducer, + CollectConsumer::new(writes, left, map_folder.clone(), reducer.clone()), + CollectConsumer::new(writes, right, map_folder, reducer.clone()), + reducer, ) } - fn into_folder(self) -> CollectFolder<'c, T> { + fn into_folder(self) -> CollectFolder<'c, T, U, F> { CollectFolder { global_writes: self.writes, local_writes: 0, target: self.target.into_iter(), + marker: PhantomData, + map_folder: self.map_folder, } } @@ -66,10 +96,17 @@ impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { } } -impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { - type Result = (); +impl<'c, T, U, F> Folder for CollectFolder<'c, T, U, F> +where + T: Send + 'c, + F: MapFolder, +{ + type Result = F::Result; + + fn consume(mut self, item: U) -> Self { + let (map_folder, item) = self.map_folder.consume(item); + self.map_folder = map_folder; - fn consume(mut self, item: T) -> CollectFolder<'c, T> { // Compute target pointer and write to it. Safe because the iterator // does all the bounds checking; we're only avoiding the target drop. let head = self @@ -84,12 +121,14 @@ impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { self } - fn complete(self) { + fn complete(self) -> Self::Result { assert!(self.target.len() == 0, "too few values pushed to consumer"); // track total values written self.global_writes .fetch_add(self.local_writes, Ordering::Relaxed); + + self.map_folder.complete() } fn full(&self) -> bool { @@ -99,10 +138,11 @@ impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { /// Pretend to be unindexed for `special_collect_into_vec`, /// but we should never actually get used that way... -impl<'c, T: Send + 'c> UnindexedConsumer for CollectConsumer<'c, T> { +impl<'c, T: Send + 'c> UnindexedConsumer for CollectConsumer<'c, T, T, NoopConsumer, NoopReducer> { fn split_off_left(&self) -> Self { unreachable!("CollectConsumer must be indexed!") } + fn to_reducer(&self) -> Self::Reducer { NoopReducer } diff --git a/src/iter/collect/mod.rs b/src/iter/collect/mod.rs index 2a8b755ea..739d23365 100644 --- a/src/iter/collect/mod.rs +++ b/src/iter/collect/mod.rs @@ -1,4 +1,6 @@ use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator}; +use super::noop::*; +use super::plumbing::*; use std::collections::LinkedList; use std::slice; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -9,6 +11,30 @@ use super::unzip::unzip_indexed; mod test; +/// Collects the results of the exact iterator into the specified vector +/// and fold-reduce over references to the items of that vector. +/// +/// This is not directly public, but called by +/// `IndexedParallelIterator::collect_into_vec_and_fold_reduce`. +pub fn mapfold_collect_into_vec<'c, I, T, F, R>( + pi: I, + v: &'c mut Vec, + map_folder: F, + reducer: R, +) -> F::Result +where + I: IndexedParallelIterator, + T: Send + 'c, + F: Clone + MapFolder + Send, + R: Clone + Reducer + Send, +{ + v.truncate(0); // clear any old data + let mut collect = Collect::new(v, pi.len()); + let result = pi.drive(collect.as_consumer(map_folder, reducer)); + collect.complete(); + result +} + /// Collects the results of the exact iterator into the specified vector. /// /// This is not directly public, but called by `IndexedParallelIterator::collect_into_vec`. @@ -17,10 +43,7 @@ where I: IndexedParallelIterator, T: Send, { - v.truncate(0); // clear any old data - let mut collect = Collect::new(v, pi.len()); - pi.drive(collect.as_consumer()); - collect.complete(); + mapfold_collect_into_vec(pi, v, NoopConsumer, NoopReducer) } /// Collects the results of the iterator into the specified vector. @@ -40,7 +63,7 @@ where T: Send, { let mut collect = Collect::new(v, len); - pi.drive_unindexed(collect.as_consumer()); + pi.drive_unindexed(collect.as_noop_consumer()); collect.complete(); } @@ -61,7 +84,7 @@ where let mut left = Collect::new(left, len); let mut right = Collect::new(right, len); - unzip_indexed(pi, left.as_consumer(), right.as_consumer()); + unzip_indexed(pi, left.as_noop_consumer(), right.as_noop_consumer()); left.complete(); right.complete(); @@ -83,8 +106,16 @@ impl<'c, T: Send + 'c> Collect<'c, T> { } } + fn as_noop_consumer(&mut self) -> CollectConsumer { + self.as_consumer(NoopConsumer, NoopReducer) + } + /// Create a consumer on a slice of our memory. - fn as_consumer(&mut self) -> CollectConsumer { + fn as_consumer<'a, U, F, R>(&'a mut self, map_folder: F, reducer: R) -> CollectConsumer<'a, T, U, F, R> + where + F: MapFolder, + R: Reducer, + { // Reserve the new space. self.vec.reserve(self.len); @@ -92,7 +123,7 @@ impl<'c, T: Send + 'c> Collect<'c, T> { let start = self.vec.len(); let mut slice = &mut self.vec[start..]; slice = unsafe { slice::from_raw_parts_mut(slice.as_mut_ptr(), self.len) }; - CollectConsumer::new(&self.writes, slice) + CollectConsumer::new(&self.writes, slice, map_folder, reducer) } /// Update the final vector length. diff --git a/src/iter/collect/test.rs b/src/iter/collect/test.rs index f2bbeb9f1..ca29c4f4d 100644 --- a/src/iter/collect/test.rs +++ b/src/iter/collect/test.rs @@ -15,7 +15,7 @@ use iter::plumbing::*; fn produce_too_many_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 2); - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); @@ -29,7 +29,7 @@ fn produce_too_many_items() { fn produce_fewer_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 5); - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); @@ -43,7 +43,7 @@ fn left_produces_items_with_no_complete() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -62,7 +62,7 @@ fn right_produces_items_with_no_complete() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -80,7 +80,7 @@ fn produces_items_with_no_complete() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 2); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let mut folder = consumer.into_folder(); folder = folder.consume(22); folder = folder.consume(23); @@ -96,7 +96,7 @@ fn left_produces_too_many_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -115,7 +115,7 @@ fn right_produces_too_many_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -134,7 +134,7 @@ fn left_produces_fewer_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); @@ -154,7 +154,7 @@ fn right_produces_fewer_items() { let mut v = vec![]; let mut collect = Collect::new(&mut v, 4); { - let consumer = collect.as_consumer(); + let consumer = collect.as_noop_consumer(); let (left_consumer, right_consumer, _) = consumer.split_at(2); let mut left_folder = left_consumer.into_folder(); let mut right_folder = right_consumer.into_folder(); diff --git a/src/iter/mod.rs b/src/iter/mod.rs index fce24e884..bf80da1de 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -1974,6 +1974,75 @@ pub trait IndexedParallelIterator: ParallelIterator { collect::collect_into_vec(self, target); } + /// Map-folds the results of this iterator into the specified vector. + /// The final folding result is returned from the function. The vector + /// is always truncated before execution begins. If possible, reusing + /// the vector across calls can lead to better performance since it + /// reuses the same backing buffer. + /// + /// # Examples + /// + /// ``` + /// use rayon::prelude::*; + /// use rayon::iter::plumbing::{MapFolder, Reducer}; + /// + /// // any prior data will be truncated + /// let mut vec = vec![1, 2, 3]; + /// + /// #[derive(Clone)] + /// struct NegativeFolder(Vec); + /// + /// impl MapFolder for NegativeFolder { + /// type Output = usize; + /// type Result = Vec; + /// + /// fn consume(mut self, item: isize) -> (Self, Self::Output) { + /// if item >= 0 { + /// (self, item as usize) + /// } else { + /// self.0.push(item); + /// (self, 0) + /// } + /// } + /// + /// fn complete(self) -> Vec { + /// self.0 + /// } + /// } + /// + /// #[derive(Clone)] + /// struct NegativeReducer; + /// + /// impl Reducer> for NegativeReducer { + /// fn reduce(self, mut left: Vec, mut right: Vec) -> Vec { + /// left.append(&mut right); + /// left + /// } + /// } + /// + /// let negative_numbers = + /// [4, -7, 18, 25, -9].par_iter().cloned().mapfold_collect_into_vec( + /// &mut vec, + /// NegativeFolder(vec![]), + /// NegativeReducer + /// ); + /// + /// assert_eq!(vec, [4, 0, 18, 25, 0]); + /// assert_eq!(negative_numbers, [-7, -9]); + /// ``` + fn mapfold_collect_into_vec( + self, + target: &mut Vec, + map_folder: F, + reducer: R, + ) -> F::Result + where + F: Clone + MapFolder + Send, + R: Clone + Reducer + Send, + { + collect::mapfold_collect_into_vec(self, target, map_folder, reducer) + } + /// Unzips the results of the iterator into the specified /// vectors. The vectors are always truncated before execution /// begins. If possible, reusing the vectors across calls can lead diff --git a/src/iter/noop.rs b/src/iter/noop.rs index 9e8bbb5b7..feda22fa3 100644 --- a/src/iter/noop.rs +++ b/src/iter/noop.rs @@ -1,5 +1,6 @@ use super::plumbing::*; +#[derive(Clone)] pub struct NoopConsumer; impl NoopConsumer { @@ -26,6 +27,20 @@ impl Consumer for NoopConsumer { } } +impl MapFolder for NoopConsumer +where + T: Send, +{ + type Output = T; + type Result = (); + + fn consume(self, item: T) -> (Self, T) { + (self, item) + } + + fn complete(self) {} +} + impl Folder for NoopConsumer { type Result = (); @@ -58,6 +73,7 @@ impl UnindexedConsumer for NoopConsumer { } } +#[derive(Clone)] pub struct NoopReducer; impl Reducer<()> for NoopReducer { diff --git a/src/iter/plumbing/mod.rs b/src/iter/plumbing/mod.rs index 6056ff974..7573dfb38 100644 --- a/src/iter/plumbing/mod.rs +++ b/src/iter/plumbing/mod.rs @@ -192,6 +192,24 @@ pub trait Folder: Sized { fn full(&self) -> bool; } +/// The `MapFolder` trait represents a "map fold" operation, where the "map" +/// part is encoded by the `consume` method taking `Self::Item` and returning +/// `T`, and where the "fold" part is encoded by `map` taking `&mut self` and +/// the `complete` method returning `Self::Result`. +pub trait MapFolder: Sized { + /// The output returned when consuming an item. + type Output: Send; + + /// The type of result that will ultimately be produced by the map folder. + type Result: Send; + + /// Consume an item. + fn consume(self, item: Item) -> (Self, Self::Output); + + /// Finish consuming items, produce final result. + fn complete(self) -> Self::Result; +} + /// The reducer is the final step of a `Consumer` -- after a consumer /// has been split into two parts, and each of those parts has been /// fully processed, we are left with two results. The reducer is then