diff --git a/src/tools/miri/src/alloc_addresses/mod.rs b/src/tools/miri/src/alloc_addresses/mod.rs index 58241538795e3..9ec9ae317f4fb 100644 --- a/src/tools/miri/src/alloc_addresses/mod.rs +++ b/src/tools/miri/src/alloc_addresses/mod.rs @@ -169,7 +169,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { size, align, memory_kind, - ecx.get_active_thread(), + ecx.active_thread(), ) { if let Some(clock) = clock { ecx.acquire_clock(&clock); @@ -367,7 +367,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> { // `alloc_id_from_addr` any more. global_state.exposed.remove(&dead_id); // Also remember this address for future reuse. - let thread = self.threads.get_active_thread_id(); + let thread = self.threads.active_thread(); global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || { if let Some(data_race) = &self.data_race { data_race.release_clock(&self.threads).clone() diff --git a/src/tools/miri/src/concurrency/data_race.rs b/src/tools/miri/src/concurrency/data_race.rs index da6fa4f3405c5..f22dbf49e8b1e 100644 --- a/src/tools/miri/src/concurrency/data_race.rs +++ b/src/tools/miri/src/concurrency/data_race.rs @@ -839,7 +839,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> { fn acquire_clock(&self, clock: &VClock) { let this = self.eval_context_ref(); if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(clock, this.get_active_thread()); + data_race.acquire_clock(clock, &this.machine.threads); } } } @@ -1662,13 +1662,14 @@ impl GlobalState { /// This should be called strictly before any calls to /// `thread_joined`. #[inline] - pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) { + pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) { let current_index = self.active_thread_index(thread_mgr); // Increment the clock to a unique termination timestamp. let vector_clocks = self.vector_clocks.get_mut(); let current_clocks = &mut vector_clocks[current_index]; - current_clocks.increment_clock(current_index, current_span); + current_clocks + .increment_clock(current_index, thread_mgr.active_thread_ref().current_span()); // Load the current thread id for the executing vector. let vector_info = self.vector_info.get_mut(); @@ -1722,11 +1723,12 @@ impl GlobalState { format!("thread `{thread_name}`") } - /// Acquire the given clock into the given thread, establishing synchronization with + /// Acquire the given clock into the current thread, establishing synchronization with /// the moment when that clock snapshot was taken via `release_clock`. /// As this is an acquire operation, the thread timestamp is not /// incremented. - pub fn acquire_clock(&self, clock: &VClock, thread: ThreadId) { + pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) { + let thread = threads.active_thread(); let (_, mut clocks) = self.thread_state_mut(thread); clocks.clock.join(clock); } @@ -1738,7 +1740,7 @@ impl GlobalState { &self, threads: &ThreadManager<'mir, 'tcx>, ) -> Ref<'_, VClock> { - let thread = threads.get_active_thread_id(); + let thread = threads.active_thread(); let span = threads.active_thread_ref().current_span(); // We increment the clock each time this happens, to ensure no two releases // can be confused with each other. @@ -1782,7 +1784,7 @@ impl GlobalState { &self, thread_mgr: &ThreadManager<'_, '_>, ) -> (VectorIdx, Ref<'_, ThreadClockSet>) { - self.thread_state(thread_mgr.get_active_thread_id()) + self.thread_state(thread_mgr.active_thread()) } /// Load the current vector clock in use and the current set of thread clocks @@ -1792,14 +1794,14 @@ impl GlobalState { &self, thread_mgr: &ThreadManager<'_, '_>, ) -> (VectorIdx, RefMut<'_, ThreadClockSet>) { - self.thread_state_mut(thread_mgr.get_active_thread_id()) + self.thread_state_mut(thread_mgr.active_thread()) } /// Return the current thread, should be the same /// as the data-race active thread. #[inline] fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx { - let active_thread_id = thread_mgr.get_active_thread_id(); + let active_thread_id = thread_mgr.active_thread(); self.thread_index(active_thread_id) } diff --git a/src/tools/miri/src/concurrency/init_once.rs b/src/tools/miri/src/concurrency/init_once.rs index 6469c90c69325..1ee84273b58c7 100644 --- a/src/tools/miri/src/concurrency/init_once.rs +++ b/src/tools/miri/src/concurrency/init_once.rs @@ -4,29 +4,11 @@ use rustc_index::Idx; use rustc_middle::ty::layout::TyAndLayout; use super::sync::EvalContextExtPriv as _; -use super::thread::MachineCallback; use super::vector_clock::VClock; use crate::*; declare_id!(InitOnceId); -/// A thread waiting on an InitOnce object. -struct InitOnceWaiter<'mir, 'tcx> { - /// The thread that is waiting. - thread: ThreadId, - /// The callback that should be executed, after the thread has been woken up. - callback: Box + 'tcx>, -} - -impl<'mir, 'tcx> std::fmt::Debug for InitOnceWaiter<'mir, 'tcx> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InitOnce") - .field("thread", &self.thread) - .field("callback", &"dyn MachineCallback") - .finish() - } -} - #[derive(Default, Debug, Copy, Clone, PartialEq, Eq)] /// The current status of a one time initialization. pub enum InitOnceStatus { @@ -38,68 +20,14 @@ pub enum InitOnceStatus { /// The one time initialization state. #[derive(Default, Debug)] -pub(super) struct InitOnce<'mir, 'tcx> { +pub(super) struct InitOnce { status: InitOnceStatus, - waiters: VecDeque>, + waiters: VecDeque, clock: VClock, } -impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - for waiter in self.waiters.iter() { - waiter.callback.visit_provenance(visit); - } - } -} - impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { - /// Synchronize with the previous initialization attempt of an InitOnce. - #[inline] - fn init_once_observe_attempt(&mut self, id: InitOnceId) { - let this = self.eval_context_mut(); - let current_thread = this.get_active_thread(); - - if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(&this.machine.sync.init_onces[id].clock, current_thread); - } - } - - #[inline] - fn init_once_wake_waiter( - &mut self, - id: InitOnceId, - waiter: InitOnceWaiter<'mir, 'tcx>, - ) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - let current_thread = this.get_active_thread(); - - this.unblock_thread(waiter.thread, BlockReason::InitOnce(id)); - - // Call callback, with the woken-up thread as `current`. - this.set_active_thread(waiter.thread); - this.init_once_observe_attempt(id); - waiter.callback.call(this)?; - this.set_active_thread(current_thread); - - Ok(()) - } -} - -impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} -pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { - fn init_once_get_or_create_id( - &mut self, - lock_op: &OpTy<'tcx, Provenance>, - lock_layout: TyAndLayout<'tcx>, - offset: u64, - ) -> InterpResult<'tcx, InitOnceId> { - let this = self.eval_context_mut(); - this.init_once_get_or_create(|ecx, next_id| { - ecx.get_or_create_id(next_id, lock_op, lock_layout, offset) - }) - } - /// Provides the closure with the next InitOnceId. Creates that InitOnce if the closure returns None, /// otherwise returns the value from the closure. #[inline] @@ -120,6 +48,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { Ok(new_index) } } +} + +impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} +pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { + fn init_once_get_or_create_id( + &mut self, + lock_op: &OpTy<'tcx, Provenance>, + lock_layout: TyAndLayout<'tcx>, + offset: u64, + ) -> InterpResult<'tcx, InitOnceId> { + let this = self.eval_context_mut(); + this.init_once_get_or_create(|ecx, next_id| { + ecx.get_or_create_id(next_id, lock_op, lock_layout, offset) + }) + } #[inline] fn init_once_status(&mut self, id: InitOnceId) -> InitOnceStatus { @@ -132,14 +75,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn init_once_enqueue_and_block( &mut self, id: InitOnceId, - thread: ThreadId, - callback: Box + 'tcx>, + callback: impl UnblockCallback<'mir, 'tcx> + 'tcx, ) { let this = self.eval_context_mut(); + let thread = this.active_thread(); let init_once = &mut this.machine.sync.init_onces[id]; assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once"); - init_once.waiters.push_back(InitOnceWaiter { thread, callback }); - this.block_thread(thread, BlockReason::InitOnce(id)); + init_once.waiters.push_back(thread); + this.block_thread(BlockReason::InitOnce(id), None, callback); } /// Begin initializing this InitOnce. Must only be called after checking that it is currently @@ -177,7 +120,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // Wake up everyone. // need to take the queue to avoid having `this` be borrowed multiple times for waiter in std::mem::take(&mut init_once.waiters) { - this.init_once_wake_waiter(id, waiter)?; + this.unblock_thread(waiter, BlockReason::InitOnce(id))?; } Ok(()) @@ -192,6 +135,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { InitOnceStatus::Begun, "failing already completed or uninit init once" ); + // This is again uninitialized. + init_once.status = InitOnceStatus::Uninitialized; // Each complete happens-before the end of the wait if let Some(data_race) = &this.machine.data_race { @@ -200,10 +145,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // Wake up one waiting thread, so they can go ahead and try to init this. if let Some(waiter) = init_once.waiters.pop_front() { - this.init_once_wake_waiter(id, waiter)?; - } else { - // Nobody there to take this, so go back to 'uninit' - init_once.status = InitOnceStatus::Uninitialized; + this.unblock_thread(waiter, BlockReason::InitOnce(id))?; } Ok(()) @@ -221,6 +163,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { "observing the completion of incomplete init once" ); - this.init_once_observe_attempt(id); + this.acquire_clock(&this.machine.sync.init_onces[id].clock); } } diff --git a/src/tools/miri/src/concurrency/sync.rs b/src/tools/miri/src/concurrency/sync.rs index a498d9e0a5230..df10e15fe1015 100644 --- a/src/tools/miri/src/concurrency/sync.rs +++ b/src/tools/miri/src/concurrency/sync.rs @@ -111,19 +111,10 @@ struct RwLock { declare_id!(CondvarId); -/// A thread waiting on a conditional variable. -#[derive(Debug)] -struct CondvarWaiter { - /// The thread that is waiting on this variable. - thread: ThreadId, - /// The mutex on which the thread is waiting. - lock: MutexId, -} - /// The conditional variable state. #[derive(Default, Debug)] struct Condvar { - waiters: VecDeque, + waiters: VecDeque, /// Tracks the happens-before relationship /// between a cond-var signal and a cond-var /// wait during a non-spurious signal event. @@ -155,20 +146,12 @@ struct FutexWaiter { /// The state of all synchronization objects. #[derive(Default, Debug)] -pub struct SynchronizationObjects<'mir, 'tcx> { +pub struct SynchronizationObjects { mutexes: IndexVec, rwlocks: IndexVec, condvars: IndexVec, futexes: FxHashMap, - pub(super) init_onces: IndexVec>, -} - -impl<'mir, 'tcx> VisitProvenance for SynchronizationObjects<'mir, 'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - for init_once in self.init_onces.iter() { - init_once.visit_provenance(visit); - } - } + pub(super) init_onces: IndexVec, } // Private extension trait for local helper methods @@ -210,45 +193,69 @@ pub(super) trait EvalContextExtPriv<'mir, 'tcx: 'mir>: }) } - /// Take a reader out of the queue waiting for the lock. - /// Returns `true` if some thread got the rwlock. + /// Provides the closure with the next MutexId. Creates that mutex if the closure returns None, + /// otherwise returns the value from the closure. #[inline] - fn rwlock_dequeue_and_lock_reader(&mut self, id: RwLockId) -> bool { + fn mutex_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, MutexId> + where + F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option>, + { let this = self.eval_context_mut(); - if let Some(reader) = this.machine.sync.rwlocks[id].reader_queue.pop_front() { - this.unblock_thread(reader, BlockReason::RwLock(id)); - this.rwlock_reader_lock(id, reader); - true + let next_index = this.machine.sync.mutexes.next_index(); + if let Some(old) = existing(this, next_index)? { + if this.machine.sync.mutexes.get(old).is_none() { + throw_ub_format!("mutex has invalid ID"); + } + Ok(old) } else { - false + let new_index = this.machine.sync.mutexes.push(Default::default()); + assert_eq!(next_index, new_index); + Ok(new_index) } } - /// Take the writer out of the queue waiting for the lock. - /// Returns `true` if some thread got the rwlock. + /// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None, + /// otherwise returns the value from the closure. #[inline] - fn rwlock_dequeue_and_lock_writer(&mut self, id: RwLockId) -> bool { + fn rwlock_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, RwLockId> + where + F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option>, + { let this = self.eval_context_mut(); - if let Some(writer) = this.machine.sync.rwlocks[id].writer_queue.pop_front() { - this.unblock_thread(writer, BlockReason::RwLock(id)); - this.rwlock_writer_lock(id, writer); - true + let next_index = this.machine.sync.rwlocks.next_index(); + if let Some(old) = existing(this, next_index)? { + if this.machine.sync.rwlocks.get(old).is_none() { + throw_ub_format!("rwlock has invalid ID"); + } + Ok(old) } else { - false + let new_index = this.machine.sync.rwlocks.push(Default::default()); + assert_eq!(next_index, new_index); + Ok(new_index) } } - /// Take a thread out of the queue waiting for the mutex, and lock - /// the mutex for it. Returns `true` if some thread has the mutex now. + /// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None, + /// otherwise returns the value from the closure. #[inline] - fn mutex_dequeue_and_lock(&mut self, id: MutexId) -> bool { + fn condvar_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, CondvarId> + where + F: FnOnce( + &mut MiriInterpCx<'mir, 'tcx>, + CondvarId, + ) -> InterpResult<'tcx, Option>, + { let this = self.eval_context_mut(); - if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() { - this.unblock_thread(thread, BlockReason::Mutex(id)); - this.mutex_lock(id, thread); - true + let next_index = this.machine.sync.condvars.next_index(); + if let Some(old) = existing(this, next_index)? { + if this.machine.sync.condvars.get(old).is_none() { + throw_ub_format!("condvar has invalid ID"); + } + Ok(old) } else { - false + let new_index = this.machine.sync.condvars.push(Default::default()); + assert_eq!(next_index, new_index); + Ok(new_index) } } } @@ -295,27 +302,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { }) } - #[inline] - /// Provides the closure with the next MutexId. Creates that mutex if the closure returns None, - /// otherwise returns the value from the closure - fn mutex_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, MutexId> - where - F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, MutexId) -> InterpResult<'tcx, Option>, - { - let this = self.eval_context_mut(); - let next_index = this.machine.sync.mutexes.next_index(); - if let Some(old) = existing(this, next_index)? { - if this.machine.sync.mutexes.get(old).is_none() { - throw_ub_format!("mutex has invalid ID"); - } - Ok(old) - } else { - let new_index = this.machine.sync.mutexes.push(Default::default()); - assert_eq!(next_index, new_index); - Ok(new_index) - } - } - #[inline] /// Get the id of the thread that currently owns this lock. fn mutex_get_owner(&mut self, id: MutexId) -> ThreadId { @@ -331,8 +317,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } /// Lock by setting the mutex owner and increasing the lock count. - fn mutex_lock(&mut self, id: MutexId, thread: ThreadId) { + fn mutex_lock(&mut self, id: MutexId) { let this = self.eval_context_mut(); + let thread = this.active_thread(); let mutex = &mut this.machine.sync.mutexes[id]; if let Some(current_owner) = mutex.owner { assert_eq!(thread, current_owner, "mutex already locked by another thread"); @@ -345,7 +332,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } mutex.lock_count = mutex.lock_count.checked_add(1).unwrap(); if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(&mutex.clock, thread); + data_race.acquire_clock(&mutex.clock, &this.machine.threads); } } @@ -353,14 +340,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { /// count. If the lock count reaches 0, release the lock and potentially /// give to a new owner. If the lock was not locked by the current thread, /// return `None`. - fn mutex_unlock(&mut self, id: MutexId) -> Option { + fn mutex_unlock(&mut self, id: MutexId) -> InterpResult<'tcx, Option> { let this = self.eval_context_mut(); let mutex = &mut this.machine.sync.mutexes[id]; - if let Some(current_owner) = mutex.owner { + Ok(if let Some(current_owner) = mutex.owner { // Mutex is locked. - if current_owner != this.machine.threads.get_active_thread_id() { + if current_owner != this.machine.threads.active_thread() { // Only the owner can unlock the mutex. - return None; + return Ok(None); } let old_lock_count = mutex.lock_count; mutex.lock_count = old_lock_count @@ -373,42 +360,52 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { if let Some(data_race) = &this.machine.data_race { mutex.clock.clone_from(&data_race.release_clock(&this.machine.threads)); } - this.mutex_dequeue_and_lock(id); + if let Some(thread) = this.machine.sync.mutexes[id].queue.pop_front() { + this.unblock_thread(thread, BlockReason::Mutex(id))?; + } } Some(old_lock_count) } else { // Mutex is not locked. None - } + }) } /// Put the thread into the queue waiting for the mutex. + /// Once the Mutex becomes available, `retval` will be written to `dest`. #[inline] - fn mutex_enqueue_and_block(&mut self, id: MutexId, thread: ThreadId) { + fn mutex_enqueue_and_block( + &mut self, + id: MutexId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + ) { let this = self.eval_context_mut(); assert!(this.mutex_is_locked(id), "queing on unlocked mutex"); + let thread = this.active_thread(); this.machine.sync.mutexes[id].queue.push_back(thread); - this.block_thread(thread, BlockReason::Mutex(id)); - } + this.block_thread(BlockReason::Mutex(id), None, Callback { id, retval, dest }); - /// Provides the closure with the next RwLockId. Creates that RwLock if the closure returns None, - /// otherwise returns the value from the closure - #[inline] - fn rwlock_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, RwLockId> - where - F: FnOnce(&mut MiriInterpCx<'mir, 'tcx>, RwLockId) -> InterpResult<'tcx, Option>, - { - let this = self.eval_context_mut(); - let next_index = this.machine.sync.rwlocks.next_index(); - if let Some(old) = existing(this, next_index)? { - if this.machine.sync.rwlocks.get(old).is_none() { - throw_ub_format!("rwlock has invalid ID"); + struct Callback<'tcx> { + id: MutexId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { id: _, retval, dest } = self; + retval.visit_provenance(visit); + dest.visit_provenance(visit); + } + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + assert!(!this.mutex_is_locked(self.id)); + this.mutex_lock(self.id); + + this.write_scalar(self.retval, &self.dest)?; + Ok(()) } - Ok(old) - } else { - let new_index = this.machine.sync.rwlocks.push(Default::default()); - assert_eq!(next_index, new_index); - Ok(new_index) } } @@ -437,23 +434,24 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { /// Read-lock the lock by adding the `reader` the list of threads that own /// this lock. - fn rwlock_reader_lock(&mut self, id: RwLockId, reader: ThreadId) { + fn rwlock_reader_lock(&mut self, id: RwLockId) { let this = self.eval_context_mut(); + let thread = this.active_thread(); assert!(!this.rwlock_is_write_locked(id), "the lock is write locked"); - trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, reader); + trace!("rwlock_reader_lock: {:?} now also held (one more time) by {:?}", id, thread); let rwlock = &mut this.machine.sync.rwlocks[id]; - let count = rwlock.readers.entry(reader).or_insert(0); + let count = rwlock.readers.entry(thread).or_insert(0); *count = count.checked_add(1).expect("the reader counter overflowed"); if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(&rwlock.clock_unlocked, reader); + data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads); } } /// Try read-unlock the lock for the current threads and potentially give the lock to a new owner. /// Returns `true` if succeeded, `false` if this `reader` did not hold the lock. - fn rwlock_reader_unlock(&mut self, id: RwLockId) -> bool { + fn rwlock_reader_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> { let this = self.eval_context_mut(); - let thread = this.get_active_thread(); + let thread = this.active_thread(); let rwlock = &mut this.machine.sync.rwlocks[id]; match rwlock.readers.entry(thread) { Entry::Occupied(mut entry) => { @@ -467,7 +465,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { trace!("rwlock_reader_unlock: {:?} held one less time by {:?}", id, thread); } } - Entry::Vacant(_) => return false, // we did not even own this lock + Entry::Vacant(_) => return Ok(false), // we did not even own this lock } if let Some(data_race) = &this.machine.data_race { // Add this to the shared-release clock of all concurrent readers. @@ -481,48 +479,79 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // happen-before the writers let rwlock = &mut this.machine.sync.rwlocks[id]; rwlock.clock_unlocked.clone_from(&rwlock.clock_current_readers); - this.rwlock_dequeue_and_lock_writer(id); + // See if there is a thread to unblock. + if let Some(writer) = rwlock.writer_queue.pop_front() { + this.unblock_thread(writer, BlockReason::RwLock(id))?; + } } - true + Ok(true) } /// Put the reader in the queue waiting for the lock and block it. + /// Once the lock becomes available, `retval` will be written to `dest`. #[inline] - fn rwlock_enqueue_and_block_reader(&mut self, id: RwLockId, reader: ThreadId) { + fn rwlock_enqueue_and_block_reader( + &mut self, + id: RwLockId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + ) { let this = self.eval_context_mut(); + let thread = this.active_thread(); assert!(this.rwlock_is_write_locked(id), "read-queueing on not write locked rwlock"); - this.machine.sync.rwlocks[id].reader_queue.push_back(reader); - this.block_thread(reader, BlockReason::RwLock(id)); + this.machine.sync.rwlocks[id].reader_queue.push_back(thread); + this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest }); + + struct Callback<'tcx> { + id: RwLockId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { id: _, retval, dest } = self; + retval.visit_provenance(visit); + dest.visit_provenance(visit); + } + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + this.rwlock_reader_lock(self.id); + this.write_scalar(self.retval, &self.dest)?; + Ok(()) + } + } } /// Lock by setting the writer that owns the lock. #[inline] - fn rwlock_writer_lock(&mut self, id: RwLockId, writer: ThreadId) { + fn rwlock_writer_lock(&mut self, id: RwLockId) { let this = self.eval_context_mut(); + let thread = this.active_thread(); assert!(!this.rwlock_is_locked(id), "the rwlock is already locked"); - trace!("rwlock_writer_lock: {:?} now held by {:?}", id, writer); + trace!("rwlock_writer_lock: {:?} now held by {:?}", id, thread); let rwlock = &mut this.machine.sync.rwlocks[id]; - rwlock.writer = Some(writer); + rwlock.writer = Some(thread); if let Some(data_race) = &this.machine.data_race { - data_race.acquire_clock(&rwlock.clock_unlocked, writer); + data_race.acquire_clock(&rwlock.clock_unlocked, &this.machine.threads); } } /// Try to unlock an rwlock held by the current thread. /// Return `false` if it is held by another thread. #[inline] - fn rwlock_writer_unlock(&mut self, id: RwLockId) -> bool { + fn rwlock_writer_unlock(&mut self, id: RwLockId) -> InterpResult<'tcx, bool> { let this = self.eval_context_mut(); - let thread = this.get_active_thread(); + let thread = this.active_thread(); let rwlock = &mut this.machine.sync.rwlocks[id]; - if let Some(current_writer) = rwlock.writer { + Ok(if let Some(current_writer) = rwlock.writer { if current_writer != thread { // Only the owner can unlock the rwlock. - return false; + return Ok(false); } rwlock.writer = None; trace!("rwlock_writer_unlock: {:?} unlocked by {:?}", id, thread); - // Release memory to next lock holder. + // Record release clock for next lock holder. if let Some(data_race) = &this.machine.data_race { rwlock.clock_unlocked.clone_from(&*data_race.release_clock(&this.machine.threads)); } @@ -531,50 +560,54 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // We are prioritizing writers here against the readers. As a // result, not only readers can starve writers, but also writers can // starve readers. - if this.rwlock_dequeue_and_lock_writer(id) { - // Someone got the write lock, nice. + if let Some(writer) = rwlock.writer_queue.pop_front() { + this.unblock_thread(writer, BlockReason::RwLock(id))?; } else { - // Give the lock to all readers. - while this.rwlock_dequeue_and_lock_reader(id) { - // Rinse and repeat. + // Take the entire read queue and wake them all up. + let readers = std::mem::take(&mut rwlock.reader_queue); + for reader in readers { + this.unblock_thread(reader, BlockReason::RwLock(id))?; } } true } else { false - } + }) } /// Put the writer in the queue waiting for the lock. + /// Once the lock becomes available, `retval` will be written to `dest`. #[inline] - fn rwlock_enqueue_and_block_writer(&mut self, id: RwLockId, writer: ThreadId) { + fn rwlock_enqueue_and_block_writer( + &mut self, + id: RwLockId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + ) { let this = self.eval_context_mut(); assert!(this.rwlock_is_locked(id), "write-queueing on unlocked rwlock"); - this.machine.sync.rwlocks[id].writer_queue.push_back(writer); - this.block_thread(writer, BlockReason::RwLock(id)); - } - - /// Provides the closure with the next CondvarId. Creates that Condvar if the closure returns None, - /// otherwise returns the value from the closure - #[inline] - fn condvar_get_or_create(&mut self, existing: F) -> InterpResult<'tcx, CondvarId> - where - F: FnOnce( - &mut MiriInterpCx<'mir, 'tcx>, - CondvarId, - ) -> InterpResult<'tcx, Option>, - { - let this = self.eval_context_mut(); - let next_index = this.machine.sync.condvars.next_index(); - if let Some(old) = existing(this, next_index)? { - if this.machine.sync.condvars.get(old).is_none() { - throw_ub_format!("condvar has invalid ID"); + let thread = this.active_thread(); + this.machine.sync.rwlocks[id].writer_queue.push_back(thread); + this.block_thread(BlockReason::RwLock(id), None, Callback { id, retval, dest }); + + struct Callback<'tcx> { + id: RwLockId, + retval: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { id: _, retval, dest } = self; + retval.visit_provenance(visit); + dest.visit_provenance(visit); + } + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + this.rwlock_writer_lock(self.id); + this.write_scalar(self.retval, &self.dest)?; + Ok(()) } - Ok(old) - } else { - let new_index = this.machine.sync.condvars.push(Default::default()); - assert_eq!(next_index, new_index); - Ok(new_index) } } @@ -585,17 +618,106 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { !this.machine.sync.condvars[id].waiters.is_empty() } - /// Mark that the thread is waiting on the conditional variable. - fn condvar_wait(&mut self, id: CondvarId, thread: ThreadId, lock: MutexId) { + /// Release the mutex and let the current thread wait on the given condition variable. + /// Once it is signaled, the mutex will be acquired and `retval_succ` will be written to `dest`. + /// If the timeout happens first, `retval_timeout` will be written to `dest`. + fn condvar_wait( + &mut self, + condvar: CondvarId, + mutex: MutexId, + timeout: Option, + retval_succ: Scalar, + retval_timeout: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let waiters = &mut this.machine.sync.condvars[id].waiters; - assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); - waiters.push_back(CondvarWaiter { thread, lock }); + if let Some(old_locked_count) = this.mutex_unlock(mutex)? { + if old_locked_count != 1 { + throw_unsup_format!( + "awaiting a condvar on a mutex acquired multiple times is not supported" + ); + } + } else { + throw_ub_format!( + "awaiting a condvar on a mutex that is unlocked or owned by a different thread" + ); + } + let thread = this.active_thread(); + let waiters = &mut this.machine.sync.condvars[condvar].waiters; + waiters.push_back(thread); + this.block_thread( + BlockReason::Condvar(condvar), + timeout, + Callback { condvar, mutex, retval_succ, retval_timeout, dest }, + ); + return Ok(()); + + struct Callback<'tcx> { + condvar: CondvarId, + mutex: MutexId, + retval_succ: Scalar, + retval_timeout: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { condvar: _, mutex: _, retval_succ, retval_timeout, dest } = self; + retval_succ.visit_provenance(visit); + retval_timeout.visit_provenance(visit); + dest.visit_provenance(visit); + } + } + impl<'tcx, 'mir> Callback<'tcx> { + #[allow(clippy::boxed_local)] + fn reacquire_mutex( + self: Box, + this: &mut MiriInterpCx<'mir, 'tcx>, + retval: Scalar, + ) -> InterpResult<'tcx> { + if this.mutex_is_locked(self.mutex) { + assert_ne!(this.mutex_get_owner(self.mutex), this.active_thread()); + this.mutex_enqueue_and_block(self.mutex, retval, self.dest); + } else { + // We can have it right now! + this.mutex_lock(self.mutex); + // Don't forget to write the return value. + this.write_scalar(retval, &self.dest)?; + } + Ok(()) + } + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + // The condvar was signaled. Make sure we get the clock for that. + if let Some(data_race) = &this.machine.data_race { + data_race.acquire_clock( + &this.machine.sync.condvars[self.condvar].clock, + &this.machine.threads, + ); + } + // Try to acquire the mutex. + // The timeout only applies to the first wait (until the signal), not for mutex acquisition. + let retval = self.retval_succ; + self.reacquire_mutex(this, retval) + } + fn timeout( + self: Box, + this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>, + ) -> InterpResult<'tcx> { + // We have to remove the waiter from the queue again. + let thread = this.active_thread(); + let waiters = &mut this.machine.sync.condvars[self.condvar].waiters; + waiters.retain(|waiter| *waiter != thread); + // Now get back the lock. + let retval = self.retval_timeout; + self.reacquire_mutex(this, retval) + } + } } /// Wake up some thread (if there is any) sleeping on the conditional - /// variable. - fn condvar_signal(&mut self, id: CondvarId) -> Option<(ThreadId, MutexId)> { + /// variable. Returns `true` iff any thread was woken up. + fn condvar_signal(&mut self, id: CondvarId) -> InterpResult<'tcx, bool> { let this = self.eval_context_mut(); let condvar = &mut this.machine.sync.condvars[id]; let data_race = &this.machine.data_race; @@ -604,32 +726,87 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { if let Some(data_race) = data_race { condvar.clock.clone_from(&*data_race.release_clock(&this.machine.threads)); } - condvar.waiters.pop_front().map(|waiter| { - if let Some(data_race) = data_race { - data_race.acquire_clock(&condvar.clock, waiter.thread); - } - (waiter.thread, waiter.lock) - }) - } - - #[inline] - /// Remove the thread from the queue of threads waiting on this conditional variable. - fn condvar_remove_waiter(&mut self, id: CondvarId, thread: ThreadId) { - let this = self.eval_context_mut(); - this.machine.sync.condvars[id].waiters.retain(|waiter| waiter.thread != thread); + let Some(waiter) = condvar.waiters.pop_front() else { + return Ok(false); + }; + this.unblock_thread(waiter, BlockReason::Condvar(id))?; + Ok(true) } - fn futex_wait(&mut self, addr: u64, thread: ThreadId, bitset: u32) { + /// Wait for the futex to be signaled, or a timeout. + /// On a signal, `retval_succ` is written to `dest`. + /// On a timeout, `retval_timeout` is written to `dest` and `errno_timeout` is set as the last error. + fn futex_wait( + &mut self, + addr: u64, + bitset: u32, + timeout: Option, + retval_succ: Scalar, + retval_timeout: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + errno_timeout: Scalar, + ) { let this = self.eval_context_mut(); + let thread = this.active_thread(); let futex = &mut this.machine.sync.futexes.entry(addr).or_default(); let waiters = &mut futex.waiters; assert!(waiters.iter().all(|waiter| waiter.thread != thread), "thread is already waiting"); waiters.push_back(FutexWaiter { thread, bitset }); + this.block_thread( + BlockReason::Futex { addr }, + timeout, + Callback { addr, retval_succ, retval_timeout, dest, errno_timeout }, + ); + + struct Callback<'tcx> { + addr: u64, + retval_succ: Scalar, + retval_timeout: Scalar, + dest: MPlaceTy<'tcx, Provenance>, + errno_timeout: Scalar, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { addr: _, retval_succ, retval_timeout, dest, errno_timeout } = self; + retval_succ.visit_provenance(visit); + retval_timeout.visit_provenance(visit); + dest.visit_provenance(visit); + errno_timeout.visit_provenance(visit); + } + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + let futex = this.machine.sync.futexes.get(&self.addr).unwrap(); + // Acquire the clock of the futex. + if let Some(data_race) = &this.machine.data_race { + data_race.acquire_clock(&futex.clock, &this.machine.threads); + } + // Write the return value. + this.write_scalar(self.retval_succ, &self.dest)?; + Ok(()) + } + fn timeout( + self: Box, + this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>, + ) -> InterpResult<'tcx> { + // Remove the waiter from the futex. + let thread = this.active_thread(); + let futex = this.machine.sync.futexes.get_mut(&self.addr).unwrap(); + futex.waiters.retain(|waiter| waiter.thread != thread); + // Set errno and write return value. + this.set_last_error(self.errno_timeout)?; + this.write_scalar(self.retval_timeout, &self.dest)?; + Ok(()) + } + } } - fn futex_wake(&mut self, addr: u64, bitset: u32) -> Option { + /// Returns whether anything was woken. + fn futex_wake(&mut self, addr: u64, bitset: u32) -> InterpResult<'tcx, bool> { let this = self.eval_context_mut(); - let futex = &mut this.machine.sync.futexes.get_mut(&addr)?; + let Some(futex) = this.machine.sync.futexes.get_mut(&addr) else { + return Ok(false); + }; let data_race = &this.machine.data_race; // Each futex-wake happens-before the end of the futex wait @@ -638,19 +815,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } // Wake up the first thread in the queue that matches any of the bits in the bitset. - futex.waiters.iter().position(|w| w.bitset & bitset != 0).map(|i| { - let waiter = futex.waiters.remove(i).unwrap(); - if let Some(data_race) = data_race { - data_race.acquire_clock(&futex.clock, waiter.thread); - } - waiter.thread - }) - } - - fn futex_remove_waiter(&mut self, addr: u64, thread: ThreadId) { - let this = self.eval_context_mut(); - if let Some(futex) = this.machine.sync.futexes.get_mut(&addr) { - futex.waiters.retain(|waiter| waiter.thread != thread); - } + let Some(i) = futex.waiters.iter().position(|w| w.bitset & bitset != 0) else { + return Ok(false); + }; + let waiter = futex.waiters.remove(i).unwrap(); + this.unblock_thread(waiter.thread, BlockReason::Futex { addr })?; + Ok(true) } } diff --git a/src/tools/miri/src/concurrency/thread.rs b/src/tools/miri/src/concurrency/thread.rs index cdc9cba664636..43aac8a3243f7 100644 --- a/src/tools/miri/src/concurrency/thread.rs +++ b/src/tools/miri/src/concurrency/thread.rs @@ -1,7 +1,6 @@ //! Implements threads. -use std::cell::RefCell; -use std::collections::hash_map::Entry; +use std::mem; use std::num::TryFromIntError; use std::sync::atomic::Ordering::Relaxed; use std::task::Poll; @@ -41,12 +40,23 @@ pub enum TlsAllocAction { Leak, } -/// Trait for callbacks that can be executed when some event happens, such as after a timeout. -pub trait MachineCallback<'mir, 'tcx>: VisitProvenance { - fn call(&self, ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>) -> InterpResult<'tcx>; -} +/// Trait for callbacks that are executed when a thread gets unblocked. +pub trait UnblockCallback<'mir, 'tcx>: VisitProvenance { + fn unblock( + self: Box, + ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>, + ) -> InterpResult<'tcx>; -type TimeoutCallback<'mir, 'tcx> = Box + 'tcx>; + fn timeout( + self: Box, + _ecx: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>, + ) -> InterpResult<'tcx> { + unreachable!( + "timeout on a thread that was blocked without a timeout (or someone forgot to overwrite this method)" + ) + } +} +type DynUnblockCallback<'mir, 'tcx> = Box + 'tcx>; /// A thread identifier. #[derive(Clone, Copy, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] @@ -117,17 +127,45 @@ pub enum BlockReason { } /// The state of a thread. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -pub enum ThreadState { +enum ThreadState<'mir, 'tcx> { /// The thread is enabled and can be executed. Enabled, /// The thread is blocked on something. - Blocked(BlockReason), + Blocked { + reason: BlockReason, + timeout: Option, + callback: DynUnblockCallback<'mir, 'tcx>, + }, /// The thread has terminated its execution. We do not delete terminated /// threads (FIXME: why?). Terminated, } +impl<'mir, 'tcx> std::fmt::Debug for ThreadState<'mir, 'tcx> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Enabled => write!(f, "Enabled"), + Self::Blocked { reason, timeout, .. } => + f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(), + Self::Terminated => write!(f, "Terminated"), + } + } +} + +impl<'mir, 'tcx> ThreadState<'mir, 'tcx> { + fn is_enabled(&self) -> bool { + matches!(self, ThreadState::Enabled) + } + + fn is_terminated(&self) -> bool { + matches!(self, ThreadState::Terminated) + } + + fn is_blocked_on(&self, reason: BlockReason) -> bool { + matches!(*self, ThreadState::Blocked { reason: actual_reason, .. } if actual_reason == reason) + } +} + /// The join status of a thread. #[derive(Debug, Copy, Clone, PartialEq, Eq)] enum ThreadJoinStatus { @@ -142,7 +180,7 @@ enum ThreadJoinStatus { /// A thread. pub struct Thread<'mir, 'tcx> { - state: ThreadState, + state: ThreadState<'mir, 'tcx>, /// Name of the thread. thread_name: Option>, @@ -323,41 +361,24 @@ impl VisitProvenance for Frame<'_, '_, Provenance, FrameExtra<'_>> { } } -/// A specific moment in time. +/// The moment in time when a blocked thread should be woken up. #[derive(Debug)] -pub enum CallbackTime { +pub enum Timeout { Monotonic(Instant), RealTime(SystemTime), } -impl CallbackTime { +impl Timeout { /// How long do we have to wait from now until the specified time? fn get_wait_time(&self, clock: &Clock) -> Duration { match self { - CallbackTime::Monotonic(instant) => instant.duration_since(clock.now()), - CallbackTime::RealTime(time) => - time.duration_since(SystemTime::now()).unwrap_or(Duration::new(0, 0)), + Timeout::Monotonic(instant) => instant.duration_since(clock.now()), + Timeout::RealTime(time) => + time.duration_since(SystemTime::now()).unwrap_or(Duration::ZERO), } } } -/// Callbacks are used to implement timeouts. For example, waiting on a -/// conditional variable with a timeout creates a callback that is called after -/// the specified time and unblocks the thread. If another thread signals on the -/// conditional variable, the signal handler deletes the callback. -struct TimeoutCallbackInfo<'mir, 'tcx> { - /// The callback should be called no earlier than this time. - call_time: CallbackTime, - /// The called function. - callback: TimeoutCallback<'mir, 'tcx>, -} - -impl<'mir, 'tcx> std::fmt::Debug for TimeoutCallbackInfo<'mir, 'tcx> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "TimeoutCallback({:?})", self.call_time) - } -} - /// A set of threads. #[derive(Debug)] pub struct ThreadManager<'mir, 'tcx> { @@ -369,11 +390,9 @@ pub struct ThreadManager<'mir, 'tcx> { threads: IndexVec>, /// A mapping from a thread-local static to an allocation id of a thread /// specific allocation. - thread_local_alloc_ids: RefCell>>, + thread_local_alloc_ids: FxHashMap<(DefId, ThreadId), Pointer>, /// A flag that indicates that we should change the active thread. yield_active_thread: bool, - /// Callbacks that are called once the specified time passes. - timeout_callbacks: FxHashMap>, } impl VisitProvenance for ThreadManager<'_, '_> { @@ -381,7 +400,6 @@ impl VisitProvenance for ThreadManager<'_, '_> { let ThreadManager { threads, thread_local_alloc_ids, - timeout_callbacks, active_thread: _, yield_active_thread: _, } = self; @@ -389,12 +407,9 @@ impl VisitProvenance for ThreadManager<'_, '_> { for thread in threads { thread.visit_provenance(visit); } - for ptr in thread_local_alloc_ids.borrow().values() { + for ptr in thread_local_alloc_ids.values() { ptr.visit_provenance(visit); } - for callback in timeout_callbacks.values() { - callback.callback.visit_provenance(visit); - } } } @@ -408,7 +423,6 @@ impl<'mir, 'tcx> Default for ThreadManager<'mir, 'tcx> { threads, thread_local_alloc_ids: Default::default(), yield_active_thread: false, - timeout_callbacks: FxHashMap::default(), } } } @@ -430,18 +444,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { /// Check if we have an allocation for the given thread local static for the /// active thread. fn get_thread_local_alloc_id(&self, def_id: DefId) -> Option> { - self.thread_local_alloc_ids.borrow().get(&(def_id, self.active_thread)).cloned() + self.thread_local_alloc_ids.get(&(def_id, self.active_thread)).cloned() } /// Set the pointer for the allocation of the given thread local /// static for the active thread. /// /// Panics if a thread local is initialized twice for the same thread. - fn set_thread_local_alloc(&self, def_id: DefId, ptr: Pointer) { - self.thread_local_alloc_ids - .borrow_mut() - .try_insert((def_id, self.active_thread), ptr) - .unwrap(); + fn set_thread_local_alloc(&mut self, def_id: DefId, ptr: Pointer) { + self.thread_local_alloc_ids.try_insert((def_id, self.active_thread), ptr).unwrap(); } /// Borrow the stack of the active thread. @@ -480,7 +491,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } /// Get the id of the currently active thread. - pub fn get_active_thread_id(&self) -> ThreadId { + pub fn active_thread(&self) -> ThreadId { self.active_thread } @@ -492,17 +503,17 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { /// Get the total of threads that are currently live, i.e., not yet terminated. /// (They might be blocked.) pub fn get_live_thread_count(&self) -> usize { - self.threads.iter().filter(|t| !matches!(t.state, ThreadState::Terminated)).count() + self.threads.iter().filter(|t| !t.state.is_terminated()).count() } /// Has the given thread terminated? fn has_terminated(&self, thread_id: ThreadId) -> bool { - self.threads[thread_id].state == ThreadState::Terminated + self.threads[thread_id].state.is_terminated() } /// Have all threads terminated? fn have_all_terminated(&self) -> bool { - self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) + self.threads.iter().all(|thread| thread.state.is_terminated()) } /// Enable the thread for execution. The thread must be terminated. @@ -532,8 +543,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { fn detach_thread(&mut self, id: ThreadId, allow_terminated_joined: bool) -> InterpResult<'tcx> { trace!("detaching {:?}", id); - let is_ub = if allow_terminated_joined && self.threads[id].state == ThreadState::Terminated - { + let is_ub = if allow_terminated_joined && self.threads[id].state.is_terminated() { // "Detached" in particular means "not yet joined". Redundant detaching is still UB. self.threads[id].join_status == ThreadJoinStatus::Detached } else { @@ -561,15 +571,41 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { // Mark the joined thread as being joined so that we detect if other // threads try to join it. self.threads[joined_thread_id].join_status = ThreadJoinStatus::Joined; - if self.threads[joined_thread_id].state != ThreadState::Terminated { - // The joined thread is still running, we need to wait for it. - self.active_thread_mut().state = - ThreadState::Blocked(BlockReason::Join(joined_thread_id)); + if !self.threads[joined_thread_id].state.is_terminated() { trace!( "{:?} blocked on {:?} when trying to join", self.active_thread, joined_thread_id ); + // The joined thread is still running, we need to wait for it. + // Unce we get unblocked, perform the appropriate synchronization. + self.block_thread( + BlockReason::Join(joined_thread_id), + None, + Callback { joined_thread_id }, + ); + + struct Callback { + joined_thread_id: ThreadId, + } + impl VisitProvenance for Callback { + fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {} + } + impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for Callback { + fn unblock( + self: Box, + this: &mut MiriInterpCx<'mir, 'tcx>, + ) -> InterpResult<'tcx> { + if let Some(data_race) = &mut this.machine.data_race { + data_race.thread_joined( + &this.machine.threads, + this.machine.threads.active_thread(), + self.joined_thread_id, + ); + } + Ok(()) + } + } } else { // The thread has already terminated - mark join happens-before if let Some(data_race) = data_race { @@ -596,9 +632,9 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { // Sanity check `join_status`. assert!( - self.threads.iter().all(|thread| { - thread.state != ThreadState::Blocked(BlockReason::Join(joined_thread_id)) - }), + self.threads + .iter() + .all(|thread| { !thread.state.is_blocked_on(BlockReason::Join(joined_thread_id)) }), "this thread already has threads waiting for its termination" ); @@ -620,18 +656,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } /// Put the thread into the blocked state. - fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) { - let state = &mut self.threads[thread].state; - assert_eq!(*state, ThreadState::Enabled); - *state = ThreadState::Blocked(reason); - } - - /// Put the blocked thread into the enabled state. - /// Sanity-checks that the thread previously was blocked for the right reason. - fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) { - let state = &mut self.threads[thread].state; - assert_eq!(*state, ThreadState::Blocked(reason)); - *state = ThreadState::Enabled; + fn block_thread( + &mut self, + reason: BlockReason, + timeout: Option, + callback: impl UnblockCallback<'mir, 'tcx> + 'tcx, + ) { + let state = &mut self.threads[self.active_thread].state; + assert!(state.is_enabled()); + *state = ThreadState::Blocked { reason, timeout, callback: Box::new(callback) } } /// Change the active thread to some enabled thread. @@ -642,87 +675,18 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { self.yield_active_thread = true; } - /// Register the given `callback` to be called once the `call_time` passes. - /// - /// The callback will be called with `thread` being the active thread, and - /// the callback may not change the active thread. - fn register_timeout_callback( - &mut self, - thread: ThreadId, - call_time: CallbackTime, - callback: TimeoutCallback<'mir, 'tcx>, - ) { - self.timeout_callbacks - .try_insert(thread, TimeoutCallbackInfo { call_time, callback }) - .unwrap(); - } - - /// Unregister the callback for the `thread`. - fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { - self.timeout_callbacks.remove(&thread); - } - - /// Get a callback that is ready to be called. - fn get_ready_callback( - &mut self, - clock: &Clock, - ) -> Option<(ThreadId, TimeoutCallback<'mir, 'tcx>)> { - // We iterate over all threads in the order of their indices because - // this allows us to have a deterministic scheduler. - for thread in self.threads.indices() { - match self.timeout_callbacks.entry(thread) { - Entry::Occupied(entry) => { - if entry.get().call_time.get_wait_time(clock) == Duration::new(0, 0) { - return Some((thread, entry.remove().callback)); - } - } - Entry::Vacant(_) => {} - } - } - None - } - - /// Wakes up threads joining on the active one and deallocates thread-local statics. - /// The `AllocId` that can now be freed are returned. - fn thread_terminated( - &mut self, - mut data_race: Option<&mut data_race::GlobalState>, - current_span: Span, - ) -> Vec> { - let mut free_tls_statics = Vec::new(); - { - let mut thread_local_statics = self.thread_local_alloc_ids.borrow_mut(); - thread_local_statics.retain(|&(_def_id, thread), &mut alloc_id| { - if thread != self.active_thread { - // Keep this static around. - return true; - } - // Delete this static from the map and from memory. - // We cannot free directly here as we cannot use `?` in this context. - free_tls_statics.push(alloc_id); - false - }); - } - // Set the thread into a terminated state in the data-race detector. - if let Some(ref mut data_race) = data_race { - data_race.thread_terminated(self, current_span); - } - // Check if we need to unblock any threads. - let mut joined_threads = vec![]; // store which threads joined, we'll need it - for (i, thread) in self.threads.iter_enumerated_mut() { - if thread.state == ThreadState::Blocked(BlockReason::Join(self.active_thread)) { - // The thread has terminated, mark happens-before edge to joining thread - if data_race.is_some() { - joined_threads.push(i); + /// Get the wait time for the next timeout, or `None` if no timeout is pending. + fn next_callback_wait_time(&self, clock: &Clock) -> Option { + self.threads + .iter() + .filter_map(|t| { + match &t.state { + ThreadState::Blocked { timeout: Some(timeout), .. } => + Some(timeout.get_wait_time(clock)), + _ => None, } - trace!("unblocking {:?} because {:?} terminated", i, self.active_thread); - thread.state = ThreadState::Enabled; - } - } - for &i in &joined_threads { - data_race.as_mut().unwrap().thread_joined(self, i, self.active_thread); - } - free_tls_statics + }) + .min() } /// Decide which action to take next and on which thread. @@ -733,9 +697,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { /// blocked, terminated, or has explicitly asked to be preempted). fn schedule(&mut self, clock: &Clock) -> InterpResult<'tcx, SchedulingAction> { // This thread and the program can keep going. - if self.threads[self.active_thread].state == ThreadState::Enabled - && !self.yield_active_thread - { + if self.threads[self.active_thread].state.is_enabled() && !self.yield_active_thread { // The currently active thread is still enabled, just continue with it. return Ok(SchedulingAction::ExecuteStep); } @@ -745,9 +707,8 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by // abstime has already been passed at the time of the call". // - let potential_sleep_time = - self.timeout_callbacks.values().map(|info| info.call_time.get_wait_time(clock)).min(); - if potential_sleep_time == Some(Duration::new(0, 0)) { + let potential_sleep_time = self.next_callback_wait_time(clock); + if potential_sleep_time == Some(Duration::ZERO) { return Ok(SchedulingAction::ExecuteTimeoutCallback); } // No callbacks immediately scheduled, pick a regular thread to execute. @@ -765,7 +726,7 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { .chain(self.threads.iter_enumerated().take(self.active_thread.index())); for (id, thread) in threads { debug_assert_ne!(self.active_thread, id); - if thread.state == ThreadState::Enabled { + if thread.state.is_enabled() { info!( "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------", self.get_thread_display_name(id), @@ -776,11 +737,11 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> { } } self.yield_active_thread = false; - if self.threads[self.active_thread].state == ThreadState::Enabled { + if self.threads[self.active_thread].state.is_enabled() { return Ok(SchedulingAction::ExecuteStep); } // We have not found a thread to execute. - if self.threads.iter().all(|thread| thread.state == ThreadState::Terminated) { + if self.threads.iter().all(|thread| thread.state.is_terminated()) { unreachable!("all threads terminated without the main thread terminating?!"); } else if let Some(sleep_time) = potential_sleep_time { // All threads are currently blocked, but we have unexecuted @@ -799,29 +760,40 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> { #[inline] fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let (thread, callback) = if let Some((thread, callback)) = - this.machine.threads.get_ready_callback(&this.machine.clock) - { - (thread, callback) - } else { - // get_ready_callback can return None if the computer's clock - // was shifted after calling the scheduler and before the call - // to get_ready_callback (see issue - // https://github.com/rust-lang/miri/issues/1763). In this case, - // just do nothing, which effectively just returns to the - // scheduler. - return Ok(()); - }; - // This back-and-forth with `set_active_thread` is here because of two - // design decisions: - // 1. Make the caller and not the callback responsible for changing - // thread. - // 2. Make the scheduler the only place that can change the active - // thread. - let old_thread = this.set_active_thread(thread); - callback.call(this)?; - this.set_active_thread(old_thread); - Ok(()) + let mut found_callback = None; + // Find a blocked thread that has timed out. + for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() { + match &thread.state { + ThreadState::Blocked { timeout: Some(timeout), .. } + if timeout.get_wait_time(&this.machine.clock) == Duration::ZERO => + { + let old_state = mem::replace(&mut thread.state, ThreadState::Enabled); + let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() }; + found_callback = Some((id, callback)); + // Run the fallback (after the loop because borrow-checking). + break; + } + _ => {} + } + } + if let Some((thread, callback)) = found_callback { + // This back-and-forth with `set_active_thread` is here because of two + // design decisions: + // 1. Make the caller and not the callback responsible for changing + // thread. + // 2. Make the scheduler the only place that can change the active + // thread. + let old_thread = this.machine.threads.set_active_thread_id(thread); + callback.timeout(this)?; + this.machine.threads.set_active_thread_id(old_thread); + } + // found_callback can remain None if the computer's clock + // was shifted after calling the scheduler and before the call + // to get_ready_callback (see issue + // https://github.com/rust-lang/miri/issues/1763). In this case, + // just do nothing, which effectively just returns to the + // scheduler. + return Ok(()); } #[inline] @@ -904,7 +876,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // Finally switch to new thread so that we can push the first stackframe. // After this all accesses will be treated as occurring in the new thread. - let old_thread_id = this.set_active_thread(new_thread_id); + let old_thread_id = this.machine.threads.set_active_thread_id(new_thread_id); // Perform the function pointer load in the new thread frame. let instance = this.get_ptr_fn(start_routine)?.as_instance()?; @@ -923,11 +895,110 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { )?; // Restore the old active thread frame. - this.set_active_thread(old_thread_id); + this.machine.threads.set_active_thread_id(old_thread_id); Ok(new_thread_id) } + /// Handles thread termination of the active thread: wakes up threads joining on this one, + /// and deals with the thread's thread-local statics according to `tls_alloc_action`. + /// + /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`. + fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + // Mark thread as terminated. + let thread = this.active_thread_mut(); + assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated"); + thread.state = ThreadState::Terminated; + if let Some(ref mut data_race) = this.machine.data_race { + data_race.thread_terminated(&this.machine.threads); + } + // Deallocate TLS. + let gone_thread = this.active_thread(); + { + let mut free_tls_statics = Vec::new(); + this.machine.threads.thread_local_alloc_ids.retain( + |&(_def_id, thread), &mut alloc_id| { + if thread != gone_thread { + // A different thread, keep this static around. + return true; + } + // Delete this static from the map and from memory. + // We cannot free directly here as we cannot use `?` in this context. + free_tls_statics.push(alloc_id); + false + }, + ); + // Now free the TLS statics. + for ptr in free_tls_statics { + match tls_alloc_action { + TlsAllocAction::Deallocate => + this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?, + TlsAllocAction::Leak => + if let Some(alloc) = ptr.provenance.get_alloc_id() { + trace!( + "Thread-local static leaked and stored as static root: {:?}", + alloc + ); + this.machine.static_roots.push(alloc); + }, + } + } + } + // Unblock joining threads. + let unblock_reason = BlockReason::Join(gone_thread); + let threads = &this.machine.threads.threads; + let joining_threads = threads + .iter_enumerated() + .filter(|(_, thread)| thread.state.is_blocked_on(unblock_reason)) + .map(|(id, _)| id) + .collect::>(); + for thread in joining_threads { + this.unblock_thread(thread, unblock_reason)?; + } + + Ok(()) + } + + /// Block the current thread, with an optional timeout. + /// The callback will be invoked when the thread gets unblocked. + #[inline] + fn block_thread( + &mut self, + reason: BlockReason, + timeout: Option, + callback: impl UnblockCallback<'mir, 'tcx> + 'tcx, + ) { + let this = self.eval_context_mut(); + if !this.machine.communicate() && matches!(timeout, Some(Timeout::RealTime(..))) { + panic!("cannot have `RealTime` callback with isolation enabled!") + } + this.machine.threads.block_thread(reason, timeout, callback); + } + + /// Put the blocked thread into the enabled state. + /// Sanity-checks that the thread previously was blocked for the right reason. + fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> { + let this = self.eval_context_mut(); + let old_state = + mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled); + let callback = match old_state { + ThreadState::Blocked { reason: actual_reason, callback, .. } => { + assert_eq!( + reason, actual_reason, + "unblock_thread: thread was blocked for the wrong reason" + ); + callback + } + _ => panic!("unblock_thread: thread was not blocked"), + }; + // The callback must be executed in the previously blocked thread. + let old_thread = this.machine.threads.set_active_thread_id(thread); + callback.unblock(this)?; + this.machine.threads.set_active_thread_id(old_thread); + Ok(()) + } + #[inline] fn detach_thread( &mut self, @@ -955,15 +1026,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } #[inline] - fn set_active_thread(&mut self, thread_id: ThreadId) -> ThreadId { - let this = self.eval_context_mut(); - this.machine.threads.set_active_thread_id(thread_id) - } - - #[inline] - fn get_active_thread(&self) -> ThreadId { + fn active_thread(&self) -> ThreadId { let this = self.eval_context_ref(); - this.machine.threads.get_active_thread_id() + this.machine.threads.active_thread() } #[inline] @@ -1025,16 +1090,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { self.eval_context_ref().machine.threads.get_thread_name(thread) } - #[inline] - fn block_thread(&mut self, thread: ThreadId, reason: BlockReason) { - self.eval_context_mut().machine.threads.block_thread(thread, reason); - } - - #[inline] - fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) { - self.eval_context_mut().machine.threads.unblock_thread(thread, reason); - } - #[inline] fn yield_active_thread(&mut self) { self.eval_context_mut().machine.threads.yield_active_thread(); @@ -1050,26 +1105,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } } - #[inline] - fn register_timeout_callback( - &mut self, - thread: ThreadId, - call_time: CallbackTime, - callback: TimeoutCallback<'mir, 'tcx>, - ) { - let this = self.eval_context_mut(); - if !this.machine.communicate() && matches!(call_time, CallbackTime::RealTime(..)) { - panic!("cannot have `RealTime` callback with isolation enabled!") - } - this.machine.threads.register_timeout_callback(thread, call_time, callback); - } - - #[inline] - fn unregister_timeout_callback_if_exists(&mut self, thread: ThreadId) { - let this = self.eval_context_mut(); - this.machine.threads.unregister_timeout_callback_if_exists(thread); - } - /// Run the core interpreter loop. Returns only when an interrupt occurs (an error or program /// termination). fn run_threads(&mut self) -> InterpResult<'tcx, !> { @@ -1099,32 +1134,4 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } } } - - /// Handles thread termination of the active thread: wakes up threads joining on this one, - /// and deals with the thread's thread-local statics according to `tls_alloc_action`. - /// - /// This is called by the eval loop when a thread's on_stack_empty returns `Ready`. - #[inline] - fn terminate_active_thread(&mut self, tls_alloc_action: TlsAllocAction) -> InterpResult<'tcx> { - let this = self.eval_context_mut(); - let thread = this.active_thread_mut(); - assert!(thread.stack.is_empty(), "only threads with an empty stack can be terminated"); - thread.state = ThreadState::Terminated; - - let current_span = this.machine.current_span(); - let thread_local_allocations = - this.machine.threads.thread_terminated(this.machine.data_race.as_mut(), current_span); - for ptr in thread_local_allocations { - match tls_alloc_action { - TlsAllocAction::Deallocate => - this.deallocate_ptr(ptr.into(), None, MiriMemoryKind::Tls.into())?, - TlsAllocAction::Leak => - if let Some(alloc) = ptr.provenance.get_alloc_id() { - trace!("Thread-local static leaked and stored as static root: {:?}", alloc); - this.machine.static_roots.push(alloc); - }, - } - } - Ok(()) - } } diff --git a/src/tools/miri/src/diagnostics.rs b/src/tools/miri/src/diagnostics.rs index 189c4a20bf6cf..468bbb85dddf8 100644 --- a/src/tools/miri/src/diagnostics.rs +++ b/src/tools/miri/src/diagnostics.rs @@ -411,7 +411,7 @@ pub fn report_error<'tcx, 'mir>( vec![], helps, &stacktrace, - Some(ecx.get_active_thread()), + Some(ecx.active_thread()), &ecx.machine, ); @@ -419,7 +419,7 @@ pub fn report_error<'tcx, 'mir>( if show_all_threads { for (thread, stack) in ecx.machine.threads.all_stacks() { - if thread != ecx.get_active_thread() { + if thread != ecx.active_thread() { let stacktrace = Frame::generate_stacktrace_from_stack(stack); let (stacktrace, was_pruned) = prune_stacktrace(stacktrace, &ecx.machine); any_pruned |= was_pruned; @@ -684,7 +684,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> { notes, helps, &stacktrace, - Some(self.threads.get_active_thread_id()), + Some(self.threads.active_thread()), self, ); } @@ -712,7 +712,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { vec![], vec![], &stacktrace, - Some(this.get_active_thread()), + Some(this.active_thread()), &this.machine, ); } diff --git a/src/tools/miri/src/lib.rs b/src/tools/miri/src/lib.rs index 1a663ba0704b0..2fa88b6a24619 100644 --- a/src/tools/miri/src/lib.rs +++ b/src/tools/miri/src/lib.rs @@ -123,7 +123,8 @@ pub use crate::concurrency::{ init_once::{EvalContextExt as _, InitOnceId}, sync::{CondvarId, EvalContextExt as _, MutexId, RwLockId, SynchronizationObjects}, thread::{ - BlockReason, CallbackTime, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, + BlockReason, EvalContextExt as _, StackEmptyCallback, ThreadId, ThreadManager, Timeout, + UnblockCallback, }, }; pub use crate::diagnostics::{ diff --git a/src/tools/miri/src/machine.rs b/src/tools/miri/src/machine.rs index cd9ab03dc687e..e09fd2122f8ca 100644 --- a/src/tools/miri/src/machine.rs +++ b/src/tools/miri/src/machine.rs @@ -473,7 +473,7 @@ pub struct MiriMachine<'mir, 'tcx> { /// The set of threads. pub(crate) threads: ThreadManager<'mir, 'tcx>, /// The state of the primitive synchronization objects. - pub(crate) sync: SynchronizationObjects<'mir, 'tcx>, + pub(crate) sync: SynchronizationObjects, /// Precomputed `TyLayout`s for primitive data types that are commonly used inside Miri. pub(crate) layouts: PrimitiveLayouts<'tcx>, @@ -770,7 +770,7 @@ impl VisitProvenance for MiriMachine<'_, '_> { #[rustfmt::skip] let MiriMachine { threads, - sync, + sync: _, tls, env_vars, main_fn_ret_place, @@ -819,7 +819,6 @@ impl VisitProvenance for MiriMachine<'_, '_> { } = self; threads.visit_provenance(visit); - sync.visit_provenance(visit); tls.visit_provenance(visit); env_vars.visit_provenance(visit); dirs.visit_provenance(visit); @@ -1371,7 +1370,7 @@ impl<'mir, 'tcx> Machine<'mir, 'tcx> for MiriMachine<'mir, 'tcx> { Some(profiler.start_recording_interval_event_detached( *name, measureme::EventId::from_label(*name), - ecx.get_active_thread().to_u32(), + ecx.active_thread().to_u32(), )) } else { None diff --git a/src/tools/miri/src/shims/time.rs b/src/tools/miri/src/shims/time.rs index 941c61caa53ab..a99006f397090 100644 --- a/src/tools/miri/src/shims/time.rs +++ b/src/tools/miri/src/shims/time.rs @@ -6,7 +6,6 @@ use std::time::{Duration, SystemTime}; use chrono::{DateTime, Datelike, Offset, Timelike, Utc}; use chrono_tz::Tz; -use crate::concurrency::thread::MachineCallback; use crate::*; /// Returns the time elapsed between the provided time and the unix epoch as a `Duration`. @@ -336,16 +335,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let timeout_time = now .checked_add(duration) .unwrap_or_else(|| now.checked_add(Duration::from_secs(3600)).unwrap()); + let timeout_time = Timeout::Monotonic(timeout_time); - let active_thread = this.get_active_thread(); - this.block_thread(active_thread, BlockReason::Sleep); - - this.register_timeout_callback( - active_thread, - CallbackTime::Monotonic(timeout_time), - Box::new(UnblockCallback { thread_to_unblock: active_thread }), - ); - + this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback); Ok(0) } @@ -359,31 +351,25 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let duration = Duration::from_millis(timeout_ms.into()); let timeout_time = this.machine.clock.now().checked_add(duration).unwrap(); + let timeout_time = Timeout::Monotonic(timeout_time); - let active_thread = this.get_active_thread(); - this.block_thread(active_thread, BlockReason::Sleep); - - this.register_timeout_callback( - active_thread, - CallbackTime::Monotonic(timeout_time), - Box::new(UnblockCallback { thread_to_unblock: active_thread }), - ); - + this.block_thread(BlockReason::Sleep, Some(timeout_time), SleepCallback); Ok(()) } } -struct UnblockCallback { - thread_to_unblock: ThreadId, -} - -impl VisitProvenance for UnblockCallback { +struct SleepCallback; +impl VisitProvenance for SleepCallback { fn visit_provenance(&self, _visit: &mut VisitWith<'_>) {} } - -impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for UnblockCallback { - fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { - ecx.unblock_thread(self.thread_to_unblock, BlockReason::Sleep); +impl<'mir, 'tcx: 'mir> UnblockCallback<'mir, 'tcx> for SleepCallback { + fn timeout(self: Box, _this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { Ok(()) } + fn unblock( + self: Box, + _this: &mut InterpCx<'mir, 'tcx, MiriMachine<'mir, 'tcx>>, + ) -> InterpResult<'tcx> { + panic!("a sleeping thread should only ever be woken up via the timeout") + } } diff --git a/src/tools/miri/src/shims/tls.rs b/src/tools/miri/src/shims/tls.rs index 0dec12f0b65f1..3dc85cc70be89 100644 --- a/src/tools/miri/src/shims/tls.rs +++ b/src/tools/miri/src/shims/tls.rs @@ -282,7 +282,7 @@ impl<'tcx> TlsDtorsState<'tcx> { } } Done => { - this.machine.tls.delete_all_thread_tls(this.get_active_thread()); + this.machine.tls.delete_all_thread_tls(this.active_thread()); return Ok(Poll::Ready(())); } } @@ -332,7 +332,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { /// executed. fn schedule_macos_tls_dtor(&mut self) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let thread_id = this.get_active_thread(); + let thread_id = this.active_thread(); if let Some((instance, data)) = this.machine.tls.macos_thread_dtors.remove(&thread_id) { trace!("Running macos dtor {:?} on {:?} at {:?}", instance, data, thread_id); @@ -354,7 +354,7 @@ trait EvalContextPrivExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { state: &mut RunningDtorState, ) -> InterpResult<'tcx, Poll<()>> { let this = self.eval_context_mut(); - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); // Fetch next dtor after `key`. let dtor = match this.machine.tls.fetch_tls_dtor(state.last_key, active_thread) { diff --git a/src/tools/miri/src/shims/unix/foreign_items.rs b/src/tools/miri/src/shims/unix/foreign_items.rs index 86dd23f31c7d6..e75df88876b53 100644 --- a/src/tools/miri/src/shims/unix/foreign_items.rs +++ b/src/tools/miri/src/shims/unix/foreign_items.rs @@ -388,14 +388,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { "pthread_getspecific" => { let [key] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; let key = this.read_scalar(key)?.to_bits(key.layout.size)?; - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); let ptr = this.machine.tls.load_tls(key, active_thread, this)?; this.write_scalar(ptr, dest)?; } "pthread_setspecific" => { let [key, new_ptr] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; let key = this.read_scalar(key)?.to_bits(key.layout.size)?; - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); let new_data = this.read_scalar(new_ptr)?; this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?; @@ -426,8 +426,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } "pthread_mutex_lock" => { let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.pthread_mutex_lock(mutex)?; - this.write_scalar(Scalar::from_i32(result), dest)?; + this.pthread_mutex_lock(mutex, dest)?; } "pthread_mutex_trylock" => { let [mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; @@ -446,8 +445,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } "pthread_rwlock_rdlock" => { let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.pthread_rwlock_rdlock(rwlock)?; - this.write_scalar(Scalar::from_i32(result), dest)?; + this.pthread_rwlock_rdlock(rwlock, dest)?; } "pthread_rwlock_tryrdlock" => { let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; @@ -456,8 +454,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } "pthread_rwlock_wrlock" => { let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.pthread_rwlock_wrlock(rwlock)?; - this.write_scalar(Scalar::from_i32(result), dest)?; + this.pthread_rwlock_wrlock(rwlock, dest)?; } "pthread_rwlock_trywrlock" => { let [rwlock] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; @@ -513,8 +510,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } "pthread_cond_wait" => { let [cond, mutex] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.pthread_cond_wait(cond, mutex)?; - this.write_scalar(Scalar::from_i32(result), dest)?; + this.pthread_cond_wait(cond, mutex, dest)?; } "pthread_cond_timedwait" => { let [cond, mutex, abstime] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; diff --git a/src/tools/miri/src/shims/unix/linux/sync.rs b/src/tools/miri/src/shims/unix/linux/sync.rs index d4a6cd96f48df..2c7c4dd65cef7 100644 --- a/src/tools/miri/src/shims/unix/linux/sync.rs +++ b/src/tools/miri/src/shims/unix/linux/sync.rs @@ -1,6 +1,5 @@ use std::time::SystemTime; -use crate::concurrency::thread::MachineCallback; use crate::*; /// Implementation of the SYS_futex syscall. @@ -32,7 +31,6 @@ pub fn futex<'tcx>( let op = this.read_scalar(&args[1])?.to_i32()?; let val = this.read_scalar(&args[2])?.to_i32()?; - let thread = this.get_active_thread(); // This is a vararg function so we have to bring our own type for this pointer. let addr = this.ptr_to_mplace(addr, this.machine.layouts.i32); let addr_usize = addr.ptr().addr().bytes(); @@ -107,22 +105,18 @@ pub fn futex<'tcx>( Some(if wait_bitset { // FUTEX_WAIT_BITSET uses an absolute timestamp. if realtime { - CallbackTime::RealTime( - SystemTime::UNIX_EPOCH.checked_add(duration).unwrap(), - ) + Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap()) } else { - CallbackTime::Monotonic( + Timeout::Monotonic( this.machine.clock.anchor().checked_add(duration).unwrap(), ) } } else { // FUTEX_WAIT uses a relative timestamp. if realtime { - CallbackTime::RealTime(SystemTime::now().checked_add(duration).unwrap()) + Timeout::RealTime(SystemTime::now().checked_add(duration).unwrap()) } else { - CallbackTime::Monotonic( - this.machine.clock.now().checked_add(duration).unwrap(), - ) + Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap()) } }) }; @@ -158,7 +152,7 @@ pub fn futex<'tcx>( // to see an up-to-date value. // // The above case distinction is valid since both FUTEX_WAIT and FUTEX_WAKE - // contain a SeqCst fence, therefore inducting a total order between the operations. + // contain a SeqCst fence, therefore inducing a total order between the operations. // It is also critical that the fence, the atomic load, and the comparison in FUTEX_WAIT // altogether happen atomically. If the other thread's fence in FUTEX_WAKE // gets interleaved after our fence, then we lose the guarantee on the @@ -174,48 +168,16 @@ pub fn futex<'tcx>( // It's not uncommon for `addr` to be passed as another type than `*mut i32`, such as `*const AtomicI32`. let futex_val = this.read_scalar_atomic(&addr, AtomicReadOrd::Relaxed)?.to_i32()?; if val == futex_val { - // The value still matches, so we block the thread make it wait for FUTEX_WAKE. - this.block_thread(thread, BlockReason::Futex { addr: addr_usize }); - this.futex_wait(addr_usize, thread, bitset); - // Succesfully waking up from FUTEX_WAIT always returns zero. - this.write_scalar(Scalar::from_target_isize(0, this), dest)?; - // Register a timeout callback if a timeout was specified. - // This callback will override the return value when the timeout triggers. - if let Some(timeout_time) = timeout_time { - struct Callback<'tcx> { - thread: ThreadId, - addr_usize: u64, - dest: MPlaceTy<'tcx, Provenance>, - } - - impl<'tcx> VisitProvenance for Callback<'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - let Callback { thread: _, addr_usize: _, dest } = self; - dest.visit_provenance(visit); - } - } - - impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> { - fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { - this.unblock_thread( - self.thread, - BlockReason::Futex { addr: self.addr_usize }, - ); - this.futex_remove_waiter(self.addr_usize, self.thread); - let etimedout = this.eval_libc("ETIMEDOUT"); - this.set_last_error(etimedout)?; - this.write_scalar(Scalar::from_target_isize(-1, this), &self.dest)?; - - Ok(()) - } - } - - this.register_timeout_callback( - thread, - timeout_time, - Box::new(Callback { thread, addr_usize, dest: dest.clone() }), - ); - } + // The value still matches, so we block the thread and make it wait for FUTEX_WAKE. + this.futex_wait( + addr_usize, + bitset, + timeout_time, + Scalar::from_target_isize(0, this), // retval_succ + Scalar::from_target_isize(-1, this), // retval_timeout + dest.clone(), + this.eval_libc("ETIMEDOUT"), + ); } else { // The futex value doesn't match the expected value, so we return failure // right away without sleeping: -1 and errno set to EAGAIN. @@ -257,9 +219,7 @@ pub fn futex<'tcx>( let mut n = 0; #[allow(clippy::arithmetic_side_effects)] for _ in 0..val { - if let Some(thread) = this.futex_wake(addr_usize, bitset) { - this.unblock_thread(thread, BlockReason::Futex { addr: addr_usize }); - this.unregister_timeout_callback_if_exists(thread); + if this.futex_wake(addr_usize, bitset)? { n += 1; } else { break; diff --git a/src/tools/miri/src/shims/unix/macos/foreign_items.rs b/src/tools/miri/src/shims/unix/macos/foreign_items.rs index 2b9ce746a5653..7c489ec4e3c86 100644 --- a/src/tools/miri/src/shims/unix/macos/foreign_items.rs +++ b/src/tools/miri/src/shims/unix/macos/foreign_items.rs @@ -131,7 +131,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let dtor = this.read_pointer(dtor)?; let dtor = this.get_ptr_fn(dtor)?.as_instance()?; let data = this.read_scalar(data)?; - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); this.machine.tls.set_macos_thread_dtor(active_thread, dtor, data)?; } diff --git a/src/tools/miri/src/shims/unix/sync.rs b/src/tools/miri/src/shims/unix/sync.rs index 79358da0894ab..b2864269b2ee0 100644 --- a/src/tools/miri/src/shims/unix/sync.rs +++ b/src/tools/miri/src/shims/unix/sync.rs @@ -3,7 +3,6 @@ use std::time::SystemTime; use rustc_target::abi::Size; -use crate::concurrency::thread::MachineCallback; use crate::*; // pthread_mutexattr_t is either 4 or 8 bytes, depending on the platform. @@ -373,59 +372,6 @@ fn cond_set_clock_id<'mir, 'tcx: 'mir>( ) } -/// Try to reacquire the mutex associated with the condition variable after we -/// were signaled. -fn reacquire_cond_mutex<'mir, 'tcx: 'mir>( - ecx: &mut MiriInterpCx<'mir, 'tcx>, - thread: ThreadId, - condvar: CondvarId, - mutex: MutexId, -) -> InterpResult<'tcx> { - ecx.unblock_thread(thread, BlockReason::Condvar(condvar)); - if ecx.mutex_is_locked(mutex) { - ecx.mutex_enqueue_and_block(mutex, thread); - } else { - ecx.mutex_lock(mutex, thread); - } - Ok(()) -} - -/// After a thread waiting on a condvar was signaled: -/// Reacquire the conditional variable and remove the timeout callback if any -/// was registered. -fn post_cond_signal<'mir, 'tcx: 'mir>( - ecx: &mut MiriInterpCx<'mir, 'tcx>, - thread: ThreadId, - condvar: CondvarId, - mutex: MutexId, -) -> InterpResult<'tcx> { - reacquire_cond_mutex(ecx, thread, condvar, mutex)?; - // Waiting for the mutex is not included in the waiting time because we need - // to acquire the mutex always even if we get a timeout. - ecx.unregister_timeout_callback_if_exists(thread); - Ok(()) -} - -/// Release the mutex associated with the condition variable because we are -/// entering the waiting state. -fn release_cond_mutex_and_block<'mir, 'tcx: 'mir>( - ecx: &mut MiriInterpCx<'mir, 'tcx>, - thread: ThreadId, - condvar: CondvarId, - mutex: MutexId, -) -> InterpResult<'tcx> { - assert_eq!(ecx.get_active_thread(), thread); - if let Some(old_locked_count) = ecx.mutex_unlock(mutex) { - if old_locked_count != 1 { - throw_unsup_format!("awaiting on a lock acquired multiple times is not supported"); - } - } else { - throw_ub_format!("awaiting on unlocked or owned by a different thread mutex"); - } - ecx.block_thread(thread, BlockReason::Condvar(condvar)); - Ok(()) -} - impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn pthread_mutexattr_init( @@ -531,19 +477,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { Ok(0) } - fn pthread_mutex_lock(&mut self, mutex_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> { + fn pthread_mutex_lock( + &mut self, + mutex_op: &OpTy<'tcx, Provenance>, + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); let kind = mutex_get_kind(this, mutex_op)?; let id = mutex_get_id(this, mutex_op)?; - let active_thread = this.get_active_thread(); - if this.mutex_is_locked(id) { + let ret = if this.mutex_is_locked(id) { let owner_thread = this.mutex_get_owner(id); - if owner_thread != active_thread { - // Enqueue the active thread. - this.mutex_enqueue_and_block(id, active_thread); - Ok(0) + if owner_thread != this.active_thread() { + this.mutex_enqueue_and_block(id, Scalar::from_i32(0), dest.clone()); + return Ok(()); } else { // Trying to acquire the same mutex again. if is_mutex_kind_default(this, kind)? { @@ -551,10 +499,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } else if is_mutex_kind_normal(this, kind)? { throw_machine_stop!(TerminationInfo::Deadlock); } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_ERRORCHECK") { - Ok(this.eval_libc_i32("EDEADLK")) + this.eval_libc_i32("EDEADLK") } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") { - this.mutex_lock(id, active_thread); - Ok(0) + this.mutex_lock(id); + 0 } else { throw_unsup_format!( "called pthread_mutex_lock on an unsupported type of mutex" @@ -563,9 +511,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } } else { // The mutex is unlocked. Let's lock it. - this.mutex_lock(id, active_thread); - Ok(0) - } + this.mutex_lock(id); + 0 + }; + this.write_scalar(Scalar::from_i32(ret), dest)?; + Ok(()) } fn pthread_mutex_trylock( @@ -576,11 +526,10 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let kind = mutex_get_kind(this, mutex_op)?; let id = mutex_get_id(this, mutex_op)?; - let active_thread = this.get_active_thread(); if this.mutex_is_locked(id) { let owner_thread = this.mutex_get_owner(id); - if owner_thread != active_thread { + if owner_thread != this.active_thread() { Ok(this.eval_libc_i32("EBUSY")) } else { if is_mutex_kind_default(this, kind)? @@ -589,7 +538,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { { Ok(this.eval_libc_i32("EBUSY")) } else if kind == this.eval_libc_i32("PTHREAD_MUTEX_RECURSIVE") { - this.mutex_lock(id, active_thread); + this.mutex_lock(id); Ok(0) } else { throw_unsup_format!( @@ -599,7 +548,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { } } else { // The mutex is unlocked. Let's lock it. - this.mutex_lock(id, active_thread); + this.mutex_lock(id); Ok(0) } } @@ -613,7 +562,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let kind = mutex_get_kind(this, mutex_op)?; let id = mutex_get_id(this, mutex_op)?; - if let Some(_old_locked_count) = this.mutex_unlock(id) { + if let Some(_old_locked_count) = this.mutex_unlock(id)? { // The mutex was locked by the current thread. Ok(0) } else { @@ -666,19 +615,20 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn pthread_rwlock_rdlock( &mut self, rwlock_op: &OpTy<'tcx, Provenance>, - ) -> InterpResult<'tcx, i32> { + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); let id = rwlock_get_id(this, rwlock_op)?; - let active_thread = this.get_active_thread(); if this.rwlock_is_write_locked(id) { - this.rwlock_enqueue_and_block_reader(id, active_thread); - Ok(0) + this.rwlock_enqueue_and_block_reader(id, Scalar::from_i32(0), dest.clone()); } else { - this.rwlock_reader_lock(id, active_thread); - Ok(0) + this.rwlock_reader_lock(id); + this.write_null(dest)?; } + + Ok(()) } fn pthread_rwlock_tryrdlock( @@ -688,12 +638,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let this = self.eval_context_mut(); let id = rwlock_get_id(this, rwlock_op)?; - let active_thread = this.get_active_thread(); if this.rwlock_is_write_locked(id) { Ok(this.eval_libc_i32("EBUSY")) } else { - this.rwlock_reader_lock(id, active_thread); + this.rwlock_reader_lock(id); Ok(0) } } @@ -701,11 +650,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn pthread_rwlock_wrlock( &mut self, rwlock_op: &OpTy<'tcx, Provenance>, - ) -> InterpResult<'tcx, i32> { + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); let id = rwlock_get_id(this, rwlock_op)?; - let active_thread = this.get_active_thread(); if this.rwlock_is_locked(id) { // Note: this will deadlock if the lock is already locked by this @@ -720,12 +669,13 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { // report the deadlock only when no thread can continue execution, // but we could detect that this lock is already locked and report // an error.) - this.rwlock_enqueue_and_block_writer(id, active_thread); + this.rwlock_enqueue_and_block_writer(id, Scalar::from_i32(0), dest.clone()); } else { - this.rwlock_writer_lock(id, active_thread); + this.rwlock_writer_lock(id); + this.write_null(dest)?; } - Ok(0) + Ok(()) } fn pthread_rwlock_trywrlock( @@ -735,12 +685,11 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let this = self.eval_context_mut(); let id = rwlock_get_id(this, rwlock_op)?; - let active_thread = this.get_active_thread(); if this.rwlock_is_locked(id) { Ok(this.eval_libc_i32("EBUSY")) } else { - this.rwlock_writer_lock(id, active_thread); + this.rwlock_writer_lock(id); Ok(0) } } @@ -754,9 +703,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let id = rwlock_get_id(this, rwlock_op)?; #[allow(clippy::if_same_then_else)] - if this.rwlock_reader_unlock(id) { + if this.rwlock_reader_unlock(id)? { Ok(0) - } else if this.rwlock_writer_unlock(id) { + } else if this.rwlock_writer_unlock(id)? { Ok(0) } else { throw_ub_format!("unlocked an rwlock that was not locked by the active thread"); @@ -885,10 +834,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn pthread_cond_signal(&mut self, cond_op: &OpTy<'tcx, Provenance>) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); let id = cond_get_id(this, cond_op)?; - if let Some((thread, mutex)) = this.condvar_signal(id) { - post_cond_signal(this, thread, id, mutex)?; - } - + this.condvar_signal(id)?; Ok(0) } @@ -898,11 +844,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { ) -> InterpResult<'tcx, i32> { let this = self.eval_context_mut(); let id = cond_get_id(this, cond_op)?; - - while let Some((thread, mutex)) = this.condvar_signal(id) { - post_cond_signal(this, thread, id, mutex)?; - } - + while this.condvar_signal(id)? {} Ok(0) } @@ -910,17 +852,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { &mut self, cond_op: &OpTy<'tcx, Provenance>, mutex_op: &OpTy<'tcx, Provenance>, - ) -> InterpResult<'tcx, i32> { + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); let id = cond_get_id(this, cond_op)?; let mutex_id = mutex_get_id(this, mutex_op)?; - let active_thread = this.get_active_thread(); - release_cond_mutex_and_block(this, active_thread, id, mutex_id)?; - this.condvar_wait(id, active_thread, mutex_id); + this.condvar_wait( + id, + mutex_id, + None, // no timeout + Scalar::from_i32(0), + Scalar::from_i32(0), // retval_timeout -- unused + dest.clone(), + )?; - Ok(0) + Ok(()) } fn pthread_cond_timedwait( @@ -934,7 +882,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let id = cond_get_id(this, cond_op)?; let mutex_id = mutex_get_id(this, mutex_op)?; - let active_thread = this.get_active_thread(); // Extract the timeout. let clock_id = cond_get_clock_id(this, cond_op)?; @@ -948,61 +895,23 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { return Ok(()); } }; - let timeout_time = if is_cond_clock_realtime(this, clock_id) { this.check_no_isolation("`pthread_cond_timedwait` with `CLOCK_REALTIME`")?; - CallbackTime::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap()) + Timeout::RealTime(SystemTime::UNIX_EPOCH.checked_add(duration).unwrap()) } else if clock_id == this.eval_libc_i32("CLOCK_MONOTONIC") { - CallbackTime::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap()) + Timeout::Monotonic(this.machine.clock.anchor().checked_add(duration).unwrap()) } else { throw_unsup_format!("unsupported clock id: {}", clock_id); }; - release_cond_mutex_and_block(this, active_thread, id, mutex_id)?; - this.condvar_wait(id, active_thread, mutex_id); - - // We return success for now and override it in the timeout callback. - this.write_scalar(Scalar::from_i32(0), dest)?; - - struct Callback<'tcx> { - active_thread: ThreadId, - mutex_id: MutexId, - id: CondvarId, - dest: MPlaceTy<'tcx, Provenance>, - } - - impl<'tcx> VisitProvenance for Callback<'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - let Callback { active_thread: _, mutex_id: _, id: _, dest } = self; - dest.visit_provenance(visit); - } - } - - impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> { - fn call(&self, ecx: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { - assert_eq!(self.active_thread, ecx.get_active_thread()); - // We are not waiting for the condvar any more, wait for the - // mutex instead. - reacquire_cond_mutex(ecx, self.active_thread, self.id, self.mutex_id)?; - - // Remove the thread from the conditional variable. - ecx.condvar_remove_waiter(self.id, self.active_thread); - - // Set the return value: we timed out. - let etimedout = ecx.eval_libc("ETIMEDOUT"); - ecx.write_scalar(etimedout, &self.dest)?; - - Ok(()) - } - } - - // Register the timeout callback. - let dest = dest.clone(); - this.register_timeout_callback( - active_thread, - timeout_time, - Box::new(Callback { active_thread, mutex_id, id, dest }), - ); + this.condvar_wait( + id, + mutex_id, + Some(timeout_time), + Scalar::from_i32(0), + this.eval_libc("ETIMEDOUT"), // retval_timeout + dest.clone(), + )?; Ok(()) } diff --git a/src/tools/miri/src/shims/unix/thread.rs b/src/tools/miri/src/shims/unix/thread.rs index 9e09401a81584..f8787ad90e0c3 100644 --- a/src/tools/miri/src/shims/unix/thread.rs +++ b/src/tools/miri/src/shims/unix/thread.rs @@ -63,7 +63,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { fn pthread_self(&mut self) -> InterpResult<'tcx, Scalar> { let this = self.eval_context_mut(); - let thread_id = this.get_active_thread(); + let thread_id = this.active_thread(); Ok(Scalar::from_uint(thread_id.to_u32(), this.libc_ty_layout("pthread_t").size)) } diff --git a/src/tools/miri/src/shims/windows/foreign_items.rs b/src/tools/miri/src/shims/windows/foreign_items.rs index 91def80227db8..462c7ffcdccf1 100644 --- a/src/tools/miri/src/shims/windows/foreign_items.rs +++ b/src/tools/miri/src/shims/windows/foreign_items.rs @@ -354,7 +354,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { "TlsGetValue" => { let [key] = this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?; let key = u128::from(this.read_scalar(key)?.to_u32()?); - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); let ptr = this.machine.tls.load_tls(key, active_thread, this)?; this.write_scalar(ptr, dest)?; } @@ -362,7 +362,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let [key, new_ptr] = this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?; let key = u128::from(this.read_scalar(key)?.to_u32()?); - let active_thread = this.get_active_thread(); + let active_thread = this.active_thread(); let new_data = this.read_scalar(new_ptr)?; this.machine.tls.store_tls(key, active_thread, new_data, &*this.tcx)?; @@ -423,8 +423,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { "InitOnceBeginInitialize" => { let [ptr, flags, pending, context] = this.check_shim(abi, Abi::System { unwind: false }, link_name, args)?; - let result = this.InitOnceBeginInitialize(ptr, flags, pending, context)?; - this.write_scalar(result, dest)?; + this.InitOnceBeginInitialize(ptr, flags, pending, context, dest)?; } "InitOnceComplete" => { let [ptr, flags, context] = @@ -502,7 +501,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let thread = match Handle::from_scalar(handle, this)? { Some(Handle::Thread(thread)) => thread, - Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(), + Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(), _ => this.invalid_handle("SetThreadDescription")?, }; @@ -520,7 +519,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let thread = match Handle::from_scalar(handle, this)? { Some(Handle::Thread(thread)) => thread, - Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(), + Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(), _ => this.invalid_handle("SetThreadDescription")?, }; // Looks like the default thread name is empty. diff --git a/src/tools/miri/src/shims/windows/sync.rs b/src/tools/miri/src/shims/windows/sync.rs index 836b9e9259591..1e71fc92400d5 100644 --- a/src/tools/miri/src/shims/windows/sync.rs +++ b/src/tools/miri/src/shims/windows/sync.rs @@ -3,7 +3,6 @@ use std::time::Duration; use rustc_target::abi::Size; use crate::concurrency::init_once::InitOnceStatus; -use crate::concurrency::thread::MachineCallback; use crate::*; impl<'mir, 'tcx> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} @@ -18,6 +17,31 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let this = self.eval_context_mut(); this.init_once_get_or_create_id(init_once_op, this.windows_ty_layout("INIT_ONCE"), 0) } + + /// Returns `true` if we were succssful, `false` if we would block. + fn init_once_try_begin( + &mut self, + id: InitOnceId, + pending_place: &MPlaceTy<'tcx, Provenance>, + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx, bool> { + let this = self.eval_context_mut(); + Ok(match this.init_once_status(id) { + InitOnceStatus::Uninitialized => { + this.init_once_begin(id); + this.write_scalar(this.eval_windows("c", "TRUE"), pending_place)?; + this.write_scalar(this.eval_windows("c", "TRUE"), dest)?; + true + } + InitOnceStatus::Complete => { + this.init_once_observe_completed(id); + this.write_scalar(this.eval_windows("c", "FALSE"), pending_place)?; + this.write_scalar(this.eval_windows("c", "TRUE"), dest)?; + true + } + InitOnceStatus::Begun => false, + }) + } } impl<'mir, 'tcx> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {} @@ -29,9 +53,9 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { flags_op: &OpTy<'tcx, Provenance>, pending_op: &OpTy<'tcx, Provenance>, context_op: &OpTy<'tcx, Provenance>, - ) -> InterpResult<'tcx, Scalar> { + dest: &MPlaceTy<'tcx, Provenance>, + ) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let active_thread = this.get_active_thread(); let id = this.init_once_get_id(init_once_op)?; let flags = this.read_scalar(flags_op)?.to_u32()?; @@ -46,58 +70,34 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { throw_unsup_format!("non-null `lpContext` in `InitOnceBeginInitialize`"); } - match this.init_once_status(id) { - InitOnceStatus::Uninitialized => { - this.init_once_begin(id); - this.write_scalar(this.eval_windows("c", "TRUE"), &pending_place)?; - } - InitOnceStatus::Begun => { - // Someone else is already on it. - // Block this thread until they are done. - // When we are woken up, set the `pending` flag accordingly. - struct Callback<'tcx> { - init_once_id: InitOnceId, - pending_place: MPlaceTy<'tcx, Provenance>, - } - - impl<'tcx> VisitProvenance for Callback<'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - let Callback { init_once_id: _, pending_place } = self; - pending_place.visit_provenance(visit); - } - } - - impl<'mir, 'tcx> MachineCallback<'mir, 'tcx> for Callback<'tcx> { - fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { - let pending = match this.init_once_status(self.init_once_id) { - InitOnceStatus::Uninitialized => - unreachable!( - "status should have either been set to begun or complete" - ), - InitOnceStatus::Begun => this.eval_windows("c", "TRUE"), - InitOnceStatus::Complete => this.eval_windows("c", "FALSE"), - }; - - this.write_scalar(pending, &self.pending_place)?; - - Ok(()) - } - } - - this.init_once_enqueue_and_block( - id, - active_thread, - Box::new(Callback { init_once_id: id, pending_place }), - ) + if this.init_once_try_begin(id, &pending_place, dest)? { + // Done! + return Ok(()); + } + + // We have to block, and then try again when we are woken up. + this.init_once_enqueue_and_block(id, Callback { id, pending_place, dest: dest.clone() }); + return Ok(()); + + struct Callback<'tcx> { + id: InitOnceId, + pending_place: MPlaceTy<'tcx, Provenance>, + dest: MPlaceTy<'tcx, Provenance>, + } + impl<'tcx> VisitProvenance for Callback<'tcx> { + fn visit_provenance(&self, visit: &mut VisitWith<'_>) { + let Callback { id: _, dest, pending_place } = self; + pending_place.visit_provenance(visit); + dest.visit_provenance(visit); } - InitOnceStatus::Complete => { - this.init_once_observe_completed(id); - this.write_scalar(this.eval_windows("c", "FALSE"), &pending_place)?; + } + impl<'mir, 'tcx> UnblockCallback<'mir, 'tcx> for Callback<'tcx> { + fn unblock(self: Box, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { + let ret = this.init_once_try_begin(self.id, &self.pending_place, &self.dest)?; + assert!(ret, "we were woken up but init_once_try_begin still failed"); + Ok(()) } } - - // This always succeeds (even if the thread is blocked, we will succeed if we ever unblock). - Ok(this.eval_windows("c", "TRUE")) } fn InitOnceComplete( @@ -155,7 +155,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { let size = this.read_target_usize(size_op)?; let timeout_ms = this.read_scalar(timeout_op)?.to_u32()?; - let thread = this.get_active_thread(); let addr = ptr.addr().bytes(); if size > 8 || !size.is_power_of_two() { @@ -170,7 +169,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { None } else { let duration = Duration::from_millis(timeout_ms.into()); - Some(CallbackTime::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())) + Some(Timeout::Monotonic(this.machine.clock.now().checked_add(duration).unwrap())) }; // See the Linux futex implementation for why this fence exists. @@ -183,41 +182,15 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { if futex_val == compare_val { // If the values are the same, we have to block. - this.block_thread(thread, BlockReason::Futex { addr }); - this.futex_wait(addr, thread, u32::MAX); - - if let Some(timeout_time) = timeout_time { - struct Callback<'tcx> { - thread: ThreadId, - addr: u64, - dest: MPlaceTy<'tcx, Provenance>, - } - - impl<'tcx> VisitProvenance for Callback<'tcx> { - fn visit_provenance(&self, visit: &mut VisitWith<'_>) { - let Callback { thread: _, addr: _, dest } = self; - dest.visit_provenance(visit); - } - } - - impl<'mir, 'tcx: 'mir> MachineCallback<'mir, 'tcx> for Callback<'tcx> { - fn call(&self, this: &mut MiriInterpCx<'mir, 'tcx>) -> InterpResult<'tcx> { - this.unblock_thread(self.thread, BlockReason::Futex { addr: self.addr }); - this.futex_remove_waiter(self.addr, self.thread); - let error_timeout = this.eval_windows("c", "ERROR_TIMEOUT"); - this.set_last_error(error_timeout)?; - this.write_scalar(Scalar::from_i32(0), &self.dest)?; - - Ok(()) - } - } - - this.register_timeout_callback( - thread, - timeout_time, - Box::new(Callback { thread, addr, dest: dest.clone() }), - ); - } + this.futex_wait( + addr, + u32::MAX, // bitset + timeout_time, + Scalar::from_i32(1), // retval_succ + Scalar::from_i32(0), // retval_timeout + dest.clone(), + this.eval_windows("c", "ERROR_TIMEOUT"), + ); } this.write_scalar(Scalar::from_i32(1), dest)?; @@ -234,10 +207,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { this.atomic_fence(AtomicFenceOrd::SeqCst)?; let addr = ptr.addr().bytes(); - if let Some(thread) = this.futex_wake(addr, u32::MAX) { - this.unblock_thread(thread, BlockReason::Futex { addr }); - this.unregister_timeout_callback_if_exists(thread); - } + this.futex_wake(addr, u32::MAX)?; Ok(()) } @@ -250,10 +220,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { this.atomic_fence(AtomicFenceOrd::SeqCst)?; let addr = ptr.addr().bytes(); - while let Some(thread) = this.futex_wake(addr, u32::MAX) { - this.unblock_thread(thread, BlockReason::Futex { addr }); - this.unregister_timeout_callback_if_exists(thread); - } + while this.futex_wake(addr, u32::MAX)? {} Ok(()) } diff --git a/src/tools/miri/src/shims/windows/thread.rs b/src/tools/miri/src/shims/windows/thread.rs index 3953a881a720e..047f52f50be29 100644 --- a/src/tools/miri/src/shims/windows/thread.rs +++ b/src/tools/miri/src/shims/windows/thread.rs @@ -69,7 +69,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> { Some(Handle::Thread(thread)) => thread, // Unlike on posix, the outcome of joining the current thread is not documented. // On current Windows, it just deadlocks. - Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.get_active_thread(), + Some(Handle::Pseudo(PseudoHandle::CurrentThread)) => this.active_thread(), _ => this.invalid_handle("WaitForSingleObject")?, };