Skip to content

Commit

Permalink
Named threads for better debugging (#98)
Browse files Browse the repository at this point in the history
* Implement named threads for windows

* Add thread names to host_task thread-pool worker

* static_assert native thread handle type

* Add missing header

* Implement named threads for unix

* Fix unused variable warning in release builds

* Replace string with vector

* Implement proper string/wstring conversion

* Simplify named threads API

* Remove dependency to std::vector and use normal array instead

* Use overloading and accept minor code duplication

* Add comment about possible memory leak in case of exception

* Make use of catch2 string matcher

* Add prefix to thread names, and additionally label worker threads with id

* Fix wrong cmake if-block for platform specific translation units
  • Loading branch information
BlackMark authored Mar 8, 2022
1 parent 3c303bb commit 25d769d
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 7 deletions.
8 changes: 6 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,12 @@ set(SOURCES

if(WIN32)
set(SOURCES ${SOURCES} src/platform_specific/affinity.win.cc)
elseif(UNIX AND NOT APPLE)
set(SOURCES ${SOURCES} src/platform_specific/affinity.unix.cc)
set(SOURCES ${SOURCES} src/platform_specific/named_threads.win.cc)
elseif(UNIX)
if(NOT APPLE)
set(SOURCES ${SOURCES} src/platform_specific/affinity.unix.cc)
endif()
set(SOURCES ${SOURCES} src/platform_specific/named_threads.unix.cc)
endif()

add_library(
Expand Down
2 changes: 2 additions & 0 deletions include/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace detail {
};

class executor {
friend struct executor_testspy;

public:
// TODO: Try to decouple this more.
executor(
Expand Down
13 changes: 10 additions & 3 deletions include/host_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <mpi.h>

#include "config.h"
#include "named_threads.h"
#include "types.h"

namespace celerity {
Expand Down Expand Up @@ -113,15 +114,15 @@ namespace detail {

host_queue() {
// TODO what is a good thread count for the non-collective thread pool?
threads.emplace(std::piecewise_construct, std::tuple{0}, std::tuple{MPI_COMM_NULL, 4});
threads.emplace(std::piecewise_construct, std::tuple{0}, std::tuple{MPI_COMM_NULL, 4, id++});
}

void require_collective_group(collective_group_id cgid) {
if(threads.find(cgid) != threads.end()) return;
assert(cgid != 0);
MPI_Comm comm;
MPI_Comm_dup(MPI_COMM_WORLD, &comm);
threads.emplace(std::piecewise_construct, std::tuple{cgid}, std::tuple{comm, 1});
threads.emplace(std::piecewise_construct, std::tuple{cgid}, std::tuple{comm, 1, id++});
}

template <typename Fn>
Expand Down Expand Up @@ -159,10 +160,16 @@ namespace detail {
MPI_Comm comm;
ctpl::thread_pool thread;

comm_thread(MPI_Comm comm, size_t n_threads) : comm(comm), thread(n_threads) {}
comm_thread(MPI_Comm comm, size_t n_threads, size_t id) : comm(comm), thread(n_threads) {
for(size_t i = 0; i < n_threads; ++i) {
auto& worker = thread.get_thread(i);
set_thread_name(worker.native_handle(), fmt::format("cy-worker-{}.{}", id, i));
}
}
};

std::unordered_map<collective_group_id, comm_thread> threads;
size_t id = 0;
};

} // namespace detail
Expand Down
14 changes: 14 additions & 0 deletions include/named_threads.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <string>
#include <thread>

namespace celerity::detail {

std::thread::native_handle_type get_current_thread_handle();

void set_thread_name(const std::thread::native_handle_type thread_handle, const std::string& name);

std::string get_thread_name(const std::thread::native_handle_type thread_handle);

} // namespace celerity::detail
2 changes: 2 additions & 0 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ namespace detail {
};

class runtime {
friend struct runtime_testspy;

public:
/**
* @param user_device This optional device can be provided by the user, overriding any other device selection strategy.
Expand Down
2 changes: 2 additions & 0 deletions include/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ namespace detail {
};

class scheduler {
friend struct scheduler_testspy;

public:
scheduler(graph_generator& ggen, graph_serializer& gsrlzr, size_t num_nodes);

Expand Down
6 changes: 5 additions & 1 deletion src/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "distr_queue.h"
#include "log.h"
#include "mpi_support.h"
#include "named_threads.h"

// TODO: Get rid of this. (This could potentialy even cause deadlocks on large clusters)
constexpr size_t MAX_CONCURRENT_JOBS = 20;
Expand All @@ -30,7 +31,10 @@ namespace detail {
metrics.initial_idle.resume();
}

void executor::startup() { exec_thrd = std::thread(&executor::run, this); }
void executor::startup() {
exec_thrd = std::thread(&executor::run, this);
set_thread_name(exec_thrd.native_handle(), "cy-executor");
}

void executor::shutdown() {
if(exec_thrd.joinable()) { exec_thrd.join(); }
Expand Down
32 changes: 32 additions & 0 deletions src/platform_specific/named_threads.unix.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#include "named_threads.h"

#include <cassert>
#include <type_traits>

#include <pthread.h>

namespace celerity::detail {

static_assert(std::is_same_v<std::thread::native_handle_type, pthread_t>, "Unexpected native thread handle type");

constexpr auto PTHREAD_MAX_THREAD_NAME_LEN = 16;

std::thread::native_handle_type get_current_thread_handle() {
return pthread_self();
}

void set_thread_name(const std::thread::native_handle_type thread_handle, const std::string& name) {
auto truncated_name = name;
truncated_name.resize(PTHREAD_MAX_THREAD_NAME_LEN - 1); // -1 because of null terminator
[[maybe_unused]] const auto res = pthread_setname_np(thread_handle, truncated_name.c_str());
assert(res == 0 && "Failed to set thread name");
}

std::string get_thread_name(const std::thread::native_handle_type thread_handle) {
char name[PTHREAD_MAX_THREAD_NAME_LEN] = {};
[[maybe_unused]] const auto res = pthread_getname_np(thread_handle, name, PTHREAD_MAX_THREAD_NAME_LEN);
assert(res == 0 && "Failed to get thread name");
return name; // Automatically strips null terminator
}

} // namespace celerity::detail
53 changes: 53 additions & 0 deletions src/platform_specific/named_threads.win.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#include "named_threads.h"

#include <cassert>
#include <cwchar>
#include <type_traits>

#include <windows.h>

namespace celerity::detail {

static_assert(std::is_same_v<std::thread::native_handle_type, HANDLE>, "Unexpected native thread handle type");

static inline std::string convert_string(const std::wstring& str) {
const auto* src = str.c_str();
auto mbstate = std::mbstate_t{};
const auto len = std::wcsrtombs(nullptr, &src, 0, &mbstate);
auto dst = std::string(len, L'\0'); // Automatically includes space for the null terminator
std::wcsrtombs(dst.data(), &src, dst.size(), &mbstate);
return dst;
}

static inline std::wstring convert_string(const std::string& str) {
const auto* src = str.c_str();
auto mbstate = std::mbstate_t{};
const auto len = std::mbsrtowcs(nullptr, &src, 0, &mbstate);
auto dst = std::wstring(len, L'\0'); // Automatically includes space for the null terminator
std::mbsrtowcs(dst.data(), &src, dst.size(), &mbstate);
return dst;
}

std::thread::native_handle_type get_current_thread_handle() {
return GetCurrentThread();
}

void set_thread_name(const std::thread::native_handle_type thread_handle, const std::string& name) {
const auto wname = convert_string(name);
[[maybe_unused]] const auto res = SetThreadDescription(thread_handle, wname.c_str());
assert(SUCCEEDED(res) && "Failed to set thread name");
}

std::string get_thread_name(const std::thread::native_handle_type thread_handle) {
PWSTR wname = nullptr;
const auto res = GetThreadDescription(thread_handle, &wname);
assert(SUCCEEDED(res) && "Failed to get thread name");
std::string name;
if(SUCCEEDED(res)) {
name = convert_string(wname);
LocalFree(wname); // Will leak if convert_string throws
}
return name;
}

} // namespace celerity::detail
2 changes: 2 additions & 0 deletions src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "host_object.h"
#include "log.h"
#include "mpi_support.h"
#include "named_threads.h"
#include "scheduler.h"
#include "task_manager.h"
#include "user_bench.h"
Expand Down Expand Up @@ -170,6 +171,7 @@ namespace detail {
is_active = true;
if(is_master_node()) { schdlr->startup(); }
exec->startup();
set_thread_name(get_current_thread_handle(), "cy-main");
}

void runtime::shutdown() noexcept {
Expand Down
6 changes: 5 additions & 1 deletion src/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@

#include "graph_generator.h"
#include "graph_serializer.h"
#include "named_threads.h"
#include "transformers/naive_split.h"

namespace celerity {
namespace detail {

scheduler::scheduler(graph_generator& ggen, graph_serializer& gsrlzr, size_t num_nodes) : ggen(ggen), gsrlzr(gsrlzr), num_nodes(num_nodes) {}

void scheduler::startup() { worker_thread = std::thread(&scheduler::schedule, this); }
void scheduler::startup() {
worker_thread = std::thread(&scheduler::schedule, this);
set_thread_name(worker_thread.native_handle(), "cy-scheduler");
}

void scheduler::shutdown() {
notify(scheduler_event_type::SHUTDOWN, 0);
Expand Down
43 changes: 43 additions & 0 deletions test/runtime_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <celerity.h>

#include "affinity.h"
#include "executor.h"
#include "named_threads.h"
#include "ranges.h"
#include "region_map.h"

Expand All @@ -42,6 +44,20 @@ namespace detail {
}
};

struct runtime_testspy {
static scheduler& get_schdlr(runtime& rt) { return *rt.schdlr; }

static executor& get_exec(runtime& rt) { return *rt.exec; }
};

struct scheduler_testspy {
static std::thread& get_worker_thread(scheduler& schdlr) { return schdlr.worker_thread; }
};

struct executor_testspy {
static std::thread& get_exec_thrd(executor& exec) { return exec.exec_thrd; }
};

TEST_CASE("only a single distr_queue can be created", "[distr_queue][lifetime][dx]") {
distr_queue q1;
auto q2{q1}; // Copying is allowed
Expand Down Expand Up @@ -968,5 +984,32 @@ namespace detail {
CHECK(exterior == std::vector{1, 2});
}

TEST_CASE("thread names are set", "[threads]") {
distr_queue q;

auto& rt = runtime::get_instance();
auto& schdlr = runtime_testspy::get_schdlr(rt);
auto& exec = runtime_testspy::get_exec(rt);

if(rt.is_master_node()) {
const auto scheduler_thread_name = get_thread_name(scheduler_testspy::get_worker_thread(schdlr).native_handle());
CHECK(scheduler_thread_name == "cy-scheduler");
}

const auto executor_thread_name = get_thread_name(executor_testspy::get_exec_thrd(exec).native_handle());
CHECK(executor_thread_name == "cy-executor");

const auto main_thread_name = get_thread_name(get_current_thread_handle());
CHECK(main_thread_name == "cy-main");

q.submit([](handler& cgh) {
cgh.host_task(experimental::collective, [&](experimental::collective_partition) {
const auto base_name = std::string("cy-worker-");
const auto worker_thread_name = get_thread_name(get_current_thread_handle());
CHECK_THAT(worker_thread_name, Catch::Matchers::StartsWith(base_name));
});
});
}

} // namespace detail
} // namespace celerity

0 comments on commit 25d769d

Please sign in to comment.