diff --git a/examples/wave_sim/wave_sim.cc b/examples/wave_sim/wave_sim.cc index d75b1cd3c..6be99ab1a 100644 --- a/examples/wave_sim/wave_sim.cc +++ b/examples/wave_sim/wave_sim.cc @@ -63,23 +63,32 @@ void update(celerity::distr_queue& queue, celerity::buffer up, celerit step(queue, up, u, dt, delta); } +void stream_open(celerity::distr_queue& queue, size_t N, size_t num_samples, celerity::experimental::host_object os) { + queue.submit([=](celerity::handler& cgh) { + celerity::experimental::side_effect os_eff{os, cgh}; + cgh.host_task(celerity::on_master_node, [=] { + os_eff->open("wave_sim_result.bin", std::ios_base::out | std::ios_base::binary); + const struct { uint64_t n, t; } header{N, num_samples}; + os_eff->write(reinterpret_cast(&header), sizeof(header)); + }); + }); +} + template -void store(celerity::distr_queue& queue, celerity::buffer up, celerity::buffer sampled_frames, size_t frame_idx) { +void stream_append(celerity::distr_queue& queue, celerity::buffer up, celerity::experimental::host_object os) { const auto range = up.get_range(); queue.submit([=](celerity::handler& cgh) { celerity::accessor up_r{up, cgh, celerity::access::all{}, celerity::read_only_host_task}; - // Use `all` range mapper to avoid unnecessary lazy resizing of the backing buffer as new frames come in - celerity::accessor sf_w{sampled_frames, cgh, celerity::access::all{}, celerity::write_only_host_task}; - cgh.host_task(celerity::on_master_node, [=] { memcpy(sf_w.get_pointer() + frame_idx * range.size(), up_r.get_pointer(), range.size() * sizeof(T)); }); + celerity::experimental::side_effect os_eff{os, cgh}; + cgh.host_task(celerity::on_master_node, [=] { os_eff->write(reinterpret_cast(up_r.get_pointer()), range.size() * sizeof(T)); }); }); } -template -void write_bin(size_t N, size_t num_samples, const T* sampled_frames) { - std::ofstream os("wave_sim_result.bin", std::ios_base::out | std::ios_base::binary); - const struct { uint64_t n, t; } header{N, num_samples}; - os.write(reinterpret_cast(&header), sizeof(header)); - os.write(reinterpret_cast(sampled_frames), num_samples * N * N * sizeof(T)); +void stream_close(celerity::distr_queue& queue, celerity::experimental::host_object os) { + queue.submit([=](celerity::handler& cgh) { + celerity::experimental::side_effect os_eff{os, cgh}; + cgh.host_task(celerity::on_master_node, [=] { os_eff->close(); }); + }); } struct wave_sim_config { @@ -134,32 +143,28 @@ int main(int argc, char* argv[]) { celerity::buffer up{celerity::range<2>(cfg.N, cfg.N)}; // next celerity::buffer u{celerity::range<2>(cfg.N, cfg.N)}; // current - // Create buffer for storing sampled frames. - // As we only need some form of contiguous storage for dumping the result in the end, we can simply use a 1D buffer here. - celerity::buffer sampled_frames{celerity::range<1>{num_samples * up.get_range().size()}}; - setup_wave(queue, u, {cfg.N / 4.f, cfg.N / 4.f}, 1, {cfg.N / 8.f, cfg.N / 8.f}); zero(queue, up); initialize(queue, up, u, cfg.dt, {cfg.dx, cfg.dy}); - // Store initial state - if(cfg.output_sample_rate > 0) { store(queue, u, sampled_frames, 0); } + const celerity::experimental::host_object os; + if(cfg.output_sample_rate > 0) { + stream_open(queue, cfg.N, num_samples, os); + stream_append(queue, u, os); // Store initial state + } auto t = 0.0; size_t i = 0; while(t < cfg.T) { update(queue, up, u, cfg.dt, {cfg.dx, cfg.dy}); - if(cfg.output_sample_rate != 0 && ++i % cfg.output_sample_rate == 0) { store(queue, u, sampled_frames, i / cfg.output_sample_rate); } + if(cfg.output_sample_rate > 0) { + if(++i % cfg.output_sample_rate == 0) { stream_append(queue, u, os); } + } std::swap(u, up); t += cfg.dt; } - if(cfg.output_sample_rate > 0) { - queue.submit([=](celerity::handler& cgh) { - celerity::accessor sf{sampled_frames, cgh, celerity::access::all{}, celerity::read_only_host_task}; - cgh.host_task(celerity::on_master_node, [=]() { write_bin(cfg.N, num_samples, sf.get_pointer()); }); - }); - } + if(cfg.output_sample_rate > 0) { stream_close(queue, os); } return EXIT_SUCCESS; } diff --git a/include/celerity.h b/include/celerity.h index 63583ac7f..8e92236fd 100644 --- a/include/celerity.h +++ b/include/celerity.h @@ -6,6 +6,7 @@ #include "accessor.h" #include "buffer.h" #include "distr_queue.h" +#include "side_effect.h" #include "user_bench.h" #include "version.h" diff --git a/include/graph_generator.h b/include/graph_generator.h index 5e021d2be..5ee935fb0 100644 --- a/include/graph_generator.h +++ b/include/graph_generator.h @@ -58,6 +58,7 @@ namespace detail { using buffer_state_map = std::unordered_map; using buffer_read_map = std::unordered_map>; using buffer_writer_map = std::unordered_map>>; + using side_effect_map = std::unordered_map; struct per_node_data { // An "init command" is used as the last writer for host-initialized buffers. @@ -69,6 +70,8 @@ namespace detail { // Collective host tasks have an implicit dependency on the previous task in the same collective group, which is required in order to guarantee // they are executed in the same order on every node. std::unordered_map last_collective_commands; + // Side effects on the same host object create true dependencies between task commands, so we track the last effect per host object on each node. + side_effect_map host_object_last_effects; }; public: @@ -119,6 +122,8 @@ namespace detail { void process_task_data_requirements(task_id tid); + void process_task_side_effect_requirements(task_id tid); + void generate_horizon(task_id tid); }; diff --git a/include/handler.h b/include/handler.h index 41adbfe58..d99991ae9 100644 --- a/include/handler.h +++ b/include/handler.h @@ -287,42 +287,46 @@ namespace detail { access_map.add_access(bid, std::move(rm)); } + void add_requirement(const host_object_id hoid, const experimental::side_effect_order order) { + assert(task == nullptr); + side_effect_map.add_side_effect(hoid, order); + } + template void add_reduction(reduction_id rid) { reductions.push_back(rid); } - void create_host_compute_task(int dimensions, range<3> global_range, id<3> global_offset, range<3> granularity) { + void create_host_compute_task(task_geometry geometry) { assert(task == nullptr); - if(global_range.size() == 0) { + if(geometry.global_size.size() == 0) { // TODO this can be easily supported by not creating a task in case the execution range is empty throw std::runtime_error{"The execution range of distributed host tasks must have at least one item"}; } - task = detail::task::make_host_compute( - tid, dimensions, global_range, global_offset, granularity, std::move(cgf), std::move(access_map), std::move(reductions)); + task = detail::task::make_host_compute(tid, geometry, std::move(cgf), std::move(access_map), std::move(side_effect_map), std::move(reductions)); } - void create_device_compute_task(int dimensions, range<3> global_range, id<3> global_offset, range<3> granularity, std::string debug_name) { + void create_device_compute_task(task_geometry geometry, std::string debug_name) { assert(task == nullptr); - if(global_range.size() == 0) { + if(geometry.global_size.size() == 0) { // TODO unless reductions are involved, this can be easily supported by not creating a task in case the execution range is empty. // Edge case: If the task includes reductions that specify property::reduction::initialize_to_identity, we need to create a task that sets // the buffer state to an empty pending_reduction_state in the graph_generator. This will cause a trivial reduction_command to be generated on // each node that reads from the reduction output buffer, initializing it to the identity value locally. throw std::runtime_error{"The execution range of device tasks must have at least one item"}; } - task = detail::task::make_device_compute( - tid, dimensions, global_range, global_offset, granularity, std::move(cgf), std::move(access_map), std::move(reductions), std::move(debug_name)); + if(!side_effect_map.empty()) { throw std::runtime_error{"Side effects cannot be used in device kernels"}; } + task = detail::task::make_device_compute(tid, geometry, std::move(cgf), std::move(access_map), std::move(reductions), std::move(debug_name)); } void create_collective_task(collective_group_id cgid) { assert(task == nullptr); - task = detail::task::make_collective(tid, cgid, num_collective_nodes, std::move(cgf), std::move(access_map)); + task = detail::task::make_collective(tid, cgid, num_collective_nodes, std::move(cgf), std::move(access_map), std::move(side_effect_map)); } void create_master_node_task() { assert(task == nullptr); - task = detail::task::make_master_node(tid, std::move(cgf), std::move(access_map)); + task = detail::task::make_master_node(tid, std::move(cgf), std::move(access_map), std::move(side_effect_map)); } std::unique_ptr into_task() && { return std::move(task); } @@ -339,6 +343,7 @@ namespace detail { task_id tid; std::unique_ptr cgf; buffer_access_map access_map; + side_effect_map side_effect_map; std::vector reductions; std::unique_ptr task = nullptr; size_t num_collective_nodes; @@ -595,8 +600,8 @@ void handler::parallel_for_kernel_and_reductions(range global_range, id(*this).create_device_compute_task( - Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), granularity, detail::kernel_debug_name()); + const detail::task_geometry geometry{Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), granularity}; + return dynamic_cast(*this).create_device_compute_task(geometry, detail::kernel_debug_name()); } auto& device_handler = dynamic_cast(*this); @@ -642,8 +647,8 @@ void handler::host_task(experimental::collective_tag tag, Functor kernel) { template void handler::host_task(range global_range, id global_offset, Functor kernel) { if(is_prepass()) { - dynamic_cast(*this).create_host_compute_task( - Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), {1, 1, 1}); + const detail::task_geometry geometry{Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), {1, 1, 1}}; + dynamic_cast(*this).create_host_compute_task(geometry); } else { dynamic_cast(*this).schedule(kernel); } diff --git a/include/host_object.h b/include/host_object.h new file mode 100644 index 000000000..0e5669367 --- /dev/null +++ b/include/host_object.h @@ -0,0 +1,175 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "runtime.h" + + +namespace celerity::experimental { + +template +class host_object; + +} // namespace celerity::experimental + +namespace celerity::detail { + +class host_object_manager { + public: + host_object_id create_host_object() { + const std::lock_guard lock{mutex}; + const auto id = next_id++; + objects.emplace(id); + return id; + } + + void destroy_host_object(const host_object_id id) { + const std::lock_guard lock{mutex}; + objects.erase(id); + } + + // true-result only reliable if no calls to create_host_object() are pending + bool has_active_objects() const { + const std::lock_guard lock{mutex}; + return !objects.empty(); + } + + private: + mutable std::mutex mutex; + host_object_id next_id = 0; + std::unordered_set objects; +}; + +// Base for `state` structs in all host_object specializations: registers and unregisters host_objects with the host_object_manager. +struct host_object_tracker { + detail::host_object_id id{}; + + host_object_tracker() { + if(!detail::runtime::is_initialized()) { detail::runtime::init(nullptr, nullptr); } + id = detail::runtime::get_instance().get_host_object_manager().create_host_object(); + } + + host_object_tracker(host_object_tracker&&) = delete; + host_object_tracker& operator=(host_object_tracker&&) = delete; + + ~host_object_tracker() { detail::runtime::get_instance().get_host_object_manager().destroy_host_object(id); } +}; + +// see host_object deduction guides +template +struct assert_host_object_ctor_param_is_rvalue { + static_assert(std::is_rvalue_reference_v, + "Either pass the constructor parameter as T&& or std::reference_wrapper, or add explicit template arguments to host_object"); + using type = T; +}; + +template +using assert_host_object_ctor_param_is_rvalue_t = typename assert_host_object_ctor_param_is_rvalue::type; + +} // namespace celerity::detail + +namespace celerity::experimental { + +/** + * A `host_object` wraps state that exists separately on each worker node and can be referenced in host tasks through `side_effect`s. Celerity ensures that + * access to the object state is properly synchronized and ordered. An example usage of a host object might be a file stream that is written to from multiple + * host tasks sequentially. + * + * - The generic `host_object` keeps ownership of the state at any time and is the safest way to achieve side effects on the host. + * - The `host_object` specialization attaches Celerity's tracking and synchronization mechanism to user-managed state. The user guarantees that the + * referenced object is not accessed in any way other than through a `side_effect` while the `host_object` is live. + * - `host_object` does not carry internal state and can be used to track access to global variables or functions like `printf()`. + */ +template +class host_object { + static_assert(std::is_object_v); // disallow host_object and host_object + + public: + using object_type = T; + + host_object() : shared_state{std::make_shared(std::in_place)} {} + + explicit host_object(const T& obj) : shared_state{std::make_shared(std::in_place, obj)} {} + + explicit host_object(T&& obj) : shared_state{std::make_shared(std::in_place, std::move(obj))} {} + + /// Constructs the object in-place with the given constructor arguments. + template + explicit host_object(const std::in_place_t, CtorParams&&... ctor_args) // requiring std::in_place avoids overriding copy and move constructors + : shared_state{std::make_shared(std::in_place, std::forward(ctor_args)...)} {} + + private: + template + friend class side_effect; + + struct state : detail::host_object_tracker { + T object; + + template + explicit state(const std::in_place_t, CtorParams&&... ctor_args) : object{std::forward(ctor_args)...} {} + }; + + detail::host_object_id get_id() const { return shared_state->id; } + T* get_object() const { return &shared_state->object; } + + std::shared_ptr shared_state; +}; + +template +class host_object { + public: + using object_type = T; + + explicit host_object(T& obj) : shared_state{std::make_shared(obj)} {} + + explicit host_object(const std::reference_wrapper ref) : shared_state{std::make_shared(ref.get())} {} + + private: + template + friend class side_effect; + + struct state : detail::host_object_tracker { + T& object; + + explicit state(T& object) : object{object} {} + }; + + detail::host_object_id get_id() const { return shared_state->id; } + T* get_object() const { return &shared_state->object; } + + std::shared_ptr shared_state; +}; + +template <> +class host_object { + public: + using object_type = void; + + explicit host_object() : shared_state{std::make_shared()} {} + + private: + template + friend class side_effect; + + struct state : detail::host_object_tracker {}; + + detail::host_object_id get_id() const { return shared_state->id; } + + std::shared_ptr shared_state; +}; + +// The universal reference parameter T&& matches U& as well as U&& for object types U, but we don't want to implicitly invoke a copy constructor: the user +// might have intended to either create a host_object (which requires a std::reference_wrapper parameter) or move-construct the interior. +template +explicit host_object(T &&) -> host_object>; + +template +explicit host_object(std::reference_wrapper) -> host_object; + +explicit host_object()->host_object; + +} // namespace celerity::experimental diff --git a/include/runtime.h b/include/runtime.h index 978aab36e..060fe437e 100644 --- a/include/runtime.h +++ b/include/runtime.h @@ -30,6 +30,7 @@ namespace detail { class scheduler; class executor; class task_manager; + class host_object_manager; class runtime_already_started_error : public std::runtime_error { public: @@ -72,6 +73,8 @@ namespace detail { reduction_manager& get_reduction_manager() const; + host_object_manager& get_host_object_manager() const; + std::shared_ptr get_logger() const { return default_logger; } /** @@ -110,6 +113,7 @@ namespace detail { std::unique_ptr buffer_mngr; std::unique_ptr reduction_mngr; + std::unique_ptr host_object_mngr; std::unique_ptr task_mngr; std::unique_ptr exec; diff --git a/include/side_effect.h b/include/side_effect.h new file mode 100644 index 000000000..24f35bc0c --- /dev/null +++ b/include/side_effect.h @@ -0,0 +1,45 @@ +#pragma once + +#include + +#include "handler.h" +#include "host_object.h" + + +namespace celerity::experimental { + +/** + * Provides access to a `host_object` through capture in a `host_task`. Inside the host task kernel, the internal state of the host object can be accessed + * through the `*` or `->` operators. This behavior is similar to accessors on buffers. + */ +template +class side_effect { + public: + using object_type = typename host_object::object_type; + constexpr static inline side_effect_order order = Order; + + explicit side_effect(const host_object& object, handler& cgh) : object{object} { + if(detail::is_prepass_handler(cgh)) { + auto& prepass_cgh = static_cast(cgh); + prepass_cgh.add_requirement(object.get_id(), order); + } + } + + template + std::enable_if_t, object_type>& operator*() const { + return *object.get_object(); + } + + template + std::enable_if_t, object_type>* operator->() const { + return object.get_object(); + } + + private: + host_object object; +}; + +template +side_effect(const host_object&, handler&) -> side_effect; + +} // namespace celerity::experimental \ No newline at end of file diff --git a/include/task.h b/include/task.h index 0aec9ee83..29d191a89 100644 --- a/include/task.h +++ b/include/task.h @@ -68,6 +68,33 @@ namespace detail { std::unordered_multimap> map; }; + class side_effect_map : private std::unordered_map { + private: + using map_base = std::unordered_map; + + public: + using typename map_base::const_iterator, map_base::value_type, map_base::key_type, map_base::mapped_type, map_base::const_reference, + map_base::const_pointer; + using iterator = const_iterator; + using reference = const_reference; + using pointer = const_pointer; + + using map_base::size, map_base::count, map_base::empty, map_base::cbegin, map_base::cend, map_base::at; + + iterator begin() const { return cbegin(); } + iterator end() const { return cend(); } + iterator find(host_object_id key) const { return map_base::find(key); } + + void add_side_effect(host_object_id hoid, experimental::side_effect_order order); + }; + + struct task_geometry { + int dimensions = 0; + cl::sycl::range<3> global_size{0, 0, 0}; + cl::sycl::id<3> global_offset{}; + cl::sycl::range<3> granularity{1, 1, 1}; + }; + class task : public intrusive_graph_node { public: task_type get_type() const { return type; } @@ -78,15 +105,19 @@ namespace detail { const buffer_access_map& get_buffer_access_map() const { return access_map; } + const side_effect_map& get_side_effect_map() const { return side_effect_map; } + const command_group_storage_base& get_command_group() const { return *cgf; } - int get_dimensions() const { return dimensions; } + const task_geometry& get_geometry() const { return geometry; } + + int get_dimensions() const { return geometry.dimensions; } - cl::sycl::range<3> get_global_size() const { return global_size; } + cl::sycl::range<3> get_global_size() const { return geometry.global_size; } - cl::sycl::id<3> get_global_offset() const { return global_offset; } + cl::sycl::id<3> get_global_offset() const { return geometry.global_offset; } - cl::sycl::range<3> get_granularity() const { return granularity; } + cl::sycl::range<3> get_granularity() const { return geometry.granularity; } const std::string& get_debug_name() const { return debug_name; } @@ -107,56 +138,55 @@ namespace detail { const std::vector& get_reductions() const { return reductions; } static std::unique_ptr make_nop(task_id tid) { - return std::unique_ptr(new task(tid, task_type::NOP, {}, 0, {0, 0, 0}, {}, {1, 1, 1}, nullptr, {}, {}, {})); + return std::unique_ptr(new task(tid, task_type::NOP, collective_group_id{}, task_geometry{}, nullptr, {}, {}, {}, {})); } - static std::unique_ptr make_host_compute(task_id tid, int dimensions, cl::sycl::range<3> global_size, cl::sycl::id<3> global_offset, - cl::sycl::range<3> granularity, std::unique_ptr cgf, buffer_access_map access_map, - std::vector reductions) { - return std::unique_ptr(new task(tid, task_type::HOST_COMPUTE, {}, dimensions, global_size, global_offset, granularity, std::move(cgf), - std::move(access_map), std::move(reductions), {})); + static std::unique_ptr make_host_compute(task_id tid, task_geometry geometry, std::unique_ptr cgf, + buffer_access_map access_map, side_effect_map side_effect_map, std::vector reductions) { + return std::unique_ptr(new task(tid, task_type::HOST_COMPUTE, collective_group_id{}, geometry, std::move(cgf), std::move(access_map), + std::move(side_effect_map), std::move(reductions), {})); } - static std::unique_ptr make_device_compute(task_id tid, int dimensions, cl::sycl::range<3> global_size, cl::sycl::id<3> global_offset, - cl::sycl::range<3> granularity, std::unique_ptr cgf, buffer_access_map access_map, std::vector reductions, - std::string debug_name) { - return std::unique_ptr(new task(tid, task_type::DEVICE_COMPUTE, {}, dimensions, global_size, global_offset, granularity, std::move(cgf), - std::move(access_map), std::move(reductions), std::move(debug_name))); + static std::unique_ptr make_device_compute(task_id tid, task_geometry geometry, std::unique_ptr cgf, + buffer_access_map access_map, std::vector reductions, std::string debug_name) { + return std::unique_ptr(new task(tid, task_type::DEVICE_COMPUTE, collective_group_id{}, geometry, std::move(cgf), std::move(access_map), {}, + std::move(reductions), std::move(debug_name))); } - static std::unique_ptr make_collective( - task_id tid, collective_group_id cgid, size_t num_collective_nodes, std::unique_ptr cgf, buffer_access_map access_map) { - return std::unique_ptr(new task(tid, task_type::COLLECTIVE, cgid, 1, detail::range_cast<3>(cl::sycl::range<1>{num_collective_nodes}), {}, - {1, 1, 1}, std::move(cgf), std::move(access_map), {}, {})); + static std::unique_ptr make_collective(task_id tid, collective_group_id cgid, size_t num_collective_nodes, + std::unique_ptr cgf, buffer_access_map access_map, side_effect_map side_effect_map) { + const task_geometry geometry{1, detail::range_cast<3>(cl::sycl::range<1>{num_collective_nodes}), {}, {1, 1, 1}}; + return std::unique_ptr( + new task(tid, task_type::COLLECTIVE, cgid, geometry, std::move(cgf), std::move(access_map), std::move(side_effect_map), {}, {})); } - static std::unique_ptr make_master_node(task_id tid, std::unique_ptr cgf, buffer_access_map access_map) { - return std::unique_ptr(new task(tid, task_type::MASTER_NODE, {}, 0, {0, 0, 0}, {}, {1, 1, 1}, std::move(cgf), std::move(access_map), {}, {})); + static std::unique_ptr make_master_node( + task_id tid, std::unique_ptr cgf, buffer_access_map access_map, side_effect_map side_effect_map) { + return std::unique_ptr(new task(tid, task_type::MASTER_NODE, collective_group_id{}, task_geometry{}, std::move(cgf), std::move(access_map), + std::move(side_effect_map), {}, {})); } static std::unique_ptr make_horizon_task(task_id tid) { - return std::unique_ptr(new task(tid, task_type::HORIZON, {}, 0, {0, 0, 0}, {}, {1, 1, 1}, nullptr, {}, {}, {})); + return std::unique_ptr(new task(tid, task_type::HORIZON, collective_group_id{}, task_geometry{}, nullptr, {}, {}, {}, {})); } private: task_id tid; task_type type; collective_group_id cgid; - int dimensions; - cl::sycl::range<3> global_size; - cl::sycl::id<3> global_offset; - cl::sycl::range<3> granularity; + task_geometry geometry; std::unique_ptr cgf; buffer_access_map access_map; + detail::side_effect_map side_effect_map; std::vector reductions; std::string debug_name; - task(task_id tid, task_type type, collective_group_id cgid, int dimensions, cl::sycl::range<3> global_size, cl::sycl::id<3> global_offset, - cl::sycl::range<3> granularity, std::unique_ptr cgf, buffer_access_map access_map, std::vector reductions, - std::string debug_name) - : tid(tid), type(type), cgid(cgid), dimensions(dimensions), global_size(global_size), global_offset(global_offset), granularity(granularity), - cgf(std::move(cgf)), access_map(std::move(access_map)), reductions(std::move(reductions)), debug_name(std::move(debug_name)) { - assert(type == task_type::HOST_COMPUTE || type == task_type::DEVICE_COMPUTE || granularity.size() == 1); + task(task_id tid, task_type type, collective_group_id cgid, task_geometry geometry, std::unique_ptr cgf, + buffer_access_map access_map, detail::side_effect_map side_effect_map, std::vector reductions, std::string debug_name) + : tid(tid), type(type), cgid(cgid), geometry(geometry), cgf(std::move(cgf)), access_map(std::move(access_map)), + side_effect_map(std::move(side_effect_map)), reductions(std::move(reductions)), debug_name(std::move(debug_name)) { + assert(type == task_type::HOST_COMPUTE || type == task_type::DEVICE_COMPUTE || get_granularity().size() == 1); + assert((type != task_type::HOST_COMPUTE && type != task_type::COLLECTIVE && type != task_type::MASTER_NODE) || side_effect_map.empty()); } }; diff --git a/include/task_manager.h b/include/task_manager.h index 6a7561a9a..81d2df280 100644 --- a/include/task_manager.h +++ b/include/task_manager.h @@ -118,6 +118,9 @@ namespace detail { std::unordered_map last_collective_tasks; + // Stores which host object was last affected by which task. + std::unordered_map host_object_last_effects; + // For simplicity we use a single mutex to control access to all task-related (i.e. the task graph, task_map, ...) data structures. mutable std::mutex task_mutex; diff --git a/include/types.h b/include/types.h index e3ed8b28c..1a3612bf2 100644 --- a/include/types.h +++ b/include/types.h @@ -54,3 +54,12 @@ MAKE_PHANTOM_TYPE(node_id, size_t) MAKE_PHANTOM_TYPE(command_id, size_t) MAKE_PHANTOM_TYPE(collective_group_id, size_t) MAKE_PHANTOM_TYPE(reduction_id, size_t) +MAKE_PHANTOM_TYPE(host_object_id, size_t) + + +// declared in this header for include-dependency reasons +namespace celerity::experimental { + +enum class side_effect_order { sequential }; + +} diff --git a/src/graph_generator.cc b/src/graph_generator.cc index 739a48cf6..755ed1ceb 100644 --- a/src/graph_generator.cc +++ b/src/graph_generator.cc @@ -92,6 +92,7 @@ namespace detail { // TODO: At some point we might want to do this also before calling transformers // --> So that more advanced transformations can also take data transfers into account process_task_data_requirements(tid); + process_task_side_effect_requirements(tid); } using buffer_requirements_map = std::unordered_map>>; @@ -475,6 +476,29 @@ namespace detail { } } + void graph_generator::process_task_side_effect_requirements(const task_id tid) { + const auto tsk = task_mngr.get_task(tid); + if(tsk->get_side_effect_map().empty()) return; // skip the loop in the common case + + for(const auto cmd : cdag.task_commands(tid)) { + auto& nd = node_data.at(cmd->get_nid()); + + for(const auto& side_effect : tsk->get_side_effect_map()) { + const auto [hoid, order] = side_effect; + if(const auto last_effect = nd.host_object_last_effects.find(hoid); last_effect != nd.host_object_last_effects.end()) { + // TODO once we have different side_effect_orders, their interaction will determine the dependency kind + cdag.add_dependency(cmd, cdag.get(last_effect->second), dependency_kind::TRUE_DEP); + } + + // Simplification: If there are multiple chunks per node, we generate true-dependencies between them in an arbitrary order, when all we really + // need is mutual exclusion (i.e. a bi-directional pseudo-dependency). + nd.host_object_last_effects.insert_or_assign(hoid, cmd->get_cid()); + + cmd->debug_label += fmt::format("affect host-object {}\n", hoid); + } + } + } + void graph_generator::generate_horizon(task_id tid) { detail::command_id lowest_prev_hid = 0; for(node_id node = 0; node < num_nodes; ++node) { @@ -501,6 +525,9 @@ namespace detail { for(auto& [cgid, cid] : this_node_data.last_collective_commands) { cid = std::max(prev_hid, cid); } + for(auto& [cgid, cid] : this_node_data.host_object_last_effects) { + cid = std::max(prev_hid, cid); + } // update lowest previous horizon id (for later command deletion) if(lowest_prev_hid == 0) { lowest_prev_hid = prev_hid; diff --git a/src/runtime.cc b/src/runtime.cc index 084f9ee96..45a045a91 100644 --- a/src/runtime.cc +++ b/src/runtime.cc @@ -19,6 +19,7 @@ #include "executor.h" #include "graph_generator.h" #include "graph_serializer.h" +#include "host_object.h" #include "logger.h" #include "mpi_support.h" #include "scheduler.h" @@ -122,6 +123,7 @@ namespace detail { }); reduction_mngr = std::make_unique(); + host_object_mngr = std::make_unique(); task_mngr = std::make_unique(num_nodes, h_queue.get(), reduction_mngr.get()); exec = std::make_unique(local_nid, *h_queue, *d_queue, *task_mngr, *buffer_mngr, *reduction_mngr, default_logger); if(is_master_node()) { @@ -149,6 +151,7 @@ namespace detail { exec.reset(); task_mngr.reset(); reduction_mngr.reset(); + host_object_mngr.reset(); // All buffers should have unregistered themselves by now. assert(!buffer_mngr->has_active_buffers()); buffer_mngr.reset(); @@ -223,6 +226,8 @@ namespace detail { reduction_manager& runtime::get_reduction_manager() const { return *reduction_mngr; } + host_object_manager& runtime::get_host_object_manager() const { return *host_object_mngr; } + void runtime::broadcast_control_command(command_type cmd, const command_data& data) { assert_true(is_master_node()) << "Control commands should only be broadcast from the master"; for(auto n = 0u; n < num_nodes; ++n) { @@ -243,6 +248,7 @@ namespace detail { if(is_active) return; if(is_shutting_down) return; if(buffer_mngr->has_active_buffers()) return; + if(host_object_mngr->has_active_objects()) return; instance.reset(); } diff --git a/src/task.cc b/src/task.cc index b2ebc991f..911fb15cb 100644 --- a/src/task.cc +++ b/src/task.cc @@ -59,5 +59,9 @@ namespace detail { return result; } + void side_effect_map::add_side_effect(const host_object_id hoid, const experimental::side_effect_order order) { + // TODO for multiple side effects on the same hoid, find the weakest order satisfying all of them + emplace(hoid, order); + } } // namespace detail } // namespace celerity diff --git a/src/task_manager.cc b/src/task_manager.cc index 8e292b78b..2f3378bc3 100644 --- a/src/task_manager.cc +++ b/src/task_manager.cc @@ -160,6 +160,14 @@ namespace detail { } } + for(const auto& side_effect : tsk->get_side_effect_map()) { + const auto [hoid, order] = side_effect; + if(const auto last_effect = host_object_last_effects.find(hoid); last_effect != host_object_last_effects.end()) { + add_dependency(tsk.get(), task_map.at(last_effect->second).get(), dependency_kind::TRUE_DEP); + } + host_object_last_effects.insert_or_assign(hoid, tid); + } + if(auto cgid = tsk->get_collective_group_id(); cgid != 0) { if(auto prev = last_collective_tasks.find(cgid); prev != last_collective_tasks.end()) { add_dependency(tsk.get(), task_map.at(prev->second).get(), dependency_kind::ORDER_DEP); @@ -218,6 +226,9 @@ namespace detail { for(auto& [cgid, tid] : last_collective_tasks) { tid = std::max(prev_hid, tid); } + for(auto& [hoid, tid] : host_object_last_effects) { + tid = std::max(prev_hid, tid); + } // We also use the previous horizon as the new init task for host-initialized buffers current_init_task_id = prev_hid; diff --git a/test/graph_compaction_tests.cc b/test/graph_compaction_tests.cc index 2367f9778..64367df45 100644 --- a/test/graph_compaction_tests.cc +++ b/test/graph_compaction_tests.cc @@ -333,6 +333,25 @@ namespace detail { maybe_print_graphs(ctx); } + static void check_task_commands_depend_on_horizon_only( + task_id dependency_without_horizon, task_id task, dependency_kind kind, test_utils::cdag_test_context& ctx) { + const auto& inspector = ctx.get_inspector(); + auto& cdag = ctx.get_command_graph(); + const auto first_commands = inspector.get_commands(dependency_without_horizon, std::nullopt, std::nullopt); + const auto second_commands = inspector.get_commands(task, std::nullopt, std::nullopt); + for(const auto second_cid : second_commands) { + for(const auto first_cid : first_commands) { + CHECK(!inspector.has_dependency(second_cid, first_cid)); + } + const auto second_deps = cdag.get(second_cid)->get_dependencies(); + CHECK(std::distance(second_deps.begin(), second_deps.end()) == 1); + for(const auto& dep : second_deps) { + CHECK(dep.kind == kind); + CHECK(isa(dep.node)); + } + } + } + TEST_CASE("commands for collective host tasks do not order-depend on their predecessor if it is shadowed by a horizon", "[graph_generator][command-graph][horizon]") { // Regression test: the order-dependencies between host tasks in the same collective group are built by tracking the last task command in each @@ -357,22 +376,37 @@ namespace detail { // This must depend on the first horizon, not first_collective const auto second_collective = test_utils::build_and_flush(ctx, test_utils::add_host_task(tm, experimental::collective, [&](handler& cgh) {})); - const auto& inspector = ctx.get_inspector(); - auto& cdag = ctx.get_command_graph(); - const auto first_commands = inspector.get_commands(first_collective, std::nullopt, std::nullopt); - const auto second_commands = inspector.get_commands(second_collective, std::nullopt, std::nullopt); - for(const auto second_cid : second_commands) { - for(const auto first_cid : first_commands) { - CHECK(!inspector.has_dependency(second_cid, first_cid)); - } - const auto second_deps = cdag.get(second_cid)->get_dependencies(); - CHECK(std::distance(second_deps.begin(), second_deps.end()) == 1); - for(const auto& dep : second_deps) { - CHECK(dep.kind == dependency_kind::ORDER_DEP); - CHECK(dynamic_cast(dep.node)); - } + check_task_commands_depend_on_horizon_only(first_collective, second_collective, dependency_kind::ORDER_DEP, ctx); + + maybe_print_graphs(ctx); + } + + TEST_CASE("side-effect dependencies are correctly subsumed by horizons", "[graph_generator][command-graph][horizon]") { + const size_t num_nodes = 1; + test_utils::cdag_test_context ctx(num_nodes); + auto& tm = ctx.get_task_manager(); + tm.set_horizon_step(2); + + test_utils::mock_host_object_factory mhof; + auto ho = mhof.create_host_object(); + const auto first_task = test_utils::build_and_flush( + ctx, test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { ho.add_side_effect(cgh, experimental::side_effect_order::sequential); })); + + // generate exactly two horizons + auto& ggen = ctx.get_graph_generator(); + test_utils::mock_buffer_factory mbf(&tm, &ggen); + auto buf = mbf.create_buffer(range<1>(1)); + for(int i = 0; i < 5; ++i) { + test_utils::build_and_flush( + ctx, test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, all{}); })); } + // This must depend on the first horizon, not first_task + const auto second_task = test_utils::build_and_flush( + ctx, test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { ho.add_side_effect(cgh, experimental::side_effect_order::sequential); })); + + check_task_commands_depend_on_horizon_only(first_task, second_task, dependency_kind::TRUE_DEP, ctx); + maybe_print_graphs(ctx); } diff --git a/test/graph_generation_tests.cc b/test/graph_generation_tests.cc index 0feb3b960..15f2d7617 100644 --- a/test/graph_generation_tests.cc +++ b/test/graph_generation_tests.cc @@ -1523,5 +1523,65 @@ namespace detail { } } + TEST_CASE("side effects generate appropriate command-dependencies", "[graph_generator][command-graph][side-effect]") { + using order = experimental::side_effect_order; + + // Must be static for Catch2 GENERATE, which implicitly generates sections for each value and therefore cannot depend on runtime values + static constexpr auto side_effect_orders = {order::sequential}; + + constexpr size_t num_nodes = 2; + const range<1> node_range{num_nodes}; + + // TODO placeholder: complete with dependency types for other side effect orders + const auto expected_dependencies = std::unordered_map, std::optional, pair_hash>{ + {{order::sequential, order::sequential}, dependency_kind::TRUE_DEP}, + }; + + const auto order_a = GENERATE(values(side_effect_orders)); + const auto order_b = GENERATE(values(side_effect_orders)); + CAPTURE(order_a); + CAPTURE(order_b); + + test_utils::cdag_test_context ctx(num_nodes); + auto& tm = ctx.get_task_manager(); + auto& ggen = ctx.get_graph_generator(); + test_utils::mock_host_object_factory mhof; + + auto ho_common = mhof.create_host_object(); // should generate dependencies + auto ho_a = mhof.create_host_object(); // should NOT generate dependencies + auto ho_b = mhof.create_host_object(); // -"- + const auto tid_0 = test_utils::build_and_flush(ctx, num_nodes, test_utils::add_host_task(tm, node_range, [&](handler& cgh) { // + ho_a.add_side_effect(cgh, order_a); + })); + const auto tid_1 = test_utils::build_and_flush(ctx, num_nodes, test_utils::add_host_task(tm, node_range, [&](handler& cgh) { // + ho_common.add_side_effect(cgh, order_a); + ho_b.add_side_effect(cgh, order_b); + })); + const auto tid_2 = test_utils::build_and_flush(ctx, num_nodes, test_utils::add_host_task(tm, node_range, [&](handler& cgh) { // + ho_common.add_side_effect(cgh, order_b); + })); + + auto& inspector = ctx.get_inspector(); + auto& cdag = ctx.get_command_graph(); + + for(auto tid : {tid_0, tid_1}) { + for(auto cid : inspector.get_commands(tid, std::nullopt, std::nullopt)) { + const auto deps = cdag.get(cid)->get_dependencies(); + CHECK(deps.empty()); + } + } + + const auto expected_2 = expected_dependencies.at({order_a, order_b}); + for(auto cid_2 : inspector.get_commands(tid_2, std::nullopt, std::nullopt)) { + const auto deps_2 = cdag.get(cid_2)->get_dependencies(); + // This assumes no oversubscription in the split, adjust if necessary: + CHECK(std::distance(deps_2.begin(), deps_2.end()) == expected_2.has_value()); + if(expected_2) { + const auto& dep_tcmd = dynamic_cast(*deps_2.front().node); + CHECK(dep_tcmd.get_tid() == tid_1); + } + } + } + } // namespace detail } // namespace celerity diff --git a/test/runtime_tests.cc b/test/runtime_tests.cc index 8e877810b..074bbb19d 100644 --- a/test/runtime_tests.cc +++ b/test/runtime_tests.cc @@ -2425,5 +2425,43 @@ namespace detail { } } + TEST_CASE("side_effect API works as expected on a single node", "[side-effect]") { + distr_queue q; + + experimental::host_object owned_ho{std::vector{}}; + std::vector exterior; + experimental::host_object ref_ho{std::ref(exterior)}; + experimental::host_object void_ho; + + q.submit([=](handler& cgh) { + experimental::side_effect append_owned{owned_ho, cgh}; + experimental::side_effect append_ref{ref_ho, cgh}; + experimental::side_effect track_void{void_ho, cgh}; + cgh.host_task(on_master_node, [=] { + (*append_owned).push_back(1); + (*append_ref).push_back(1); + }); + }); + + q.submit([=](handler& cgh) { + experimental::side_effect append_owned{owned_ho, cgh}; + experimental::side_effect append_ref{ref_ho, cgh}; + experimental::side_effect track_void{void_ho, cgh}; + cgh.host_task(on_master_node, [=] { + append_owned->push_back(2); + append_ref->push_back(2); + }); + }); + + q.submit([=](handler& cgh) { + experimental::side_effect check_owned{owned_ho, cgh}; + cgh.host_task(on_master_node, [=] { CHECK(*check_owned == std::vector{1, 2}); }); + }); + + q.slow_full_sync(); + + CHECK(exterior == std::vector{1, 2}); + } + } // namespace detail } // namespace celerity diff --git a/test/task_graph_tests.cc b/test/task_graph_tests.cc index e95dffbf5..c71fc82d2 100644 --- a/test/task_graph_tests.cc +++ b/test/task_graph_tests.cc @@ -554,5 +554,76 @@ namespace detail { CHECK(has_any_dependency(tm, read_tid, write_tid) == (!write_empty && !read_empty)); } + TEST_CASE("side effects generate appropriate task-dependencies", "[task_manager][task-graph][side-effect]") { + using order = experimental::side_effect_order; + static constexpr auto side_effect_orders = {order::sequential}; + + // TODO placeholder: complete with dependency types for other side effect orders + const auto expected_dependencies = std::unordered_map, std::optional, pair_hash>{ + {{order::sequential, order::sequential}, dependency_kind::TRUE_DEP}}; + + const auto order_a = GENERATE(values(side_effect_orders)); + const auto order_b = GENERATE(values(side_effect_orders)); + + CAPTURE(order_a); + CAPTURE(order_b); + + task_manager tm{1, nullptr, nullptr}; + test_utils::mock_host_object_factory mhof; + + auto ho_common = mhof.create_host_object(); // should generate dependencies + auto ho_a = mhof.create_host_object(); // should NOT generate dependencies + auto ho_b = mhof.create_host_object(); // -"- + const auto tid_a = test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { + ho_common.add_side_effect(cgh, order_a); + ho_a.add_side_effect(cgh, order_a); + }); + const auto tid_b = test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { + ho_common.add_side_effect(cgh, order_b); + ho_b.add_side_effect(cgh, order_b); + }); + + const auto deps_a = tm.get_task(tid_a)->get_dependencies(); + CHECK(deps_a.empty()); + + const auto deps_b = tm.get_task(tid_b)->get_dependencies(); + const auto expected_b = expected_dependencies.at({order_a, order_b}); + CHECK(std::distance(deps_b.begin(), deps_b.end()) == expected_b.has_value()); + if(expected_b) { + CHECK(deps_b.front().node == tm.get_task(tid_a)); + CHECK(deps_b.front().kind == *expected_b); + } + } + + TEST_CASE("side-effect dependencies are correctly subsumed by horizons", "[task_manager][task-graph][task-horizon]") { + task_manager tm{1, nullptr, nullptr}; + tm.set_horizon_step(2); + + test_utils::mock_host_object_factory mhof; + auto ho = mhof.create_host_object(); + const auto first_task = + test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { ho.add_side_effect(cgh, experimental::side_effect_order::sequential); }); + + // generate exactly two horizons + test_utils::mock_buffer_factory mbf(&tm); + auto buf = mbf.create_buffer(range<1>(1)); + for(int i = 0; i < 5; ++i) { + test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { buf.get_access(cgh, all{}); }); + } + + // This must depend on the first horizon, not first_task + const auto second_task = + test_utils::add_host_task(tm, on_master_node, [&](handler& cgh) { ho.add_side_effect(cgh, experimental::side_effect_order::sequential); }); + + const auto& second_deps = tm.get_task(second_task)->get_dependencies(); + CHECK(std::distance(second_deps.begin(), second_deps.end()) == 1); + for(const auto& dep : second_deps) { + const auto type = dep.node->get_type(); + CHECK(type == task_type::HORIZON); + CHECK(dep.kind == dependency_kind::TRUE_DEP); + } + + maybe_print_graph(tm); + } } // namespace detail } // namespace celerity \ No newline at end of file diff --git a/test/test_utils.h b/test/test_utils.h index e3f1e42b1..eb73e55bc 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -69,6 +69,7 @@ namespace test_utils { } class mock_buffer_factory; + class mock_host_object_factory; template class mock_buffer { @@ -92,6 +93,24 @@ namespace test_utils { mock_buffer(detail::buffer_id id, cl::sycl::range size) : id(id), size(size) {} }; + class mock_host_object { + public: + void add_side_effect(handler& cgh, const experimental::side_effect_order order) { + if(detail::is_prepass_handler(cgh)) { + auto& prepass_cgh = static_cast(cgh); + prepass_cgh.add_requirement(id, order); + } + } + + private: + friend class mock_host_object_factory; + + detail::host_object_id id; + + public: + explicit mock_host_object(detail::host_object_id id) : id(id) {} + }; + class cdag_inspector { public: auto get_cb() { @@ -207,8 +226,8 @@ namespace test_utils { class mock_buffer_factory { public: - mock_buffer_factory(detail::task_manager* tm = nullptr, detail::graph_generator* ggen = nullptr) : task_mngr(tm), ggen(ggen) {} - mock_buffer_factory(cdag_test_context& ctx) : task_mngr(&ctx.get_task_manager()), ggen(&ctx.get_graph_generator()) {} + explicit mock_buffer_factory(detail::task_manager* tm = nullptr, detail::graph_generator* ggen = nullptr) : task_mngr(tm), ggen(ggen) {} + explicit mock_buffer_factory(cdag_test_context& ctx) : task_mngr(&ctx.get_task_manager()), ggen(&ctx.get_graph_generator()) {} template mock_buffer create_buffer(cl::sycl::range size, bool mark_as_host_initialized = false) { @@ -225,6 +244,14 @@ namespace test_utils { detail::buffer_id next_buffer_id = 0; }; + class mock_host_object_factory { + public: + mock_host_object create_host_object() { return mock_host_object{next_id++}; } + + private: + detail::host_object_id next_id = 0; + }; + template detail::task_id add_compute_task( detail::task_manager& tm, CGF cgf, cl::sycl::range global_size = {1, 1}, cl::sycl::id global_offset = {}) {