Skip to content

Commit

Permalink
Fix race between creating collective groups and submitting host tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
fknorr committed Nov 8, 2022
1 parent 59e7d61 commit 0a4fca5
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
22 changes: 13 additions & 9 deletions include/host_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ namespace detail {
}

void require_collective_group(collective_group_id cgid) {
if(m_threads.find(cgid) != m_threads.end()) return;
const std::lock_guard lock(m_mutex); // called by main thread
if(m_threads.count(cgid) > 0) return;

assert(cgid != 0);
MPI_Comm comm;
MPI_Comm_dup(MPI_COMM_WORLD, &comm);
Expand All @@ -133,9 +135,9 @@ namespace detail {

template <typename Fn>
std::future<execution_info> submit(collective_group_id cgid, Fn&& fn) {
auto it = m_threads.find(cgid);
assert(it != m_threads.end());
return it->second.thread.push([fn = std::forward<Fn>(fn), submit_time = std::chrono::steady_clock::now(), comm = it->second.comm](int) {
const std::lock_guard lock(m_mutex); // called by executor thread
auto& [comm, pool] = m_threads.at(cgid);
return pool.push([fn = std::forward<Fn>(fn), submit_time = std::chrono::steady_clock::now(), comm = comm](int) {
auto start_time = std::chrono::steady_clock::now();
try {
fn(comm);
Expand All @@ -151,24 +153,26 @@ namespace detail {
* @brief Waits until all currently submitted operations have completed.
*/
void wait() {
for(auto& ct : m_threads) {
ct.second.thread.stop(true /* isWait */);
const std::lock_guard lock(m_mutex); // called by main thread - never contended because the executor is shut down at this point
for(auto& [_, ct] : m_threads) {
ct.pool.stop(true /* isWait */);
}
}

private:
struct comm_thread {
MPI_Comm comm;
ctpl::thread_pool thread;
ctpl::thread_pool pool;

comm_thread(MPI_Comm comm, size_t n_threads, size_t id) : comm(comm), thread(n_threads) {
comm_thread(MPI_Comm comm, size_t n_threads, size_t id) : comm(comm), pool(n_threads) {
for(size_t i = 0; i < n_threads; ++i) {
auto& worker = thread.get_thread(i);
auto& worker = pool.get_thread(i);
set_thread_name(worker.native_handle(), fmt::format("cy-worker-{}.{}", id, i));
}
}
};

std::mutex m_mutex;
std::unordered_map<collective_group_id, comm_thread> m_threads;
size_t m_id = 0;
};
Expand Down
10 changes: 8 additions & 2 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,15 @@ namespace detail {
prepass_handler cgh(tid, std::make_unique<command_group_storage<CGF>>(cgf), m_num_collective_nodes);
cgf(cgh);

task& tsk = register_task_internal(std::move(reservation), std::move(cgh).into_task());
auto unique_tsk = std::move(cgh).into_task();

// Require the collective group before inserting the task into the ring buffer, otherwise the executor will try to schedule the collective host
// task on a collective-group thread that does not yet exist.
// The queue pointer will be null in non-runtime tests.
if(m_queue) m_queue->require_collective_group(unique_tsk->get_collective_group_id());

auto& tsk = register_task_internal(std::move(reservation), std::move(unique_tsk));
compute_dependencies(tsk);
if(m_queue) m_queue->require_collective_group(tsk.get_collective_group_id());

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots,
Expand Down

0 comments on commit 0a4fca5

Please sign in to comment.