From 72ad57cf05d908404433b6866070f74b92b3a856 Mon Sep 17 00:00:00 2001 From: Peter Thoman Date: Mon, 18 Jul 2022 11:23:01 +0200 Subject: [PATCH] Replace latest epoch tracking with on-demand lookup --- include/task_manager.h | 10 ++++++---- src/task_manager.cc | 30 ++++++++++++++++++------------ test/task_ring_buffer_tests.cc | 3 ++- 3 files changed, 26 insertions(+), 17 deletions(-) diff --git a/include/task_manager.h b/include/task_manager.h index 0f894466b..33c2ee0c8 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -79,7 +79,8 @@ namespace detail { if(queue) queue->require_collective_group(task_ref.get_collective_group_id()); // the following deletion is intentionally redundant with the one happening when waiting for free task slots - // we want to free tasks earlier than just when running out of slots + // we want to free tasks earlier than just when running out of slots, + // so that we can potentially reclaim additional resources such as buffers earlier task_buffer.delete_up_to(latest_epoch_reached.get()); } invoke_callbacks(tid); @@ -208,9 +209,6 @@ namespace detail { // Only accessed in task_manager::notify_*, which are always called from the executor thread - no locking needed. std::optional latest_horizon_reached; - // The number of horizons and epochs in flight, used to detect stalling scenarios with very broad task graphs - std::atomic number_of_in_flight_horizons_and_epochs = 0; - // The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread. epoch_monitor latest_epoch_reached{initial_epoch_task}; @@ -237,6 +235,10 @@ namespace detail { void compute_dependencies(task_id tid); + // Finds the first in-flight epoch, or returns the currently reached one if there are none in-flight + // Used in await_free_task_slot_callback to check for hangs + task_id get_first_in_flight_epoch() const; + // Returns a callback which blocks until any epoch task has executed, freeing new task slots task_ring_buffer::wait_callback await_free_task_slot_callback(); }; diff --git a/src/task_manager.cc b/src/task_manager.cc index 08bb743d1..7bcb1de86 100644 --- a/src/task_manager.cc +++ b/src/task_manager.cc @@ -38,9 +38,6 @@ namespace detail { assert(!latest_horizon_reached || *latest_horizon_reached < horizon_tid); assert(latest_epoch_reached.get() < horizon_tid); - assert(number_of_in_flight_horizons_and_epochs.load() > 0); - number_of_in_flight_horizons_and_epochs--; - if(latest_horizon_reached) { latest_epoch_reached.set(*latest_horizon_reached); } latest_horizon_reached = horizon_tid; @@ -54,9 +51,6 @@ namespace detail { assert(!latest_horizon_reached || *latest_horizon_reached < epoch_tid); assert(latest_epoch_reached.get() < epoch_tid); - assert(number_of_in_flight_horizons_and_epochs.load() > 0); - number_of_in_flight_horizons_and_epochs--; - latest_epoch_reached.set(epoch_tid); latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself } @@ -187,11 +181,6 @@ namespace detail { task& task_manager::register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr task) { auto& task_ref = *task; assert(task != nullptr); - - if(task_ref.get_type() == task_type::EPOCH || task_ref.get_type() == task_type::HORIZON) { - number_of_in_flight_horizons_and_epochs++; // - } - task_buffer.put(std::move(reserve), std::move(task)); execution_front.insert(&task_ref); return task_ref; @@ -276,9 +265,26 @@ namespace detail { return tid; } + task_id task_manager::get_first_in_flight_epoch() const { + task_id current_horizon = 0; + task_id latest_epoch = latest_epoch_reached.get(); + // we need either one epoch or two horizons that have yet to be executed + // so that it is possible for task slots to be freed in the future + for(const auto& tsk : task_buffer) { + if(tsk->get_id() <= latest_epoch) continue; + if(tsk->get_type() == task_type::EPOCH) { + return tsk->get_id(); + } else if(tsk->get_type() == task_type::HORIZON) { + if(current_horizon) return current_horizon; + current_horizon = tsk->get_id(); + } + } + return latest_epoch; + } + task_ring_buffer::wait_callback task_manager::await_free_task_slot_callback() { return [&](task_id previous_free_tid) { - if(number_of_in_flight_horizons_and_epochs == 0) { + if(get_first_in_flight_epoch() == latest_epoch_reached.get()) { // verify that the epoch didn't get reached between the invocation of the callback and the in flight check if(latest_epoch_reached.get() < previous_free_tid + 1) { throw std::runtime_error("Exhausted task slots with no horizons or epochs in flight." diff --git a/test/task_ring_buffer_tests.cc b/test/task_ring_buffer_tests.cc index bb5b22db7..6ed569090 100644 --- a/test/task_ring_buffer_tests.cc +++ b/test/task_ring_buffer_tests.cc @@ -28,7 +28,8 @@ TEST_CASE_METHOD(test_utils::runtime_fixture, "freeing task ring buffer capacity celerity::accessor acc{dependency, cgh, celerity::access::all{}, celerity::read_write_host_task}; cgh.host_task(celerity::on_master_node, [=, &reached_ringbuffer_capacity] { while(!reached_ringbuffer_capacity.load()) - ; + ; // we wait in all tasks so that we can make sure to fill the ring buffer completely + // and therefore test that execution re-starts correctly once an epoch is reached }); }); }