Skip to content

Commit

Permalink
data_race: vector indices can be reused immediately when the thread i…
Browse files Browse the repository at this point in the history
…s gone
  • Loading branch information
RalfJung committed May 26, 2024
1 parent a131243 commit e6bb468
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 82 deletions.
109 changes: 34 additions & 75 deletions src/tools/miri/src/concurrency/data_race.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use std::{
};

use rustc_ast::Mutability;
use rustc_data_structures::fx::{FxHashMap, FxHashSet};
use rustc_data_structures::fx::FxHashSet;
use rustc_index::{Idx, IndexVec};
use rustc_middle::{mir, ty::Ty};
use rustc_span::Span;
Expand Down Expand Up @@ -1432,13 +1432,6 @@ pub struct GlobalState {
/// active vector-clocks catch up with the threads timestamp.
reuse_candidates: RefCell<FxHashSet<VectorIdx>>,

/// This contains threads that have terminated, but not yet joined
/// and so cannot become re-use candidates until a join operation
/// occurs.
/// The associated vector index will be moved into re-use candidates
/// after the join operation occurs.
terminated_threads: RefCell<FxHashMap<ThreadId, VectorIdx>>,

/// The timestamp of last SC fence performed by each thread
last_sc_fence: RefCell<VClock>,

Expand Down Expand Up @@ -1466,7 +1459,6 @@ impl GlobalState {
vector_info: RefCell::new(IndexVec::new()),
thread_info: RefCell::new(IndexVec::new()),
reuse_candidates: RefCell::new(FxHashSet::default()),
terminated_threads: RefCell::new(FxHashMap::default()),
last_sc_fence: RefCell::new(VClock::default()),
last_sc_write: RefCell::new(VClock::default()),
track_outdated_loads: config.track_outdated_loads,
Expand Down Expand Up @@ -1500,8 +1492,6 @@ impl GlobalState {
fn find_vector_index_reuse_candidate(&self) -> Option<VectorIdx> {
let mut reuse = self.reuse_candidates.borrow_mut();
let vector_clocks = self.vector_clocks.borrow();
let vector_info = self.vector_info.borrow();
let terminated_threads = self.terminated_threads.borrow();
for &candidate in reuse.iter() {
let target_timestamp = vector_clocks[candidate].clock[candidate];
if vector_clocks.iter_enumerated().all(|(clock_idx, clock)| {
Expand All @@ -1511,9 +1501,7 @@ impl GlobalState {

// The vector represents a thread that has terminated and hence cannot
// report a data-race with the candidate index.
let thread_id = vector_info[clock_idx];
let vector_terminated =
reuse.contains(&clock_idx) || terminated_threads.contains_key(&thread_id);
let vector_terminated = reuse.contains(&clock_idx);

// The vector index cannot report a race with the candidate index
// and hence allows the candidate index to be re-used.
Expand Down Expand Up @@ -1603,55 +1591,38 @@ impl GlobalState {
/// thread (the joinee, the thread that someone waited on) and the current thread (the joiner,
/// the thread who was waiting).
#[inline]
pub fn thread_joined(
&mut self,
thread_mgr: &ThreadManager<'_, '_>,
joiner: ThreadId,
joinee: ThreadId,
) {
let clocks_vec = self.vector_clocks.get_mut();
let thread_info = self.thread_info.get_mut();

// Load the vector clock of the current thread.
let current_index = thread_info[joiner]
.vector_index
.expect("Performed thread join on thread with no assigned vector");
let current = &mut clocks_vec[current_index];
pub fn thread_joined(&mut self, threads: &ThreadManager<'_, '_>, joinee: ThreadId) {
let thread_info = self.thread_info.borrow();
let thread_info = &thread_info[joinee];

// Load the associated vector clock for the terminated thread.
let join_clock = thread_info[joinee]
let join_clock = thread_info
.termination_vector_clock
.as_ref()
.expect("Joined with thread but thread has not terminated");

// The join thread happens-before the current thread
// so update the current vector clock.
// Is not a release operation so the clock is not incremented.
current.clock.join(join_clock);
.expect("joined with thread but thread has not terminated");
// Acquire that into the current thread.
self.acquire_clock(join_clock, threads);

// Check the number of live threads, if the value is 1
// then test for potentially disabling multi-threaded execution.
if thread_mgr.get_live_thread_count() == 1 {
// May potentially be able to disable multi-threaded execution.
let current_clock = &clocks_vec[current_index];
if clocks_vec
.iter_enumerated()
.all(|(idx, clocks)| clocks.clock[idx] <= current_clock.clock[idx])
{
// All thread terminations happen-before the current clock
// therefore no data-races can be reported until a new thread
// is created, so disable multi-threaded execution.
self.multi_threaded.set(false);
// This has to happen after `acquire_clock`, otherwise there'll always
// be some thread that has not synchronized yet.
if let Some(current_index) = thread_info.vector_index {
if threads.get_live_thread_count() == 1 {
let vector_clocks = self.vector_clocks.get_mut();
// May potentially be able to disable multi-threaded execution.
let current_clock = &vector_clocks[current_index];
if vector_clocks
.iter_enumerated()
.all(|(idx, clocks)| clocks.clock[idx] <= current_clock.clock[idx])
{
// All thread terminations happen-before the current clock
// therefore no data-races can be reported until a new thread
// is created, so disable multi-threaded execution.
self.multi_threaded.set(false);
}
}
}

// If the thread is marked as terminated but not joined
// then move the thread to the re-use set.
let termination = self.terminated_threads.get_mut();
if let Some(index) = termination.remove(&joinee) {
let reuse = self.reuse_candidates.get_mut();
reuse.insert(index);
}
}

/// On thread termination, the vector-clock may re-used
Expand All @@ -1663,29 +1634,17 @@ impl GlobalState {
/// `thread_joined`.
#[inline]
pub fn thread_terminated(&mut self, thread_mgr: &ThreadManager<'_, '_>) {
let current_thread = thread_mgr.active_thread();
let current_index = self.active_thread_index(thread_mgr);

// Increment the clock to a unique termination timestamp.
let vector_clocks = self.vector_clocks.get_mut();
let current_clocks = &mut vector_clocks[current_index];
current_clocks
.increment_clock(current_index, thread_mgr.active_thread_ref().current_span());

// Load the current thread id for the executing vector.
let vector_info = self.vector_info.get_mut();
let current_thread = vector_info[current_index];

// Load the current thread metadata, and move to a terminated
// vector state. Setting up the vector clock all join operations
// will use.
let thread_info = self.thread_info.get_mut();
let current = &mut thread_info[current_thread];
current.termination_vector_clock = Some(current_clocks.clock.clone());

// Add this thread as a candidate for re-use after a thread join
// occurs.
let termination = self.terminated_threads.get_mut();
termination.insert(current_thread, current_index);
// Store the terminaion clock.
let terminaion_clock = self.release_clock(thread_mgr).clone();
self.thread_info.get_mut()[current_thread].termination_vector_clock =
Some(terminaion_clock);

// Add this thread's clock index as a candidate for re-use.
let reuse = self.reuse_candidates.get_mut();
reuse.insert(current_index);
}

/// Attempt to perform a synchronized operation, this
Expand Down
10 changes: 3 additions & 7 deletions src/tools/miri/src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,19 +597,15 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
this: &mut MiriInterpCx<'mir, 'tcx>,
) -> InterpResult<'tcx> {
if let Some(data_race) = &mut this.machine.data_race {
data_race.thread_joined(
&this.machine.threads,
this.machine.threads.active_thread(),
self.joined_thread_id,
);
data_race.thread_joined(&this.machine.threads, self.joined_thread_id);
}
Ok(())
}
}
} else {
// The thread has already terminated - mark join happens-before
// The thread has already terminated - establish happens-before
if let Some(data_race) = data_race {
data_race.thread_joined(self, self.active_thread, joined_thread_id);
data_race.thread_joined(self, joined_thread_id);
}
}
Ok(())
Expand Down

0 comments on commit e6bb468

Please sign in to comment.