Skip to content

Commit

Permalink
Implement test without relying on std Mutex/Condvar
Browse files Browse the repository at this point in the history
  • Loading branch information
faern committed Nov 16, 2019
1 parent 1f75375 commit fba8d10
Showing 1 changed file with 22 additions and 29 deletions.
51 changes: 22 additions & 29 deletions core/src/parking_lot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1396,8 +1396,8 @@ mod tests {
use std::{
ptr,
sync::{
atomic::{AtomicIsize, AtomicPtr, Ordering},
Arc, Condvar, Mutex,
atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering},
Arc,
},
thread,
time::Duration,
Expand Down Expand Up @@ -1488,8 +1488,7 @@ mod tests {

struct SingleLatchTest {
semaphore: AtomicIsize,
lock: Mutex<usize>,
condition: Condvar,
num_awake: AtomicUsize,
/// Holds the pointer to the last *unprocessed* woken up thread.
last_awoken: AtomicPtr<ThreadData>,
/// Total number of threads participating in this test.
Expand All @@ -1501,8 +1500,7 @@ mod tests {
Self {
// This implements a fair (FIFO) semaphore, and it starts out unavailable.
semaphore: AtomicIsize::new(0),
lock: Mutex::new(0),
condition: Condvar::new(),
num_awake: AtomicUsize::new(0),
last_awoken: AtomicPtr::new(ptr::null_mut()),
num_threads,
}
Expand All @@ -1513,12 +1511,9 @@ mod tests {
self.down();

// Report back to the test verification code that this thread woke up
let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 1");
*num_awake += 1;
let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _);
self.last_awoken.store(this_thread_ptr, Ordering::SeqCst);
std::mem::drop(num_awake);
self.condition.notify_one();
self.num_awake.fetch_add(1, Ordering::SeqCst);
}

pub fn unpark_one(&self, single_unpark_index: usize) {
Expand All @@ -1532,21 +1527,17 @@ mod tests {
});
assert!(queue.len() <= self.num_threads - single_unpark_index);

// Lock before up() in order to guarantee we will reach condition.wait() below *before*
// the test thread reaches condition.notify_one()?
let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 3");
let num_awake_before_up = self.num_awake.load(Ordering::SeqCst);

self.up();

// Wait for a parked thread to wake up and update num_awake + last_awoken.
num_awake = self
.condition
.wait(num_awake)
.expect("Condvar::wait got poisoned lock 2");
assert_eq!(*num_awake, single_unpark_index + 1);
while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 {
thread::yield_now();
}

let last_awoken = self.last_awoken.load(Ordering::SeqCst);
// At this point the other thread should have set last_awoken inside the run() method
let last_awoken = self.last_awoken.load(Ordering::SeqCst);
assert!(!last_awoken.is_null());
if !queue.is_empty() && queue[0] != last_awoken {
panic!(
Expand All @@ -1559,7 +1550,7 @@ mod tests {

pub fn finish(&self, num_single_unparks: usize) {
// The amount of threads not unparked via unpark_one
let mut num_threads_left = self.num_threads - num_single_unparks;
let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap();

// Wake remaining threads up with unpark_all. Has to be in a loop, because there might
// still be threads that has not yet parked.
Expand All @@ -1570,24 +1561,26 @@ mod tests {
});
assert!(num_waiting_on_address <= num_threads_left);

let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 2");
let num_awake_before_unpark = *num_awake;
let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst);

let num_unparked =
unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) };
assert!(num_unparked >= num_waiting_on_address);
assert!(num_unparked <= num_threads_left);

// Wait for all unparked parked thread to wake up and update num_awake + last_awoken.
while *num_awake < num_awake_before_unpark + num_unparked {
num_awake = self
.condition
.wait(num_awake)
.expect("Condvar::wait got poisoned lock 1");
// Wait for all unparked threads to wake up and update num_awake + last_awoken.
while self.num_awake.load(Ordering::SeqCst)
!= num_awake_before_unpark + num_unparked
{
thread::yield_now()
}

num_threads_left -= num_unparked;
num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap();
}
// By now, all threads should have been woken up
assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads);

// Make sure no thread is parked on our semaphore address
let mut num_waiting_on_address = 0;
for_each(self.semaphore_addr(), |_thread_data| {
num_waiting_on_address += 1;
Expand Down

0 comments on commit fba8d10

Please sign in to comment.