From 0a4fca5831ad0dfe4691368cace8f96c300ab9f2 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Mon, 7 Nov 2022 18:44:35 +0100 Subject: [PATCH] Fix race between creating collective groups and submitting host tasks --- include/host_queue.h | 22 +++++++++++++--------- include/task_manager.h | 10 ++++++++-- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/include/host_queue.h b/include/host_queue.h index c22293357..d385ddd04 100644 --- a/include/host_queue.h +++ b/include/host_queue.h @@ -119,7 +119,9 @@ namespace detail { } void require_collective_group(collective_group_id cgid) { - if(m_threads.find(cgid) != m_threads.end()) return; + const std::lock_guard lock(m_mutex); // called by main thread + if(m_threads.count(cgid) > 0) return; + assert(cgid != 0); MPI_Comm comm; MPI_Comm_dup(MPI_COMM_WORLD, &comm); @@ -133,9 +135,9 @@ namespace detail { template std::future submit(collective_group_id cgid, Fn&& fn) { - auto it = m_threads.find(cgid); - assert(it != m_threads.end()); - return it->second.thread.push([fn = std::forward(fn), submit_time = std::chrono::steady_clock::now(), comm = it->second.comm](int) { + const std::lock_guard lock(m_mutex); // called by executor thread + auto& [comm, pool] = m_threads.at(cgid); + return pool.push([fn = std::forward(fn), submit_time = std::chrono::steady_clock::now(), comm = comm](int) { auto start_time = std::chrono::steady_clock::now(); try { fn(comm); @@ -151,24 +153,26 @@ namespace detail { * @brief Waits until all currently submitted operations have completed. */ void wait() { - for(auto& ct : m_threads) { - ct.second.thread.stop(true /* isWait */); + const std::lock_guard lock(m_mutex); // called by main thread - never contended because the executor is shut down at this point + for(auto& [_, ct] : m_threads) { + ct.pool.stop(true /* isWait */); } } private: struct comm_thread { MPI_Comm comm; - ctpl::thread_pool thread; + ctpl::thread_pool pool; - comm_thread(MPI_Comm comm, size_t n_threads, size_t id) : comm(comm), thread(n_threads) { + comm_thread(MPI_Comm comm, size_t n_threads, size_t id) : comm(comm), pool(n_threads) { for(size_t i = 0; i < n_threads; ++i) { - auto& worker = thread.get_thread(i); + auto& worker = pool.get_thread(i); set_thread_name(worker.native_handle(), fmt::format("cy-worker-{}.{}", id, i)); } } }; + std::mutex m_mutex; std::unordered_map m_threads; size_t m_id = 0; }; diff --git a/include/task_manager.h b/include/task_manager.h index 414e43818..8985f2ad2 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -70,9 +70,15 @@ namespace detail { prepass_handler cgh(tid, std::make_unique>(cgf), m_num_collective_nodes); cgf(cgh); - task& tsk = register_task_internal(std::move(reservation), std::move(cgh).into_task()); + auto unique_tsk = std::move(cgh).into_task(); + + // Require the collective group before inserting the task into the ring buffer, otherwise the executor will try to schedule the collective host + // task on a collective-group thread that does not yet exist. + // The queue pointer will be null in non-runtime tests. + if(m_queue) m_queue->require_collective_group(unique_tsk->get_collective_group_id()); + + auto& tsk = register_task_internal(std::move(reservation), std::move(unique_tsk)); compute_dependencies(tsk); - if(m_queue) m_queue->require_collective_group(tsk.get_collective_group_id()); // the following deletion is intentionally redundant with the one happening when waiting for free task slots // we want to free tasks earlier than just when running out of slots,