Skip to content

Commit

Permalink
task_ring_buffer improvements
Browse files Browse the repository at this point in the history
Based on reviews & discussion:
* now uses epoch monitor
* tracks in-flight horizons/epochs to be able to report deadlock
  scenarios
* unit tests
* document how member functions may be called
* explicit memory semantics on atomics
  • Loading branch information
PeterTh committed Jun 28, 2022
1 parent fec38b9 commit eefb8dc
Show file tree
Hide file tree
Showing 10 changed files with 321 additions and 222 deletions.
172 changes: 86 additions & 86 deletions ci/perf/gpuc2_bench.csv

Large diffs are not rendered by default.

174 changes: 87 additions & 87 deletions ci/perf/gpuc2_bench.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ namespace detail {
*/
void startup();

void shutdown() noexcept;
void shutdown();

void sync() noexcept;
void sync();

bool is_master_node() const { return local_nid == 0; }

Expand Down
23 changes: 16 additions & 7 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ namespace detail {
return this_epoch;
}

void await(const task_id epoch) const {
task_id await(const task_id min_tid_reached) const {
std::unique_lock lock{mutex};
epoch_changed.wait(lock, [=] { return this_epoch >= epoch; });
epoch_changed.wait(lock, [=] { return this_epoch >= min_tid_reached; });
return this_epoch;
}

void set(const task_id epoch) {
Expand Down Expand Up @@ -67,15 +68,18 @@ namespace detail {
task_id tid;
{
std::lock_guard lock(task_mutex);
auto reservation = task_buffer.reserve_task_entry();
auto reservation = task_buffer.reserve_task_entry(await_free_task_slot_callback());
tid = reservation.get_tid();

prepass_handler cgh(tid, std::make_unique<command_group_storage<CGF>>(cgf), num_collective_nodes);
cgf(cgh);
task& task_ref = register_task_internal(reservation, std::move(cgh).into_task());
task& task_ref = register_task_internal(std::move(reservation), std::move(cgh).into_task());

compute_dependencies(tid);
if(queue) queue->require_collective_group(task_ref.get_collective_group_id());

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots
task_buffer.delete_up_to(latest_epoch_reached.get());
}
invoke_callbacks(tid);
Expand Down Expand Up @@ -168,7 +172,6 @@ namespace detail {

reduction_manager* reduction_mngr;

task_id next_task_id = 1;
task_ring_buffer task_buffer;

// The active epoch is used as the last writer for host-initialized buffers.
Expand Down Expand Up @@ -205,13 +208,16 @@ namespace detail {
// Only accessed in task_manager::notify_*, which are always called from the executor thread - no locking needed.
std::optional<task_id> latest_horizon_reached;

// The number of horizons and epochs in flight, used to detect stalling scenarios with very broad task graphs
std::atomic<int> number_of_in_flight_horizons_and_epochs = 0;

// The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread.
epoch_monitor latest_epoch_reached{initial_epoch_task};

// Set of tasks with no dependents
std::unordered_set<task*> execution_front;

task& register_task_internal(task_ring_buffer::reservation& reserve, std::unique_ptr<task> task);
task& register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> task);

void invoke_callbacks(task_id tid);

Expand All @@ -221,7 +227,7 @@ namespace detail {

int get_max_pseudo_critical_path_length() const { return max_pseudo_critical_path_length; }

task_id reduce_execution_front(task_ring_buffer::reservation& reserve, std::unique_ptr<task> new_front);
task_id reduce_execution_front(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> new_front);

void set_epoch_for_new_tasks(task_id epoch);

Expand All @@ -230,6 +236,9 @@ namespace detail {
task_id generate_horizon_task();

void compute_dependencies(task_id tid);

// Returns a callback which blocks until any epoch task has executed, freeing new task slots
task_ring_buffer::wait_callback await_free_task_slot_callback();
};

} // namespace detail
Expand Down
55 changes: 34 additions & 21 deletions include/task_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <array>
#include <atomic>
#include <memory>
#include <thread>

#include "log.h"
#include "task.h"
Expand All @@ -14,6 +13,8 @@ namespace celerity::detail {
constexpr unsigned long task_ringbuffer_size = 1024;

class task_ring_buffer {
friend struct task_ring_buffer_testspy;

public:
// This is an RAII type for ensuring correct handling of task id reservations
// in the presence of exceptions (i.e. revoking the reservation on stack unwinding)
Expand All @@ -25,7 +26,7 @@ class task_ring_buffer {
~reservation() {
if(!consumed) {
CELERITY_WARN("Consumed reservation for tid {} in destructor", tid);
buffer.revoke_reservation(*this);
buffer.revoke_reservation(std::move(*this));
}
}
reservation(const reservation&) = delete; // non copyable
Expand All @@ -45,53 +46,62 @@ class task_ring_buffer {
task_ring_buffer& buffer;
};

size_t get_total_task_count() const { return next_active_tid.load(); }
size_t get_current_task_count() const { return next_active_tid.load() - number_of_deleted_tasks; }

bool has_task(task_id tid) const {
return tid >= number_of_deleted_tasks && tid < next_active_tid.load(); //
return tid >= number_of_deleted_tasks.load(std::memory_order_relaxed) // best effort, only reliable from application thread
&& tid < next_active_tid.load(std::memory_order_acquire); // synchronizes access to data with put(...)
}

size_t get_total_task_count() const { return next_active_tid.load(std::memory_order_relaxed); }

task* find_task(task_id tid) const { return has_task(tid) ? data[tid % task_ringbuffer_size].get() : nullptr; }

task* get_task(task_id tid) const {
assert(has_task(tid));
return data[tid % task_ringbuffer_size].get();
}

reservation reserve_task_entry() {
wait_for_available_slot();
// all member functions beyond this point may *only* be called by the main application thread

size_t get_current_task_count() const { //
return next_active_tid.load(std::memory_order_relaxed) - number_of_deleted_tasks.load(std::memory_order_relaxed);
}

// the task id passed to the wait callback identifies the lowest in-use TID that the ring buffer is aware of
using wait_callback = std::function<void(task_id)>;

reservation reserve_task_entry(const wait_callback& wc) {
wait_for_available_slot(wc);
reservation ret(next_task_id, *this);
next_task_id++;
return ret;
}

void revoke_reservation(reservation& reserve) {
void revoke_reservation(reservation&& reserve) {
reserve.consume();
assert(reserve.tid == next_task_id - 1); // this is the only allowed (and extant) pattern
next_task_id--;
}

void put(reservation& reserve, std::unique_ptr<task> task) {
void put(reservation&& reserve, std::unique_ptr<task> task) {
reserve.consume();
task_id expected_tid = reserve.tid;
[[maybe_unused]] bool successfully_updated = next_active_tid.compare_exchange_strong(expected_tid, next_active_tid.load() + 1);
assert(successfully_updated); // this is the only allowed (and extant) pattern
assert(next_active_tid.load(std::memory_order_relaxed) == reserve.tid);
data[reserve.tid % task_ringbuffer_size] = std::move(task);
next_active_tid.store(reserve.tid + 1, std::memory_order_release);
}

// may only be called by one thread
void delete_up_to(task_id target_tid) {
for(task_id tid = number_of_deleted_tasks.load(); tid < target_tid; ++tid) {
assert(target_tid >= number_of_deleted_tasks.load(std::memory_order_relaxed));
for(task_id tid = number_of_deleted_tasks.load(std::memory_order_relaxed); tid < target_tid; ++tid) {
data[tid % task_ringbuffer_size].reset();
}
number_of_deleted_tasks += target_tid - number_of_deleted_tasks.load();
number_of_deleted_tasks.store(target_tid, std::memory_order_relaxed);
}

void clear() {
for(auto&& d : data) {
d.reset();
}
number_of_deleted_tasks.store(next_task_id, std::memory_order_relaxed);
}

class task_buffer_iterator {
Expand All @@ -106,7 +116,9 @@ class task_ring_buffer {
bool operator!=(task_buffer_iterator other) { return &buffer != &other.buffer || id != other.id; }
};

task_buffer_iterator begin() const { return task_buffer_iterator(number_of_deleted_tasks, *this); }
task_buffer_iterator begin() const { //
return task_buffer_iterator(number_of_deleted_tasks.load(std::memory_order_relaxed), *this);
}
task_buffer_iterator end() const { return task_buffer_iterator(next_task_id, *this); }

private:
Expand All @@ -115,12 +127,13 @@ class task_ring_buffer {
// the next task id that will actually be emplaced
std::atomic<task_id> next_active_tid = task_id(0);
// the number of deleted tasks (which is implicitly the start of the active range of the ringbuffer)
std::atomic<unsigned long> number_of_deleted_tasks = 0;
std::atomic<size_t> number_of_deleted_tasks = 0;
std::array<std::unique_ptr<task>, task_ringbuffer_size> data;

void wait_for_available_slot() const {
while(next_task_id - number_of_deleted_tasks >= task_ringbuffer_size)
std::this_thread::yield(); // busy wait until we have available slots
void wait_for_available_slot(const wait_callback& wc) const {
if(next_task_id - number_of_deleted_tasks.load(std::memory_order_relaxed) >= task_ringbuffer_size) {
wc(static_cast<task_id>(number_of_deleted_tasks.load(std::memory_order_relaxed)));
}
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ namespace detail {
set_thread_name(get_current_thread_handle(), "cy-main");
}

void runtime::shutdown() noexcept {
void runtime::shutdown() {
assert(is_active);
is_shutting_down = true;

Expand Down Expand Up @@ -229,7 +229,7 @@ namespace detail {
maybe_destroy_runtime();
}

void runtime::sync() noexcept {
void runtime::sync() {
const auto epoch = task_mngr->generate_epoch_task(epoch_action::barrier);
task_mngr->await_epoch(epoch);
}
Expand Down
54 changes: 37 additions & 17 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ namespace detail {
task_manager::task_manager(size_t num_collective_nodes, host_queue* queue, reduction_manager* reduction_mgr)
: num_collective_nodes(num_collective_nodes), queue(queue), reduction_mngr(reduction_mgr) {
// We manually generate the initial epoch task, which we treat as if it has been reached immediately.
auto reserve = task_buffer.reserve_task_entry();
task_buffer.put(reserve, task::make_epoch(initial_epoch_task, epoch_action::none));
auto reserve = task_buffer.reserve_task_entry(await_free_task_slot_callback());
task_buffer.put(std::move(reserve), task::make_epoch(initial_epoch_task, epoch_action::none));
}

void task_manager::add_buffer(buffer_id bid, const cl::sycl::range<3>& range, bool host_initialized) {
Expand All @@ -34,16 +34,14 @@ namespace detail {
}

void task_manager::notify_horizon_reached(task_id horizon_tid) {
// This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread.
// latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(task_buffer.get_task(horizon_tid)->get_type() == task_type::HORIZON);
assert(!latest_horizon_reached || *latest_horizon_reached < horizon_tid);
assert(latest_epoch_reached.get() < horizon_tid);

if(latest_horizon_reached) {
latest_epoch_reached.set(*latest_horizon_reached); // The next call to submit_command_group() will prune all tasks before the epoch reached
}
assert(number_of_in_flight_horizons_and_epochs.load() > 0);
number_of_in_flight_horizons_and_epochs--;

if(latest_horizon_reached) { latest_epoch_reached.set(*latest_horizon_reached); }

latest_horizon_reached = horizon_tid;
}
Expand All @@ -56,7 +54,10 @@ namespace detail {
assert(!latest_horizon_reached || *latest_horizon_reached < epoch_tid);
assert(latest_epoch_reached.get() < epoch_tid);

latest_epoch_reached.set(epoch_tid); // The next call to submit_command_group() will prune all tasks before the last epoch reached
assert(number_of_in_flight_horizons_and_epochs.load() > 0);
number_of_in_flight_horizons_and_epochs--;

latest_epoch_reached.set(epoch_tid);
latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
}

Expand Down Expand Up @@ -185,10 +186,15 @@ namespace detail {
}
}

task& task_manager::register_task_internal(task_ring_buffer::reservation& reserve, std::unique_ptr<task> task) {
task& task_manager::register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> task) {
auto& task_ref = *task;
assert(task != nullptr);
task_buffer.put(reserve, std::move(task));

if(task_ref.get_type() == task_type::EPOCH || task_ref.get_type() == task_type::HORIZON) {
number_of_in_flight_horizons_and_epochs++; //
}

task_buffer.put(std::move(reserve), std::move(task));
execution_front.insert(&task_ref);
return task_ref;
}
Expand All @@ -207,14 +213,14 @@ namespace detail {
max_pseudo_critical_path_length = std::max(max_pseudo_critical_path_length, depender->get_pseudo_critical_path_length());
}

task_id task_manager::reduce_execution_front(task_ring_buffer::reservation& reserve, std::unique_ptr<task> new_front) {
task_id task_manager::reduce_execution_front(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> new_front) {
// add dependencies from a copy of the front to this task
const auto current_front = execution_front;
for(task* front_task : current_front) {
add_dependency(new_front.get(), front_task, dependency_kind::TRUE_DEP, dependency_origin::execution_front);
}
assert(execution_front.empty());
return register_task_internal(reserve, std::move(new_front)).get_id();
return register_task_internal(std::move(reserve), std::move(new_front)).get_id();
}

void task_manager::set_epoch_for_new_tasks(const task_id epoch) {
Expand All @@ -239,12 +245,12 @@ namespace detail {
// we are probably overzealous in locking here
task_id tid;
{
auto reserve = task_buffer.reserve_task_entry();
auto reserve = task_buffer.reserve_task_entry(await_free_task_slot_callback());
tid = reserve.get_tid();
std::lock_guard lock(task_mutex);
current_horizon_critical_path_length = max_pseudo_critical_path_length;
const auto previous_horizon = current_horizon;
current_horizon = reduce_execution_front(reserve, task::make_horizon_task(tid));
current_horizon = reduce_execution_front(std::move(reserve), task::make_horizon_task(tid));
if(previous_horizon) { set_epoch_for_new_tasks(*previous_horizon); }
}

Expand All @@ -257,10 +263,10 @@ namespace detail {
// we are probably overzealous in locking here
task_id tid;
{
auto reserve = task_buffer.reserve_task_entry();
auto reserve = task_buffer.reserve_task_entry(await_free_task_slot_callback());
tid = reserve.get_tid();
std::lock_guard lock(task_mutex);
const auto new_epoch = reduce_execution_front(reserve, task::make_epoch(tid, action));
const auto new_epoch = reduce_execution_front(std::move(reserve), task::make_epoch(tid, action));
compute_dependencies(new_epoch);
set_epoch_for_new_tasks(new_epoch);
current_horizon = std::nullopt; // this horizon is now behind the epoch_for_new_tasks, so it will never become an epoch itself
Expand All @@ -272,5 +278,19 @@ namespace detail {
return tid;
}

task_ring_buffer::wait_callback task_manager::await_free_task_slot_callback() {
return [&](task_id previous_free_tid) {
if(number_of_in_flight_horizons_and_epochs == 0) {
// verify that the epoch didn't get reached between the invocation of the callback and the in flight check
if(latest_epoch_reached.get() < previous_free_tid + 1) {
throw std::runtime_error("Exhausted task slots with no horizons or epochs in flight."
"\nLikely due to generating a very large number of tasks with no dependencies.");
}
}
task_id reached_epoch = latest_epoch_reached.await(previous_free_tid + 1);
task_buffer.delete_up_to(reached_epoch);
};
}

} // namespace detail
} // namespace celerity
1 change: 1 addition & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ set(TEST_TARGETS
runtime_deprecation_tests
sycl_tests
task_graph_tests
task_ring_buffer_tests
device_selection_tests
)

Expand Down
Loading

0 comments on commit eefb8dc

Please sign in to comment.