Skip to content

Commit

Permalink
Changes used by rustc
Browse files Browse the repository at this point in the history
  • Loading branch information
Zoxc committed Apr 27, 2018
1 parent df14a3c commit 1e863f4
Show file tree
Hide file tree
Showing 9 changed files with 252 additions and 28 deletions.
5 changes: 5 additions & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@ readme = "README.md"
keywords = ["parallel", "thread", "concurrency", "join", "performance"]
categories = ["concurrency"]

[features]
default = ["tlv"]
tlv = []

[dependencies]
rand = ">= 0.3, < 0.5"
num_cpus = "1.2"
libc = "0.2.16"
lazy_static = "1"
scoped-tls = "0.1.1"

# This is deliberately not the latest version, because we want
# to support older rustc than crossbeam-deque 0.3+ does.
Expand Down
18 changes: 17 additions & 1 deletion rayon-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
use unwind;
#[cfg(feature = "tlv")]
use tlv;

pub enum JobResult<T> {
None,
Expand Down Expand Up @@ -73,6 +75,8 @@ pub struct StackJob<L, F, R>
pub latch: L,
func: UnsafeCell<Option<F>>,
result: UnsafeCell<JobResult<R>>,
#[cfg(feature = "tlv")]
tlv: usize,
}

impl<L, F, R> StackJob<L, F, R>
Expand All @@ -85,6 +89,8 @@ impl<L, F, R> StackJob<L, F, R>
latch: latch,
func: UnsafeCell::new(Some(func)),
result: UnsafeCell::new(JobResult::None),
#[cfg(feature = "tlv")]
tlv: tlv::get(),
}
}

