Skip to content

Commit

Permalink
completely refactor how we manage blocking and unblocking threads
Browse files Browse the repository at this point in the history
  • Loading branch information
RalfJung committed May 26, 2024
1 parent 5fa30f7 commit a131243
Show file tree
Hide file tree
Showing 18 changed files with 832 additions and 895 deletions.
4 changes: 2 additions & 2 deletions src/tools/miri/src/alloc_addresses/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
size,
align,
memory_kind,
ecx.get_active_thread(),
ecx.active_thread(),
) {
if let Some(clock) = clock {
ecx.acquire_clock(&clock);
Expand Down Expand Up @@ -367,7 +367,7 @@ impl<'mir, 'tcx> MiriMachine<'mir, 'tcx> {
// `alloc_id_from_addr` any more.
global_state.exposed.remove(&dead_id);
// Also remember this address for future reuse.
let thread = self.threads.get_active_thread_id();
let thread = self.threads.active_thread();
global_state.reuse.add_addr(rng, addr, size, align, kind, thread, || {
if let Some(data_race) = &self.data_race {
data_race.release_clock(&self.threads).clone()
Expand Down
20 changes: 11 additions & 9 deletions src/tools/miri/src/concurrency/data_race.rs
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: MiriInterpCxExt<'mir, 'tcx> {
fn acquire_clock(&self, clock: &VClock) {
let this = self.eval_context_ref();
if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(clock, this.get_active_thread());
data_race.acquire_clock(clock, &this.machine.threads);
}
}
}
Expand Down Expand Up @@ -1662,13 +1662,14 @@ impl GlobalState {
/// This should be called strictly before any calls to
/// `thread_joined`.
#[inline]
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>, current_span: Span) {
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
let current_index = self.active_thread_index(thread_mgr);

// Increment the clock to a unique termination timestamp.
let vector_clocks = self.vector_clocks.get_mut();
let current_clocks = &mut vector_clocks[current_index];
current_clocks.increment_clock(current_index, current_span);
current_clocks
.increment_clock(current_index, thread_mgr.active_thread_ref().current_span());

// Load the current thread id for the executing vector.
let vector_info = self.vector_info.get_mut();
Expand Down Expand Up @@ -1722,11 +1723,12 @@ impl GlobalState {
format!("thread `{thread_name}`")
}

/// Acquire the given clock into the given thread, establishing synchronization with
/// Acquire the given clock into the current thread, establishing synchronization with
/// the moment when that clock snapshot was taken via `release_clock`.
/// As this is an acquire operation, the thread timestamp is not
/// incremented.
pub fn acquire_clock(&self, clock: &VClock, thread: ThreadId) {
pub fn acquire_clock<'mir, 'tcx>(&self, clock: &VClock, threads: &ThreadManager<'mir, 'tcx>) {
let thread = threads.active_thread();
let (_, mut clocks) = self.thread_state_mut(thread);
clocks.clock.join(clock);
}
Expand All @@ -1738,7 +1740,7 @@ impl GlobalState {
&self,
threads: &ThreadManager<'mir, 'tcx>,
) -> Ref<'_, VClock> {
let thread = threads.get_active_thread_id();
let thread = threads.active_thread();
let span = threads.active_thread_ref().current_span();
// We increment the clock each time this happens, to ensure no two releases
// can be confused with each other.
Expand Down Expand Up @@ -1782,7 +1784,7 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, Ref<'_, ThreadClockSet>) {
self.thread_state(thread_mgr.get_active_thread_id())
self.thread_state(thread_mgr.active_thread())
}

/// Load the current vector clock in use and the current set of thread clocks
Expand All @@ -1792,14 +1794,14 @@ impl GlobalState {
&self,
thread_mgr: &ThreadManager<'_, '_>,
) -> (VectorIdx, RefMut<'_, ThreadClockSet>) {
self.thread_state_mut(thread_mgr.get_active_thread_id())
self.thread_state_mut(thread_mgr.active_thread())
}

/// Return the current thread, should be the same
/// as the data-race active thread.
#[inline]
fn active_thread_index(&self, thread_mgr: &ThreadManager<'_, '_>) -> VectorIdx {
let active_thread_id = thread_mgr.get_active_thread_id();
let active_thread_id = thread_mgr.active_thread();
self.thread_index(active_thread_id)
}

Expand Down
110 changes: 26 additions & 84 deletions src/tools/miri/src/concurrency/init_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,11 @@ use rustc_index::Idx;
use rustc_middle::ty::layout::TyAndLayout;

use super::sync::EvalContextExtPriv as _;
use super::thread::MachineCallback;
use super::vector_clock::VClock;
use crate::*;

declare_id!(InitOnceId);

/// A thread waiting on an InitOnce object.
struct InitOnceWaiter<'mir, 'tcx> {
/// The thread that is waiting.
thread: ThreadId,
/// The callback that should be executed, after the thread has been woken up.
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
}

impl<'mir, 'tcx> std::fmt::Debug for InitOnceWaiter<'mir, 'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("InitOnce")
.field("thread", &self.thread)
.field("callback", &"dyn MachineCallback")
.finish()
}
}

#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
/// The current status of a one time initialization.
pub enum InitOnceStatus {
Expand All @@ -38,68 +20,14 @@ pub enum InitOnceStatus {

/// The one time initialization state.
#[derive(Default, Debug)]
pub(super) struct InitOnce<'mir, 'tcx> {
pub(super) struct InitOnce {
status: InitOnceStatus,
waiters: VecDeque<InitOnceWaiter<'mir, 'tcx>>,
waiters: VecDeque<ThreadId>,
clock: VClock,
}

impl<'mir, 'tcx> VisitProvenance for InitOnce<'mir, 'tcx> {
fn visit_provenance(&self, visit: &mut VisitWith<'_>) {
for waiter in self.waiters.iter() {
waiter.callback.visit_provenance(visit);
}
}
}

impl<'mir, 'tcx: 'mir> EvalContextExtPriv<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
trait EvalContextExtPriv<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
/// Synchronize with the previous initialization attempt of an InitOnce.
#[inline]
fn init_once_observe_attempt(&mut self, id: InitOnceId) {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

if let Some(data_race) = &this.machine.data_race {
data_race.acquire_clock(&this.machine.sync.init_onces[id].clock, current_thread);
}
}

#[inline]
fn init_once_wake_waiter(
&mut self,
id: InitOnceId,
waiter: InitOnceWaiter<'mir, 'tcx>,
) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let current_thread = this.get_active_thread();

this.unblock_thread(waiter.thread, BlockReason::InitOnce(id));

// Call callback, with the woken-up thread as `current`.
this.set_active_thread(waiter.thread);
this.init_once_observe_attempt(id);
waiter.callback.call(this)?;
this.set_active_thread(current_thread);

Ok(())
}
}

impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_get_or_create_id(
&mut self,
lock_op: &OpTy<'tcx, Provenance>,
lock_layout: TyAndLayout<'tcx>,
offset: u64,
) -> InterpResult<'tcx, InitOnceId> {
let this = self.eval_context_mut();
this.init_once_get_or_create(|ecx, next_id| {
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
})
}

/// Provides the closure with the next InitOnceId. Creates that InitOnce if the closure returns None,
/// otherwise returns the value from the closure.
#[inline]
Expand All @@ -120,6 +48,21 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
Ok(new_index)
}
}
}

impl<'mir, 'tcx: 'mir> EvalContextExt<'mir, 'tcx> for crate::MiriInterpCx<'mir, 'tcx> {}
pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_get_or_create_id(
&mut self,
lock_op: &OpTy<'tcx, Provenance>,
lock_layout: TyAndLayout<'tcx>,
offset: u64,
) -> InterpResult<'tcx, InitOnceId> {
let this = self.eval_context_mut();
this.init_once_get_or_create(|ecx, next_id| {
ecx.get_or_create_id(next_id, lock_op, lock_layout, offset)
})
}

