diff --git a/CHANGELOG.md b/CHANGELOG.md index 88df0505a..246b71ae1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html). - In edge cases, command graph generation would fail to generate await-push commands when re-distributing reduction results (#223) - Command graph generation was missing an anti-dependency between push-commands of partial reduction results and the final reduction command (#223) +- Don't create multiple smaller push-commands instead of a single large one in some rare situations (#229) ## [0.4.1] - 2023-09-08 diff --git a/src/distributed_graph_generator.cc b/src/distributed_graph_generator.cc index 626023db7..d40dcd093 100644 --- a/src/distributed_graph_generator.cc +++ b/src/distributed_graph_generator.cc @@ -309,27 +309,33 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) buffer_state.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */}); } } else if(!is_pending_reduction) { + // We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data. + // TODO: Can and/or should we consolidate? const auto local_sources = buffer_state.local_last_writer.get_region_values(req); for(const auto& [local_box, wcs] : local_sources) { if(!wcs.is_fresh() || wcs.is_replicated()) { continue; } - // Check if we've already pushed this box - const auto replicated_boxes = buffer_state.replicated_regions.get_region_values(local_box); - for(const auto& [replicated_box, nodes] : replicated_boxes) { + // Make sure we don't push anything we've already pushed to this node before + box_vector<3> non_replicated_boxes; + for(const auto& [replicated_box, nodes] : buffer_state.replicated_regions.get_region_values(local_box)) { if(nodes.test(nid)) continue; + non_replicated_boxes.push_back(replicated_box); + } - // Generate separate push command for each last writer command for now, - // possibly even multiple for partially already-replicated data. - // TODO: Can and/or should we consolidate? - auto* const push_cmd = create_command(bid, 0, nid, trid, replicated_box.get_subrange()); + // Merge all connected boxes to determine final set of pushes + const auto push_boxes = region<3>(std::move(non_replicated_boxes)); + for(auto& push_box : push_boxes.get_boxes()) { + auto* const push_cmd = create_command(bid, 0, nid, trid, push_box.get_subrange()); assert(!utils::isa(m_cdag.get(wcs)) && "Attempting to push non-owned data?!"); m_cdag.add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow); generated_pushes.push_back(push_cmd); // Store the read access for determining anti-dependencies later on - m_command_buffer_reads[push_cmd->get_cid()][bid] = replicated_box; + m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box; + } - // Remember that we've replicated this region + // Remember that we've replicated this region + for(const auto& [replicated_box, nodes] : buffer_state.replicated_regions.get_region_values(push_boxes)) { buffer_state.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid)); } } diff --git a/test/graph_gen_transfer_tests.cc b/test/graph_gen_transfer_tests.cc index d769e6e26..6018459de 100644 --- a/test/graph_gen_transfer_tests.cc +++ b/test/graph_gen_transfer_tests.cc @@ -134,6 +134,35 @@ TEST_CASE("distributed_graph_generator consolidates push commands for adjacent s CHECK(dctx.query(tid_b).have_successors(dctx.query(command_type::push))); } +// Regression test: While we generate separate pushes for each last writer (see above), unless a last writer box gets fragmented +// further by subsequent writes, we should only ever generate a single push command. This was not the case, because we additionally have +// to check whether the data in question has already been (partially) replicated to the target node. Without a subsequent merging step, +// the number of pushes was effectively being dictated by the replication map, NOT the last writer. +TEST_CASE( + "distributed_graph_generator does not unnecessarily divide push commands due to partial replication", "[distributed_graph_generator][command-graph]") { + dist_cdag_test_context dctx(3); + + const range<1> test_range = {96}; + auto buf = dctx.create_buffer(test_range); + dctx.device_compute(test_range).discard_write(buf, acc::one_to_one{}).submit(); + // Assuming standard 1D split + CHECK(subrange_cast<1>(dynamic_cast(*dctx.query().get_raw(0)[0]).get_execution_range()) == subrange<1>{0, 32}); + CHECK(subrange_cast<1>(dynamic_cast(*dctx.query().get_raw(1)[0]).get_execution_range()) == subrange<1>{32, 32}); + CHECK(subrange_cast<1>(dynamic_cast(*dctx.query().get_raw(2)[0]).get_execution_range()) == subrange<1>{64, 32}); + // Require partial data from nodes 1 and 2 + dctx.master_node_host_task().read(buf, acc::fixed{subrange<1>{48, 32}}).submit(); + const auto pushes1 = dctx.query(command_type::push); + CHECK(pushes1.count() == 2); + // Now exchange data between nodes 1 and 2. Node 0 doesn't read anything. + auto rm = [](const chunk<1>& chnk) { + if(chnk.offset[0] + chnk.range[0] >= 64) return subrange<1>{32, 64}; + return subrange<1>{0, 0}; + }; + dctx.device_compute(test_range).read(buf, rm).submit(); + const auto pushes2 = dctx.query(command_type::push) - pushes1; + CHECK(pushes2.count() == 2); +} + TEST_CASE("distributed_graph_generator generates dependencies for push commands", "[distributed_graph_generator][command-graph]") { dist_cdag_test_context dctx(2);