Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Do not merge] Changes to Rayon in fork for rustc #569

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
[package]
name = "rayon"
name = "rustc-rayon"
# Reminder to update html_rool_url in lib.rs when updating version
version = "1.0.1"
version = "0.1.1"
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Simple work-stealing parallelism for Rust"
description = "Simple work-stealing parallelism for Rust - fork for rustc"
license = "Apache-2.0/MIT"
repository = "https://github.com/rayon-rs/rayon"
documentation = "https://docs.rs/rayon/"
repository = "https://github.com/Zoxc/rayon/tree/rustc"
documentation = "https://docs.rs/rustc-rayon/"
readme = "README.md"
keywords = ["parallel", "thread", "concurrency", "join", "performance"]
categories = ["concurrency"]

[workspace]
members = ["rayon-demo", "rayon-core", "rayon-futures"]
members = ["rayon-core"]
exclude = ["ci"]

[dependencies]
rayon-core = { version = "1.4", path = "rayon-core" }
rustc-rayon-core = { version = "0.1", path = "rayon-core" }

# This is a public dependency!
[dependencies.either]
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Note: This is an unstable fork made for use in rustc

# Rayon

[![Rayon crate](https://img.shields.io/crates/v/rayon.svg)](https://crates.io/crates/rayon)
Expand Down
15 changes: 9 additions & 6 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
[package]
name = "rayon-core"
version = "1.4.0" # reminder to update html_root_url attribute
name = "rustc-rayon-core"
version = "0.1.1" # reminder to update html_root_url attribute
authors = ["Niko Matsakis <niko@alum.mit.edu>",
"Josh Stone <cuviper@gmail.com>"]
description = "Core APIs for Rayon"
description = "Core APIs for Rayon - fork for rustc"
license = "Apache-2.0/MIT"
repository = "https://github.com/rayon-rs/rayon"
documentation = "https://docs.rs/rayon/"
links = "rayon-core"
repository = "https://github.com/Zoxc/rayon/tree/rustc"
documentation = "https://docs.rs/rustc-rayon-core/"
build = "build.rs"
readme = "README.md"
keywords = ["parallel", "thread", "concurrency", "join", "performance"]
categories = ["concurrency"]

[features]
default = ["tlv"]
tlv = []

[dependencies]
rand = ">= 0.3, < 0.5"
num_cpus = "1.2"
Expand Down
2 changes: 2 additions & 0 deletions rayon-core/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
Note: This is an unstable fork made for use in rustc

Rayon-core represents the "core, stable" APIs of Rayon: join, scope, and so forth, as well as the ability to create custom thread-pools with ThreadPool.

Maybe worth mentioning: users are not necessarily intended to directly access rayon-core; all its APIs are mirror in the rayon crate. To that end, the examples in the docs use rayon::join and so forth rather than rayon_core::join.
Expand Down
18 changes: 17 additions & 1 deletion rayon-core/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::any::Any;
use std::cell::UnsafeCell;
use std::mem;
use unwind;
#[cfg(feature = "tlv")]
use tlv;

pub enum JobResult<T> {
None,
Expand Down Expand Up @@ -73,6 +75,8 @@ pub struct StackJob<L, F, R>
pub latch: L,
func: UnsafeCell<Option<F>>,
result: UnsafeCell<JobResult<R>>,
#[cfg(feature = "tlv")]
tlv: usize,
}

impl<L, F, R> StackJob<L, F, R>
Expand All @@ -85,6 +89,8 @@ impl<L, F, R> StackJob<L, F, R>
latch: latch,
func: UnsafeCell::new(Some(func)),
result: UnsafeCell::new(JobResult::None),
#[cfg(feature = "tlv")]
tlv: tlv::get(),
}
}