#[inline]
fn init_once_status(&mut self, id: InitOnceId) -> InitOnceStatus {
Expand All @@ -132,14 +75,14 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
fn init_once_enqueue_and_block(
&mut self,
id: InitOnceId,
thread: ThreadId,
callback: Box<dyn MachineCallback<'mir, 'tcx> + 'tcx>,
callback: impl UnblockCallback<'mir, 'tcx> + 'tcx,
) {
let this = self.eval_context_mut();
let thread = this.active_thread();
let init_once = &mut this.machine.sync.init_onces[id];
assert_ne!(init_once.status, InitOnceStatus::Complete, "queueing on complete init once");
init_once.waiters.push_back(InitOnceWaiter { thread, callback });
this.block_thread(thread, BlockReason::InitOnce(id));
init_once.waiters.push_back(thread);
this.block_thread(BlockReason::InitOnce(id), None, callback);
}

/// Begin initializing this InitOnce. Must only be called after checking that it is currently
Expand Down Expand Up @@ -177,7 +120,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
// Wake up everyone.
// need to take the queue to avoid having `this` be borrowed multiple times
for waiter in std::mem::take(&mut init_once.waiters) {
this.init_once_wake_waiter(id, waiter)?;
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
}

Ok(())
Expand All @@ -192,6 +135,8 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
InitOnceStatus::Begun,
"failing already completed or uninit init once"
);
// This is again uninitialized.
init_once.status = InitOnceStatus::Uninitialized;

// Each complete happens-before the end of the wait
if let Some(data_race) = &this.machine.data_race {
Expand All @@ -200,10 +145,7 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {

// Wake up one waiting thread, so they can go ahead and try to init this.
if let Some(waiter) = init_once.waiters.pop_front() {
this.init_once_wake_waiter(id, waiter)?;
} else {
// Nobody there to take this, so go back to 'uninit'
init_once.status = InitOnceStatus::Uninitialized;
this.unblock_thread(waiter, BlockReason::InitOnce(id))?;
}

Ok(())
Expand All @@ -221,6 +163,6 @@ pub trait EvalContextExt<'mir, 'tcx: 'mir>: crate::MiriInterpCxExt<'mir, 'tcx> {
"observing the completion of incomplete init once"
);

this.init_once_observe_attempt(id);
this.acquire_clock(&this.machine.sync.init_onces[id].clock);
}
}
Loading

0 comments on commit a131243

Please sign in to comment.