Skip to content

Commit

Permalink
Use crossbeam to get safety & stability.
Browse files Browse the repository at this point in the history
This converts `map`, `unordered_map`, `Pool::map` and
`Pool::unordered_map` to take a `crossbeam::Scope` argument, instead of
using the API of the old `thread::scoped` to return a guard (where the
destructor was required to run to ensure safety, something not
guaranteed). See rust-lang/rust#24292 for more
discussion of the issues with that API.
  • Loading branch information
huonw committed Oct 4, 2015
1 parent cdff00b commit fc64d38
Show file tree
Hide file tree
Showing 7 changed files with 207 additions and 126 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ operations, including parallel maps, for loops and thread pools.
strided = "*"
num = "*"
num_cpus = "0.2"
[dependencies]
crossbeam = "0.1"

[features]
unstable = []
32 changes: 13 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,26 +211,19 @@
//! contains more intricate example(s), such as a parallel fast
//! Fourier transform implementation (it really works, and the
//! parallelism does buy something... when tuned).
#![cfg_attr(feature = "unstable", feature(scoped))]
#[cfg(feature = "unstable")]
use std::thread;
#[cfg(feature = "unstable")]
use std::iter::IntoIterator;
extern crate crossbeam;

#[cfg(feature = "unstable")]
mod maps;

mod fnbox;

pub mod pool;

pub mod one_to_one {
#[cfg(feature = "unstable")]
pub use maps::{unordered_map, UnorderedParMap, map, ParMap};
}

#[cfg(feature = "unstable")]
pub use one_to_one::{map, unordered_map};

pub use pool::Pool;
Expand All @@ -241,15 +234,14 @@ pub use pool::Pool;
/// If `f` panics, so does `for_`. If this occurs, the number of
/// elements of `iter` that have had `f` called on them is
/// unspecified.
#[cfg(feature = "unstable")]
pub fn for_<I: IntoIterator, F>(iter: I, ref f: F)
where I::Item: Send, F: Fn(I::Item) + Sync
{
let _guards: Vec<_> = iter.into_iter().map(|elem| {
thread::scoped(move || {
f(elem)
})
}).collect();
crossbeam::scope(|scope| {
for elem in iter {
scope.spawn(move || f(elem));
}
});
}

/// Execute `f` on both `x` and `y`, in parallel, returning the
Expand All @@ -258,14 +250,16 @@ pub fn for_<I: IntoIterator, F>(iter: I, ref f: F)
/// This is the same (including panic semantics) as `(f(x), f(y))`, up
/// to ordering. It is designed to be used for divide-and-conquer
/// algorithms.
#[cfg(feature = "unstable")]
pub fn both<T, U, F>(x: T, y: T, ref f: F) -> (U, U)
where T: Send,
U: Send,
F: Sync + Fn(T) -> U
{
let guard = thread::scoped(move || f(y));
let a = f(x);
let b = guard.join();
(a, b)
crossbeam::scope(|scope| {
let guard = scope.spawn(move || f(y));
let a = f(x);
let b = guard.join();
(a, b)

})
}
29 changes: 15 additions & 14 deletions src/maps.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::sync::mpsc::{self, Sender, Receiver};
use std::thread::{self,JoinGuard};
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::iter::IntoIterator;

use crossbeam::{Scope, ScopedJoinHandle};

struct Packet<T> {
// this should be unique for a given instance of `*ParMap`
idx: usize,
Expand All @@ -25,12 +26,12 @@ impl<T> Eq for Packet<T> {}

/// A parallel-mapping iterator that doesn't care about the order in
/// which elements come out.
pub struct UnorderedParMap<'a, T: 'a + Send> {
pub struct UnorderedParMap<T: Send> {
rx: Receiver<Packet<T>>,
_guards: Vec<JoinGuard<'a, ()>>
_guards: Vec<ScopedJoinHandle<()>>
}

impl<'a,T: 'static + Send> Iterator for UnorderedParMap<'a, T> {
impl<T: 'static + Send> Iterator for UnorderedParMap<T> {
type Item = (usize, T);

fn next(&mut self) -> Option<(usize, T)> {
Expand Down Expand Up @@ -62,9 +63,9 @@ impl<T: Send + 'static> Drop for Panicker<T> {
/// This behaves like `simple_parallel::map`, but does not make
/// efforts to ensure that the elements are returned in the order of
/// `iter`, hence this is cheaper.
pub fn unordered_map<'a, I: IntoIterator, F, T>(iter: I, f: &'a F) -> UnorderedParMap<'a, T>
pub fn unordered_map<'a, I: IntoIterator, F, T>(scope: &Scope<'a>, iter: I, f: &'a F) -> UnorderedParMap<T>
where I::Item: Send + 'a,
F: 'a + Sync + Fn(I::Item) -> T,
F: Sync + Fn(I::Item) -> T,
T: Send + 'static
{
let (tx, rx) = mpsc::channel();
Expand All @@ -73,7 +74,7 @@ pub fn unordered_map<'a, I: IntoIterator, F, T>(iter: I, f: &'a F) -> UnorderedP
let tx = tx.clone();
let f = f.clone();

thread::scoped(move || {
scope.spawn(move || {
let mut p = Panicker { tx: tx, idx: idx, all_ok: false };
let val = f(elem);
let _ = p.tx.send(Packet { idx: idx, data: Some(val) });
Expand All @@ -88,13 +89,13 @@ pub fn unordered_map<'a, I: IntoIterator, F, T>(iter: I, f: &'a F) -> UnorderedP
}

/// A parallel-mapping iterator.
pub struct ParMap<'a, T: 'a + Send> {
unordered: UnorderedParMap<'a, T>,
pub struct ParMap<T: Send> {
unordered: UnorderedParMap<T>,
looking_for: usize,
queue: BinaryHeap<Packet<T>>
}

impl<'a, T: Send + 'static> Iterator for ParMap<'a, T> {
impl<T: Send + 'static> Iterator for ParMap<T> {
type Item = T;

fn next(&mut self) -> Option<T> {
Expand Down Expand Up @@ -127,13 +128,13 @@ impl<'a, T: Send + 'static> Iterator for ParMap<'a, T> {
/// This is a drop-in replacement for `iter.map(f)`, that runs in
/// parallel, and eagerly consumes `iter` spawning a thread for each
/// element.
pub fn map<'a, I: IntoIterator, F, T>(iter: I, f: &'a F) -> ParMap<'a, T>
where I::Item: Send + 'a,
F: 'a + Sync + Fn(I::Item) -> T,
pub fn map<'a, I: IntoIterator, F, T>(scope: &Scope<'a>, iter: I, f: &'a F) -> ParMap<T>
where I::Item: 'a + Send,
F: Sync + Fn(I::Item) -> T,
T: Send + 'static
{
ParMap {
unordered: unordered_map(iter, f),
unordered: unordered_map(scope, iter, f),
looking_for: 0,
queue: BinaryHeap::new(),
}
Expand Down
Loading

0 comments on commit fc64d38

Please sign in to comment.