Skip to content

Commit

Permalink
Remove last remnants of task mutex locking in task_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
fknorr committed Aug 30, 2022
1 parent d14f06e commit b0254fd
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 46 deletions.
32 changes: 15 additions & 17 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,25 @@ namespace detail {

template <typename CGF, typename... Hints>
task_id submit_command_group(CGF cgf, Hints... hints) {
task_id tid;
const task* tsk_ptr = nullptr;
{
auto reservation = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
tid = reservation.get_tid();
auto reservation = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
const auto tid = reservation.get_tid();

prepass_handler cgh(tid, std::make_unique<command_group_storage<CGF>>(cgf), m_num_collective_nodes);
cgf(cgh);
task& task_ref = register_task_internal(std::move(reservation), std::move(cgh).into_task());
tsk_ptr = &task_ref;
prepass_handler cgh(tid, std::make_unique<command_group_storage<CGF>>(cgf), m_num_collective_nodes);
cgf(cgh);

compute_dependencies(task_ref);
if(m_queue) m_queue->require_collective_group(task_ref.get_collective_group_id());
task& tsk = register_task_internal(std::move(reservation), std::move(cgh).into_task());
compute_dependencies(tsk);
if(m_queue) m_queue->require_collective_group(tsk.get_collective_group_id());

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots,
// so that we can potentially reclaim additional resources such as buffers earlier
m_task_buffer.delete_up_to(m_latest_epoch_reached.get());

invoke_callbacks(&tsk);

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots,
// so that we can potentially reclaim additional resources such as buffers earlier
m_task_buffer.delete_up_to(m_latest_epoch_reached.get());
}
invoke_callbacks(tsk_ptr);
if(need_new_horizon()) { generate_horizon_task(); }

return tid;
}

Expand Down
54 changes: 25 additions & 29 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace detail {
}

void task_manager::notify_horizon_reached(task_id horizon_tid) {
// m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(m_task_buffer.get_task(horizon_tid)->get_type() == task_type::horizon);
assert(!m_latest_horizon_reached || *m_latest_horizon_reached < horizon_tid);
assert(m_latest_epoch_reached.get() < horizon_tid);
Expand All @@ -42,8 +44,7 @@ namespace detail {
}

void task_manager::notify_epoch_reached(task_id epoch_tid) {
// This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread.
// latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
// m_latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(get_task(epoch_tid)->get_type() == task_type::epoch);
assert(!m_latest_horizon_reached || *m_latest_horizon_reached < epoch_tid);
Expand Down Expand Up @@ -226,38 +227,33 @@ namespace detail {
}

task_id task_manager::generate_horizon_task() {
// we are probably overzealous in locking here
task* new_horizon;
{
auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length;
const auto previous_horizon = m_current_horizon;
m_current_horizon = reserve.get_tid();
new_horizon = &reduce_execution_front(std::move(reserve), task::make_horizon_task(*m_current_horizon));
if(previous_horizon) { set_epoch_for_new_tasks(*previous_horizon); }
}
auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
const auto tid = reserve.get_tid();

m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length;
const auto previous_horizon = m_current_horizon;
m_current_horizon = tid;

task& new_horizon = reduce_execution_front(std::move(reserve), task::make_horizon_task(*m_current_horizon));
if(previous_horizon) { set_epoch_for_new_tasks(*previous_horizon); }

// it's important that we don't hold the lock while doing this
invoke_callbacks(new_horizon);
return new_horizon->get_id();
invoke_callbacks(&new_horizon);
return tid;
}

task_id task_manager::generate_epoch_task(epoch_action action) {
// we are probably overzealous in locking here
task* new_epoch;
{
auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
const auto tid = reserve.get_tid();
new_epoch = &reduce_execution_front(std::move(reserve), task::make_epoch(tid, action));
compute_dependencies(*new_epoch);
set_epoch_for_new_tasks(new_epoch->get_id());
m_current_horizon = std::nullopt; // this horizon is now behind the epoch_for_new_tasks, so it will never become an epoch itself
m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length; // the explicit epoch resets the need to create horizons
}
auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
const auto tid = reserve.get_tid();

task& new_epoch = reduce_execution_front(std::move(reserve), task::make_epoch(tid, action));
compute_dependencies(new_epoch);
set_epoch_for_new_tasks(tid);

m_current_horizon = std::nullopt; // this horizon is now behind the epoch_for_new_tasks, so it will never become an epoch itself
m_current_horizon_critical_path_length = m_max_pseudo_critical_path_length; // the explicit epoch resets the need to create horizons

// it's important that we don't hold the lock while doing this
invoke_callbacks(new_epoch);
return new_epoch->get_id();
invoke_callbacks(&new_epoch);
return tid;
}

task_id task_manager::get_first_in_flight_epoch() const {
Expand Down

0 comments on commit b0254fd

Please sign in to comment.