Skip to content

Commit

Permalink
Address reviewer comments on epochs
Browse files Browse the repository at this point in the history
  • Loading branch information
fknorr committed Apr 27, 2022
1 parent 85bdfa6 commit 61dd07e
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 77 deletions.
2 changes: 0 additions & 2 deletions include/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@ namespace detail {
);
// clang-format on
}

// clang-format on
};

} // namespace detail
Expand Down
8 changes: 4 additions & 4 deletions include/graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ namespace detail {
std::unordered_map<collective_group_id, command_id> last_collective_commands;
// Side effects on the same host object create true dependencies between task commands, so we track the last effect per host object on each node.
side_effect_map host_object_last_effects;

void set_epoch_for_new_commands(const command_id epoch);
};

public:
Expand All @@ -101,9 +99,9 @@ namespace detail {

// After completing an epoch, we need to wait until it is flushed before pruning predecessors from the CDAG, otherwise dependencies will not be flushed.
// We generate the initial epoch commands manually starting from cid 0, so initializing these to 0 is correct.
detail::command_id min_epoch_for_new_commands = 0;
command_id min_epoch_for_new_commands = 0;
// Used to skip the pruning step if no new epoch has been completed.
detail::command_id min_epoch_last_pruned_before = 0;
command_id min_epoch_last_pruned_before = 0;

// NOTE: We have several data structures that keep track of the "global state" of the distributed program, across all tasks and nodes.
// While it might seem that this is problematic when the ordering of tasks can be chosen freely (by the scheduler),
Expand All @@ -124,6 +122,8 @@ namespace detail {
// This mutex mainly serves to protect per-buffer data structures, as new buffers might be added at any time.
std::mutex buffer_mutex;

void set_epoch_for_new_commands(per_node_data& node_data, const command_id epoch);

void reduce_execution_front_to(abstract_command* new_front);

void generate_anti_dependencies(task_id tid, buffer_id bid, const region_map<std::optional<command_id>>& last_writers_map,
Expand Down
8 changes: 4 additions & 4 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ namespace detail {

compute_dependencies(tid);
if(queue) queue->require_collective_group(task_ref.get_collective_group_id());
prune_tasks_before_last_epoch_reached();
prune_tasks_before_latest_epoch_reached();
}
invoke_callbacks(tid);
if(need_new_horizon()) { generate_horizon_task(); }
Expand Down Expand Up @@ -171,8 +171,8 @@ namespace detail {
std::unordered_map<task_id, std::unique_ptr<task>> task_map;

// The active epoch is used as the last writer for host-initialized buffers.
// To ensure correct ordering, all tasks that have no other true-dependencies depend on this task.
// This is useful so we can correctly generate anti-dependencies onto tasks that read host-initialized buffers.
// To ensure correct ordering, all tasks that have no other true-dependencies depend on this task.
task_id epoch_for_new_tasks{initial_epoch_task};

// We store a map of which task last wrote to a certain region of a buffer.
Expand Down Expand Up @@ -207,7 +207,7 @@ namespace detail {
// The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread.
epoch_monitor latest_epoch_reached{initial_epoch_task};

// The last epoch that was used in task pruning after being reached. This allows skipping the pruning step if no new was completed since.
// The last epoch that was used in task pruning after being reached. This allows skipping the pruning step if no new epoch was completed since.
task_id last_pruned_before{initial_epoch_task};

// Set of tasks with no dependents
Expand All @@ -234,7 +234,7 @@ namespace detail {
task_id generate_horizon_task();

// Needs to be called while task map accesses are safe (ie. mutex is locked)
void prune_tasks_before_last_epoch_reached();
void prune_tasks_before_latest_epoch_reached();

void compute_dependencies(task_id tid);
};
Expand Down
52 changes: 28 additions & 24 deletions src/graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ namespace detail {
graph_generator::graph_generator(size_t num_nodes, task_manager& tm, reduction_manager& rm, command_graph& cdag)
: task_mngr(tm), reduction_mngr(rm), num_nodes(num_nodes), cdag(cdag) {
// Build initial epoch command for each node (these are required to properly handle anti-dependencies on host-initialized buffers).
// We manually generate the first set of commands; horizons are used later on (see generate_horizon).
// We manually generate the first set of commands, these will be replaced by applied horizons or explicit epochs down the line (see
// set_epoch_for_new_commands).
for(node_id nid = 0; nid < num_nodes; ++nid) {
const auto epoch_cmd = cdag.create<epoch_command>(nid, task_manager::initial_epoch_task, epoch_action::none);
epoch_cmd->mark_as_flushed(); // there is no point in flushing the initial epoch command
Expand All @@ -37,25 +38,6 @@ namespace detail {
buffer_states.emplace(bid, distributed_state{{range, std::move(all_nodes)}});
}

void graph_generator::per_node_data::set_epoch_for_new_commands(const command_id epoch) {
// update "buffer_last_writer" and "last_collective_commands" structures to subsume pre-epoch commands
for(auto& blw_pair : buffer_last_writer) {
// TODO this could be optimized to something like cdag.apply_horizon(node_id, horizon_cmd) with much fewer internal operations
blw_pair.second.apply_to_values([epoch](const std::optional<command_id> cid) -> std::optional<command_id> {
if(!cid) return cid;
return {std::max(epoch, *cid)};
});
}
for(auto& [cgid, cid] : last_collective_commands) {
cid = std::max(epoch, cid);
}
for(auto& [cgid, cid] : host_object_last_effects) {
cid = std::max(epoch, cid);
}

epoch_for_new_commands = epoch;
}

void graph_generator::build_task(const task_id tid, const std::vector<graph_transformer*>& transformers) {
std::lock_guard<std::mutex> lock(buffer_mutex);
// TODO: Maybe assert that this task hasn't been processed before
Expand Down Expand Up @@ -110,6 +92,28 @@ namespace detail {
prune_commands_before(min_epoch_to_prune_before);
}

void graph_generator::set_epoch_for_new_commands(per_node_data& node, const command_id epoch) { // NOLINT(readability-convert-member-functions-to-static)
// both an explicit epoch command and an applied horizon can be effective epochs
assert(isa<epoch_command>(cdag.get(epoch)) || isa<horizon_command>(cdag.get(epoch)));

// update "buffer_last_writer" and "last_collective_commands" structures to subsume pre-epoch commands
for(auto& blw_pair : node.buffer_last_writer) {
// TODO this could be optimized to something like cdag.apply_horizon(node_id, horizon_cmd) with much fewer internal operations
blw_pair.second.apply_to_values([epoch](const std::optional<command_id> cid) -> std::optional<command_id> {
if(!cid) return cid;
return {std::max(epoch, *cid)};
});
}
for(auto& [cgid, cid] : node.last_collective_commands) {
cid = std::max(epoch, cid);
}
for(auto& [cgid, cid] : node.host_object_last_effects) {
cid = std::max(epoch, cid);
}

node.epoch_for_new_commands = epoch;
}

void graph_generator::reduce_execution_front_to(abstract_command* const new_front) {
const auto nid = new_front->get_nid();
const auto previous_execution_front = cdag.get_execution_front(nid);
Expand All @@ -130,10 +134,10 @@ namespace detail {
const auto cid = epoch->get_cid();
if(nid == 0) { min_new_epoch = cid; }

node.set_epoch_for_new_commands(cid);
set_epoch_for_new_commands(node, cid);
node.current_horizon = std::nullopt;

// Make the horizon or epoch command depend on the previous execution front
// Make the epoch depend on the previous execution front
reduce_execution_front_to(epoch);
}

Expand All @@ -156,11 +160,11 @@ namespace detail {
} else {
min_new_epoch = node.current_horizon;
}
node.set_epoch_for_new_commands(*node.current_horizon);
set_epoch_for_new_commands(node, *node.current_horizon);
}
node.current_horizon = cid;

// Make the horizon or epoch command depend on the previous execution front
// Make the horizon depend on the previous execution front
reduce_execution_front_to(horizon);
}

Expand Down
8 changes: 4 additions & 4 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace detail {

void task_manager::notify_horizon_reached(task_id horizon_tid) {
// This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread.
// last_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
// latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(get_task(horizon_tid)->get_type() == task_type::HORIZON);
assert(!latest_horizon_reached || *latest_horizon_reached < horizon_tid);
Expand All @@ -59,14 +59,14 @@ namespace detail {

void task_manager::notify_epoch_reached(task_id epoch_tid) {
// This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread.
// last_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.
// latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized.

assert(get_task(epoch_tid)->get_type() == task_type::EPOCH);
assert(!latest_horizon_reached || *latest_horizon_reached < epoch_tid);
assert(latest_epoch_reached.get() < epoch_tid);

latest_epoch_reached.set(epoch_tid); // The next call to submit_command_group() will prune all tasks before the last epoch reached
latest_horizon_reached = std::nullopt; // The last horizon reached is now behind the epoch and will therefore never become an epoch itself
latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself
}

void task_manager::await_epoch(task_id epoch) { latest_epoch_reached.await(epoch); }
Expand Down Expand Up @@ -279,7 +279,7 @@ namespace detail {
return tid;
}

void task_manager::prune_tasks_before_last_epoch_reached() {
void task_manager::prune_tasks_before_latest_epoch_reached() {
const auto prune_before = latest_epoch_reached.get();
if(prune_before > last_pruned_before) {
for(auto iter = task_map.begin(); iter != task_map.end();) {
Expand Down
25 changes: 13 additions & 12 deletions test/graph_compaction_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ namespace detail {
test_utils::cdag_test_context ctx(NUM_NODES);

// For this test, we need to generate 2 horizons but still have the first one be relevant
// after the second is generated -> use 2 buffers A and B, with a longer task chan on A, and write to B later
// after the second is generated -> use 2 buffers A and B, with a longer task chain on A, and write to B later
// step size is set to ensure expected horizons
ctx.get_task_manager().set_horizon_step(2);

Expand Down Expand Up @@ -276,7 +276,7 @@ namespace detail {
const auto cmds = inspector.get_commands(tid, std::nullopt, std::nullopt);
CHECK(cmds.size() == 2);
std::transform(cmds.begin(), cmds.end(), initial_last_writer_ids.begin(), [&](auto cid) {
// (Implementation detail: We can't use the inspector here b/c EPOCH commands are not flushed)
// (Implementation detail: We can't use the inspector here b/c the initial epoch commands are not flushed)
const auto deps = ctx.get_command_graph().get(cid)->get_dependencies();
REQUIRE(std::distance(deps.begin(), deps.end()) == 1);
return deps.begin()->node->get_cid();
Expand All @@ -293,9 +293,9 @@ namespace detail {
const auto tid = test_utils::build_and_flush(ctx, NUM_NODES,
test_utils::add_compute_task<class UKN(generate_horizons)>(
ctx.get_task_manager(), [&](handler& cgh) { buf.get_access<mode::discard_write>(cgh, one_to_one{}); }, buf_range));
const auto current_horizion = task_manager_testspy::get_current_horizion(ctx.get_task_manager());
if(current_horizion && *current_horizion > last_horizon_reached) {
last_horizon_reached = *current_horizion;
const auto current_horizon = task_manager_testspy::get_current_horizon(ctx.get_task_manager());
if(current_horizon && *current_horizon > last_horizon_reached) {
last_horizon_reached = *current_horizon;
ctx.get_task_manager().notify_horizon_reached(last_horizon_reached);
}
}
Expand Down Expand Up @@ -323,9 +323,9 @@ namespace detail {
CHECK(isa<horizon_command>(ctx.get_command_graph().get(new_last_writer_ids[0])));
CHECK(isa<horizon_command>(ctx.get_command_graph().get(new_last_writer_ids[1])));

const auto current_horizions = graph_generator_testspy::get_current_horizons(ctx.get_graph_generator());
const auto current_horizons = graph_generator_testspy::get_current_horizons(ctx.get_graph_generator());
INFO("previous horizons are being used");
CHECK(std::none_of(current_horizions.cbegin(), current_horizions.cend(),
CHECK(std::none_of(current_horizons.cbegin(), current_horizons.cend(),
[&](const command_id cid) { return cid == new_last_writer_ids[0] || cid == new_last_writer_ids[1]; }));

test_utils::maybe_print_graphs(ctx);
Expand Down Expand Up @@ -409,10 +409,11 @@ namespace detail {
auto& cdag = ctx.get_command_graph();
const auto first_commands = inspector.get_commands(first_task, std::nullopt, std::nullopt);
const auto second_commands = inspector.get_commands(second_task, std::nullopt, std::nullopt);
for(const auto second_cid : second_commands) {
for(const auto first_cid : first_commands) {
CHECK(!inspector.has_dependency(second_cid, first_cid));
}
CHECKED_IF(std::tuple{first_commands.size(), second_commands.size()} == std::tuple{1, 1}) {
const auto first_cid = *first_commands.begin();
const auto second_cid = *second_commands.begin();
CHECK(!inspector.has_dependency(second_cid, first_cid));

const auto second_deps = cdag.get(second_cid)->get_dependencies();
CHECK(std::distance(second_deps.begin(), second_deps.end()) == 1);
for(const auto& dep : second_deps) {
Expand All @@ -424,7 +425,7 @@ namespace detail {
test_utils::maybe_print_graphs(ctx);
}

TEST_CASE("finishing an epoch will prune all nodes of the preceding graph", "[task_manager][graph_generator][task-graph][command-graph][epoch]") {
TEST_CASE("reaching an epoch will prune all nodes of the preceding task graph", "[task_manager][task-graph][epoch]") {
using namespace cl::sycl::access;

constexpr int num_nodes = 2;
Expand Down
2 changes: 1 addition & 1 deletion test/graph_generation_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ namespace detail {
test_utils::add_compute_task<class UKN(produce)>(
tm,
[&](handler& cgh) {
buf_2.get_access<access_mode::discard_write>(cgh, all{});
buf_2.get_access<access_mode::discard_write>(cgh, one_to_one{});
test_utils::add_reduction(cgh, ctx.get_reduction_manager(), buf_3, false);
},
range<1>{num_nodes}));
Expand Down
Loading

0 comments on commit 61dd07e

Please sign in to comment.