Expand All @@ -108,6 +114,8 @@ impl<L, F, R> Job for StackJob<L, F, R>
{
unsafe fn execute(this: *const Self) {
let this = &*this;
#[cfg(feature = "tlv")]
tlv::set(this.tlv);
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
(*this.result.get()) = match unwind::halt_unwinding(|| func(true)) {
Expand All @@ -129,13 +137,19 @@ pub struct HeapJob<BODY>
where BODY: FnOnce() + Send
{
job: UnsafeCell<Option<BODY>>,
#[cfg(feature = "tlv")]
tlv: usize,
}

impl<BODY> HeapJob<BODY>
where BODY: FnOnce() + Send
{
pub fn new(func: BODY) -> Self {
HeapJob { job: UnsafeCell::new(Some(func)) }
HeapJob {
job: UnsafeCell::new(Some(func)),
#[cfg(feature = "tlv")]
tlv: tlv::get(),
}
}

/// Creates a `JobRef` from this job -- note that this hides all
Expand All @@ -152,6 +166,8 @@ impl<BODY> Job for HeapJob<BODY>
{
unsafe fn execute(this: *const Self) {
let this: Box<Self> = mem::transmute(this);
#[cfg(feature = "tlv")]
tlv::set(this.tlv);
let job = (*this.job.get()).take().unwrap();
job();
}
Expand Down
30 changes: 13 additions & 17 deletions rayon-core/src/join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use latch::{LatchProbe, SpinLatch};
use log::Event::*;
use job::StackJob;
use registry::{self, WorkerThread};
use std::any::Any;
use unwind;

use registry;
use PoisonedJob;
use FnContext;

#[cfg(test)]
Expand Down Expand Up @@ -128,7 +127,17 @@ pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
let status_a = unwind::halt_unwinding(move || oper_a(FnContext::new(injected)));
let result_a = match status_a {
Ok(v) => v,
Err(err) => join_recover_from_panic(worker_thread, &job_b.latch, err),
Err(err) => {
// If job A panics, we still cannot return until we are sure that job
// B is complete. This is because it may contain references into the
// enclosing stack frame(s).
worker_thread.wait_until(&job_b.latch);
if err.is::<PoisonedJob>() {
// Job A was poisoned, so unwind the panic of Job B if it exists
job_b.into_result();
}
unwind::resume_unwinding(err)
},
};

// Now that task A has finished, try to pop job B from the
Expand Down Expand Up @@ -162,16 +171,3 @@ pub fn join_context<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
return (result_a, job_b.into_result());
})
}

/// If job A panics, we still cannot return until we are sure that job
/// B is complete. This is because it may contain references into the
/// enclosing stack frame(s).
#[cold] // cold path
unsafe fn join_recover_from_panic(worker_thread: &WorkerThread,
job_b_latch: &SpinLatch,
err: Box<Any + Send>)
-> !
{
worker_thread.wait_until(job_b_latch);
unwind::resume_unwinding(err)
}
42 changes: 41 additions & 1 deletion rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use std::fmt;
extern crate crossbeam_deque;
#[macro_use]
extern crate lazy_static;
#[macro_use]
extern crate scoped_tls;
extern crate libc;
extern crate num_cpus;
extern crate rand;
Expand All @@ -50,10 +52,14 @@ mod scope;
mod sleep;
mod spawn;
mod test;
mod thread_local;
mod thread_pool;
mod unwind;
mod util;

#[cfg(feature = "tlv")]
pub mod tlv;

#[cfg(rayon_unstable)]
pub mod internal;
pub use thread_pool::ThreadPool;
Expand All @@ -62,6 +68,7 @@ pub use thread_pool::current_thread_has_pending_tasks;
pub use join::{join, join_context};
pub use scope::{scope, Scope};
pub use spawn::spawn;
pub use thread_local::ThreadLocal;

/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
Expand All @@ -85,6 +92,11 @@ pub fn current_num_threads() -> usize {
::registry::Registry::current_num_threads()
}

/// A value which can be thrown which will give priority to other panics
/// for `join` and `scope`
#[derive(Debug)]
pub struct PoisonedJob;

/// Error when initializing a thread pool.
#[derive(Debug)]
pub struct ThreadPoolBuildError {
Expand Down Expand Up @@ -138,6 +150,9 @@ pub struct ThreadPoolBuilder {
/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// Closure invoked on worker thread start.
main_handler: Option<Box<MainHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
Expand Down Expand Up @@ -167,6 +182,12 @@ type StartHandler = Fn(usize) + Send + Sync;
/// Note that this same closure may be invoked multiple times in parallel.
type ExitHandler = Fn(usize) + Send + Sync;

/// The type for a closure that gets invoked with a
/// function which runs rayon tasks.
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;

impl ThreadPoolBuilder {
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
pub fn new() -> ThreadPoolBuilder {
Expand Down Expand Up @@ -366,6 +387,23 @@ impl ThreadPoolBuilder {
self.exit_handler = Some(Box::new(exit_handler));
self
}

/// Takes the current thread main callback, leaving `None`.
fn take_main_handler(&mut self) -> Option<Box<MainHandler>> {
self.main_handler.take()
}

/// Set a callback to be invoked on thread main.
///
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
pub fn main_handler<H>(mut self, main_handler: H) -> ThreadPoolBuilder
where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static
{
self.main_handler = Some(Box::new(main_handler));
self
}
}

#[allow(deprecated)]
Expand Down Expand Up @@ -471,7 +509,7 @@ impl fmt::Debug for ThreadPoolBuilder {
let ThreadPoolBuilder { ref num_threads, ref get_thread_name,
ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler,
ref breadth_first } = *self;
ref main_handler, ref breadth_first } = *self;

// Just print `Some(<closure>)` or `None` to the debug
// output.
Expand All @@ -485,6 +523,7 @@ impl fmt::Debug for ThreadPoolBuilder {
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);

f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
Expand All @@ -493,6 +532,7 @@ impl fmt::Debug for ThreadPoolBuilder {
.field("stack_size", &stack_size)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("main_handler", &main_handler)
.field("breadth_first", &breadth_first)
.finish()
}
Expand Down
28 changes: 22 additions & 6 deletions rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use ::{ExitHandler, PanicHandler, StartHandler, ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
use ::{ExitHandler, PanicHandler, StartHandler, MainHandler,
ThreadPoolBuilder, ThreadPoolBuildError, ErrorKind};
use crossbeam_deque::{Deque, Steal, Stealer};
use job::{JobRef, StackJob};
#[cfg(rayon_unstable)]
Expand Down Expand Up @@ -27,6 +28,7 @@ pub struct Registry {
panic_handler: Option<Box<PanicHandler>>,
start_handler: Option<Box<StartHandler>>,
exit_handler: Option<Box<ExitHandler>>,
main_handler: Option<Box<MainHandler>>,

// When this latch reaches 0, it means that all work on this
// registry must be complete. This is ensured in the following ways:
Expand Down Expand Up @@ -116,6 +118,7 @@ impl Registry {
terminate_latch: CountLatch::new(),
panic_handler: builder.take_panic_handler(),
start_handler: builder.take_start_handler(),
main_handler: builder.take_main_handler(),
exit_handler: builder.take_exit_handler(),
});

Expand Down Expand Up @@ -212,8 +215,7 @@ impl Registry {

/// Waits for the worker threads to stop. This is used for testing
/// -- so we can check that termination actually works.
#[cfg(test)]
pub fn wait_until_stopped(&self) {
pub(crate) fn wait_until_stopped(&self) {
for info in &self.thread_infos {
info.stopped.wait();
}
Expand Down Expand Up @@ -454,15 +456,15 @@ pub struct WorkerThread {
/// the "worker" half of our local deque
worker: Deque<JobRef>,

index: usize,
pub(crate) index: usize,

/// are these workers configured to steal breadth-first or not?
breadth_first: bool,

/// A weak random number generator.
rng: UnsafeCell<rand::XorShiftRng>,

registry: Arc<Registry>,
pub(crate) registry: Arc<Registry>,
}

// This is a bit sketchy, but basically: the WorkerThread is
Expand Down Expand Up @@ -671,7 +673,21 @@ unsafe fn main_loop(worker: Deque<JobRef>,
}
}

worker_thread.wait_until(&registry.terminate_latch);
let mut work = || {
worker_thread.wait_until(&registry.terminate_latch);
};

if let Some(ref handler) = registry.main_handler {
match unwind::halt_unwinding(|| handler(index, &mut work)) {
Ok(()) => {
}
Err(err) => {
registry.handle_panic(err);
}
}
} else {
work();
}

// Should not be any work left in our queue.
debug_assert!(worker_thread.take_local_job().is_none());
Expand Down
19 changes: 16 additions & 3 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicPtr, Ordering};
use registry::{in_worker, WorkerThread, Registry};
use unwind;
use PoisonedJob;

#[cfg(test)]
mod test;
Expand All @@ -37,6 +38,10 @@ pub struct Scope<'scope> {
/// propagated to the one who created the scope
panic: AtomicPtr<Box<Any + Send + 'static>>,

/// if some job panicked with PoisonedJob, the error is stored here; it will be
/// propagated to the one who created the scope unless there is a proper panic
poisoned_panic: AtomicPtr<Box<Any + Send + 'static>>,

/// latch to set when the counter drops to zero (and hence this scope is complete)
job_completed_latch: CountLatch,

Expand Down Expand Up @@ -265,6 +270,7 @@ pub fn scope<'scope, OP, R>(op: OP) -> R
owner_thread_index: owner_thread.index(),
registry: owner_thread.registry().clone(),
panic: AtomicPtr::new(ptr::null_mut()),
poisoned_panic: AtomicPtr::new(ptr::null_mut()),
job_completed_latch: CountLatch::new(),
marker: PhantomData,
};
Expand Down Expand Up @@ -371,14 +377,18 @@ impl<'scope> Scope<'scope> {
// capture the first error we see, free the rest
let nil = ptr::null_mut();
let mut err = Box::new(err); // box up the fat ptr
if self.panic.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed).is_ok() {
let field = if err.is::<PoisonedJob>() {
&self.poisoned_panic
} else {
&self.panic
};
if field.compare_exchange(nil, &mut *err, Ordering::Release, Ordering::Relaxed).is_ok() {
log!(JobPanickedErrorStored { owner_thread: self.owner_thread_index });
mem::forget(err); // ownership now transferred into self.panic
} else {
log!(JobPanickedErrorNotStored { owner_thread: self.owner_thread_index });
}


self.job_completed_latch.set();
}

Expand All @@ -394,7 +404,10 @@ impl<'scope> Scope<'scope> {
// propagate panic, if any occurred; at this point, all
// outstanding jobs have completed, so we can use a relaxed
// ordering:
let panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
let mut panic = self.panic.swap(ptr::null_mut(), Ordering::Relaxed);
if panic.is_null() {
panic = self.poisoned_panic.swap(ptr::null_mut(), Ordering::Relaxed);
}
if !panic.is_null() {
log!(ScopeCompletePanicked { owner_thread: owner_thread.index() });
let value: Box<Box<Any + Send + 'static>> = mem::transmute(panic);
Expand Down
Loading

0 comments on commit 1e863f4

Please sign in to comment.