Skip to content

Commit

Permalink
[BASE] celerity#224 Detect uninitialized reads and overlapping writes
Browse files Browse the repository at this point in the history
task_manager and distributed_graph_generator by default throw when
detecting an uninitialized read or overlapping writes in order to catch
invalid test code or internal bugs. In `runtime`, these conditions are
reported through warning / error logs instead.
  • Loading branch information
fknorr committed Nov 18, 2023
1 parent 66be449 commit a17063e
Show file tree
Hide file tree
Showing 26 changed files with 796 additions and 466 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)

### Changed

Expand Down
12 changes: 6 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(ENABLE_ACC_CHECK ON)
set(DEFAULT_ENABLE_DEBUG_CHECKS ON)
else()
set(ENABLE_ACC_CHECK OFF)
set(DEFAULT_ENABLE_DEBUG_CHECKS OFF)
endif()

option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${ENABLE_ACC_CHECK})
option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK)
if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

unset(ENABLE_ACC_CHECK)

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -288,6 +287,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

# Collect version information from git in src/version.cc. This target is always out of date, but the timestamp
Expand Down
292 changes: 146 additions & 146 deletions ci/perf/gpuc2_bench.csv

Large diffs are not rendered by default.

290 changes: 145 additions & 145 deletions ci/perf/gpuc2_bench.md

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions cmake/celerity-config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ set(CELERITY_FEATURE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SCALAR_REDUCTIONS@")
set(CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS@")
set(CELERITY_FEATURE_LOCAL_ACCESSOR "@CELERITY_FEATURE_LOCAL_ACCESSOR@")
set(CELERITY_FEATURE_UNNAMED_KERNELS "@CELERITY_FEATURE_UNNAMED_KERNELS@")
set(CELERITY_DETAIL_HAS_NAMED_THREADS "@CELERITY_DETAIL_HAS_NAMED_THREADS@")
set(CELERITY_ACCESSOR_BOUNDARY_CHECK "@CELERITY_ACCESSOR_BOUNDARY_CHECK@")
set(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "@CELERITY_ACCESS_PATTERN_DIAGNOSTICS@")

include("${CMAKE_CURRENT_LIST_DIR}/celerity-targets.cmake")

Expand Down
18 changes: 14 additions & 4 deletions include/distributed_graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <bitset>
#include <unordered_map>
#include <variant>

#include "command_graph.h"
#include "ranges.h"
Expand Down Expand Up @@ -65,6 +64,7 @@ class distributed_graph_generator {
buffer_state(region_map<write_command_state> lw, region_map<std::bitset<max_num_nodes>> rr)
: local_last_writer(std::move(lw)), replicated_regions(std::move(rr)), pending_reduction(std::nullopt) {}

region<3> initialized_region; // for detecting uninitialized reads (only if policies.uninitialized_read != error_policy::ignore)
region_map<write_command_state> local_last_writer;
region_map<node_bitset> replicated_regions;

Expand All @@ -75,10 +75,15 @@ class distributed_graph_generator {
};

public:
distributed_graph_generator(
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder);
struct policy_set {
error_policy uninitialized_read_error = error_policy::throw_exception;
error_policy overlapping_write_error = error_policy::throw_exception;
};

distributed_graph_generator(const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm,
detail::command_recorder* recorder, const policy_set& policy = default_policy_set());

void add_buffer(const buffer_id bid, const int dims, const range<3>& range);
void add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized);

std::unordered_set<abstract_command*> build_task(const task& tsk);

Expand Down Expand Up @@ -120,8 +125,13 @@ class distributed_graph_generator {
using buffer_read_map = std::unordered_map<buffer_id, region<3>>;
using side_effect_map = std::unordered_map<host_object_id, command_id>;

// default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member
// initializers, within its surrounding class (Clang)
constexpr static policy_set default_policy_set() { return {}; }

size_t m_num_nodes;
node_id m_local_nid;
policy_set m_policy;
command_graph& m_cdag;
const task_manager& m_task_mngr;
std::unordered_map<buffer_id, buffer_state> m_buffer_states;
Expand Down
5 changes: 4 additions & 1 deletion include/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ namespace detail {
*/
void notify_task_created(const task* const tsk) { notify(event_task_available{tsk}); }

void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range) { notify(event_buffer_registered{bid, dims, range}); }
void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
notify(event_buffer_registered{bid, dims, range, host_initialized});
}

protected:
/**
Expand All @@ -53,6 +55,7 @@ namespace detail {
buffer_id bid;
int dims;
celerity::range<3> range;
bool host_initialized;
};
using event = std::variant<event_shutdown, event_task_available, event_buffer_registered>;

Expand Down
2 changes: 2 additions & 0 deletions include/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,5 +271,7 @@ namespace detail {
}
};

std::unordered_map<buffer_id, region<3>> detect_overlapping_writes(const task& tsk, const box_vector<3>& chunks);

} // namespace detail
} // namespace celerity
11 changes: 10 additions & 1 deletion include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ namespace detail {
using buffer_writers_map = std::unordered_map<buffer_id, region_map<std::optional<task_id>>>;

public:
struct policy_set {
error_policy uninitialized_read_error = error_policy::throw_exception;
};

constexpr inline static task_id initial_epoch_task = 0;

task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder);
task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder, const policy_set& policy = default_policy_set());

virtual ~task_manager() = default;

Expand Down Expand Up @@ -179,8 +183,13 @@ namespace detail {
size_t get_current_task_count() const { return m_task_buffer.get_current_task_count(); }

private:
// default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member
// initializers, within its surrounding class (Clang)
constexpr static policy_set default_policy_set() { return {}; }

const size_t m_num_collective_nodes;
host_queue* m_queue;
policy_set m_policy;

task_ring_buffer m_task_buffer;

Expand Down
8 changes: 8 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,12 @@ struct reduction_info {
};

constexpr node_id master_node_id = 0;

enum class error_policy {
ignore,
log_warning,
log_error,
throw_exception,
};

} // namespace celerity::detail
21 changes: 21 additions & 0 deletions include/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

#include <cstdint>
#include <functional>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <variant>

#include "spdlog/fmt/fmt.h"

#include "types.h"


namespace celerity::detail::utils {

template <typename T, typename P>
Expand Down Expand Up @@ -90,4 +96,19 @@ std::string simplify_task_name(const std::string& demangled_type_name);
// escapes "<", ">", and "&" with their corresponding HTML escape sequences
std::string escape_for_dot_label(std::string str);

template <typename... FmtParams>
[[noreturn]] void throw_error(FmtParams&&... fmt_args) {
throw std::runtime_error(fmt::format(std::forward<FmtParams>(fmt_args)...));
}

template <typename... FmtParams>
void report_error(const error_policy policy, FmtParams&&... fmt_args) {
switch(policy) {
case error_policy::ignore: break;
case error_policy::log_warning: CELERITY_WARN(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::log_error: CELERITY_ERROR(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::throw_exception: throw_error(std::forward<FmtParams>(fmt_args)...); break;
}
}

} // namespace celerity::detail::utils
54 changes: 51 additions & 3 deletions src/distributed_graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace celerity::detail {

distributed_graph_generator::distributed_graph_generator(
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder)
: m_num_nodes(num_nodes), m_local_nid(local_nid), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
: m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
if(m_num_nodes > max_num_nodes) {
throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
}
Expand All @@ -25,9 +25,10 @@ distributed_graph_generator::distributed_graph_generator(
m_epoch_for_new_commands = epoch_cmd->get_cid();
}

void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range) {
void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
m_buffer_states.emplace(
std::piecewise_construct, std::tuple{bid}, std::tuple{region_map<write_command_state>{range, dims}, region_map<node_bitset>{range, dims}});
if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffer_states.at(bid).initialized_region = box(subrange({}, range)); }
// Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
// This is required when tasks access host-initialized or uninitialized buffers.
m_buffer_states.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
Expand Down Expand Up @@ -178,6 +179,9 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
// Remember all generated pushes for determining intra-task anti-dependencies.
std::vector<push_command*> generated_pushes;

// Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner.
box_vector<3> local_chunks;

// In the master/worker model, we used to try and find the node best suited for initializing multiple
// reductions that do not initialize_to_identity based on current data distribution.
// This is more difficult in a distributed setting, so for now we just hard code it to node 0.
Expand Down Expand Up @@ -249,6 +253,8 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
m_last_collective_commands.emplace(cgid, cmd->get_cid());
}

local_chunks.push_back(subrange(chunks[i].offset, chunks[i].range));
}

// We use the task id, together with the "chunk id" and the buffer id (stored separately) to match pushes against their corresponding await pushes
Expand Down Expand Up @@ -283,9 +289,15 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
// TODO the per-node reduction result is discarded - warn user about dead store
}

region<3> uninitialized_reads;
for(const auto mode : required_modes) {
const auto& req = reqs_by_mode.at(mode);
if(detail::access::mode_traits::is_consumer(mode)) {
if(is_local_chunk && m_policy.uninitialized_read_error != error_policy::ignore
&& !bounding_box(buffer_state.initialized_region).covers(bounding_box(req.get_boxes()))) {
uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer_state.initialized_region));
}

if(is_local_chunk) {
// Store the read access for determining anti-dependencies later on
m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], req);
Expand Down Expand Up @@ -353,7 +365,16 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}

if(!uninitialized_reads.empty()) {
utils::report_error(m_policy.uninitialized_read_error, "Command C{} on N{} reads B{} {}, which has not been written by any node.",
cmd->get_cid(), m_local_nid, bid, detail::region(std::move(uninitialized_reads)));
}

if(generate_reduction) {
if(m_policy.uninitialized_read_error != error_policy::ignore) {
post_reduction_buffer_states.at(bid).initialized_region = scalar_reduction_box;
}

const auto& reduction = *buffer_state.pending_reduction;

const auto local_last_writer = buffer_state.local_last_writer.get_region_values(scalar_reduction_box);
Expand Down Expand Up @@ -402,6 +423,29 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}

// Check for and report overlapping writes between local chunks, and between local and remote chunks.
if(m_policy.overlapping_write_error != error_policy::ignore) {
// Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
// region-union of remote chunks. This way, every conflict will be reported by at least one node.
const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();

// detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
auto distributed_chunks = std::move(remote_chunks);
distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());

if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
auto error = fmt::format("Task T{}", tsk.get_id());
if(!tsk.get_debug_name().empty()) { fmt::format_to(std::back_inserter(error), " \"{}\"", tsk.get_debug_name()); }
error += " has overlapping writes between multiple nodes in";
for(const auto& [bid, overlap] : overlapping_writes) {
fmt::format_to(std::back_inserter(error), " B{} {}", bid, overlap);
}
error += ". Choose a non-overlapping range mapper for the write access or constrain the split to make the access non-overlapping.";
utils::report_error(m_policy.overlapping_write_error, "{}", error);
}
}

// For buffers that were in a pending reduction state and a reduction was generated
// (i.e., the result was not discarded), set their new state.
for(auto& [bid, new_state] : post_reduction_buffer_states) {
Expand Down Expand Up @@ -502,6 +546,10 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
const auto remote_writes = region_difference(global_writes, local_writes);
auto& buffer_state = m_buffer_states.at(bid);

if(m_policy.uninitialized_read_error != error_policy::ignore) {
buffer_state.initialized_region = region_union(buffer_state.initialized_region, global_writes);
}

// TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
auto boxes_and_cids = buffer_state.local_last_writer.get_region_values(remote_writes);
for(auto& [box, wcs] : boxes_and_cids) {
Expand Down
23 changes: 20 additions & 3 deletions src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,31 @@ namespace detail {

m_reduction_mngr = std::make_unique<reduction_manager>();
m_host_object_mngr = std::make_unique<host_object_manager>();

if(m_cfg->is_recording()) m_task_recorder = std::make_unique<task_recorder>(m_buffer_mngr.get());
m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_h_queue.get(), m_task_recorder.get());

task_manager::policy_set task_mngr_policy;
// Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the
// first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf
// https://github.com/celerity/meta/issues/74).
task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore;

m_task_mngr = std::make_unique<task_manager>(m_num_nodes, m_h_queue.get(), m_task_recorder.get(), task_mngr_policy);
if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value());
if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value());

m_exec = std::make_unique<executor>(m_num_nodes, m_local_nid, *m_h_queue, *m_d_queue, *m_task_mngr, *m_buffer_mngr, *m_reduction_mngr);

m_cdag = std::make_unique<command_graph>();
if(m_cfg->is_recording()) m_command_recorder = std::make_unique<command_recorder>(m_task_mngr.get(), m_buffer_mngr.get());
auto dggen = std::make_unique<distributed_graph_generator>(m_num_nodes, m_local_nid, *m_cdag, *m_task_mngr, m_command_recorder.get());

distributed_graph_generator::policy_set dggen_policy;
// Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug.
dggen_policy.uninitialized_read_error = error_policy::ignore;
dggen_policy.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore;

auto dggen = std::make_unique<distributed_graph_generator>(m_num_nodes, m_local_nid, *m_cdag, *m_task_mngr, m_command_recorder.get(), dggen_policy);

m_schdlr = std::make_unique<scheduler>(is_dry_run(), std::move(dggen), *m_exec);
m_task_mngr->register_task_callback([this](const task* tsk) { m_schdlr->notify_task_created(tsk); });

Expand Down Expand Up @@ -274,7 +291,7 @@ namespace detail {
void runtime::handle_buffer_registered(buffer_id bid) {
const auto& info = m_buffer_mngr->get_buffer_info(bid);
m_task_mngr->add_buffer(bid, info.dimensions, info.range, info.is_host_initialized);
m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range);
m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range, info.is_host_initialized);
}

void runtime::handle_buffer_unregistered(buffer_id bid) { maybe_destroy_runtime(); }
Expand Down
2 changes: 1 addition & 1 deletion src/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace detail {
serializer.flush(cmds);
},
[&](const event_buffer_registered& e) { //
m_dggen->add_buffer(e.bid, e.dims, e.range);
m_dggen->add_buffer(e.bid, e.dims, e.range, e.host_initialized);
},
[&](const event_shutdown&) {
assert(in_flight_events.empty());
Expand Down
Loading

0 comments on commit a17063e

Please sign in to comment.