Skip to content

Commit

Permalink
Introduce IndexedParallelIterator::mapfold_collect_into_vec
Browse files Browse the repository at this point in the history
  • Loading branch information
nox committed Apr 4, 2019
1 parent 003b5e6 commit 81b39a7
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 37 deletions.
80 changes: 60 additions & 20 deletions src/iter/collect/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,63 +1,93 @@
use super::super::noop::*;
use super::super::plumbing::*;
use std::marker::PhantomData;
use std::ptr;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};

pub struct CollectConsumer<'c, T: Send + 'c> {
pub struct CollectConsumer<'c, T: Send + 'c, U, F: MapFolder<U>, R> {
/// Tracks how many items we successfully wrote. Used to guarantee
/// safety in the face of panics or buggy parallel iterators.
writes: &'c AtomicUsize,

/// A slice covering the target memory, not yet initialized!
target: &'c mut [T],

marker: PhantomData<U>,
map_folder: F,
reducer: R,
}

pub struct CollectFolder<'c, T: Send + 'c> {
pub struct CollectFolder<'c, T: Send + 'c, U, F: MapFolder<U>> {
global_writes: &'c AtomicUsize,
local_writes: usize,

/// An iterator over the *uninitialized* target memory.
target: slice::IterMut<'c, T>,

marker: PhantomData<U>,
map_folder: F,
}

impl<'c, T: Send + 'c> CollectConsumer<'c, T> {
impl<'c, T, U, F, R> CollectConsumer<'c, T, U, F, R>
where
T: Send + 'c,
F: MapFolder<U>,
R: Reducer<F::Result>,
{
/// The target memory is considered uninitialized, and will be
/// overwritten without dropping anything.
pub fn new(writes: &'c AtomicUsize, target: &'c mut [T]) -> CollectConsumer<'c, T> {
pub fn new(
writes: &'c AtomicUsize,
target: &'c mut [T],
map_folder: F,
reducer: R,
) -> CollectConsumer<'c, T, U, F, R> {
CollectConsumer {
writes: writes,
target: target,
marker: PhantomData,
map_folder: map_folder,
reducer: reducer,
}
}
}

impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
type Folder = CollectFolder<'c, T>;
type Reducer = NoopReducer;
type Result = ();

fn split_at(self, index: usize) -> (Self, Self, NoopReducer) {
impl<'c, T, U, F, R> Consumer<U> for CollectConsumer<'c, T, U, F, R>
where
T: Send + 'c,
U: Send,
F: Clone + MapFolder<U, Output = T> + Send,
F::Result: Send,
R: Clone + Reducer<F::Result> + Send,
{
type Folder = CollectFolder<'c, T, U, F>;
type Reducer = R;
type Result = F::Result;

fn split_at(self, index: usize) -> (Self, Self, Self::Reducer) {
// instances Read in the fields from `self` and then
// forget `self`, since it has been legitimately consumed
// (and not dropped during unwinding).
let CollectConsumer { writes, target } = self;
let CollectConsumer { writes, target, map_folder, reducer, .. } = self;

// Produce new consumers. Normal slicing ensures that the
// memory range given to each consumer is disjoint.
let (left, right) = target.split_at_mut(index);
(
CollectConsumer::new(writes, left),
CollectConsumer::new(writes, right),
NoopReducer,
CollectConsumer::new(writes, left, map_folder.clone(), reducer.clone()),
CollectConsumer::new(writes, right, map_folder, reducer.clone()),
reducer,
)
}

fn into_folder(self) -> CollectFolder<'c, T> {
fn into_folder(self) -> CollectFolder<'c, T, U, F> {
CollectFolder {
global_writes: self.writes,
local_writes: 0,
target: self.target.into_iter(),
marker: PhantomData,
map_folder: self.map_folder,
}
}

Expand All @@ -66,10 +96,17 @@ impl<'c, T: Send + 'c> Consumer<T> for CollectConsumer<'c, T> {
}
}

impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
type Result = ();
impl<'c, T, U, F> Folder<U> for CollectFolder<'c, T, U, F>
where
T: Send + 'c,
F: MapFolder<U, Output = T>,
{
type Result = F::Result;

fn consume(mut self, item: U) -> Self {
let (map_folder, item) = self.map_folder.consume(item);
self.map_folder = map_folder;

fn consume(mut self, item: T) -> CollectFolder<'c, T> {
// Compute target pointer and write to it. Safe because the iterator
// does all the bounds checking; we're only avoiding the target drop.
let head = self
Expand All @@ -84,12 +121,14 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {
self
}

fn complete(self) {
fn complete(self) -> Self::Result {
assert!(self.target.len() == 0, "too few values pushed to consumer");

// track total values written
self.global_writes
.fetch_add(self.local_writes, Ordering::Relaxed);

self.map_folder.complete()
}

fn full(&self) -> bool {
Expand All @@ -99,10 +138,11 @@ impl<'c, T: Send + 'c> Folder<T> for CollectFolder<'c, T> {

/// Pretend to be unindexed for `special_collect_into_vec`,
/// but we should never actually get used that way...
impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T> {
impl<'c, T: Send + 'c> UnindexedConsumer<T> for CollectConsumer<'c, T, T, NoopConsumer, NoopReducer> {
fn split_off_left(&self) -> Self {
unreachable!("CollectConsumer must be indexed!")
}

fn to_reducer(&self) -> Self::Reducer {
NoopReducer
}
Expand Down
47 changes: 39 additions & 8 deletions src/iter/collect/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{IndexedParallelIterator, IntoParallelIterator, ParallelExtend, ParallelIterator};
use super::noop::*;
use super::plumbing::*;
use std::collections::LinkedList;
use std::slice;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand All @@ -9,6 +11,30 @@ use super::unzip::unzip_indexed;

mod test;

/// Collects the results of the exact iterator into the specified vector
/// and fold-reduce over references to the items of that vector.
///
/// This is not directly public, but called by
/// `IndexedParallelIterator::collect_into_vec_and_fold_reduce`.
pub fn mapfold_collect_into_vec<'c, I, T, F, R>(
pi: I,
v: &'c mut Vec<T>,
map_folder: F,
reducer: R,
) -> F::Result
where
I: IndexedParallelIterator,
T: Send + 'c,
F: Clone + MapFolder<I::Item, Output = T> + Send,
R: Clone + Reducer<F::Result> + Send,
{
v.truncate(0); // clear any old data
let mut collect = Collect::new(v, pi.len());
let result = pi.drive(collect.as_consumer(map_folder, reducer));
collect.complete();
result
}

/// Collects the results of the exact iterator into the specified vector.
///
/// This is not directly public, but called by `IndexedParallelIterator::collect_into_vec`.
Expand All @@ -17,10 +43,7 @@ where
I: IndexedParallelIterator<Item = T>,
T: Send,
{
v.truncate(0); // clear any old data
let mut collect = Collect::new(v, pi.len());
pi.drive(collect.as_consumer());
collect.complete();
mapfold_collect_into_vec(pi, v, NoopConsumer, NoopReducer)
}

/// Collects the results of the iterator into the specified vector.
Expand All @@ -40,7 +63,7 @@ where
T: Send,
{
let mut collect = Collect::new(v, len);
pi.drive_unindexed(collect.as_consumer());
pi.drive_unindexed(collect.as_noop_consumer());
collect.complete();
}

Expand All @@ -61,7 +84,7 @@ where
let mut left = Collect::new(left, len);
let mut right = Collect::new(right, len);

unzip_indexed(pi, left.as_consumer(), right.as_consumer());
unzip_indexed(pi, left.as_noop_consumer(), right.as_noop_consumer());

left.complete();
right.complete();
Expand All @@ -83,16 +106,24 @@ impl<'c, T: Send + 'c> Collect<'c, T> {
}
}

fn as_noop_consumer(&mut self) -> CollectConsumer<T, T, NoopConsumer, NoopReducer> {
self.as_consumer(NoopConsumer, NoopReducer)
}

/// Create a consumer on a slice of our memory.
fn as_consumer(&mut self) -> CollectConsumer<T> {
fn as_consumer<'a, U, F, R>(&'a mut self, map_folder: F, reducer: R) -> CollectConsumer<'a, T, U, F, R>
where
F: MapFolder<U, Output = T>,
R: Reducer<F::Result>,
{
// Reserve the new space.
self.vec.reserve(self.len);

// Get a correct borrow, then extend it for the newly added length.
let start = self.vec.len();
let mut slice = &mut self.vec[start..];
slice = unsafe { slice::from_raw_parts_mut(slice.as_mut_ptr(), self.len) };
CollectConsumer::new(&self.writes, slice)
CollectConsumer::new(&self.writes, slice, map_folder, reducer)
}

/// Update the final vector length.
Expand Down
18 changes: 9 additions & 9 deletions src/iter/collect/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use iter::plumbing::*;
fn produce_too_many_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 2);
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
Expand All @@ -29,7 +29,7 @@ fn produce_too_many_items() {
fn produce_fewer_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 5);
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
Expand All @@ -43,7 +43,7 @@ fn left_produces_items_with_no_complete() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand All @@ -62,7 +62,7 @@ fn right_produces_items_with_no_complete() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand All @@ -80,7 +80,7 @@ fn produces_items_with_no_complete() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 2);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let mut folder = consumer.into_folder();
folder = folder.consume(22);
folder = folder.consume(23);
Expand All @@ -96,7 +96,7 @@ fn left_produces_too_many_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand All @@ -115,7 +115,7 @@ fn right_produces_too_many_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand All @@ -134,7 +134,7 @@ fn left_produces_fewer_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand All @@ -154,7 +154,7 @@ fn right_produces_fewer_items() {
let mut v = vec![];
let mut collect = Collect::new(&mut v, 4);
{
let consumer = collect.as_consumer();
let consumer = collect.as_noop_consumer();
let (left_consumer, right_consumer, _) = consumer.split_at(2);
let mut left_folder = left_consumer.into_folder();
let mut right_folder = right_consumer.into_folder();
Expand Down
Loading

0 comments on commit 81b39a7

Please sign in to comment.