diff --git a/rayon-demo/Cargo.toml b/rayon-demo/Cargo.toml index c63e8a087..baaea8d60 100644 --- a/rayon-demo/Cargo.toml +++ b/rayon-demo/Cargo.toml @@ -16,6 +16,7 @@ once_cell = "1.17.1" rand = "0.8" rand_xorshift = "0.3" regex = "1" +ndarray = "0.15.6" [dependencies.serde] version = "1.0.85" diff --git a/rayon-demo/src/main.rs b/rayon-demo/src/main.rs index 360d80746..2a255e9c9 100644 --- a/rayon-demo/src/main.rs +++ b/rayon-demo/src/main.rs @@ -33,6 +33,8 @@ mod sort; mod str_split; #[cfg(test)] mod vec_collect; +#[cfg(test)] +mod scan; #[cfg(test)] extern crate test; diff --git a/rayon-demo/src/scan/mod.rs b/rayon-demo/src/scan/mod.rs new file mode 100644 index 000000000..1589422ba --- /dev/null +++ b/rayon-demo/src/scan/mod.rs @@ -0,0 +1,124 @@ +use ndarray::{Array, Dim}; +use rayon::iter::*; +use std::time::{Duration, Instant}; +use std::num::Wrapping; + +const SIZE: usize = 10000; + +enum Procs { + Sequential, + Parallel, +} + +fn scan_sequential(init: I, id: T, scan_op: P) -> Vec +where + T: Clone, + I: Fn() -> T, + P: FnMut(&mut T, &T) -> Option, +{ + let v = vec![init(); SIZE]; + let scan = v.iter().scan(id, scan_op); + scan.collect() +} + +fn scan_parallel(init: I, id: T, scan_op: P) -> Vec +where + T: Clone + Send + Sync, + I: Fn() -> T, + P: Fn(&T, &T) -> T + Sync, +{ + let v = vec![init(); SIZE]; + let scan = v.into_par_iter().with_min_len(SIZE / 100).scan(&scan_op, id); + scan.collect() +} + +/******* Addition with artificial delay *******/ + +const DELAY: Duration = Duration::from_nanos(10); +fn wait() -> i32 { + let time = Instant::now(); + + let mut sum = 0; + while time.elapsed() < DELAY { + sum += 1; + } + sum +} + +fn scan_add(procs: Procs) -> Vec { + let init = || 2; + let id = 0; + + match procs { + Procs::Sequential => { + let f = |state: &mut i32, x: &i32| { + test::black_box(wait()); + *state += x; + Some(*state) + }; + scan_sequential(init, id, f) + } + Procs::Parallel => { + let f = |x: &i32, y: &i32| { + test::black_box(wait()); + *x + *y + }; + scan_parallel(init, id, f) + } + } +} + +#[bench] +fn scan_add_sequential(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Sequential)); +} + +#[bench] +fn scan_add_parallel(b: &mut test::Bencher) { + b.iter(|| scan_add(Procs::Parallel)); +} + +#[test] +fn test_scan_add() { + assert_eq!(scan_add(Procs::Sequential), scan_add(Procs::Parallel)); +} + +/******** Matrix multiplication with wrapping arithmetic *******/ + +type Matrix = Array, Dim<[usize; 2]>>; +fn scan_matmul(procs: Procs) -> Vec { + const MAT_SIZE: usize = 50; + let init = || { + Array::from_iter((0..((MAT_SIZE * MAT_SIZE) as i32)).map(|x| Wrapping(x))) + .into_shape((MAT_SIZE, MAT_SIZE)) + .unwrap() + }; + let id = Array::eye(MAT_SIZE); + + match procs { + Procs::Sequential => { + let f = |state: &mut Matrix, x: &Matrix| { + *state = state.dot(x); + Some(state.clone()) + }; + + scan_sequential(init, id, f) + } + Procs::Parallel => scan_parallel(init, id, |x, y| x.dot(y)), + } +} + +#[bench] +fn scan_matmul_sequential(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Sequential)); +} + +#[bench] +fn scan_matmul_parallel(b: &mut test::Bencher) { + b.iter(|| scan_matmul(Procs::Parallel)); +} + +#[test] +fn test_scan_matmul() { + assert_eq!(scan_matmul(Procs::Sequential), scan_matmul(Procs::Parallel)); +} diff --git a/src/iter/mod.rs b/src/iter/mod.rs index e60ea1633..18fd9f5d9 100644 --- a/src/iter/mod.rs +++ b/src/iter/mod.rs @@ -81,6 +81,7 @@ use self::plumbing::*; use self::private::Try; +use self::scan::Scan; pub use either::Either; use std::cmp::{self, Ordering}; use std::iter::{Product, Sum}; @@ -158,6 +159,8 @@ mod while_some; mod zip; mod zip_eq; +mod scan; + pub use self::{ chain::Chain, chunks::Chunks, @@ -1384,6 +1387,48 @@ pub trait ParallelIterator: Sized + Send { sum::sum(self) } + /// Folds the items in the iterator using `scan_op`, and produces a + /// new iterator with all of the intermediate results. + /// + /// Specifically, the nth element of the scan iterator will be the + /// result of reducing the first n elements of the input with `scan_op`. + /// + /// # Examples + /// + /// ``` + /// // Iterate over a sequence of numbers `x0, ..., xN` + /// // and use scan to compute the partial sums + /// use rayon::prelude::*; + /// let partial_sums = [1, 2, 3, 4, 5] + /// .into_par_iter() // iterating over i32 + /// .scan(|a, b| *a + *b, // add (&i32, &i32) -> i32 + /// 0) // identity + /// .collect::>(); + /// assert_eq!(partial_sums, vec![1, 3, 6, 10, 15]); + /// ``` + /// + /// **Note:** Unlike a sequential `scan` operation, the order in + /// which `scan_op` will be applied to produce the result is not fully + /// specified. So `scan_op` should be [associative] or else the results + /// will be non-deterministic. Also unlike sequential `scan`, there is + /// no internal state for this operation, so the operation has a + /// different signature. + /// + /// The argument `identity` should be an "identity" value for + /// `scan_op`, which may be inserted into the sequence as + /// needed to create opportunities for parallel execution. So, for + /// example, if you are doing a summation, then `identity` ought + /// to represent the zero for your type. + /// + /// [associative]: https://en.wikipedia.org/wiki/Associative_property + fn scan(self, scan_op: F, identity: Self::Item) -> Scan + where + F: Fn(&Self::Item, &Self::Item) -> Self::Item + Sync + Send, + ::Item: Send + Sync, + { + scan::scan(self, scan_op, identity) + } + /// Multiplies all the items in the iterator. /// /// Note that the order in items will be reduced is not specified, diff --git a/src/iter/scan.rs b/src/iter/scan.rs new file mode 100644 index 000000000..d783a8480 --- /dev/null +++ b/src/iter/scan.rs @@ -0,0 +1,245 @@ +use super::plumbing::*; +use super::*; +use std::usize; +use std::collections::LinkedList; + +pub(super) fn scan(pi: PI, scan_op: P, id: T) -> Scan +where + PI: ParallelIterator, + P: Fn(&T, &T) -> T + Send + Sync, + T: Send + Sync, +{ + let list = scan_p1(pi, &scan_op); + let data = list.into_iter().collect(); + let offsets = compute_offsets(&data, &scan_op, id); + Scan::new(data, offsets, scan_op) +} + +// Compute the offset for each chunk by performing another sequential scan +// on the last value of each chunk +fn compute_offsets<'a, P, T>(data: &Vec>, scan_op: &'a P, id: T) -> Vec +where + P: Fn(&T, &T) -> T, +{ + let mut offsets: Vec = Vec::with_capacity(data.len()); + offsets.push(id); + + for it in data { + // offsets is never empty because we already pushed id to it + let last = offsets.last().unwrap(); + // `it` can never be empty due to implementation of ScanP1Folder + let next: T = (scan_op)(last, &it.last().unwrap()); + offsets.push(next); + } + offsets +} + +/******* scan part 1: consumer ******/ + +// Breaks the iterator into pieces and performs sequential scan on each +// Returns intermediate data, a LinkedList of the result of each seq scan +fn scan_p1<'a, PI, P, T>(pi: PI, scan_op: &'a P) -> LinkedList> +where + PI: ParallelIterator, + P: Fn(&T, &T) -> T + Send + Sync, + T: Send, +{ + let consumer = ScanP1Consumer { scan_op }; + pi.drive_unindexed(consumer) +} + +struct ScanP1Consumer<'p, P> { + scan_op: &'p P, +} + +impl<'p, P: Send> ScanP1Consumer<'p, P> { + fn new(scan_op: &'p P) -> ScanP1Consumer<'p, P> { + ScanP1Consumer { scan_op } + } +} + +impl<'p, T, P: 'p> Consumer for ScanP1Consumer<'p, P> +where + T: Send, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Folder = ScanP1Folder<'p, T, P>; + type Reducer = ScanP1Reducer; + type Result = LinkedList>; + + fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { + ( + ScanP1Consumer::new(self.scan_op), + ScanP1Consumer::new(self.scan_op), + ScanP1Reducer, + ) + } + + fn into_folder(self) -> Self::Folder { + ScanP1Folder { + vec: Vec::new(), + scan_op: self.scan_op, + } + } + + fn full(&self) -> bool { + false + } +} + +impl<'p, T, P: 'p> UnindexedConsumer for ScanP1Consumer<'p, P> +where + T: Send, + P: Fn(&T, &T) -> T + Send + Sync, +{ + fn split_off_left(&self) -> Self { + Self { + scan_op: self.scan_op, + } + } + + fn to_reducer(&self) -> Self::Reducer { + ScanP1Reducer + } +} + +struct ScanP1Folder<'p, T, P> { + vec: Vec, + scan_op: &'p P, +} + +impl<'p, T, P> Folder for ScanP1Folder<'p, T, P> +where + P: Fn(&T, &T) -> T + 'p, +{ + type Result = LinkedList>; + + fn consume(mut self, item: T) -> Self { + let next = match self.vec.last() { + None => item, + Some(prev) => (self.scan_op)(prev, &item), + }; + self.vec.push(next); + self + } + + fn complete(self) -> Self::Result { + let mut list = LinkedList::new(); + if !self.vec.is_empty() { + list.push_back(self.vec); + } + list + } + + fn full(&self) -> bool { + false + } +} + +struct ScanP1Reducer; + +impl Reducer> for ScanP1Reducer { + fn reduce(self, mut left: LinkedList, mut right: LinkedList) -> LinkedList { + left.append(&mut right); + left + } +} + +/*********** scan part 2: producer **********/ + +#[derive(Debug)] +pub struct Scan { + data: Vec>, + offsets: Vec, + scan_op: P, +} + +impl Scan +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + pub(super) fn new(data: Vec>, offsets: Vec, scan_op: P) -> Self { + Scan { + data, + offsets, + scan_op, + } + } +} + +impl ParallelIterator for Scan +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge_unindexed( + ScanP2Producer { + data: &self.data, + offsets: &self.offsets, + scan_op: &self.scan_op, + }, + consumer, + ) + } +} + +struct ScanP2Producer<'a, T, P> { + data: &'a [Vec], + offsets: &'a [T], + scan_op: &'a P, +} + +impl<'a, T, P> ScanP2Producer<'a, T, P> +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + pub(super) fn new(data: &'a [Vec], offsets: &'a [T], scan_op: &'a P) -> Self { + ScanP2Producer { + data, + offsets, + scan_op, + } + } +} + +impl<'a, T, P> UnindexedProducer for ScanP2Producer<'a, T, P> +where + T: Send + Sync, + P: Fn(&T, &T) -> T + Send + Sync, +{ + type Item = T; + + fn split(self) -> (Self, Option) { + let mid = self.offsets.len() / 2; + if mid == 0 { + return (self, None); + } + let (data_l, data_r) = self.data.split_at(mid); + let (offsets_l, offsets_r) = self.offsets.split_at(mid); + + ( + ScanP2Producer::new(data_l, offsets_l, self.scan_op), + Some(ScanP2Producer::new(data_r, offsets_r, self.scan_op)), + ) + } + + fn fold_with(self, folder: F) -> F + where + F: Folder, + { + let iter = self + .data + .iter() + .zip(self.offsets.iter()) + .flat_map(|(chunk, offset)| chunk.iter().map(|x| (self.scan_op)(offset, x))); + folder.consume_iter(iter) + } +} diff --git a/src/lib.rs b/src/lib.rs index 86f997b36..1e491a291 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![deny(missing_debug_implementations)] -#![deny(missing_docs)] +//#![deny(missing_docs)] #![deny(unreachable_pub)] #![warn(rust_2018_idioms)]