From fba8d10dc59213d173d93d3a2fda10807b4e6a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Sat, 16 Nov 2019 15:05:15 +0100 Subject: [PATCH] Implement test without relying on std Mutex/Condvar --- core/src/parking_lot.rs | 51 ++++++++++++++++++----------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/core/src/parking_lot.rs b/core/src/parking_lot.rs index fb7df0f5..15baa554 100644 --- a/core/src/parking_lot.rs +++ b/core/src/parking_lot.rs @@ -1396,8 +1396,8 @@ mod tests { use std::{ ptr, sync::{ - atomic::{AtomicIsize, AtomicPtr, Ordering}, - Arc, Condvar, Mutex, + atomic::{AtomicIsize, AtomicPtr, AtomicUsize, Ordering}, + Arc, }, thread, time::Duration, @@ -1488,8 +1488,7 @@ mod tests { struct SingleLatchTest { semaphore: AtomicIsize, - lock: Mutex, - condition: Condvar, + num_awake: AtomicUsize, /// Holds the pointer to the last *unprocessed* woken up thread. last_awoken: AtomicPtr, /// Total number of threads participating in this test. @@ -1501,8 +1500,7 @@ mod tests { Self { // This implements a fair (FIFO) semaphore, and it starts out unavailable. semaphore: AtomicIsize::new(0), - lock: Mutex::new(0), - condition: Condvar::new(), + num_awake: AtomicUsize::new(0), last_awoken: AtomicPtr::new(ptr::null_mut()), num_threads, } @@ -1513,12 +1511,9 @@ mod tests { self.down(); // Report back to the test verification code that this thread woke up - let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 1"); - *num_awake += 1; let this_thread_ptr = super::with_thread_data(|t| t as *const _ as *mut _); self.last_awoken.store(this_thread_ptr, Ordering::SeqCst); - std::mem::drop(num_awake); - self.condition.notify_one(); + self.num_awake.fetch_add(1, Ordering::SeqCst); } pub fn unpark_one(&self, single_unpark_index: usize) { @@ -1532,21 +1527,17 @@ mod tests { }); assert!(queue.len() <= self.num_threads - single_unpark_index); - // Lock before up() in order to guarantee we will reach condition.wait() below *before* - // the test thread reaches condition.notify_one()? - let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 3"); + let num_awake_before_up = self.num_awake.load(Ordering::SeqCst); self.up(); // Wait for a parked thread to wake up and update num_awake + last_awoken. - num_awake = self - .condition - .wait(num_awake) - .expect("Condvar::wait got poisoned lock 2"); - assert_eq!(*num_awake, single_unpark_index + 1); + while self.num_awake.load(Ordering::SeqCst) != num_awake_before_up + 1 { + thread::yield_now(); + } - let last_awoken = self.last_awoken.load(Ordering::SeqCst); // At this point the other thread should have set last_awoken inside the run() method + let last_awoken = self.last_awoken.load(Ordering::SeqCst); assert!(!last_awoken.is_null()); if !queue.is_empty() && queue[0] != last_awoken { panic!( @@ -1559,7 +1550,7 @@ mod tests { pub fn finish(&self, num_single_unparks: usize) { // The amount of threads not unparked via unpark_one - let mut num_threads_left = self.num_threads - num_single_unparks; + let mut num_threads_left = self.num_threads.checked_sub(num_single_unparks).unwrap(); // Wake remaining threads up with unpark_all. Has to be in a loop, because there might // still be threads that has not yet parked. @@ -1570,24 +1561,26 @@ mod tests { }); assert!(num_waiting_on_address <= num_threads_left); - let mut num_awake = self.lock.lock().expect("Test thread poisoned a lock 2"); - let num_awake_before_unpark = *num_awake; + let num_awake_before_unpark = self.num_awake.load(Ordering::SeqCst); let num_unparked = unsafe { super::unpark_all(self.semaphore_addr(), DEFAULT_UNPARK_TOKEN) }; assert!(num_unparked >= num_waiting_on_address); + assert!(num_unparked <= num_threads_left); - // Wait for all unparked parked thread to wake up and update num_awake + last_awoken. - while *num_awake < num_awake_before_unpark + num_unparked { - num_awake = self - .condition - .wait(num_awake) - .expect("Condvar::wait got poisoned lock 1"); + // Wait for all unparked threads to wake up and update num_awake + last_awoken. + while self.num_awake.load(Ordering::SeqCst) + != num_awake_before_unpark + num_unparked + { + thread::yield_now() } - num_threads_left -= num_unparked; + num_threads_left = num_threads_left.checked_sub(num_unparked).unwrap(); } + // By now, all threads should have been woken up + assert_eq!(self.num_awake.load(Ordering::SeqCst), self.num_threads); + // Make sure no thread is parked on our semaphore address let mut num_waiting_on_address = 0; for_each(self.semaphore_addr(), |_thread_data| { num_waiting_on_address += 1;