Expand All @@ -108,6 +114,8 @@ impl<L, F, R> Job for StackJob<L, F, R>
{
unsafe fn execute(this: *const Self) {
let this = &*this;
#[cfg(feature = "tlv")]
tlv::set(this.tlv);
let abort = unwind::AbortIfPanic;
let func = (*this.func.get()).take().unwrap();
(*this.result.get()) = match unwind::halt_unwinding(|| func(true)) {
Expand All @@ -129,13 +137,19 @@ pub struct HeapJob<BODY>
where BODY: FnOnce() + Send
{
job: UnsafeCell<Option<BODY>>,
#[cfg(feature = "tlv")]
tlv: usize,
}

impl<BODY> HeapJob<BODY>
where BODY: FnOnce() + Send
{
pub fn new(func: BODY) -> Self {
HeapJob { job: UnsafeCell::new(Some(func)) }
HeapJob {
job: UnsafeCell::new(Some(func)),
#[cfg(feature = "tlv")]
tlv: tlv::get(),
}
}

/// Creates a `JobRef` from this job -- note that this hides all
Expand All @@ -152,6 +166,8 @@ impl<BODY> Job for HeapJob<BODY>
{
unsafe fn execute(this: *const Self) {
let this: Box<Self> = mem::transmute(this);
#[cfg(feature = "tlv")]
tlv::set(this.tlv);
let job = (*this.job.get()).take().unwrap();
job();
}
Expand Down
60 changes: 57 additions & 3 deletions rayon-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
//! succeed.

#![doc(html_root_url = "https://docs.rs/rayon-core/1.4")]
#![deny(missing_debug_implementations)]
#![deny(missing_docs)]
#![cfg_attr(test, feature(conservative_impl_trait))]

use std::any::Any;
Expand Down Expand Up @@ -50,18 +48,24 @@ mod scope;
mod sleep;
mod spawn;
mod test;
mod worker_local;
mod thread_pool;
mod unwind;
mod util;

#[cfg(feature = "tlv")]
pub mod tlv;

#[cfg(rayon_unstable)]
pub mod internal;
pub use thread_pool::ThreadPool;
pub use thread_pool::current_thread_index;
pub use thread_pool::current_thread_has_pending_tasks;
pub use join::{join, join_context};
pub use scope::{scope, Scope};
pub use registry::{Registry, mark_blocked, mark_unblocked};
pub use spawn::spawn;
pub use worker_local::WorkerLocal;

/// Returns the number of threads in the current registry. If this
/// code is executing within a Rayon thread-pool, then this will be
Expand Down Expand Up @@ -132,12 +136,18 @@ pub struct ThreadPoolBuilder {
/// The stack size for the created worker threads
stack_size: Option<usize>,

/// Closure invoked on deadlock.
deadlock_handler: Option<Box<DeadlockHandler>>,

/// Closure invoked on worker thread start.
start_handler: Option<Box<StartHandler>>,

/// Closure invoked on worker thread exit.
exit_handler: Option<Box<ExitHandler>>,

/// Closure invoked on worker thread start.
main_handler: Option<Box<MainHandler>>,

/// If false, worker threads will execute spawned jobs in a
/// "depth-first" fashion. If true, they will do a "breadth-first"
/// fashion. Depth-first is the default.
Expand All @@ -157,6 +167,9 @@ pub struct Configuration {
/// may be invoked multiple times in parallel.
type PanicHandler = Fn(Box<Any + Send>) + Send + Sync;

/// The type for a closure that gets invoked when the Rayon thread pool deadlocks
type DeadlockHandler = Fn() + Send + Sync;

/// The type for a closure that gets invoked when a thread starts. The
/// closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
Expand All @@ -167,6 +180,12 @@ type StartHandler = Fn(usize) + Send + Sync;
/// Note that this same closure may be invoked multiple times in parallel.
type ExitHandler = Fn(usize) + Send + Sync;

/// The type for a closure that gets invoked with a
/// function which runs rayon tasks.
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this distinct from the StartHandler?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, it kind of "takes over" the thread

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It allows up to run code both before and after the Rayon worker, so it is more powerful than StartHandler and ExitHandler. You can see it used here.

type MainHandler = Fn(usize, &mut FnMut()) + Send + Sync;

impl ThreadPoolBuilder {
/// Creates and returns a valid rayon thread pool builder, but does not initialize it.
pub fn new() -> ThreadPoolBuilder {
Expand Down Expand Up @@ -331,6 +350,19 @@ impl ThreadPoolBuilder {
self.breadth_first
}

/// Takes the current deadlock callback, leaving `None`.
fn take_deadlock_handler(&mut self) -> Option<Box<DeadlockHandler>> {
self.deadlock_handler.take()
}

/// Set a callback to be invoked on current deadlock.
pub fn deadlock_handler<H>(mut self, deadlock_handler: H) -> ThreadPoolBuilder
where H: Fn() + Send + Sync + 'static
{
self.deadlock_handler = Some(Box::new(deadlock_handler));
self
}

/// Takes the current thread start callback, leaving `None`.
fn take_start_handler(&mut self) -> Option<Box<StartHandler>> {
self.start_handler.take()
Expand Down Expand Up @@ -366,6 +398,23 @@ impl ThreadPoolBuilder {
self.exit_handler = Some(Box::new(exit_handler));
self
}

/// Takes the current thread main callback, leaving `None`.
fn take_main_handler(&mut self) -> Option<Box<MainHandler>> {
self.main_handler.take()
}

/// Set a callback to be invoked on thread main.
///
/// The closure is passed the index of the thread on which it is invoked.
/// Note that this same closure may be invoked multiple times in parallel.
/// If this closure panics, the panic will be passed to the panic handler.
pub fn main_handler<H>(mut self, main_handler: H) -> ThreadPoolBuilder
where H: Fn(usize, &mut FnMut()) + Send + Sync + 'static
{
self.main_handler = Some(Box::new(main_handler));
self
}
}

#[allow(deprecated)]
Expand Down Expand Up @@ -470,7 +519,8 @@ impl fmt::Debug for ThreadPoolBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let ThreadPoolBuilder { ref num_threads, ref get_thread_name,
ref panic_handler, ref stack_size,
ref start_handler, ref exit_handler,
ref deadlock_handler, ref start_handler,
ref exit_handler, ref main_handler,
ref breadth_first } = *self;

// Just print `Some(<closure>)` or `None` to the debug
Expand All @@ -483,16 +533,20 @@ impl fmt::Debug for ThreadPoolBuilder {
}
let get_thread_name = get_thread_name.as_ref().map(|_| ClosurePlaceholder);
let panic_handler = panic_handler.as_ref().map(|_| ClosurePlaceholder);
let deadlock_handler = deadlock_handler.as_ref().map(|_| ClosurePlaceholder);
let start_handler = start_handler.as_ref().map(|_| ClosurePlaceholder);
let exit_handler = exit_handler.as_ref().map(|_| ClosurePlaceholder);
let main_handler = main_handler.as_ref().map(|_| ClosurePlaceholder);

f.debug_struct("ThreadPoolBuilder")
.field("num_threads", num_threads)
.field("get_thread_name", &get_thread_name)
.field("panic_handler", &panic_handler)
.field("stack_size", &stack_size)
.field("deadlock_handler", &deadlock_handler)
.field("start_handler", &start_handler)
.field("exit_handler", &exit_handler)
.field("main_handler", &main_handler)
.field("breadth_first", &breadth_first)
.finish()
}
Expand Down
Loading