From 96fda827c4c86275b876adc56b9090b02051855b Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Wed, 29 Nov 2023 14:16:40 +0100 Subject: [PATCH] Detect uninitialized reads and overlapping writes task_manager and distributed_graph_generator by default throw when detecting an uninitialized read or overlapping writes in order to catch invalid test code or internal bugs. In `runtime`, these conditions are reported through warning / error logs instead. --- CHANGELOG.md | 1 + CMakeLists.txt | 12 +-- cmake/celerity-config.cmake.in | 3 + include/distributed_graph_generator.h | 20 ++++- include/scheduler.h | 5 +- include/task.h | 3 + include/task_manager.h | 11 ++- include/types.h | 8 ++ include/utils.h | 15 ++++ src/distributed_graph_generator.cc | 59 ++++++++++++- src/recorders.cc | 6 +- src/runtime.cc | 23 ++++- src/scheduler.cc | 2 +- src/task.cc | 43 ++++++++++ src/task_manager.cc | 24 ++++-- test/dag_benchmarks.cc | 33 +++++--- test/distributed_graph_generator_test_utils.h | 12 ++- test/graph_gen_transfer_tests.cc | 23 ++--- test/graph_generation_tests.cc | 70 +++++++++++----- test/runtime_deprecation_tests.cc | 2 +- test/runtime_tests.cc | 84 ++++++++++++++----- test/system/distr_tests.cc | 54 ++++++++++-- test/task_graph_tests.cc | 74 +++++++++++----- test/test_utils.h | 6 +- 24 files changed, 457 insertions(+), 136 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41264ea7a..33fa0f25b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ Versioning](http://semver.org/spec/v2.0.0.html). - Add new environment variables `CELERITY_HORIZON_STEP` and `CELERITY_HORIZON_MAX_PARALLELISM` to control Horizon generation (#199) - Add new `experimental::constrain_split` API to limit how a kernel can be split (#?) - `distr_queue::fence` and `buffer_snapshot` are now stable, subsuming the `experimental::` APIs of the same name (#225) +- Celerity now warns at runtime when a task declares reads from uninitialized buffers or writes with overlapping ranges between nodes (#224) ### Changed diff --git a/CMakeLists.txt b/CMakeLists.txt index a7335e206..feb74d0a8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,19 +16,18 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) if (CMAKE_BUILD_TYPE STREQUAL "Debug") - set(ENABLE_ACC_CHECK ON) + set(DEFAULT_ENABLE_DEBUG_CHECKS ON) else() - set(ENABLE_ACC_CHECK OFF) + set(DEFAULT_ENABLE_DEBUG_CHECKS OFF) endif() -option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${ENABLE_ACC_CHECK}) +option(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "Diagnose uninitialized reads and overlapping writes" ${DEFAULT_ENABLE_DEBUG_CHECKS}) +option(CELERITY_ACCESSOR_BOUNDARY_CHECK "Enable accessor boundary check" ${DEFAULT_ENABLE_DEBUG_CHECKS}) -if(CELERITY_ACCESSOR_BOUNDARY_CHECK) +if(CELERITY_ACCESSOR_BOUNDARY_CHECK AND NOT (CMAKE_BUILD_TYPE STREQUAL "Debug")) message(STATUS "Accessor boundary check enabled - this will impact kernel performance") endif() -unset(ENABLE_ACC_CHECK) - set(CELERITY_CMAKE_DIR "${PROJECT_SOURCE_DIR}/cmake") set(CMAKE_MODULE_PATH "${CMAKE_MODULE_PATH}" "${CELERITY_CMAKE_DIR}") find_package(MPI 2.0 REQUIRED) @@ -288,6 +287,7 @@ target_compile_definitions(celerity_runtime PUBLIC CELERITY_FEATURE_UNNAMED_KERNELS=$ CELERITY_DETAIL_HAS_NAMED_THREADS=$ CELERITY_ACCESSOR_BOUNDARY_CHECK=$ + CELERITY_ACCESS_PATTERN_DIAGNOSTICS=$ ) # Collect version information from git in src/version.cc. This target is always out of date, but the timestamp diff --git a/cmake/celerity-config.cmake.in b/cmake/celerity-config.cmake.in index 9d7f6d9e1..326ad8154 100644 --- a/cmake/celerity-config.cmake.in +++ b/cmake/celerity-config.cmake.in @@ -55,6 +55,9 @@ set(CELERITY_FEATURE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SCALAR_REDUCTIONS@") set(CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS "@CELERITY_FEATURE_SIMPLE_SCALAR_REDUCTIONS@") set(CELERITY_FEATURE_LOCAL_ACCESSOR "@CELERITY_FEATURE_LOCAL_ACCESSOR@") set(CELERITY_FEATURE_UNNAMED_KERNELS "@CELERITY_FEATURE_UNNAMED_KERNELS@") +set(CELERITY_DETAIL_HAS_NAMED_THREADS "@CELERITY_DETAIL_HAS_NAMED_THREADS@") +set(CELERITY_ACCESSOR_BOUNDARY_CHECK "@CELERITY_ACCESSOR_BOUNDARY_CHECK@") +set(CELERITY_ACCESS_PATTERN_DIAGNOSTICS "@CELERITY_ACCESS_PATTERN_DIAGNOSTICS@") include("${CMAKE_CURRENT_LIST_DIR}/celerity-targets.cmake") diff --git a/include/distributed_graph_generator.h b/include/distributed_graph_generator.h index e0b4c878c..6cb87020e 100644 --- a/include/distributed_graph_generator.h +++ b/include/distributed_graph_generator.h @@ -2,7 +2,6 @@ #include #include -#include #include "command_graph.h" #include "ranges.h" @@ -65,6 +64,7 @@ class distributed_graph_generator { buffer_state(region_map lw, region_map> rr) : local_last_writer(std::move(lw)), replicated_regions(std::move(rr)), pending_reduction(std::nullopt) {} + region<3> initialized_region; // for detecting uninitialized reads (only if policies.uninitialized_read != error_policy::ignore) region_map local_last_writer; region_map replicated_regions; @@ -75,10 +75,15 @@ class distributed_graph_generator { }; public: - distributed_graph_generator( - const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder); + struct policy_set { + error_policy uninitialized_read_error = error_policy::throw_exception; + error_policy overlapping_write_error = error_policy::throw_exception; + }; + + distributed_graph_generator(const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, + detail::command_recorder* recorder, const policy_set& policy = default_policy_set()); - void add_buffer(const buffer_id bid, const int dims, const range<3>& range); + void add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized); std::unordered_set build_task(const task& tsk); @@ -116,12 +121,19 @@ class distributed_graph_generator { void prune_commands_before(const command_id epoch); + void report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const; + private: using buffer_read_map = std::unordered_map>; using side_effect_map = std::unordered_map; + // default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member + // initializers, within its surrounding class (Clang) + constexpr static policy_set default_policy_set() { return {}; } + size_t m_num_nodes; node_id m_local_nid; + policy_set m_policy; command_graph& m_cdag; const task_manager& m_task_mngr; std::unordered_map m_buffer_states; diff --git a/include/scheduler.h b/include/scheduler.h index 77723b3a4..60f73e0bb 100644 --- a/include/scheduler.h +++ b/include/scheduler.h @@ -32,7 +32,9 @@ namespace detail { */ void notify_task_created(const task* const tsk) { notify(event_task_available{tsk}); } - void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range) { notify(event_buffer_registered{bid, dims, range}); } + void notify_buffer_registered(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) { + notify(event_buffer_registered{bid, dims, range, host_initialized}); + } protected: /** @@ -53,6 +55,7 @@ namespace detail { buffer_id bid; int dims; celerity::range<3> range; + bool host_initialized; }; using event = std::variant; diff --git a/include/task.h b/include/task.h index 1140f5145..fba4d0d76 100644 --- a/include/task.h +++ b/include/task.h @@ -273,5 +273,8 @@ namespace detail { [[nodiscard]] std::string print_task_debug_label(const task& tsk, bool title_case = false); + /// Determines which overlapping regions appear between write accesses when the iteration space of `tsk` is split into `chunks`. + std::unordered_map> detect_overlapping_writes(const task& tsk, const box_vector<3>& chunks); + } // namespace detail } // namespace celerity diff --git a/include/task_manager.h b/include/task_manager.h index 6fed0240e..787bd27d8 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -58,9 +58,13 @@ namespace detail { using buffer_writers_map = std::unordered_map>>; public: + struct policy_set { + error_policy uninitialized_read_error = error_policy::throw_exception; + }; + constexpr inline static task_id initial_epoch_task = 0; - task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder); + task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder, const policy_set& policy = default_policy_set()); virtual ~task_manager() = default; @@ -179,8 +183,13 @@ namespace detail { size_t get_current_task_count() const { return m_task_buffer.get_current_task_count(); } private: + // default-constructs a policy_set - this must be a function because we can't use the implicit default constructor of policy_set, which has member + // initializers, within its surrounding class (Clang) + constexpr static policy_set default_policy_set() { return {}; } + const size_t m_num_collective_nodes; host_queue* m_queue; + policy_set m_policy; task_ring_buffer m_task_buffer; diff --git a/include/types.h b/include/types.h index ddbd363df..44cc7f65e 100644 --- a/include/types.h +++ b/include/types.h @@ -75,4 +75,12 @@ struct reduction_info { }; constexpr node_id master_node_id = 0; + +enum class error_policy { + ignore, + log_warning, + log_error, + throw_exception, +}; + } // namespace celerity::detail diff --git a/include/utils.h b/include/utils.h index 0b2f991ed..2c14c87d7 100644 --- a/include/utils.h +++ b/include/utils.h @@ -105,4 +105,19 @@ std::string get_simplified_type_name() { /// Escapes "<", ">", and "&" with their corresponding HTML escape sequences std::string escape_for_dot_label(std::string str); +template +[[noreturn]] void throw_error(FmtParams&&... fmt_args) { + throw std::runtime_error(fmt::format(std::forward(fmt_args)...)); +} + +template +void report_error(const error_policy policy, FmtParams&&... fmt_args) { + switch(policy) { + case error_policy::ignore: break; + case error_policy::log_warning: CELERITY_WARN(std::forward(fmt_args)...); break; + case error_policy::log_error: CELERITY_ERROR(std::forward(fmt_args)...); break; + case error_policy::throw_exception: throw_error(std::forward(fmt_args)...); break; + } +} + } // namespace celerity::detail::utils diff --git a/src/distributed_graph_generator.cc b/src/distributed_graph_generator.cc index 683099b11..49c777667 100644 --- a/src/distributed_graph_generator.cc +++ b/src/distributed_graph_generator.cc @@ -10,8 +10,8 @@ namespace celerity::detail { distributed_graph_generator::distributed_graph_generator( - const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder) - : m_num_nodes(num_nodes), m_local_nid(local_nid), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) { + const size_t num_nodes, const node_id local_nid, command_graph& cdag, const task_manager& tm, detail::command_recorder* recorder, const policy_set& policy) + : m_num_nodes(num_nodes), m_local_nid(local_nid), m_policy(policy), m_cdag(cdag), m_task_mngr(tm), m_recorder(recorder) { if(m_num_nodes > max_num_nodes) { throw std::runtime_error(fmt::format("Number of nodes requested ({}) exceeds compile-time maximum of {}", m_num_nodes, max_num_nodes)); } @@ -25,9 +25,10 @@ distributed_graph_generator::distributed_graph_generator( m_epoch_for_new_commands = epoch_cmd->get_cid(); } -void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range) { +void distributed_graph_generator::add_buffer(const buffer_id bid, const int dims, const range<3>& range, bool host_initialized) { m_buffer_states.emplace( std::piecewise_construct, std::tuple{bid}, std::tuple{region_map{range, dims}, region_map{range, dims}}); + if(host_initialized && m_policy.uninitialized_read_error != error_policy::ignore) { m_buffer_states.at(bid).initialized_region = box(subrange({}, range)); } // Mark contents as available locally (= don't generate await push commands) and fully replicated (= don't generate push commands). // This is required when tasks access host-initialized or uninitialized buffers. m_buffer_states.at(bid).local_last_writer.update_region(subrange<3>({}, range), m_epoch_for_new_commands); @@ -145,8 +146,30 @@ std::unordered_set distributed_graph_generator::build_task(co return std::move(m_current_cmd_batch); } +void distributed_graph_generator::report_overlapping_writes(const task& tsk, const box_vector<3>& local_chunks) const { + const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; + + // Since this check is run distributed on every node, we avoid quadratic behavior by only checking for conflicts between all local chunks and the + // region-union of remote chunks. This way, every conflict will be reported by at least one node. + const box<3> global_chunk(subrange(full_chunk.offset, full_chunk.range)); + auto remote_chunks = region_difference(global_chunk, region(box_vector<3>(local_chunks))).into_boxes(); + + // detect_overlapping_writes takes a single box_vector, so we concatenate local and global chunks (the order does not matter) + auto distributed_chunks = std::move(remote_chunks); + distributed_chunks.insert(distributed_chunks.end(), local_chunks.begin(), local_chunks.end()); + + if(const auto overlapping_writes = detect_overlapping_writes(tsk, distributed_chunks); !overlapping_writes.empty()) { + auto error = fmt::format("{} has overlapping writes between multiple nodes in", print_task_debug_label(tsk, true /* title case */)); + for(const auto& [bid, overlap] : overlapping_writes) { + fmt::format_to(std::back_inserter(error), " B{} {}", bid, overlap); + } + error += ". Choose a non-overlapping range mapper for the write access or constrain the split to make the access non-overlapping."; + utils::report_error(m_policy.overlapping_write_error, "{}", error); + } +} + void distributed_graph_generator::generate_distributed_commands(const task& tsk) { - chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; + const chunk<3> full_chunk{tsk.get_global_offset(), tsk.get_global_size(), tsk.get_global_size()}; const size_t num_chunks = m_num_nodes * 1; // TODO Make configurable const auto chunks = ([&] { if(tsk.get_type() == task_type::collective || tsk.get_type() == task_type::fence) { @@ -178,6 +201,9 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) // Remember all generated pushes for determining intra-task anti-dependencies. std::vector generated_pushes; + // Collect all local chunks for detecting overlapping writes between all local chunks and the union of remote chunks in a distributed manner. + box_vector<3> local_chunks; + // In the master/worker model, we used to try and find the node best suited for initializing multiple // reductions that do not initialize_to_identity based on current data distribution. // This is more difficult in a distributed setting, so for now we just hard code it to node 0. @@ -249,6 +275,8 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) } m_last_collective_commands.emplace(cgid, cmd->get_cid()); } + + local_chunks.push_back(subrange(chunks[i].offset, chunks[i].range)); } // We use the task id, together with the "chunk id" and the buffer id (stored separately) to match pushes against their corresponding await pushes @@ -283,9 +311,15 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) // TODO the per-node reduction result is discarded - warn user about dead store } + region<3> uninitialized_reads; for(const auto mode : required_modes) { const auto& req = reqs_by_mode.at(mode); if(detail::access::mode_traits::is_consumer(mode)) { + if(is_local_chunk && m_policy.uninitialized_read_error != error_policy::ignore + && !bounding_box(buffer_state.initialized_region).covers(bounding_box(req.get_boxes()))) { + uninitialized_reads = region_union(uninitialized_reads, region_difference(req, buffer_state.initialized_region)); + } + if(is_local_chunk) { // Store the read access for determining anti-dependencies later on m_command_buffer_reads[cmd->get_cid()][bid] = region_union(m_command_buffer_reads[cmd->get_cid()][bid], req); @@ -353,7 +387,17 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) } } + if(!uninitialized_reads.empty()) { + utils::report_error(m_policy.uninitialized_read_error, + "Command C{} on N{}, which executes {} of {}, reads B{} {}, which has not been written by any node.", cmd->get_cid(), m_local_nid, + box(subrange(chunks[i].offset, chunks[i].range)), print_task_debug_label(tsk), bid, detail::region(std::move(uninitialized_reads))); + } + if(generate_reduction) { + if(m_policy.uninitialized_read_error != error_policy::ignore) { + post_reduction_buffer_states.at(bid).initialized_region = scalar_reduction_box; + } + const auto& reduction = *buffer_state.pending_reduction; const auto local_last_writer = buffer_state.local_last_writer.get_region_values(scalar_reduction_box); @@ -402,6 +446,9 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) } } + // Check for and report overlapping writes between local chunks, and between local and remote chunks. + if(m_policy.overlapping_write_error != error_policy::ignore) { report_overlapping_writes(tsk, local_chunks); } + // For buffers that were in a pending reduction state and a reduction was generated // (i.e., the result was not discarded), set their new state. for(auto& [bid, new_state] : post_reduction_buffer_states) { @@ -502,6 +549,10 @@ void distributed_graph_generator::generate_distributed_commands(const task& tsk) const auto remote_writes = region_difference(global_writes, local_writes); auto& buffer_state = m_buffer_states.at(bid); + if(m_policy.uninitialized_read_error != error_policy::ignore) { + buffer_state.initialized_region = region_union(buffer_state.initialized_region, global_writes); + } + // TODO: We need a way of updating regions in place! E.g. apply_to_values(box, callback) auto boxes_and_cids = buffer_state.local_last_writer.get_region_values(remote_writes); for(auto& [box, wcs] : boxes_and_cids) { diff --git a/src/recorders.cc b/src/recorders.cc index ff0f37293..187201cb0 100644 --- a/src/recorders.cc +++ b/src/recorders.cc @@ -44,9 +44,9 @@ task_dependency_list build_task_dependency_list(const task& tsk) { } task_record::task_record(const task& from, const buffer_manager* buff_mngr) - : tid(from.get_id()), debug_name(from.get_debug_name()), cgid(from.get_collective_group_id()), type(from.get_type()), - geometry(from.get_geometry()), reductions(build_reduction_list(from, buff_mngr)), accesses(build_access_list(from, buff_mngr)), - side_effect_map(from.get_side_effect_map()), dependencies(build_task_dependency_list(from)) {} + : tid(from.get_id()), debug_name(from.get_debug_name()), cgid(from.get_collective_group_id()), type(from.get_type()), geometry(from.get_geometry()), + reductions(build_reduction_list(from, buff_mngr)), accesses(build_access_list(from, buff_mngr)), side_effect_map(from.get_side_effect_map()), + dependencies(build_task_dependency_list(from)) {} void task_recorder::record_task(const task& tsk) { // m_recorded_tasks.emplace_back(tsk, m_buff_mngr); diff --git a/src/runtime.cc b/src/runtime.cc index a73ed3b6e..8eb65aba0 100644 --- a/src/runtime.cc +++ b/src/runtime.cc @@ -148,14 +148,31 @@ namespace detail { m_reduction_mngr = std::make_unique(); m_host_object_mngr = std::make_unique(); + if(m_cfg->is_recording()) m_task_recorder = std::make_unique(m_buffer_mngr.get()); - m_task_mngr = std::make_unique(m_num_nodes, m_h_queue.get(), m_task_recorder.get()); + + task_manager::policy_set task_mngr_policy; + // Merely _declaring_ an uninitialized read is legitimate as long as the kernel does not actually perform the read at runtime - this might happen in the + // first iteration of a submit-loop. We could get rid of this case by making access-modes a runtime property of accessors (cf + // https://github.com/celerity/meta/issues/74). + task_mngr_policy.uninitialized_read_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_warning : error_policy::ignore; + + m_task_mngr = std::make_unique(m_num_nodes, m_h_queue.get(), m_task_recorder.get(), task_mngr_policy); if(m_cfg->get_horizon_step()) m_task_mngr->set_horizon_step(m_cfg->get_horizon_step().value()); if(m_cfg->get_horizon_max_parallelism()) m_task_mngr->set_horizon_max_parallelism(m_cfg->get_horizon_max_parallelism().value()); + m_exec = std::make_unique(m_num_nodes, m_local_nid, *m_h_queue, *m_d_queue, *m_task_mngr, *m_buffer_mngr, *m_reduction_mngr); + m_cdag = std::make_unique(); if(m_cfg->is_recording()) m_command_recorder = std::make_unique(m_task_mngr.get(), m_buffer_mngr.get()); - auto dggen = std::make_unique(m_num_nodes, m_local_nid, *m_cdag, *m_task_mngr, m_command_recorder.get()); + + distributed_graph_generator::policy_set dggen_policy; + // Any uninitialized read that is observed on CDAG generation was already logged on task generation, unless we have a bug. + dggen_policy.uninitialized_read_error = error_policy::ignore; + dggen_policy.overlapping_write_error = CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::log_error : error_policy::ignore; + + auto dggen = std::make_unique(m_num_nodes, m_local_nid, *m_cdag, *m_task_mngr, m_command_recorder.get(), dggen_policy); + m_schdlr = std::make_unique(is_dry_run(), std::move(dggen), *m_exec); m_task_mngr->register_task_callback([this](const task* tsk) { m_schdlr->notify_task_created(tsk); }); @@ -274,7 +291,7 @@ namespace detail { void runtime::handle_buffer_registered(buffer_id bid) { const auto& info = m_buffer_mngr->get_buffer_info(bid); m_task_mngr->add_buffer(bid, info.dimensions, info.range, info.is_host_initialized); - m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range); + m_schdlr->notify_buffer_registered(bid, info.dimensions, info.range, info.is_host_initialized); } void runtime::handle_buffer_unregistered(buffer_id bid) { maybe_destroy_runtime(); } diff --git a/src/scheduler.cc b/src/scheduler.cc index 19980d20e..1122f736a 100644 --- a/src/scheduler.cc +++ b/src/scheduler.cc @@ -53,7 +53,7 @@ namespace detail { serializer.flush(cmds); }, [&](const event_buffer_registered& e) { // - m_dggen->add_buffer(e.bid, e.dims, e.range); + m_dggen->add_buffer(e.bid, e.dims, e.range, e.host_initialized); }, [&](const event_shutdown&) { assert(in_flight_events.empty()); diff --git a/src/task.cc b/src/task.cc index eff54c55e..403b7e5cf 100644 --- a/src/task.cc +++ b/src/task.cc @@ -85,5 +85,48 @@ namespace detail { return label; } + std::unordered_map> detect_overlapping_writes(const task& tsk, const box_vector<3>& chunks) { + const box<3> scalar_reduction_box({0, 0, 0}, {1, 1, 1}); + + auto& bam = tsk.get_buffer_access_map(); + + // track the union of writes we have checked so far in order to detect an overlap between that union and the next write + std::unordered_map> buffer_write_accumulators; + // collect overlapping writes in order to report all of them before throwing + std::unordered_map> overlapping_writes; + + for(const auto bid : bam.get_accessed_buffers()) { + for(const auto& ck : chunks) { + region<3> writes; + for(const auto mode : bam.get_access_modes(bid)) { + if(access::mode_traits::is_producer(mode)) { + const auto req = bam.get_mode_requirements(bid, mode, tsk.get_dimensions(), ck.get_subrange(), tsk.get_global_size()); + writes = region_union(writes, req); + } + } + if(!writes.empty()) { + auto& write_accumulator = buffer_write_accumulators[bid]; // allow default-insert + if(const auto overlap = region_intersection(write_accumulator, writes); !overlap.empty()) { + auto& full_overlap = overlapping_writes[bid]; // allow default-insert + full_overlap = region_union(full_overlap, overlap); + } + write_accumulator = region_union(write_accumulator, writes); + } + } + } + + // we already check for accessor-reduction overlaps on task generation, but we still repeat the sanity-check here + for(const auto& rinfo : tsk.get_reductions()) { + auto& write_accumulator = buffer_write_accumulators[rinfo.bid]; // allow default-insert + if(const auto overlap = region_intersection(write_accumulator, scalar_reduction_box); !overlap.empty()) { + auto& full_overlap = overlapping_writes[rinfo.bid]; // allow default-insert + full_overlap = region_union(full_overlap, overlap); + } + write_accumulator = region_union(write_accumulator, scalar_reduction_box); + } + + return overlapping_writes; + } + } // namespace detail } // namespace celerity diff --git a/src/task_manager.cc b/src/task_manager.cc index 77450ec7f..b0d79abe8 100644 --- a/src/task_manager.cc +++ b/src/task_manager.cc @@ -6,8 +6,8 @@ namespace celerity { namespace detail { - task_manager::task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder) // - : m_num_collective_nodes(num_collective_nodes), m_queue(queue), m_task_recorder(recorder) { + task_manager::task_manager(size_t num_collective_nodes, host_queue* queue, detail::task_recorder* recorder, const policy_set& error_policy) + : m_num_collective_nodes(num_collective_nodes), m_queue(queue), m_policy(error_policy), m_task_recorder(recorder) { // We manually generate the initial epoch task, which we treat as if it has been reached immediately. auto reserve = m_task_buffer.reserve_task_entry(await_free_task_slot_callback()); auto initial_epoch = task::make_epoch(initial_epoch_task, epoch_action::none); @@ -98,12 +98,20 @@ namespace detail { if(reduction.has_value()) { read_requirements = region_union(read_requirements, scalar_box); } const auto last_writers = m_buffers_last_writers.at(bid).get_region_values(read_requirements); - for(auto& p : last_writers) { - // This indicates that the buffer is being used for the first time by this task, or all previous tasks also only read from it. - // A valid use case (i.e., not reading garbage) for this is when the buffer has been initialized using a host pointer. - if(p.second == std::nullopt) continue; - const task_id last_writer = *p.second; - add_dependency(tsk, *m_task_buffer.get_task(last_writer), dependency_kind::true_dep, dependency_origin::dataflow); + box_vector<3> uninitialized_reads; + for(const auto& [box, writer] : last_writers) { + // host-initialized buffers are last-written by the current epoch + if(writer.has_value()) { + add_dependency(tsk, *m_task_buffer.get_task(*writer), dependency_kind::true_dep, dependency_origin::dataflow); + } else if(m_policy.uninitialized_read_error != error_policy::ignore) { + uninitialized_reads.push_back(box); + } + } + if(!uninitialized_reads.empty()) { + // TODO this should output the full buffer debug label, but we don't want to pull in buffer_manager for this because its days are numbered + utils::report_error(m_policy.uninitialized_read_error, + "{} declares a reading access on uninitialized B{} {}. Make sure to construct the accessor with no_init if possible.", + print_task_debug_label(tsk, true /* title case */), bid, region(std::move(uninitialized_reads))); } } diff --git a/test/dag_benchmarks.cc b/test/dag_benchmarks.cc index 1fb80a72d..b7d3b5684 100644 --- a/test/dag_benchmarks.cc +++ b/test/dag_benchmarks.cc @@ -135,10 +135,20 @@ TEST_CASE("benchmark task handling", "[benchmark][group:task-graph]") { } +// these policies are equivalent to the ones used by `runtime` (except that we throw exceptions here for benchmark-debugging purposes) +static constexpr task_manager::policy_set benchmark_task_manager_policy = { + /* uninitialized_read_error */ CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::throw_exception : error_policy::ignore, +}; +static constexpr distributed_graph_generator::policy_set benchmark_command_graph_generator_policy{ + /* uninitialized_read_error */ error_policy::ignore, // uninitialized reads already detected by task manager + /* overlapping_write_error */ CELERITY_ACCESS_PATTERN_DIAGNOSTICS ? error_policy::throw_exception : error_policy::ignore, +}; + + struct task_manager_benchmark_context { const size_t num_nodes = 1; task_recorder trec; - task_manager tm{1, nullptr, test_utils::print_graphs ? &trec : nullptr}; + task_manager tm{1, nullptr, test_utils::print_graphs ? &trec : nullptr, benchmark_task_manager_policy}; test_utils::mock_buffer_factory mbf{tm}; ~task_manager_benchmark_context() { tm.generate_epoch_task(celerity::detail::epoch_action::shutdown); } @@ -152,18 +162,19 @@ struct task_manager_benchmark_context { } }; + struct graph_generator_benchmark_context { const size_t num_nodes; command_graph cdag; graph_serializer gser{[](command_pkg&&) {}}; task_recorder trec; - task_manager tm{num_nodes, nullptr, test_utils::print_graphs ? &trec : nullptr}; - command_recorder crec; - distributed_graph_generator dggen; - test_utils::mock_buffer_factory mbf; + task_manager tm{num_nodes, nullptr, test_utils::print_graphs ? &trec : nullptr, benchmark_task_manager_policy}; + command_recorder crec{&tm}; + distributed_graph_generator dggen{ + num_nodes, 0 /* local_nid */, cdag, tm, test_utils::print_graphs ? &crec : nullptr, benchmark_command_graph_generator_policy}; + test_utils::mock_buffer_factory mbf{tm, dggen}; - explicit graph_generator_benchmark_context(size_t num_nodes) - : num_nodes{num_nodes}, crec(&tm), dggen{num_nodes, 0 /* local_nid */, cdag, tm, test_utils::print_graphs ? &crec : nullptr}, mbf{tm, dggen} { + explicit graph_generator_benchmark_context(size_t num_nodes) : num_nodes{num_nodes} { tm.register_task_callback([this](const task* tsk) { const auto cmds = dggen.build_task(*tsk); gser.flush(cmds); @@ -258,13 +269,13 @@ class benchmark_scheduler final : public abstract_scheduler { struct scheduler_benchmark_context { const size_t num_nodes; command_graph cdag; - task_manager tm{num_nodes, nullptr, {}}; + task_manager tm{num_nodes, nullptr, {}, benchmark_task_manager_policy}; benchmark_scheduler schdlr; test_utils::mock_buffer_factory mbf; explicit scheduler_benchmark_context(restartable_thread& thrd, size_t num_nodes) - : num_nodes{num_nodes}, // - schdlr{thrd, std::make_unique(num_nodes, 0 /* local_nid */, cdag, tm, nullptr)}, // + : num_nodes{num_nodes}, schdlr{thrd, std::make_unique( + num_nodes, 0 /* local_nid */, cdag, tm, nullptr, benchmark_command_graph_generator_policy)}, mbf{tm, schdlr} { tm.register_task_callback([this](const task* tsk) { schdlr.notify_task_created(tsk); }); schdlr.startup(); @@ -325,7 +336,7 @@ template template [[gnu::noinline]] BenchmarkContext&& generate_chain_graph(BenchmarkContext&& ctx, const size_t num_tasks) { const range<2> global_range{ctx.num_nodes, ctx.num_nodes}; - test_utils::mock_buffer<2> buf = ctx.mbf.create_buffer(global_range); + test_utils::mock_buffer<2> buf = ctx.mbf.create_buffer(global_range, true /* host initialized */); for(size_t t = 0; t < num_tasks; ++t) { ctx.create_task(global_range, [&](handler& cgh) { buf.get_access(cgh, [=](chunk<2> ck) { return subrange<2>{{ck.offset[1], ck.offset[0]}, {ck.range[1], ck.range[0]}}; }); diff --git a/test/distributed_graph_generator_test_utils.h b/test/distributed_graph_generator_test_utils.h index 08fe48900..d3add7c79 100644 --- a/test/distributed_graph_generator_test_utils.h +++ b/test/distributed_graph_generator_test_utils.h @@ -477,11 +477,17 @@ class dist_cdag_test_context { friend class task_builder; public: - dist_cdag_test_context(size_t num_nodes) : m_num_nodes(num_nodes), m_tm(num_nodes, nullptr /* host_queue */, &m_task_recorder) { + struct policy_set { + task_manager::policy_set tm; + distributed_graph_generator::policy_set dggen; + }; + + dist_cdag_test_context(const size_t num_nodes, const policy_set& policy = {}) + : m_num_nodes(num_nodes), m_tm(num_nodes, nullptr /* host_queue */, &m_task_recorder, policy.tm) { for(node_id nid = 0; nid < num_nodes; ++nid) { m_cdags.emplace_back(std::make_unique()); m_cmd_recorders.emplace_back(std::make_unique(&m_tm, nullptr)); - m_dggens.emplace_back(std::make_unique(num_nodes, nid, *m_cdags[nid], m_tm, m_cmd_recorders[nid].get())); + m_dggens.emplace_back(std::make_unique(num_nodes, nid, *m_cdags[nid], m_tm, m_cmd_recorders[nid].get(), policy.dggen)); } } @@ -498,7 +504,7 @@ class dist_cdag_test_context { const auto buf = test_utils::mock_buffer(bid, size); m_tm.add_buffer(bid, Dims, range_cast<3>(size), mark_as_host_initialized); for(auto& dggen : m_dggens) { - dggen->add_buffer(bid, Dims, range_cast<3>(size)); + dggen->add_buffer(bid, Dims, range_cast<3>(size), mark_as_host_initialized); } return buf; } diff --git a/test/graph_gen_transfer_tests.cc b/test/graph_gen_transfer_tests.cc index 73ff785b6..d769e6e26 100644 --- a/test/graph_gen_transfer_tests.cc +++ b/test/graph_gen_transfer_tests.cc @@ -2,8 +2,6 @@ #include "distributed_graph_generator_test_utils.h" -#include "distributed_graph_generator.h" - using namespace celerity; using namespace celerity::detail; using namespace celerity::test_utils; @@ -16,26 +14,15 @@ TEST_CASE("distributed_graph_generator generates required data transfer commands const range<1> test_range = {256}; auto buf = dctx.create_buffer(test_range); - auto rm = [](chunk<1> chnk) -> subrange<1> { - switch(chnk.offset[0]) { - case 0: return chnk; - case 64: return {128, 64}; - case 128: return {64, 64}; - case 192: return chnk; - default: FAIL("Unexpected offset"); - } - return {}; - }; + const auto rm = [&](const chunk<1>& chnk) { return subrange(id(test_range[0] - chnk.offset[0] - chnk.range[0]), chnk.range); }; const auto tid_a = dctx.device_compute(test_range).discard_write(buf, rm).submit(); CHECK(dctx.query(tid_a, command_type::execution).count_per_node() == 1); dctx.device_compute(test_range).read(buf, acc::one_to_one{}).submit(); - CHECK(dctx.query(command_type::push).count() == 2); - CHECK(dctx.query(command_type::push, node_id(1)).count() == 1); - CHECK(dctx.query(command_type::push, node_id(2)).count() == 1); - CHECK(dctx.query(command_type::await_push).count() == 2); - CHECK(dctx.query(command_type::await_push, node_id(1)).count() == 1); - CHECK(dctx.query(command_type::await_push, node_id(2)).count() == 1); + CHECK(dctx.query(command_type::push).count() == 4); + CHECK(dctx.query(command_type::push).count_per_node() == 1); + CHECK(dctx.query(command_type::await_push).count() == 4); + CHECK(dctx.query(command_type::await_push).count_per_node() == 1); } TEST_CASE("distributed_graph_generator doesn't generate data transfer commands for the same buffer and range more than once", diff --git a/test/graph_generation_tests.cc b/test/graph_generation_tests.cc index 65e58b31b..a05fa24cf 100644 --- a/test/graph_generation_tests.cc +++ b/test/graph_generation_tests.cc @@ -1,5 +1,6 @@ #include #include +#include #include "distributed_graph_generator_test_utils.h" @@ -123,27 +124,6 @@ TEST_CASE("distributed_graph_generator builds dependencies to all local commands CHECK(dctx.query(tid_c).have_successors(dctx.query(tid_d))); } -// This test case currently fails and exists for documentation purposes: -// - Having fixed write access to a buffer results in unclear semantics when it comes to splitting the task into chunks. -// - We could check for write access when using the built-in fixed range mapper and warn / throw. -// - But of course this is the easy case; the user could just as well write the same by hand. -// -// Really the most sensible thing to do might be to check whether chunks write to overlapping regions and abort if so. -TEST_CASE("distributed_graph_generator handles fixed write access", "[distributed_graph_generator][command-graph][!shouldfail]") { - dist_cdag_test_context dctx(2); - - const range<1> test_range = {128}; - auto buf0 = dctx.create_buffer(test_range); - - const auto tid_a = dctx.device_compute(test_range).discard_write(buf0, acc::all{}).submit(); - // Another solution could be to not split the task at all - CHECK(dctx.query(tid_a).count() == 1); - - dctx.device_compute(test_range).read(buf0, acc::all{}).submit(); - // Right now this generates push commands, which also doesn't make much sense - CHECK(dctx.query(command_type::push).empty()); -} - // This is a highly constructed and unrealistic example, but we'd still like the behavior to be clearly defined. TEST_CASE("distributed_graph_generator generates anti-dependencies for execution commands that have a task-level true dependency", "[distributed_graph_generator][command-graph]") { @@ -499,3 +479,51 @@ TEST_CASE("fences introduce dependencies on buffers", "[distributed_graph_genera CHECK(dctx.query(tid_fence, nid).have_successors(dctx.query(tid_b, nid))); } } + +TEST_CASE("distributed_graph_generator throws in tests if it detects an uninitialized read", "[distributed_graph_generator]") { + const size_t num_nodes = 2; + const range<1> node_range{num_nodes}; + + dist_cdag_test_context::policy_set policy; + policy.tm.uninitialized_read_error = error_policy::ignore; // otherwise we get task-level errors first + + dist_cdag_test_context dctx(num_nodes, policy); + + SECTION("on a fully uninitialized buffer") { + auto buf = dctx.create_buffer<1>({1}); + CHECK_THROWS_WITH((dctx.device_compute(node_range).name("uninitialized").read(buf, acc::all()).submit()), + "Command C1 on N0, which executes [0,0,0] - [1,1,1] of device kernel T1 \"uninitialized\", reads B0 {[0,0,0] - [1,1,1]}, which has not been " + "written by any node."); + } + + SECTION("on a partially, locally initialized buffer") { + auto buf = dctx.create_buffer<1>(node_range); + dctx.device_compute(range(1)).discard_write(buf, acc::one_to_one()).submit(); + CHECK_THROWS_WITH((dctx.device_compute(node_range).read(buf, acc::all()).submit()), + "Command C2 on N0, which executes [0,0,0] - [1,1,1] of device kernel T2, reads B0 {[1,0,0] - [2,1,1]}, which has not been written by any node."); + } + + SECTION("on a partially, remotely initialized buffer") { + auto buf = dctx.create_buffer<1>(node_range); + dctx.device_compute(range(1)).discard_write(buf, acc::one_to_one()).submit(); + CHECK_THROWS_WITH((dctx.device_compute(node_range).read(buf, acc::one_to_one()).submit()), + "Command C1 on N1, which executes [1,0,0] - [2,1,1] of device kernel T2, reads B0 {[1,0,0] - [2,1,1]}, which has not been written by any node."); + } +} + +TEST_CASE("distributed_graph_generator throws in tests if it detects overlapping writes", "[distributed_graph_generator]") { + dist_cdag_test_context dctx(2); + auto buf = dctx.create_buffer<2>({20, 20}); + + SECTION("on all-write") { + CHECK_THROWS_WITH((dctx.device_compute(buf.get_range()).discard_write(buf, acc::all()).submit()), + "Device kernel T1 has overlapping writes between multiple nodes in B0 {[0,0,0] - [20,20,1]}. Choose a non-overlapping " + "range mapper for the write access or constrain the split to make the access non-overlapping."); + } + + SECTION("on neighborhood-write") { + CHECK_THROWS_WITH((dctx.host_task(buf.get_range()).name("host neighborhood").discard_write(buf, acc::neighborhood(1, 1)).submit()), + "Host-compute task T1 \"host neighborhood\" has overlapping writes between multiple nodes in B0 {[9,0,0] - [11,20,1]}. Choose a non-overlapping " + "range mapper for the write access or constrain the split to make the access non-overlapping."); + } +} diff --git a/test/runtime_deprecation_tests.cc b/test/runtime_deprecation_tests.cc index 34900c807..623291362 100644 --- a/test/runtime_deprecation_tests.cc +++ b/test/runtime_deprecation_tests.cc @@ -27,7 +27,7 @@ namespace detail { experimental::host_object ho; int my_int = 33; q.submit(allow_by_ref, [= /* capture buffer/host-object by value */, &my_int](handler& cgh) { - accessor acc{buf, cgh, celerity::access::all{}, celerity::write_only_host_task}; + accessor acc{buf, cgh, celerity::access::all{}, celerity::write_only_host_task, celerity::no_init}; experimental::side_effect se{ho, cgh}; cgh.host_task(on_master_node, [=, &my_int] { (void)acc; diff --git a/test/runtime_tests.cc b/test/runtime_tests.cc index 35e808ff4..27ce385ef 100644 --- a/test/runtime_tests.cc +++ b/test/runtime_tests.cc @@ -1,7 +1,5 @@ #include "sycl_wrappers.h" -#include - #ifdef _WIN32 #define WIN32_LEAN_AND_MEAN #define NOMINMAX @@ -24,6 +22,7 @@ #include "named_threads.h" #include "ranges.h" +#include "log_test_utils.h" #include "test_utils.h" namespace celerity { @@ -89,10 +88,12 @@ namespace detail { } TEST_CASE_METHOD(test_utils::runtime_fixture, "get_access can be called on const buffer", "[buffer]") { - buffer buf_a{range<2>{32, 64}}; + const range<2> range{32, 64}; + std::vector init(range.size()); + buffer buf_a{init.data(), range}; auto& tm = runtime::get_instance().get_task_manager(); const auto tid = test_utils::add_compute_task( - tm, [&](handler& cgh) { buf_a.get_access(cgh, one_to_one{}); }, buf_a.get_range()); + tm, [&](handler& cgh) { buf_a.get_access(cgh, one_to_one{}); }, range); const auto tsk = tm.get_task(tid); const auto bufs = tsk->get_buffer_access_map().get_accessed_buffers(); REQUIRE(bufs.size() == 1); @@ -244,7 +245,7 @@ namespace detail { TEST_CASE("task_manager correctly records compute task information", "[task_manager][task][device_compute_task]") { task_manager tm{1, nullptr, nullptr}; test_utils::mock_buffer_factory mbf(tm); - auto buf_a = mbf.create_buffer(range<2>(64, 152)); + auto buf_a = mbf.create_buffer(range<2>(64, 152), true /* host_initialized */); auto buf_b = mbf.create_buffer(range<3>(7, 21, 99)); const auto tid = test_utils::add_compute_task( tm, @@ -557,19 +558,19 @@ namespace detail { buffer buf{{10, 10}}; CHECK_THROWS_WITH(q.submit([&](handler& cgh) { - auto acc = buf.get_access(cgh, one_to_one{}); + auto acc = buf.get_access(cgh, one_to_one{}); cgh.parallel_for(range<1>{10}, [=](celerity::item<1>) { (void)acc; }); }), "Invalid range mapper dimensionality: 1-dimensional kernel submitted with a requirement whose range mapper is neither invocable for chunk<1> nor " "(chunk<1>, range<2>) to produce subrange<2>"); CHECK_NOTHROW(q.submit([&](handler& cgh) { - auto acc = buf.get_access(cgh, one_to_one{}); + auto acc = buf.get_access(cgh, one_to_one{}); cgh.parallel_for(range<2>{10, 10}, [=](celerity::item<2>) { (void)acc; }); })); CHECK_THROWS_WITH(q.submit([&](handler& cgh) { - auto acc = buf.get_access(cgh, one_to_one{}); + auto acc = buf.get_access(cgh, one_to_one{}); cgh.parallel_for(range<3>{10, 10, 10}, [=](celerity::item<3>) { (void)acc; }); }), "Invalid range mapper dimensionality: 3-dimensional kernel submitted with a requirement whose range mapper is neither invocable for chunk<3> nor " @@ -631,32 +632,38 @@ namespace detail { buffer buf_1{range<1>{2}}; CHECK_THROWS(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<1>{1}, reduction(buf_1, cgh, cl::sycl::plus{}), [=](celerity::item<1>, auto&) {}); + cgh.parallel_for( + range<1>{1}, reduction(buf_1, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<1>, auto&) {}); })); buffer buf_4{range<1>{1}}; CHECK_NOTHROW(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<1>{1}, reduction(buf_4, cgh, cl::sycl::plus{}), [=](celerity::item<1>, auto&) {}); + cgh.parallel_for( + range<1>{1}, reduction(buf_4, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<1>, auto&) {}); })); buffer buf_2{range<2>{1, 2}}; CHECK_THROWS(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<2>{1, 1}, reduction(buf_2, cgh, cl::sycl::plus{}), [=](celerity::item<2>, auto&) {}); + cgh.parallel_for(range<2>{1, 1}, + reduction(buf_2, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<2>, auto&) {}); })); buffer buf_3{range<3>{1, 2, 1}}; CHECK_THROWS(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<3>{1, 1, 1}, reduction(buf_3, cgh, cl::sycl::plus{}), [=](celerity::item<3>, auto&) {}); + cgh.parallel_for(range<3>{1, 1, 1}, + reduction(buf_3, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<3>, auto&) {}); })); buffer buf_5{range<2>{1, 1}}; CHECK_NOTHROW(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<2>{1, 1}, reduction(buf_5, cgh, cl::sycl::plus{}), [=](celerity::item<2>, auto&) {}); + cgh.parallel_for(range<2>{1, 1}, + reduction(buf_5, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<2>, auto&) {}); })); buffer buf_6{range<3>{1, 1, 1}}; CHECK_NOTHROW(tm.submit_command_group([&](handler& cgh) { // - cgh.parallel_for(range<3>{1, 1, 1}, reduction(buf_6, cgh, cl::sycl::plus{}), [=](celerity::item<3>, auto&) {}); + cgh.parallel_for(range<3>{1, 1, 1}, + reduction(buf_6, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), [=](celerity::item<3>, auto&) {}); })); #else SKIP_BECAUSE_NO_SCALAR_REDUCTIONS @@ -707,7 +714,7 @@ namespace detail { q.submit([&](handler& cgh) { local_accessor la{32, cgh}; - accessor ga{out, cgh, celerity::access::one_to_one{}, write_only}; + accessor ga{out, cgh, celerity::access::one_to_one{}, write_only, no_init}; cgh.parallel_for(celerity::nd_range<1>{64, 32}, [=](nd_item<1> item) { la[item.get_local_id()] = static_cast(item.get_global_linear_id()); group_barrier(item.get_group()); @@ -731,8 +738,9 @@ namespace detail { buffer b{range<1>{1}}; distr_queue{}.submit([&](handler& cgh) { - cgh.parallel_for(celerity::nd_range{range<2>{8, 8}, range<2>{4, 4}}, reduction(b, cgh, cl::sycl::plus<>{}), - [](nd_item<2> item, auto& sum) { sum += item.get_global_linear_id(); }); + cgh.parallel_for(celerity::nd_range{range<2>{8, 8}, range<2>{4, 4}}, + reduction(b, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), + [](nd_item<2> item, auto& sum) { sum += static_cast(item.get_global_linear_id()); }); }); #else SKIP_BECAUSE_NO_SCALAR_REDUCTIONS @@ -752,8 +760,8 @@ namespace detail { #if CELERITY_FEATURE_SCALAR_REDUCTIONS buffer b{{1}}; q.submit([&](handler& cgh) { - cgh.parallel_for( - range<1>{64}, reduction(b, cgh, cl::sycl::plus{}), [=](item<1> item, auto& r) { r += static_cast(item.get_linear_id()); }); + cgh.parallel_for(range<1>{64}, reduction(b, cgh, cl::sycl::plus{}, property::reduction::initialize_to_identity()), + [=](item<1> item, auto& r) { r += static_cast(item.get_linear_id()); }); }); q.submit([&](handler& cgh) { cgh.parallel_for(celerity::nd_range<1>{64, 32}, reduction(b, cgh, cl::sycl::plus{}), @@ -1383,7 +1391,7 @@ namespace detail { distr_queue q; q.submit([&](handler& cgh) { - accessor acc(buf, cgh, all{}, write_only, no_init); + accessor acc(buf, cgh, one_to_one(), write_only, no_init); cgh.parallel_for(buf.get_range(), [=](celerity::item<2> item) { acc[item] = static_cast(item.get_linear_id()); }); }); @@ -1451,5 +1459,41 @@ namespace detail { }); } + TEST_CASE_METHOD( + test_utils::runtime_fixture, "runtime warns on uninitialized reads iff access pattern diagnostics are enabled", "[runtime][diagnostics]") // + { + buffer buf(1); + + std::unique_ptr lc; + { + distr_queue q; + lc = std::make_unique(); + + SECTION("in device kernels") { + q.submit([&](handler& cgh) { + accessor acc(buf, cgh, celerity::access::all(), celerity::read_only); + cgh.parallel_for(range(1), [=](item<1>) { (void)acc; }); + }); + } + + SECTION("in host tasks") { + q.submit([&](handler& cgh) { + accessor acc(buf, cgh, celerity::access::all(), celerity::read_only_host_task); + cgh.host_task(on_master_node, [=] { (void)acc; }); + }); + } + + q.slow_full_sync(); + } + + const auto error_message = + "declares a reading access on uninitialized B0 {[0,0,0] - [1,1,1]}. Make sure to construct the accessor with no_init if possible."; +#if CELERITY_ACCESS_PATTERN_DIAGNOSTICS + CHECK_THAT(lc->get_log(), Catch::Matchers::ContainsSubstring(error_message)); +#else + CHECK_THAT(lc->get_log(), !Catch::Matchers::ContainsSubstring(error_message)); +#endif + } + } // namespace detail } // namespace celerity diff --git a/test/system/distr_tests.cc b/test/system/distr_tests.cc index 29b56aa2e..bed2ba300 100644 --- a/test/system/distr_tests.cc +++ b/test/system/distr_tests.cc @@ -388,12 +388,15 @@ namespace detail { auto& tm = celerity::detail::runtime::get_instance().get_task_manager(); tm.set_horizon_step(1); - for(int i = 0; i < 2; ++i) { - q.submit([&](handler& cgh) { - celerity::accessor acc_a{buff_a, cgh, celerity::access::one_to_one{}, celerity::read_write}; - cgh.parallel_for(range, [=](item<2> item) { (void)acc_a; }); - }); - } + q.submit([&](handler& cgh) { + celerity::accessor acc_a{buff_a, cgh, celerity::access::one_to_one{}, celerity::write_only, celerity::no_init}; + cgh.parallel_for(range, [=](item<2> item) { (void)acc_a; }); + }); + + q.submit([&](handler& cgh) { + celerity::accessor acc_a{buff_a, cgh, celerity::access::one_to_one{}, celerity::read_write}; + cgh.parallel_for(range, [=](item<2> item) { (void)acc_a; }); + }); q.slow_full_sync(); @@ -434,5 +437,44 @@ namespace detail { } } + TEST_CASE_METHOD(test_utils::runtime_fixture, "runtime logs errors on overlapping writes between commands iff access pattern diagnostics are enabled", + "[runtime][diagnostics]") // + { + std::unique_ptr lc; + { + distr_queue q; + const auto num_nodes = runtime::get_instance().get_num_nodes(); + if(num_nodes < 2) { SKIP("Test needs at least 2 participating nodes"); } + + lc = std::make_unique(); + + buffer buf(1); + + SECTION("in distributed device kernels") { + q.submit([&](handler& cgh) { + accessor acc(buf, cgh, celerity::access::all(), write_only, no_init); + cgh.parallel_for(range(num_nodes), [=](item<1>) { (void)acc; }); + }); + } + + SECTION("in collective host tasks") { + q.submit([&](handler& cgh) { + accessor acc(buf, cgh, celerity::access::all(), write_only_host_task, no_init); + cgh.host_task(celerity::experimental::collective, [=](experimental::collective_partition) { (void)acc; }); + }); + } + + q.slow_full_sync(); + } + + const auto error_message = "has overlapping writes between multiple nodes in B0 {[0,0,0] - [1,1,1]}. Choose a non-overlapping range mapper for the " + "write access or constrain the split to make the access non-overlapping."; +#if CELERITY_ACCESS_PATTERN_DIAGNOSTICS + CHECK_THAT(lc->get_log(), Catch::Matchers::ContainsSubstring(error_message)); +#else + CHECK_THAT(lc->get_log(), !Catch::Matchers::ContainsSubstring(error_message)); +#endif + } + } // namespace detail } // namespace celerity diff --git a/test/task_graph_tests.cc b/test/task_graph_tests.cc index 0168c0052..b561dcc42 100644 --- a/test/task_graph_tests.cc +++ b/test/task_graph_tests.cc @@ -4,6 +4,7 @@ #include #include +#include #include #include @@ -94,16 +95,18 @@ namespace detail { using namespace cl::sycl::access; auto tt = test_utils::task_test_context{}; - auto buf = tt.mbf.create_buffer(range<1>(128)); + auto buf = tt.mbf.create_buffer(range<1>(128), true /* mark_as_host_initialized */); const auto tid_a = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { buf.get_access(cgh, fixed<1>{{0, 64}}); }); const auto tid_b = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { buf.get_access(cgh, fixed<1>{{0, 128}}); }); - REQUIRE(has_dependency(tt.tm, tid_b, tid_a)); + CHECK(has_dependency(tt.tm, tid_b, tid_a)); + CHECK(has_dependency(tt.tm, tid_b, task_manager::initial_epoch_task)); // for read of the host-initialized part const auto tid_c = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { buf.get_access(cgh, fixed<1>{{64, 128}}); }); - REQUIRE_FALSE(has_dependency(tt.tm, tid_c, tid_a)); + CHECK_FALSE(has_dependency(tt.tm, tid_c, tid_a)); + CHECK(has_dependency(tt.tm, tid_c, task_manager::initial_epoch_task)); // for read of the host-initialized part } TEST_CASE("task_manager correctly generates anti-dependencies", "[task_manager][task-graph]") { @@ -136,10 +139,14 @@ namespace detail { TEST_CASE("task_manager correctly handles host-initialized buffers", "[task_manager][task-graph]") { using namespace cl::sycl::access; - auto tt = test_utils::task_test_context{}; - auto host_init_buf = tt.mbf.create_buffer(range<1>(128), true); - auto non_host_init_buf = tt.mbf.create_buffer(range<1>(128), false); - auto artificial_dependency_buf = tt.mbf.create_buffer(range<1>(1), false); + // we explicitly test reading from non_host_init_buf + task_manager::policy_set tm_policy; + tm_policy.uninitialized_read_error = error_policy::ignore; + + auto tt = test_utils::task_test_context(tm_policy); + auto host_init_buf = tt.mbf.create_buffer(range<1>(128), true /* mark_as_host_initialized */); + auto non_host_init_buf = tt.mbf.create_buffer(range<1>(128), false /* mark_as_host_initialized */); + auto artificial_dependency_buf = tt.mbf.create_buffer(range<1>(1), false /* mark_as_host_initialized */); const auto tid_a = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { host_init_buf.get_access(cgh, fixed<1>{{0, 128}}); @@ -208,23 +215,20 @@ namespace detail { CAPTURE(producer_mode); auto tt = test_utils::task_test_context{}; - auto buf = tt.mbf.create_buffer(range<1>(128), false); + auto buf = tt.mbf.create_buffer(range<1>(128), true /* mark_as_host_initialized */); - const task_id tid_a = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { - dispatch_get_access(buf, cgh, producer_mode, fixed<1>{{0, 128}}); - }); + const task_id tid_a = + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { dispatch_get_access(buf, cgh, producer_mode, all()); }); - const task_id tid_b = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { - dispatch_get_access(buf, cgh, consumer_mode, fixed<1>{{0, 128}}); - }); - REQUIRE(has_dependency(tt.tm, tid_b, tid_a)); + const task_id tid_b = + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { dispatch_get_access(buf, cgh, consumer_mode, all()); }); + CHECK(has_dependency(tt.tm, tid_b, tid_a)); - const task_id tid_c = test_utils::add_compute_task(tt.tm, [&](handler& cgh) { - dispatch_get_access(buf, cgh, producer_mode, fixed<1>{{0, 128}}); - }); + const task_id tid_c = + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { dispatch_get_access(buf, cgh, producer_mode, all()); }); const bool pure_consumer = consumer_mode == mode::read; const bool pure_producer = producer_mode == mode::discard_read_write || producer_mode == mode::discard_write; - REQUIRE(has_dependency(tt.tm, tid_c, tid_b, pure_consumer || pure_producer ? dependency_kind::anti_dep : dependency_kind::true_dep)); + CHECK(has_dependency(tt.tm, tid_c, tid_b, pure_consumer || pure_producer ? dependency_kind::anti_dep : dependency_kind::true_dep)); } } } @@ -305,7 +309,7 @@ namespace detail { auto tt = test_utils::task_test_context{}; tt.tm.set_horizon_step(2); - auto buf_a = tt.mbf.create_buffer(range<1>(128)); + auto buf_a = tt.mbf.create_buffer(range<1>(128), true /* mark_as_host_initialized */); test_utils::add_host_task(tt.tm, on_master_node, [&](handler& cgh) { buf_a.get_access(cgh, fixed<1>({0, 128})); }); @@ -368,7 +372,7 @@ namespace detail { const auto buff_size = 128; const auto num_tasks = 9; const auto buff_elem_per_task = buff_size / num_tasks; - auto buf_a = tt.mbf.create_buffer(range<1>(buff_size)); + auto buf_a = tt.mbf.create_buffer(range<1>(buff_size), true /* mark_as_host_initialized */); auto current_horizon = task_manager_testspy::get_current_horizon(tt.tm); CHECK_FALSE(current_horizon.has_value()); @@ -538,7 +542,7 @@ namespace detail { TEST_CASE("buffer accesses with empty ranges do not generate data-flow dependencies", "[task_manager][task-graph]") { auto tt = test_utils::task_test_context{}; - auto buf = tt.mbf.create_buffer(range<2>(32, 32)); + auto buf = tt.mbf.create_buffer(range<2>(32, 32), true /* mark_as_host_initialized */); const auto write_sr = GENERATE(values({subrange<2>{{16, 16}, {0, 0}}, subrange<2>{{16, 16}, {8, 8}}})); const auto read_sr = GENERATE(values({subrange<2>{{1, 1}, {0, 0}}, subrange<2>{{8, 8}, {16, 16}}})); @@ -713,5 +717,31 @@ namespace detail { CHECK(has_dependency(tt.tm, tid_b, tid_fence, dependency_kind::anti_dep)); } + TEST_CASE("task_manager throws in tests if it detects an uninitialized read", "[task_manager]") { + test_utils::task_test_context tt; + + SECTION("on a fully uninitialized buffer") { + auto buf = tt.mbf.create_buffer<1>({1}); + + CHECK_THROWS_WITH((test_utils::add_compute_task( + tt.tm, [&](handler& cgh) { debug::set_task_name(cgh, "uninit_read"), buf.get_access(cgh, all{}); })), + "Device kernel T1 \"uninit_read\" declares a reading access on uninitialized B0 {[0,0,0] - [1,1,1]}. Make sure to construct the accessor with " + "no_init if possible."); + } + + SECTION("on a partially initialized buffer") { + auto buf = tt.mbf.create_buffer<2>({64, 64}); + test_utils::add_compute_task(tt.tm, [&](handler& cgh) { + buf.get_access(cgh, fixed<2>({{0, 0}, {32, 32}})); + }); + + CHECK_THROWS_WITH((test_utils::add_compute_task( + tt.tm, [&](handler& cgh) { debug::set_task_name(cgh, "uninit_read"), buf.get_access(cgh, all{}); })), + "Device kernel T2 \"uninit_read\" declares a reading access on uninitialized B0 {[0,32,0] - [32,64,1], [32,0,0] - [64,64,1]}. Make sure to " + "construct " + "the accessor with no_init if possible."); + } + } + } // namespace detail } // namespace celerity diff --git a/test/test_utils.h b/test/test_utils.h index 4939b3486..f966abf02 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -210,8 +210,8 @@ namespace test_utils { const detail::buffer_id bid = m_next_buffer_id++; const auto buf = mock_buffer(bid, size); if(m_task_mngr != nullptr) { m_task_mngr->add_buffer(bid, Dims, detail::range_cast<3>(size), mark_as_host_initialized); } - if(m_schdlr != nullptr) { m_schdlr->notify_buffer_registered(bid, Dims, detail::range_cast<3>(size)); } - if(m_dggen != nullptr) { m_dggen->add_buffer(bid, Dims, detail::range_cast<3>(size)); } + if(m_schdlr != nullptr) { m_schdlr->notify_buffer_registered(bid, Dims, detail::range_cast<3>(size), mark_as_host_initialized); } + if(m_dggen != nullptr) { m_dggen->add_buffer(bid, Dims, detail::range_cast<3>(size), mark_as_host_initialized); } return buf; } @@ -357,7 +357,7 @@ namespace test_utils { mock_host_object_factory mhof; mock_reduction_factory mrf; - task_test_context() : tm(1, nullptr, &trec), mbf(tm) {} + explicit task_test_context(const detail::task_manager::policy_set& policy = {}) : tm(1, nullptr, &trec, policy), mbf(tm) {} ~task_test_context() { maybe_print_task_graph(trec); } };