Skip to content

Commit

Permalink
Merge #746
Browse files Browse the repository at this point in the history
746: new scheduler from RFC 5 r=cuviper a=nikomatsakis

Implementation of the scheduler described in rayon-rs/rfcs#5 -- modulo the fact that the RFC is mildly out of date. There is a [walkthrough video available](https://youtu.be/HvmQsE5M4cY).

To Do List:

* [x] Fix the cargo lock
* [x] Address use of `AtomicU64`
* [x] Document the handling of rollover and wakeups and convince ourselves it's sound
* [ ] Adopt and document the [proposed scheme for the job event counter](#746 (comment))
* [ ] Review RFC and list out the places where it differs from the branch

Co-authored-by: Niko Matsakis <niko@alum.mit.edu>
Co-authored-by: Josh Stone <cuviper@gmail.com>
  • Loading branch information
3 people authored Aug 13, 2020
2 parents 97b7e34 + ed6a5f7 commit 09428ec
Show file tree
Hide file tree
Showing 11 changed files with 1,713 additions and 1,022 deletions.
307 changes: 159 additions & 148 deletions ci/compat-Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions rayon-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ categories = ["concurrency"]
[dependencies]
num_cpus = "1.2"
lazy_static = "1"
crossbeam-channel = "0.4.2"
crossbeam-deque = "0.7.2"
crossbeam-queue = "0.2"
crossbeam-utils = "0.7"
Expand Down
20 changes: 3 additions & 17 deletions rayon-core/src/join/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::job::StackJob;
use crate::latch::{LatchProbe, SpinLatch};
use crate::log::Event::*;
use crate::latch::SpinLatch;
use crate::registry::{self, WorkerThread};
use crate::unwind;
use std::any::Any;
Expand Down Expand Up @@ -131,14 +130,10 @@ where
}

registry::in_worker(|worker_thread, injected| unsafe {
log!(Join {
worker: worker_thread.index()
});

// Create virtual wrapper for task b; this all has to be
// done here so that the stack frame can keep it all live
// long enough.
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new());
let job_b = StackJob::new(call_b(oper_b), SpinLatch::new(worker_thread));
let job_b_ref = job_b.as_job_ref();
worker_thread.push(job_b_ref);

Expand All @@ -160,23 +155,14 @@ where
// Found it! Let's run it.
//
// Note that this could panic, but it's ok if we unwind here.
log!(PoppedRhs {
worker: worker_thread.index()
});
let result_b = job_b.run_inline(injected);
return (result_a, result_b);
} else {
log!(PoppedJob {
worker: worker_thread.index()
});
worker_thread.execute(job);
}
} else {
// Local deque is empty. Time to steal from other
// threads.
log!(LostJob {
worker: worker_thread.index()
});
worker_thread.wait_until(&job_b.latch);
debug_assert!(job_b.latch.probe());
break;
Expand All @@ -193,7 +179,7 @@ where
#[cold] // cold path
unsafe fn join_recover_from_panic(
worker_thread: &WorkerThread,
job_b_latch: &SpinLatch,
job_b_latch: &SpinLatch<'_>,
err: Box<dyn Any + Send>,
) -> ! {
worker_thread.wait_until(job_b_latch);
Expand Down
Loading

0 comments on commit 09428ec

Please sign in to comment.