Skip to content

Commit

Permalink
[BASE] celerity#229 Don't let partial replication unnecessarily divid…
Browse files Browse the repository at this point in the history
…e push commands
  • Loading branch information
psalz authored and fknorr committed Dec 5, 2023
1 parent 5cb9198 commit 5409777
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).

- In edge cases, command graph generation would fail to generate await-push commands when re-distributing reduction results (#223)
- Command graph generation was missing an anti-dependency between push-commands of partial reduction results and the final reduction command (#223)
- Don't create multiple smaller push-commands instead of a single large one in some rare situations (#229)

## [0.4.1] - 2023-09-08

Expand Down
24 changes: 15 additions & 9 deletions src/distributed_graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,27 +309,33 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
buffer_state.local_last_writer.update_region(missing_parts, {ap_cmd->get_cid(), true /* is_replicated */});
}
} else if(!is_pending_reduction) {
// We generate separate push command for each last writer command for now, possibly even multiple for partially already-replicated data.
// TODO: Can and/or should we consolidate?
const auto local_sources = buffer_state.local_last_writer.get_region_values(req);
for(const auto& [local_box, wcs] : local_sources) {
if(!wcs.is_fresh() || wcs.is_replicated()) { continue; }

// Check if we've already pushed this box
const auto replicated_boxes = buffer_state.replicated_regions.get_region_values(local_box);
for(const auto& [replicated_box, nodes] : replicated_boxes) {
// Make sure we don't push anything we've already pushed to this node before
box_vector<3> non_replicated_boxes;
for(const auto& [replicated_box, nodes] : buffer_state.replicated_regions.get_region_values(local_box)) {
if(nodes.test(nid)) continue;
non_replicated_boxes.push_back(replicated_box);
}

// Generate separate push command for each last writer command for now,
// possibly even multiple for partially already-replicated data.
// TODO: Can and/or should we consolidate?
auto* const push_cmd = create_command<push_command>(bid, 0, nid, trid, replicated_box.get_subrange());
// Merge all connected boxes to determine final set of pushes
const auto push_boxes = region<3>(std::move(non_replicated_boxes));
for(auto& push_box : push_boxes.get_boxes()) {
auto* const push_cmd = create_command<push_command>(bid, 0, nid, trid, push_box.get_subrange());
assert(!utils::isa<await_push_command>(m_cdag.get(wcs)) && "Attempting to push non-owned data?!");
m_cdag.add_dependency(push_cmd, m_cdag.get(wcs), dependency_kind::true_dep, dependency_origin::dataflow);
generated_pushes.push_back(push_cmd);

// Store the read access for determining anti-dependencies later on
m_command_buffer_reads[push_cmd->get_cid()][bid] = replicated_box;
m_command_buffer_reads[push_cmd->get_cid()][bid] = push_box;
}

// Remember that we've replicated this region
// Remember that we've replicated this region
for(const auto& [replicated_box, nodes] : buffer_state.replicated_regions.get_region_values(push_boxes)) {
buffer_state.replicated_regions.update_box(replicated_box, node_bitset{nodes}.set(nid));
}
}
Expand Down
29 changes: 29 additions & 0 deletions test/graph_gen_transfer_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,35 @@ TEST_CASE("distributed_graph_generator consolidates push commands for adjacent s
CHECK(dctx.query(tid_b).have_successors(dctx.query(command_type::push)));
}

// Regression test: While we generate separate pushes for each last writer (see above), unless a last writer box gets fragmented
// further by subsequent writes, we should only ever generate a single push command. This was not the case, because we additionally have
// to check whether the data in question has already been (partially) replicated to the target node. Without a subsequent merging step,
// the number of pushes was effectively being dictated by the replication map, NOT the last writer.
TEST_CASE(
"distributed_graph_generator does not unnecessarily divide push commands due to partial replication", "[distributed_graph_generator][command-graph]") {
dist_cdag_test_context dctx(3);

const range<1> test_range = {96};
auto buf = dctx.create_buffer(test_range);
dctx.device_compute<class UKN(task_a)>(test_range).discard_write(buf, acc::one_to_one{}).submit();
// Assuming standard 1D split
CHECK(subrange_cast<1>(dynamic_cast<const execution_command&>(*dctx.query().get_raw(0)[0]).get_execution_range()) == subrange<1>{0, 32});
CHECK(subrange_cast<1>(dynamic_cast<const execution_command&>(*dctx.query().get_raw(1)[0]).get_execution_range()) == subrange<1>{32, 32});
CHECK(subrange_cast<1>(dynamic_cast<const execution_command&>(*dctx.query().get_raw(2)[0]).get_execution_range()) == subrange<1>{64, 32});
// Require partial data from nodes 1 and 2
dctx.master_node_host_task().read(buf, acc::fixed{subrange<1>{48, 32}}).submit();
const auto pushes1 = dctx.query(command_type::push);
CHECK(pushes1.count() == 2);
// Now exchange data between nodes 1 and 2. Node 0 doesn't read anything.
auto rm = [](const chunk<1>& chnk) {
if(chnk.offset[0] + chnk.range[0] >= 64) return subrange<1>{32, 64};
return subrange<1>{0, 0};
};
dctx.device_compute<class UKN(task_c)>(test_range).read(buf, rm).submit();
const auto pushes2 = dctx.query(command_type::push) - pushes1;
CHECK(pushes2.count() == 2);
}

TEST_CASE("distributed_graph_generator generates dependencies for push commands", "[distributed_graph_generator][command-graph]") {
dist_cdag_test_context dctx(2);

Expand Down

0 comments on commit 5409777

Please sign in to comment.