Skip to content

Commit

Permalink
Replace latest epoch tracking with on-demand lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterTh committed Jul 18, 2022
1 parent 283f330 commit 5139256
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 17 deletions.
10 changes: 6 additions & 4 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ namespace detail {
if(queue) queue->require_collective_group(task_ref.get_collective_group_id());

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots
// we want to free tasks earlier than just when running out of slots,
// so that we can potentially reclaim additional resources such as buffers earlier
task_buffer.delete_up_to(latest_epoch_reached.get());
}
invoke_callbacks(tid);
Expand Down Expand Up @@ -208,9 +209,6 @@ namespace detail {
// Only accessed in task_manager::notify_*, which are always called from the executor thread - no locking needed.
std::optional<task_id> latest_horizon_reached;

// The number of horizons and epochs in flight, used to detect stalling scenarios with very broad task graphs
std::atomic<int> number_of_in_flight_horizons_and_epochs = 0;

// The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread.
epoch_monitor latest_epoch_reached{initial_epoch_task};

Expand All @@ -237,6 +235,10 @@ namespace detail {

void compute_dependencies(task_id tid);

// Finds the first in-flight epoch, or returns the currently reached one if there are none in-flight
// Used in await_free_task_slot_callback to check for hangs
task_id get_first_in_flight_epoch() const;

// Returns a callback which blocks until any epoch task has executed, freeing new task slots
task_ring_buffer::wait_callback await_free_task_slot_callback();
};
Expand Down
30 changes: 18 additions & 12 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ namespace detail {
assert(!latest_horizon_reached || *latest_horizon_reached < horizon_tid);
assert(latest_epoch_reached.get() < horizon_tid);

assert(number_of_in_flight_horizons_and_epochs.load() > 0);
number_of_in_flight_horizons_and_epochs--;

if(latest_horizon_reached) { latest_epoch_reached.set(*latest_horizon_reached); }

latest_horizon_reached = horizon_tid;
Expand All @@ -54,9 +51,6 @@ namespace detail {
assert(!latest_horizon_reached || *latest_horizon_reached < epoch_tid);
assert(latest_epoch_reached.get() < epoch_tid);

assert(number_of_in_flight_horizons_and_epochs.load() > 0);
number_of_in_flight_horizons_and_epochs--;

latest_epoch_reached.set(epoch_tid);
latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
}
Expand Down Expand Up @@ -187,11 +181,6 @@ namespace detail {
task& task_manager::register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> task) {
auto& task_ref = *task;
assert(task != nullptr);

if(task_ref.get_type() == task_type::EPOCH || task_ref.get_type() == task_type::HORIZON) {
number_of_in_flight_horizons_and_epochs++; //
}

task_buffer.put(std::move(reserve), std::move(task));
execution_front.insert(&task_ref);
return task_ref;
Expand Down Expand Up @@ -276,9 +265,26 @@ namespace detail {
return tid;
}

task_id task_manager::get_first_in_flight_epoch() const {
task_id current_horizon = 0;
task_id latest_epoch = latest_epoch_reached.get();
// we need either one epoch or two horizons that have yet to be executed
// so that it is possible for task slots to be freed in the future
for(const auto& tsk : task_buffer) {
if(tsk->get_id() <= latest_epoch) continue;
if(tsk->get_type() == task_type::EPOCH) {
return tsk->get_id();
} else if(tsk->get_type() == task_type::HORIZON) {
if(current_horizon) return current_horizon;
current_horizon = tsk->get_id();
}
}
return latest_epoch;
}

task_ring_buffer::wait_callback task_manager::await_free_task_slot_callback() {
return [&](task_id previous_free_tid) {
if(number_of_in_flight_horizons_and_epochs == 0) {
if(get_first_in_flight_epoch() == latest_epoch_reached.get()) {
// verify that the epoch didn't get reached between the invocation of the callback and the in flight check
if(latest_epoch_reached.get() < previous_free_tid + 1) {
throw std::runtime_error("Exhausted task slots with no horizons or epochs in flight."
Expand Down
3 changes: 2 additions & 1 deletion test/task_ring_buffer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ TEST_CASE_METHOD(test_utils::runtime_fixture, "freeing task ring buffer capacity
celerity::accessor acc{dependency, cgh, celerity::access::all{}, celerity::read_write_host_task};
cgh.host_task(celerity::on_master_node, [=, &reached_ringbuffer_capacity] {
while(!reached_ringbuffer_capacity.load())
;
; // we wait in all tasks so that we can make sure to fill the ring buffer completely
// and therefore test that execution re-starts correctly once an epoch is reached
});
});
}
Expand Down

0 comments on commit 5139256

Please sign in to comment.