Skip to content

Commit

Permalink
Detect uninitialized reads and overlapping writes
Browse files Browse the repository at this point in the history
task_manager and distributed_graph_generator by default throw when
detecting an uninitialized read or overlapping writes in order to catch
invalid test code or internal bugs. In `runtime`, these conditions are
reported through warning / error logs instead.
  • Loading branch information
fknorr committed Nov 2, 2023
1 parent 7dcffd3 commit 1bc771c
Show file tree
Hide file tree
Showing 20 changed files with 383 additions and 115 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Introduce new experimental `for_each_item` utility to iterate over a celerity range (#199)
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)

## Changed

Expand Down
9 changes: 7 additions & 2 deletions include/distributed_graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <bitset>
#include <unordered_map>
#include <variant>

#include "command_graph.h"
#include "ranges.h"
Expand Down Expand Up @@ -65,6 +64,7 @@ class distributed_graph_generator {
buffer_state(region_map<write_command_state> lw, region_map<std::bitset<max_num_nodes>> rr)
: local_last_writer(std::move(lw)), replicated_regions(std::move(rr)), pending_reduction(std::nullopt) {}

region<3> initialized_region; // for detecting uninitialized reads
region_map<write_command_state> local_last_writer;
region_map<node_bitset> replicated_regions;

Expand All @@ -78,7 +78,10 @@ class distributed_graph_generator {
distributed_graph_generator(
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder);

void add_buffer(const buffer_id bid, const int dims, const range<3>& range);
void set_uninitialized_read_policy(const error_policy policy) { m_uninitialized_read_policy = policy; }
void set_overlapping_write_policy(const error_policy policy) { m_overlapping_write_policy = policy; }

void add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized);

std::unordered_set<abstract_command*> build_task(const task& tsk);

Expand Down Expand Up @@ -124,6 +127,8 @@ class distributed_graph_generator {
node_id m_local_nid;
command_graph& m_cdag;
const task_manager& m_task_mngr;
error_policy m_uninitialized_read_policy = error_policy::throw_exception;
error_policy m_overlapping_write_policy = error_policy::throw_exception;
std::unordered_map<buffer_id, buffer_state> m_buffer_states;
command_id m_epoch_for_new_commands = 0;
command_id m_epoch_last_pruned_before = 0;
Expand Down
5 changes: 4 additions & 1 deletion include/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ namespace detail {
*/
void notify_task_created(const task* const tsk) { notify(event_task_available{tsk}); }

void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range) { notify(event_buffer_registered{bid, dims, range}); }
void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
notify(event_buffer_registered{bid, dims, range, host_initialized});
}

protected:
/**
Expand All @@ -53,6 +55,7 @@ namespace detail {
buffer_id bid;
int dims;
celerity::range<3> range;
bool host_initialized;
};
using event = std::variant<event_shutdown, event_task_available, event_buffer_registered>;

Expand Down
2 changes: 2 additions & 0 deletions include/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,7 @@ namespace detail {
}
};

std::unordered_map<buffer_id, region<3>> detect_overlapping_writes(const task& tsk, const std::vector<chunk<3>>& chunks);

} // namespace detail
} // namespace celerity
4 changes: 4 additions & 0 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ namespace detail {

virtual ~task_manager() = default;

void set_uninitialized_read_policy(const error_policy policy) { m_uninitialized_read_policy = policy; }

template <typename CGF, typename... Hints>
task_id submit_command_group(CGF cgf, Hints... hints) {
auto reservation = m_task_buffer.reserve_task_entry(await_free_task_slot_callback());
Expand Down Expand Up @@ -182,6 +184,8 @@ namespace detail {
const size_t m_num_collective_nodes;
host_queue* m_queue;

error_policy m_uninitialized_read_policy = error_policy::throw_exception;

task_ring_buffer m_task_buffer;

// The active epoch is used as the last writer for host-initialized buffers.
Expand Down
8 changes: 8 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,12 @@ struct reduction_info {
};

constexpr node_id master_node_id = 0;

enum class error_policy {
ignore,
log_warning,
log_error,
throw_exception,
};

} // namespace celerity::detail
21 changes: 21 additions & 0 deletions include/utils.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#pragma once

#include "types.h"

#include <cstdint>
#include <functional>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <variant>

#include "spdlog/fmt/fmt.h"


namespace celerity::detail::utils {

template <typename T, typename P>
Expand Down Expand Up @@ -90,4 +96,19 @@ std::string simplify_task_name(const std::string& demangled_type_name);
// escapes "<", ">", and "&" with their corresponding HTML escape sequences
std::string escape_for_dot_label(std::string str);

template <typename... FmtParams>
[[noreturn]] void throw_error(FmtParams&&... fmt_args) {
throw std::runtime_error(fmt::format(std::forward<FmtParams>(fmt_args)...));
}

template <typename... FmtParams>
void report_error(const error_policy policy, FmtParams&&... fmt_args) {
switch(policy) {
case error_policy::ignore: break;
case error_policy::log_warning: CELERITY_WARN(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::log_error: CELERITY_ERROR(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::throw_exception: throw_error(std::forward<FmtParams>(fmt_args)...); break;
}
}

} // namespace celerity::detail::utils
35 changes: 34 additions & 1 deletion src/distributed_graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ distributed_graph_generator::distributed_graph_generator(
m_epoch_for_new_commands = epoch_cmd->get_cid();
}

void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range) {
void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
m_buffer_states.emplace(
std::piecewise_construct, std::tuple{bid}, std::tuple{region_map<write_command_state>{range, dims}, region_map<node_bitset>{range, dims}});
if(host_initialized) { m_buffer_states.at(bid).initialized_region = box(subrange({}, range)); }
// Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
// This is required when tasks access host-initialized or uninitialized buffers.
m_buffer_states.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
Expand Down Expand Up @@ -162,6 +163,19 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
assert(chunks.size() <= num_chunks); // We may have created less than requested
assert(!chunks.empty());

if(m_overlapping_write_policy != error_policy::ignore) {
if(const auto overlapping_writes = detect_overlapping_writes(tsk, chunks); !overlapping_writes.empty()) {
auto error = fmt::format("Task T{}", tsk.get_id());
if(!tsk.get_debug_name().empty()) { fmt::format_to(std::back_inserter(error), " \"{}\"", tsk.get_debug_name()); }
error += " has overlapping writes between multiple nodes in";
for(const auto& [bid, overlap] : overlapping_writes) {
fmt::format_to(std::back_inserter(error), " B{} {}", bid, overlap);
}
error += ". Choose a non-overlapping range mapper for the write access or constrain the split to make the access non-overlapping.";
utils::report_error(m_overlapping_write_policy, "{}", error);
}
}

// Assign each chunk to a node
// We assign chunks next to each other to the same worker (if there is more chunks than workers), as this is likely to produce less
// transfers between tasks than a round-robin assignment (for typical stencil codes).
Expand Down Expand Up @@ -283,6 +297,7 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
// TODO the per-node reduction result is discarded - warn user about dead store
}

region<3> uninitialized_reads;
for(const auto mode : required_modes) {
const auto& req = reqs_by_mode.at(mode);
if(detail::access::mode_traits::is_consumer(mode)) {
Expand Down Expand Up @@ -340,6 +355,10 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}
}

if(is_local_chunk && m_uninitialized_read_policy != error_policy::ignore) {
uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer_state.initialized_region));
}
}

if(is_local_chunk && detail::access::mode_traits::is_producer(mode)) {
Expand All @@ -353,7 +372,14 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}

if(!uninitialized_reads.empty()) {
utils::report_error(m_uninitialized_read_policy, "Command C{} on N{} reads B{} {}, which has not been written by any node.", cmd->get_cid(),
m_local_nid, bid, detail::region(std::move(uninitialized_reads)));
}

if(generate_reduction) {
post_reduction_buffer_states.at(bid).initialized_region = scalar_box;

const auto& reduction = *buffer_state.pending_reduction;

const auto local_last_writer = buffer_state.local_last_writer.get_region_values(scalar_box);
Expand Down Expand Up @@ -478,6 +504,11 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)

// Determine which local data is fresh/stale based on task-level writes.
auto requirements = get_buffer_requirements_for_mapped_access(tsk, subrange<3>(tsk.get_global_offset(), tsk.get_global_size()), tsk.get_global_size());
// Add requirements for reductions
for(const auto& reduction : tsk.get_reductions()) {
// the actual mode is irrelevant as long as it's a producer - TODO have a better query API for task buffer requirements
requirements[reduction.bid][access_mode::write] = scalar_box;
}
for(auto& [bid, reqs_by_mode] : requirements) {
box_vector<3> global_write_boxes;
for(const auto mode : access::producer_modes) {
Expand All @@ -491,6 +522,8 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
const auto remote_writes = region_difference(global_writes, local_writes);
auto& buffer_state = m_buffer_states.at(bid);

buffer_state.initialized_region = region_union(buffer_state.initialized_region, global_writes);

// TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
auto boxes_and_cids = buffer_state.local_last_writer.get_region_values(remote_writes);
for(auto& [box, wcs] : boxes_and_cids) {
Expand Down
13 changes: 12 additions & 1 deletion src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,25 @@ namespace detail {

m_reduction_mngr = std::make_unique<reduction_manager>();
m_host_object_mngr = std::make_unique<host_object_manager>();

if(m_cfg->is_recording()) m_task_recorder = std::make_unique<task_recorder>(m_buffer_mngr.get());
m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_h_queue.get(), m_task_recorder.get());
if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());
// Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
// first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
// https://github.com/celerity/meta/issues/74).
m_task_mngr->set_uninitialized_read_policy(error_policy::log_warning);

m_exec = std::make_unique<executor>(m_num_nodes, m_local_nid, *m_h_queue, *m_d_queue, *m_task_mngr, *m_buffer_mngr, *m_reduction_mngr);

m_cdag = std::make_unique<command_graph>();
if(m_cfg->is_recording()) m_command_recorder = std::make_unique<command_recorder>(m_task_mngr.get(), m_buffer_mngr.get());
auto dggen = std::make_unique<distributed_graph_generator>(m_num_nodes, m_local_nid, *m_cdag, *m_task_mngr, m_command_recorder.get());
// Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
dggen->set_uninitialized_read_policy(error_policy::ignore);
dggen->set_overlapping_write_policy(error_policy::log_error);

m_schdlr = std::make_unique<scheduler>(is_dry_run(), std::move(dggen), *m_exec);
m_task_mngr->register_task_callback([this](const task* tsk) { m_schdlr->notify_task_created(tsk); });

Expand Down Expand Up @@ -274,7 +285,7 @@ namespace detail {
void runtime::handle_buffer_registered(buffer_id bid) {
const auto& info = m_buffer_mngr->get_buffer_info(bid);
m_task_mngr->add_buffer(bid, info.dimensions, info.range, info.is_host_initialized);
m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range);
m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range, info.is_host_initialized);
}

void runtime::handle_buffer_unregistered(buffer_id bid) { maybe_destroy_runtime(); }
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace detail {
serializer.flush(cmds);
},
[&](const event_buffer_registered& e) { //
m_dggen->add_buffer(e.bid, e.dims, e.range);
m_dggen->add_buffer(e.bid, e.dims, e.range, e.host_initialized);
},
[&](const event_shutdown&) {
assert(in_flight_events.empty());
Expand Down
42 changes: 41 additions & 1 deletion src/task.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#include "task.h"
#include "access_modes.h"

#include <algorithm>

namespace celerity {
namespace detail {
Expand Down Expand Up @@ -62,5 +62,45 @@ namespace detail {
// TODO for multiple side effects on the same hoid, find the weakest order satisfying all of them
emplace(hoid, order);
}

std::unordered_map<buffer_id, region<3>> detect_overlapping_writes(const task& tsk, const std::vector<chunk<3>>& chunks) {
const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1});

auto& bam = tsk.get_buffer_access_map();

std::unordered_map<buffer_id, region<3>> buffer_write_accumulators;
std::unordered_map<buffer_id, region<3>> overlapping_writes;
for(const auto bid : bam.get_accessed_buffers()) {
for(const auto& ck : chunks) {
region<3> writes;
for(const auto mode : bam.get_access_modes(bid)) {
if(access::mode_traits::is_producer(mode)) {
const auto req = bam.get_mode_requirements(bid, mode, tsk.get_dimensions(), subrange(ck.offset, ck.range), tsk.get_global_size());
writes = region_union(writes, req);
}
}
if(!writes.empty()) {
auto& write_accumulator = buffer_write_accumulators[bid]; // allow default-insert
if(const auto overlap = region_intersection(write_accumulator, writes); !overlap.empty()) {
auto& full_overlap = overlapping_writes[bid]; // allow default-insert
full_overlap = region_union(full_overlap, overlap);
}
write_accumulator = region_union(write_accumulator, writes);
}
}
}

for(const auto& rinfo : tsk.get_reductions()) {
auto& write_accumulator = buffer_write_accumulators[rinfo.bid]; // allow default-insert
if(const auto overlap = region_intersection(write_accumulator, scalar_reduction_box); !overlap.empty()) {
auto& full_overlap = overlapping_writes[rinfo.bid]; // allow default-insert
full_overlap = region_union(full_overlap, overlap);
}
write_accumulator = region_union(write_accumulator, scalar_reduction_box);
}

return overlapping_writes;
}

} // namespace detail
} // namespace celerity
20 changes: 14 additions & 6 deletions src/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,20 @@ namespace detail {
if(reduction.has_value()) { read_requirements = region_union(read_requirements, scalar_box); }
const auto last_writers = m_buffers_last_writers.at(bid).get_region_values(read_requirements);

for(auto& p : last_writers) {
// This indicates that the buffer is being used for the first time by this task, or all previous tasks also only read from it.
// A valid use case (i.e., not reading garbage) for this is when the buffer has been initialized using a host pointer.
if(p.second == std::nullopt) continue;
const task_id last_writer = *p.second;
add_dependency(tsk, *m_task_buffer.get_task(last_writer), dependency_kind::true_dep, dependency_origin::dataflow);
box_vector<3> uninitialized_reads;
for(const auto& [box, writer] : last_writers) {
// host-initialized buffers are last-written by the current epoch
if(writer.has_value()) {
add_dependency(tsk, *m_task_buffer.get_task(*writer), dependency_kind::true_dep, dependency_origin::dataflow);
} else {
uninitialized_reads.push_back(box);
}
}
if(!uninitialized_reads.empty()) {
utils::report_error(m_uninitialized_read_policy,
"Task T{}{} declares a reading access on uninitialized B{} {}. Make sure to construct the accessor with no_init if possible.",
tsk.get_id(), !tsk.get_debug_name().empty() ? fmt::format(" \"{}\"", utils::simplify_task_name(tsk.get_debug_name())) : "", bid,
region(std::move(uninitialized_reads)));
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/distributed_graph_generator_test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ class dist_cdag_test_context {
const auto buf = test_utils::mock_buffer<Dims>(bid, size);
m_tm.add_buffer(bid, Dims, range_cast<3>(size), mark_as_host_initialized);
for(auto& dggen : m_dggens) {
dggen->add_buffer(bid, Dims, range_cast<3>(size));
dggen->add_buffer(bid, Dims, range_cast<3>(size), mark_as_host_initialized);
}
return buf;
}
Expand Down
Loading

0 comments on commit 1bc771c

Please sign in to comment.