diff --git a/include/command.h b/include/command.h index 2bbd3ac2a..4e55c39ac 100644 --- a/include/command.h +++ b/include/command.h @@ -218,8 +218,6 @@ namespace detail { ); // clang-format on } - - // clang-format on }; } // namespace detail diff --git a/include/graph_generator.h b/include/graph_generator.h index e5f0877f2..a9f73dae5 100644 --- a/include/graph_generator.h +++ b/include/graph_generator.h @@ -76,8 +76,6 @@ namespace detail { std::unordered_map last_collective_commands; // Side effects on the same host object create true dependencies between task commands, so we track the last effect per host object on each node. side_effect_map host_object_last_effects; - - void set_epoch_for_new_commands(const command_id epoch); }; public: @@ -101,9 +99,9 @@ namespace detail { // After completing an epoch, we need to wait until it is flushed before pruning predecessors from the CDAG, otherwise dependencies will not be flushed. // We generate the initial epoch commands manually starting from cid 0, so initializing these to 0 is correct. - detail::command_id min_epoch_for_new_commands = 0; + command_id min_epoch_for_new_commands = 0; // Used to skip the pruning step if no new epoch has been completed. - detail::command_id min_epoch_last_pruned_before = 0; + command_id min_epoch_last_pruned_before = 0; // NOTE: We have several data structures that keep track of the "global state" of the distributed program, across all tasks and nodes. // While it might seem that this is problematic when the ordering of tasks can be chosen freely (by the scheduler), @@ -124,6 +122,8 @@ namespace detail { // This mutex mainly serves to protect per-buffer data structures, as new buffers might be added at any time. std::mutex buffer_mutex; + void set_epoch_for_new_commands(per_node_data& node_data, const command_id epoch); + void reduce_execution_front_to(abstract_command* new_front); void generate_anti_dependencies(task_id tid, buffer_id bid, const region_map>& last_writers_map, diff --git a/include/task_manager.h b/include/task_manager.h index 54a4ae4c7..4de6be4b4 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -75,7 +75,7 @@ namespace detail { compute_dependencies(tid); if(queue) queue->require_collective_group(task_ref.get_collective_group_id()); - prune_tasks_before_last_epoch_reached(); + prune_tasks_before_latest_epoch_reached(); } invoke_callbacks(tid); if(need_new_horizon()) { generate_horizon_task(); } @@ -171,8 +171,8 @@ namespace detail { std::unordered_map> task_map; // The active epoch is used as the last writer for host-initialized buffers. - // To ensure correct ordering, all tasks that have no other true-dependencies depend on this task. // This is useful so we can correctly generate anti-dependencies onto tasks that read host-initialized buffers. + // To ensure correct ordering, all tasks that have no other true-dependencies depend on this task. task_id epoch_for_new_tasks{initial_epoch_task}; // We store a map of which task last wrote to a certain region of a buffer. @@ -207,7 +207,7 @@ namespace detail { // The last epoch task that has been processed by the executor. Behind a monitor to allow awaiting this change from the main thread. epoch_monitor latest_epoch_reached{initial_epoch_task}; - // The last epoch that was used in task pruning after being reached. This allows skipping the pruning step if no new was completed since. + // The last epoch that was used in task pruning after being reached. This allows skipping the pruning step if no new epoch was completed since. task_id last_pruned_before{initial_epoch_task}; // Set of tasks with no dependents @@ -234,7 +234,7 @@ namespace detail { task_id generate_horizon_task(); // Needs to be called while task map accesses are safe (ie. mutex is locked) - void prune_tasks_before_last_epoch_reached(); + void prune_tasks_before_latest_epoch_reached(); void compute_dependencies(task_id tid); }; diff --git a/src/graph_generator.cc b/src/graph_generator.cc index 8765517c1..852c4c159 100644 --- a/src/graph_generator.cc +++ b/src/graph_generator.cc @@ -15,7 +15,8 @@ namespace detail { graph_generator::graph_generator(size_t num_nodes, task_manager& tm, reduction_manager& rm, command_graph& cdag) : task_mngr(tm), reduction_mngr(rm), num_nodes(num_nodes), cdag(cdag) { // Build initial epoch command for each node (these are required to properly handle anti-dependencies on host-initialized buffers). - // We manually generate the first set of commands; horizons are used later on (see generate_horizon). + // We manually generate the first set of commands, these will be replaced by applied horizons or explicit epochs down the line (see + // set_epoch_for_new_commands). for(node_id nid = 0; nid < num_nodes; ++nid) { const auto epoch_cmd = cdag.create(nid, task_manager::initial_epoch_task, epoch_action::none); epoch_cmd->mark_as_flushed(); // there is no point in flushing the initial epoch command @@ -37,25 +38,6 @@ namespace detail { buffer_states.emplace(bid, distributed_state{{range, std::move(all_nodes)}}); } - void graph_generator::per_node_data::set_epoch_for_new_commands(const command_id epoch) { - // update "buffer_last_writer" and "last_collective_commands" structures to subsume pre-epoch commands - for(auto& blw_pair : buffer_last_writer) { - // TODO this could be optimized to something like cdag.apply_horizon(node_id, horizon_cmd) with much fewer internal operations - blw_pair.second.apply_to_values([epoch](const std::optional cid) -> std::optional { - if(!cid) return cid; - return {std::max(epoch, *cid)}; - }); - } - for(auto& [cgid, cid] : last_collective_commands) { - cid = std::max(epoch, cid); - } - for(auto& [cgid, cid] : host_object_last_effects) { - cid = std::max(epoch, cid); - } - - epoch_for_new_commands = epoch; - } - void graph_generator::build_task(const task_id tid, const std::vector& transformers) { std::lock_guard lock(buffer_mutex); // TODO: Maybe assert that this task hasn't been processed before @@ -110,6 +92,28 @@ namespace detail { prune_commands_before(min_epoch_to_prune_before); } + void graph_generator::set_epoch_for_new_commands(per_node_data& node, const command_id epoch) { // NOLINT(readability-convert-member-functions-to-static) + // both an explicit epoch command and an applied horizon can be effective epochs + assert(isa(cdag.get(epoch)) || isa(cdag.get(epoch))); + + // update "buffer_last_writer" and "last_collective_commands" structures to subsume pre-epoch commands + for(auto& blw_pair : node.buffer_last_writer) { + // TODO this could be optimized to something like cdag.apply_horizon(node_id, horizon_cmd) with much fewer internal operations + blw_pair.second.apply_to_values([epoch](const std::optional cid) -> std::optional { + if(!cid) return cid; + return {std::max(epoch, *cid)}; + }); + } + for(auto& [cgid, cid] : node.last_collective_commands) { + cid = std::max(epoch, cid); + } + for(auto& [cgid, cid] : node.host_object_last_effects) { + cid = std::max(epoch, cid); + } + + node.epoch_for_new_commands = epoch; + } + void graph_generator::reduce_execution_front_to(abstract_command* const new_front) { const auto nid = new_front->get_nid(); const auto previous_execution_front = cdag.get_execution_front(nid); @@ -130,10 +134,10 @@ namespace detail { const auto cid = epoch->get_cid(); if(nid == 0) { min_new_epoch = cid; } - node.set_epoch_for_new_commands(cid); + set_epoch_for_new_commands(node, cid); node.current_horizon = std::nullopt; - // Make the horizon or epoch command depend on the previous execution front + // Make the epoch depend on the previous execution front reduce_execution_front_to(epoch); } @@ -156,11 +160,11 @@ namespace detail { } else { min_new_epoch = node.current_horizon; } - node.set_epoch_for_new_commands(*node.current_horizon); + set_epoch_for_new_commands(node, *node.current_horizon); } node.current_horizon = cid; - // Make the horizon or epoch command depend on the previous execution front + // Make the horizon depend on the previous execution front reduce_execution_front_to(horizon); } diff --git a/src/task_manager.cc b/src/task_manager.cc index b94c1ab0e..58157c5f8 100644 --- a/src/task_manager.cc +++ b/src/task_manager.cc @@ -45,7 +45,7 @@ namespace detail { void task_manager::notify_horizon_reached(task_id horizon_tid) { // This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread. - // last_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized. + // latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized. assert(get_task(horizon_tid)->get_type() == task_type::HORIZON); assert(!latest_horizon_reached || *latest_horizon_reached < horizon_tid); @@ -59,14 +59,14 @@ namespace detail { void task_manager::notify_epoch_reached(task_id epoch_tid) { // This method is called from the executor thread, but does not lock task_mutex to avoid lock-step execution with the main thread. - // last_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized. + // latest_horizon_reached does not need synchronization (see definition), all other accesses are implicitly synchronized. assert(get_task(epoch_tid)->get_type() == task_type::EPOCH); assert(!latest_horizon_reached || *latest_horizon_reached < epoch_tid); assert(latest_epoch_reached.get() < epoch_tid); latest_epoch_reached.set(epoch_tid); // The next call to submit_command_group() will prune all tasks before the last epoch reached - latest_horizon_reached = std::nullopt; // The last horizon reached is now behind the epoch and will therefore never become an epoch itself + latest_horizon_reached = std::nullopt; // Any non-applied horizon is now behind the epoch and will therefore never become an epoch itself } void task_manager::await_epoch(task_id epoch) { latest_epoch_reached.await(epoch); } @@ -279,7 +279,7 @@ namespace detail { return tid; } - void task_manager::prune_tasks_before_last_epoch_reached() { + void task_manager::prune_tasks_before_latest_epoch_reached() { const auto prune_before = latest_epoch_reached.get(); if(prune_before > last_pruned_before) { for(auto iter = task_map.begin(); iter != task_map.end();) { diff --git a/test/graph_compaction_tests.cc b/test/graph_compaction_tests.cc index 8820e3471..eaf25ce9d 100644 --- a/test/graph_compaction_tests.cc +++ b/test/graph_compaction_tests.cc @@ -137,7 +137,7 @@ namespace detail { test_utils::cdag_test_context ctx(NUM_NODES); // For this test, we need to generate 2 horizons but still have the first one be relevant - // after the second is generated -> use 2 buffers A and B, with a longer task chan on A, and write to B later + // after the second is generated -> use 2 buffers A and B, with a longer task chain on A, and write to B later // step size is set to ensure expected horizons ctx.get_task_manager().set_horizon_step(2); @@ -276,7 +276,7 @@ namespace detail { const auto cmds = inspector.get_commands(tid, std::nullopt, std::nullopt); CHECK(cmds.size() == 2); std::transform(cmds.begin(), cmds.end(), initial_last_writer_ids.begin(), [&](auto cid) { - // (Implementation detail: We can't use the inspector here b/c EPOCH commands are not flushed) + // (Implementation detail: We can't use the inspector here b/c the initial epoch commands are not flushed) const auto deps = ctx.get_command_graph().get(cid)->get_dependencies(); REQUIRE(std::distance(deps.begin(), deps.end()) == 1); return deps.begin()->node->get_cid(); @@ -293,9 +293,9 @@ namespace detail { const auto tid = test_utils::build_and_flush(ctx, NUM_NODES, test_utils::add_compute_task( ctx.get_task_manager(), [&](handler& cgh) { buf.get_access(cgh, one_to_one{}); }, buf_range)); - const auto current_horizion = task_manager_testspy::get_current_horizion(ctx.get_task_manager()); - if(current_horizion && *current_horizion > last_horizon_reached) { - last_horizon_reached = *current_horizion; + const auto current_horizon = task_manager_testspy::get_current_horizon(ctx.get_task_manager()); + if(current_horizon && *current_horizon > last_horizon_reached) { + last_horizon_reached = *current_horizon; ctx.get_task_manager().notify_horizon_reached(last_horizon_reached); } } @@ -323,9 +323,9 @@ namespace detail { CHECK(isa(ctx.get_command_graph().get(new_last_writer_ids[0]))); CHECK(isa(ctx.get_command_graph().get(new_last_writer_ids[1]))); - const auto current_horizions = graph_generator_testspy::get_current_horizons(ctx.get_graph_generator()); + const auto current_horizons = graph_generator_testspy::get_current_horizons(ctx.get_graph_generator()); INFO("previous horizons are being used"); - CHECK(std::none_of(current_horizions.cbegin(), current_horizions.cend(), + CHECK(std::none_of(current_horizons.cbegin(), current_horizons.cend(), [&](const command_id cid) { return cid == new_last_writer_ids[0] || cid == new_last_writer_ids[1]; })); test_utils::maybe_print_graphs(ctx); @@ -409,10 +409,11 @@ namespace detail { auto& cdag = ctx.get_command_graph(); const auto first_commands = inspector.get_commands(first_task, std::nullopt, std::nullopt); const auto second_commands = inspector.get_commands(second_task, std::nullopt, std::nullopt); - for(const auto second_cid : second_commands) { - for(const auto first_cid : first_commands) { - CHECK(!inspector.has_dependency(second_cid, first_cid)); - } + CHECKED_IF(std::tuple{first_commands.size(), second_commands.size()} == std::tuple{1, 1}) { + const auto first_cid = *first_commands.begin(); + const auto second_cid = *second_commands.begin(); + CHECK(!inspector.has_dependency(second_cid, first_cid)); + const auto second_deps = cdag.get(second_cid)->get_dependencies(); CHECK(std::distance(second_deps.begin(), second_deps.end()) == 1); for(const auto& dep : second_deps) { @@ -424,7 +425,7 @@ namespace detail { test_utils::maybe_print_graphs(ctx); } - TEST_CASE("finishing an epoch will prune all nodes of the preceding graph", "[task_manager][graph_generator][task-graph][command-graph][epoch]") { + TEST_CASE("reaching an epoch will prune all nodes of the preceding task graph", "[task_manager][task-graph][epoch]") { using namespace cl::sycl::access; constexpr int num_nodes = 2; diff --git a/test/graph_generation_tests.cc b/test/graph_generation_tests.cc index 37abe96bc..72f5e5ad1 100644 --- a/test/graph_generation_tests.cc +++ b/test/graph_generation_tests.cc @@ -743,7 +743,7 @@ namespace detail { test_utils::add_compute_task( tm, [&](handler& cgh) { - buf_2.get_access(cgh, all{}); + buf_2.get_access(cgh, one_to_one{}); test_utils::add_reduction(cgh, ctx.get_reduction_manager(), buf_3, false); }, range<1>{num_nodes})); diff --git a/test/task_graph_tests.cc b/test/task_graph_tests.cc index b61b85225..0a7c1e96a 100644 --- a/test/task_graph_tests.cc +++ b/test/task_graph_tests.cc @@ -160,14 +160,14 @@ namespace detail { host_init_buf.get_access(cgh, fixed<1>{{0, 128}}); artificial_dependency_buf.get_access(cgh, all{}); }); - CHECK(has_dependency(tm, tid_a, 0)); // This task has a dependency on the initial epoch task (tid 0) + CHECK(has_dependency(tm, tid_a, task_manager::initial_epoch_task)); const auto tid_b = test_utils::add_compute_task(tm, [&](handler& cgh) { non_host_init_buf.get_access(cgh, fixed<1>{{0, 128}}); // introduce an arbitrary true-dependency to avoid the fallback epoch dependency that is generated for tasks without other true-dependencies artificial_dependency_buf.get_access(cgh, all{}); }); - CHECK_FALSE(has_dependency(tm, tid_b, 0)); + CHECK_FALSE(has_dependency(tm, tid_b, task_manager::initial_epoch_task)); const auto tid_c = test_utils::add_compute_task(tm, [&](handler& cgh) { host_init_buf.get_access(cgh, fixed<1>{{0, 128}}); @@ -329,17 +329,17 @@ namespace detail { test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { buf_a.get_access(cgh, fixed<1>({0, 128})); }); - auto current_horizion = task_manager_testspy::get_current_horizion(tm); - CHECK(!current_horizion.has_value()); + auto current_horizon = task_manager_testspy::get_current_horizon(tm); + CHECK_FALSE(current_horizon.has_value()); const auto tid_c = test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { buf_a.get_access(cgh, fixed<1>({0, 128})); }); - current_horizion = task_manager_testspy::get_current_horizion(tm); - REQUIRE(current_horizion.has_value()); - CHECK(*current_horizion == tid_c + 1); + current_horizon = task_manager_testspy::get_current_horizon(tm); + REQUIRE(current_horizon.has_value()); + CHECK(*current_horizon == tid_c + 1); CHECK(task_manager_testspy::get_num_horizons(tm) == 1); - auto horizon_dependencies = tm.get_task(*current_horizion)->get_dependencies(); + auto horizon_dependencies = tm.get_task(*current_horizon)->get_dependencies(); CHECK(std::distance(horizon_dependencies.begin(), horizon_dependencies.end()) == 1); CHECK(horizon_dependencies.begin()->node->get_id() == tid_c); @@ -347,7 +347,7 @@ namespace detail { std::set expected_dependency_ids; // current horizon is always part of the active task front - expected_dependency_ids.insert(*current_horizion); + expected_dependency_ids.insert(*current_horizon); expected_dependency_ids.insert(test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) {})); expected_dependency_ids.insert(test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) {})); expected_dependency_ids.insert(test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) {})); @@ -359,12 +359,12 @@ namespace detail { }); expected_dependency_ids.insert(tid_d); - current_horizion = task_manager_testspy::get_current_horizion(tm); - REQUIRE(current_horizion.has_value()); - CHECK(*current_horizion == tid_d + 1); + current_horizon = task_manager_testspy::get_current_horizon(tm); + REQUIRE(current_horizon.has_value()); + CHECK(*current_horizon == tid_d + 1); CHECK(task_manager_testspy::get_num_horizons(tm) == 2); - horizon_dependencies = tm.get_task(*current_horizion)->get_dependencies(); + horizon_dependencies = tm.get_task(*current_horizon)->get_dependencies(); CHECK(std::distance(horizon_dependencies.begin(), horizon_dependencies.end()) == 5); std::set actual_dependecy_ids; @@ -398,7 +398,7 @@ namespace detail { buf_a.get_access(cgh, fixed<1>({32, 64})); }); - auto horizon = task_manager_testspy::get_current_horizion(tm); + auto horizon = task_manager_testspy::get_current_horizon(tm); CHECK(task_manager_testspy::get_num_horizons(tm) == 1); CHECK(horizon.has_value()); @@ -467,9 +467,9 @@ namespace detail { // and another one that triggers the actual deferred deletion. for(int i = 0; i < 8; ++i) { const auto tid = test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, all{}); }); - const auto current_horizion = task_manager_testspy::get_current_horizion(tm); - if(current_horizion && *current_horizion > last_executed_horizon) { - last_executed_horizon = *current_horizion; + const auto current_horizon = task_manager_testspy::get_current_horizon(tm); + if(current_horizon && *current_horizon > last_executed_horizon) { + last_executed_horizon = *current_horizon; tm.notify_horizon_reached(last_executed_horizon); } } @@ -485,10 +485,10 @@ namespace detail { const auto* new_last_writer = deps.begin()->node; CHECK(new_last_writer->get_type() == task_type::HORIZON); - const auto current_horizion = task_manager_testspy::get_current_horizion(tm); - REQUIRE(current_horizion); + const auto current_horizon = task_manager_testspy::get_current_horizon(tm); + REQUIRE(current_horizon); INFO("previous horizon is being used"); - CHECK(new_last_writer->get_id() < *current_horizion); + CHECK(new_last_writer->get_id() < *current_horizon); test_utils::maybe_print_graph(tm); } @@ -643,12 +643,12 @@ namespace detail { CHECK(has_dependency(tm, tid_epoch, tid_a)); CHECK(has_dependency(tm, tid_epoch, tid_b)); CHECK(has_dependency(tm, tid_c, tid_epoch)); - CHECK(!has_any_dependency(tm, tid_c, tid_a)); - CHECK(has_dependency(tm, tid_d, tid_epoch)); - CHECK(!has_any_dependency(tm, tid_d, tid_b)); + CHECK_FALSE(has_any_dependency(tm, tid_c, tid_a)); + CHECK(has_dependency(tm, tid_d, tid_epoch)); // needs a TRUE_DEP on barrier since it only has ANTI_DEPs otherwise + CHECK_FALSE(has_any_dependency(tm, tid_d, tid_b)); CHECK(has_dependency(tm, tid_e, tid_epoch)); CHECK(has_dependency(tm, tid_f, tid_d)); - CHECK(!has_any_dependency(tm, tid_f, tid_epoch)); + CHECK_FALSE(has_any_dependency(tm, tid_f, tid_epoch)); CHECK(has_dependency(tm, tid_g, tid_f, dependency_kind::ANTI_DEP)); CHECK(has_dependency(tm, tid_g, tid_epoch)); // needs a TRUE_DEP on barrier since it only has ANTI_DEPs otherwise diff --git a/test/test_utils.h b/test/test_utils.h index d32e12f34..a46f2e4a0 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -41,7 +41,7 @@ namespace celerity { namespace detail { struct task_manager_testspy { - static std::optional get_current_horizion(task_manager& tm) { return tm.current_horizon; } + static std::optional get_current_horizon(task_manager& tm) { return tm.current_horizon; } static int get_num_horizons(task_manager& tm) { int horizon_counter = 0; @@ -252,7 +252,7 @@ namespace test_utils { detail::graph_serializer& get_graph_serializer() { return *gsrlzr; } detail::task_id build_task_horizons() { - const auto most_recently_generated_task_horizon = detail::task_manager_testspy::get_current_horizion(get_task_manager()); + const auto most_recently_generated_task_horizon = detail::task_manager_testspy::get_current_horizon(get_task_manager()); if(most_recently_generated_task_horizon != most_recently_built_task_horizon) { most_recently_built_task_horizon = most_recently_generated_task_horizon; if(most_recently_built_task_horizon) {