Skip to content

Commit

Permalink
Detect uninitialized reads and overlapping writes
Browse files Browse the repository at this point in the history
task_manager and distributed_graph_generator by default throw when
detecting an uninitialized read or overlapping writes in order to catch
invalid test code or internal bugs. In `runtime`, these conditions are
reported through warning / error logs instead.
  • Loading branch information
fknorr committed Dec 5, 2023
1 parent 0b7cfcb commit 96fda82
Show file tree
Hide file tree
Showing 24 changed files with 457 additions and 136 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html).
- Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199)
- Add new `experimental::constrain_split` API to limit how a kernel can be split (#?)
- `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225)
- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224)

### Changed

Expand Down
12 changes: 6 additions & 6 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

if (CMAKE_BUILD_TYPE STREQUAL "Debug")
set(ENABLE_ACC_CHECK ON)
set(DEFAULT_ENABLE_DEBUG_CHECKS ON)
else()
set(ENABLE_ACC_CHECK OFF)
set(DEFAULT_ENABLE_DEBUG_CHECKS OFF)
endif()

option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${ENABLE_ACC_CHECK})
option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS})
option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS})

if(CELERITY_ACCESSOR_BOUNDARY_CHECK)
if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug"))
message(STATUS "Accessor boundary check enabled - this will impact kernel performance")
endif()

unset(ENABLE_ACC_CHECK)

set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake")
set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}")
find_package(MPI 2.0 REQUIRED)
Expand Down Expand Up @@ -288,6 +287,7 @@ target_compile_definitions(celerity_runtime PUBLIC
CELERITY_FEATURE_UNNAMED_KERNELS=$<BOOL:${CELERITY_FEATURE_UNNAMED_KERNELS}>
CELERITY_DETAIL_HAS_NAMED_THREADS=$<BOOL:${CELERITY_DETAIL_HAS_NAMED_THREADS}>
CELERITY_ACCESSOR_BOUNDARY_CHECK=$<BOOL:${CELERITY_ACCESSOR_BOUNDARY_CHECK}>
CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$<BOOL:${CELERITY_ACCESS_PATTERN_DIAGNOSTICS}>
)

# Collect version information from git in src/version.cc. This target is always out of date, but the timestamp
Expand Down
3 changes: 3 additions & 0 deletions cmake/celerity-config.cmake.in
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ set(CELERITY_FEATURE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SCALAR_REDUCTIONS@")
set(CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS@")
set(CELERITY_FEATURE_LOCAL_ACCESSOR "@CELERITY_FEATURE_LOCAL_ACCESSOR@")
set(CELERITY_FEATURE_UNNAMED_KERNELS "@CELERITY_FEATURE_UNNAMED_KERNELS@")
set(CELERITY_DETAIL_HAS_NAMED_THREADS "@CELERITY_DETAIL_HAS_NAMED_THREADS@")
set(CELERITY_ACCESSOR_BOUNDARY_CHECK "@CELERITY_ACCESSOR_BOUNDARY_CHECK@")
set(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "@CELERITY_ACCESS_PATTERN_DIAGNOSTICS@")

include("${CMAKE_CURRENT_LIST_DIR}/celerity-targets.cmake")

Expand Down
20 changes: 16 additions & 4 deletions include/distributed_graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include <bitset>
#include <unordered_map>
#include <variant>

#include "command_graph.h"
#include "ranges.h"
Expand Down Expand Up @@ -65,6 +64,7 @@ class distributed_graph_generator {
buffer_state(region_map<write_command_state> lw, region_map<std::bitset<max_num_nodes>> rr)
: local_last_writer(std::move(lw)), replicated_regions(std::move(rr)), pending_reduction(std::nullopt) {}

region<3> initialized_region; // for detecting uninitialized reads (only if policies.uninitialized_read != error_policy::ignore)
region_map<write_command_state> local_last_writer;
region_map<node_bitset> replicated_regions;

Expand All @@ -75,10 +75,15 @@ class distributed_graph_generator {
};

public:
distributed_graph_generator(
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder);
struct policy_set {
error_policy uninitialized_read_error = error_policy::throw_exception;
error_policy overlapping_write_error = error_policy::throw_exception;
};

distributed_graph_generator(const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm,
detail::command_recorder* recorder, const policy_set& policy = default_policy_set());

void add_buffer(const buffer_id bid, const int dims, const range<3>& range);
void add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized);

std::unordered_set<abstract_command*> build_task(const task& tsk);

Expand Down Expand Up @@ -116,12 +121,19 @@ class distributed_graph_generator {

void prune_commands_before(const command_id epoch);

void report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const;

private:
using buffer_read_map = std::unordered_map<buffer_id, region<3>>;
using side_effect_map = std::unordered_map<host_object_id, command_id>;

// default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member
// initializers, within its surrounding class (Clang)
constexpr static policy_set default_policy_set() { return {}; }

size_t m_num_nodes;
node_id m_local_nid;
policy_set m_policy;
command_graph& m_cdag;
const task_manager& m_task_mngr;
std::unordered_map<buffer_id, buffer_state> m_buffer_states;
Expand Down
5 changes: 4 additions & 1 deletion include/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ namespace detail {
*/
void notify_task_created(const task* const tsk) { notify(event_task_available{tsk}); }

void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range) { notify(event_buffer_registered{bid, dims, range}); }
void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
notify(event_buffer_registered{bid, dims, range, host_initialized});
}

protected:
/**
Expand All @@ -53,6 +55,7 @@ namespace detail {
buffer_id bid;
int dims;
celerity::range<3> range;
bool host_initialized;
};
using event = std::variant<event_shutdown, event_task_available, event_buffer_registered>;

Expand Down
3 changes: 3 additions & 0 deletions include/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,5 +273,8 @@ namespace detail {

[[nodiscard]] std::string print_task_debug_label(const task& tsk, bool title_case = false);

/// Determines which overlapping regions appear between write accesses when the iteration space of `tsk` is split into `chunks`.
std::unordered_map<buffer_id, region<3>> detect_overlapping_writes(const task& tsk, const box_vector<3>& chunks);

} // namespace detail
} // namespace celerity
11 changes: 10 additions & 1 deletion include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,13 @@ namespace detail {
using buffer_writers_map = std::unordered_map<buffer_id, region_map<std::optional<task_id>>>;

public:
struct policy_set {
error_policy uninitialized_read_error = error_policy::throw_exception;
};

constexpr inline static task_id initial_epoch_task = 0;

task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder);
task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder, const policy_set& policy = default_policy_set());

virtual ~task_manager() = default;

Expand Down Expand Up @@ -179,8 +183,13 @@ namespace detail {
size_t get_current_task_count() const { return m_task_buffer.get_current_task_count(); }

private:
// default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member
// initializers, within its surrounding class (Clang)
constexpr static policy_set default_policy_set() { return {}; }

const size_t m_num_collective_nodes;
host_queue* m_queue;
policy_set m_policy;

task_ring_buffer m_task_buffer;

Expand Down
8 changes: 8 additions & 0 deletions include/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,12 @@ struct reduction_info {
};

constexpr node_id master_node_id = 0;

enum class error_policy {
ignore,
log_warning,
log_error,
throw_exception,
};

} // namespace celerity::detail
15 changes: 15 additions & 0 deletions include/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,19 @@ std::string get_simplified_type_name() {
/// Escapes "<", ">", and "&" with their corresponding HTML escape sequences
std::string escape_for_dot_label(std::string str);

template <typename... FmtParams>
[[noreturn]] void throw_error(FmtParams&&... fmt_args) {
throw std::runtime_error(fmt::format(std::forward<FmtParams>(fmt_args)...));
}

template <typename... FmtParams>
void report_error(const error_policy policy, FmtParams&&... fmt_args) {
switch(policy) {
case error_policy::ignore: break;
case error_policy::log_warning: CELERITY_WARN(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::log_error: CELERITY_ERROR(std::forward<FmtParams>(fmt_args)...); break;
case error_policy::throw_exception: throw_error(std::forward<FmtParams>(fmt_args)...); break;
}
}

} // namespace celerity::detail::utils
59 changes: 55 additions & 4 deletions src/distributed_graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
namespace celerity::detail {

distributed_graph_generator::distributed_graph_generator(
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder)
: m_num_nodes(num_nodes), m_local_nid(local_nid), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy)
: m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) {
if(m_num_nodes > max_num_nodes) {
throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes));
}
Expand All @@ -25,9 +25,10 @@ distributed_graph_generator::distributed_graph_generator(
m_epoch_for_new_commands = epoch_cmd->get_cid();
}

void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range) {
void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) {
m_buffer_states.emplace(
std::piecewise_construct, std::tuple{bid}, std::tuple{region_map<write_command_state>{range, dims}, region_map<node_bitset>{range, dims}});
if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffer_states.at(bid).initialized_region = box(subrange({}, range)); }
// Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands).
// This is required when tasks access host-initialized or uninitialized buffers.
m_buffer_states.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands);
Expand Down Expand Up @@ -145,8 +146,30 @@ std::unordered_set<abstract_command*> distributed_graph_generator::build_task(co
return std::move(m_current_cmd_batch);
}

void distributed_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const {
const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};

// Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the
// region-union of remote chunks. This way, every conflict will be reported by at least one node.
const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range));
auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes();

// detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter)
auto distributed_chunks = std::move(remote_chunks);
distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end());

if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) {
auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */));
for(const auto& [bid, overlap] : overlapping_writes) {
fmt::format_to(std::back_inserter(error), " B{} {}", bid, overlap);
}
error += ". Choose a non-overlapping range mapper for the write access or constrain the split to make the access non-overlapping.";
utils::report_error(m_policy.overlapping_write_error, "{}", error);
}
}

void distributed_graph_generator::generate_distributed_commands(const task& tsk) {
chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()};
const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable
const auto chunks = ([&] {
if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) {
Expand Down Expand Up @@ -178,6 +201,9 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
// Remember all generated pushes for determining intra-task anti-dependencies.
std::vector<push_command*> generated_pushes;

// Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner.
box_vector<3> local_chunks;

// In the master/worker model, we used to try and find the node best suited for initializing multiple
// reductions that do not initialize_to_identity based on current data distribution.
// This is more difficult in a distributed setting, so for now we just hard code it to node 0.
Expand Down Expand Up @@ -249,6 +275,8 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
m_last_collective_commands.emplace(cgid, cmd->get_cid());
}

local_chunks.push_back(subrange(chunks[i].offset, chunks[i].range));
}

// We use the task id, together with the "chunk id" and the buffer id (stored separately) to match pushes against their corresponding await pushes
Expand Down Expand Up @@ -283,9 +311,15 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
// TODO the per-node reduction result is discarded - warn user about dead store
}

region<3> uninitialized_reads;
for(const auto mode : required_modes) {
const auto& req = reqs_by_mode.at(mode);
if(detail::access::mode_traits::is_consumer(mode)) {
if(is_local_chunk && m_policy.uninitialized_read_error != error_policy::ignore
&& !bounding_box(buffer_state.initialized_region).covers(bounding_box(req.get_boxes()))) {
uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer_state.initialized_region));
}

if(is_local_chunk) {
// Store the read access for determining anti-dependencies later on
m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], req);
Expand Down Expand Up @@ -353,7 +387,17 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}

if(!uninitialized_reads.empty()) {
utils::report_error(m_policy.uninitialized_read_error,
"Command C{} on N{}, which executes {} of {}, reads B{} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid,
box(subrange(chunks[i].offset, chunks[i].range)), print_task_debug_label(tsk), bid, detail::region(std::move(uninitialized_reads)));
}

if(generate_reduction) {
if(m_policy.uninitialized_read_error != error_policy::ignore) {
post_reduction_buffer_states.at(bid).initialized_region = scalar_reduction_box;
}

const auto& reduction = *buffer_state.pending_reduction;

const auto local_last_writer = buffer_state.local_last_writer.get_region_values(scalar_reduction_box);
Expand Down Expand Up @@ -402,6 +446,9 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
}
}

// Check for and report overlapping writes between local chunks, and between local and remote chunks.
if(m_policy.overlapping_write_error != error_policy::ignore) { report_overlapping_writes(tsk, local_chunks); }

// For buffers that were in a pending reduction state and a reduction was generated
// (i.e., the result was not discarded), set their new state.
for(auto& [bid, new_state] : post_reduction_buffer_states) {
Expand Down Expand Up @@ -502,6 +549,10 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk)
const auto remote_writes = region_difference(global_writes, local_writes);
auto& buffer_state = m_buffer_states.at(bid);

if(m_policy.uninitialized_read_error != error_policy::ignore) {
buffer_state.initialized_region = region_union(buffer_state.initialized_region, global_writes);
}

// TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback)
auto boxes_and_cids = buffer_state.local_last_writer.get_region_values(remote_writes);
for(auto& [box, wcs] : boxes_and_cids) {
Expand Down
6 changes: 3 additions & 3 deletions src/recorders.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ task_dependency_list build_task_dependency_list(const task& tsk) {
}

task_record::task_record(const task& from, const buffer_manager* buff_mngr)
: tid(from.get_id()), debug_name(from.get_debug_name()), cgid(from.get_collective_group_id()), type(from.get_type()),
geometry(from.get_geometry()), reductions(build_reduction_list(from, buff_mngr)), accesses(build_access_list(from, buff_mngr)),
side_effect_map(from.get_side_effect_map()), dependencies(build_task_dependency_list(from)) {}
: tid(from.get_id()), debug_name(from.get_debug_name()), cgid(from.get_collective_group_id()), type(from.get_type()), geometry(from.get_geometry()),
reductions(build_reduction_list(from, buff_mngr)), accesses(build_access_list(from, buff_mngr)), side_effect_map(from.get_side_effect_map()),
dependencies(build_task_dependency_list(from)) {}

void task_recorder::record_task(const task& tsk) { //
m_recorded_tasks.emplace_back(tsk, m_buff_mngr);
Expand Down
Loading

0 comments on commit 96fda82

Please sign in to comment.