From c60527f67736505f2d9293ed983103e231d69991 Mon Sep 17 00:00:00 2001 From: Fabian Knorr Date: Thu, 5 May 2022 14:48:04 +0200 Subject: [PATCH] Address reviewer commands on Frames Duplicates dependency count in command_frame --- include/buffer_transfer_manager.h | 2 +- include/command.h | 8 +++- include/executor.h | 11 +++-- include/mpi_support.h | 72 ++++++++++++++++++------------- src/buffer_transfer_manager.cc | 11 ++--- src/executor.cc | 13 +++--- src/graph_serializer.cc | 3 +- src/runtime.cc | 4 +- test/test_utils.h | 10 ++--- 9 files changed, 77 insertions(+), 57 deletions(-) diff --git a/include/buffer_transfer_manager.h b/include/buffer_transfer_manager.h index 580a2db88..7e3abd504 100644 --- a/include/buffer_transfer_manager.h +++ b/include/buffer_transfer_manager.h @@ -44,7 +44,7 @@ namespace detail { reduction_id rid; // zero if this does not belong to a reduction subrange<3> sr; command_id push_cid; - alignas(std::max_align_t) payload_type data[0]; + alignas(std::max_align_t) payload_type data[]; // max_align to allow reinterpret_casting a pointer to this member to any buffer element pointer }; struct transfer_in { diff --git a/include/command.h b/include/command.h index b59f6c86c..e4256ad97 100644 --- a/include/command.h +++ b/include/command.h @@ -224,13 +224,17 @@ namespace detail { struct command_frame { using payload_type = command_id; + command_pkg pkg; + size_t num_dependencies = 0; // This information is duplicated from unique_frame_ptr::get_payload_count() so that we can still use a + // `const command_frame &` without its owning pointer + payload_type dependencies[]; + // variable-sized structure command_frame() = default; command_frame(const command_frame&) = delete; command_frame& operator=(const command_frame&) = delete; - command_pkg pkg; - payload_type dependencies[0]; + iterable_range iter_dependencies() const { return {dependencies, dependencies + num_dependencies}; } }; } // namespace detail diff --git a/include/executor.h b/include/executor.h index b19c48293..8e6681fd7 100644 --- a/include/executor.h +++ b/include/executor.h @@ -80,15 +80,14 @@ namespace detail { bool first_command_received = false; template - void create_job(const unique_frame_ptr& frame, Args&&... args) { - const auto& pkg = frame->pkg; + void create_job(const command_frame& frame, Args&&... args) { + const auto& pkg = frame.pkg; jobs[pkg.cid] = {std::make_unique(pkg, std::forward(args)...), pkg.get_command_type(), {}, 0}; // If job doesn't exist we assume it has already completed. // This is true as long as we're respecting task-graph (anti-)dependencies when processing tasks. - for(size_t i = 0; i < frame.get_payload_size(); ++i) { - const auto it = jobs.find(frame->dependencies[i]); - if(it != jobs.end()) { + for(const auto dcid : frame.iter_dependencies()) { + if(const auto it = jobs.find(dcid); it != jobs.end()) { it->second.dependents.push_back(pkg.cid); jobs[pkg.cid].unsatisfied_dependencies++; } @@ -96,7 +95,7 @@ namespace detail { } void run(); - bool handle_command(const unique_frame_ptr& frame); + bool handle_command(const command_frame& frame); void update_metrics(); }; diff --git a/include/mpi_support.h b/include/mpi_support.h index 81a6b946c..5c40f0a71 100644 --- a/include/mpi_support.h +++ b/include/mpi_support.h @@ -14,63 +14,77 @@ namespace mpi_support { } // namespace mpi_support -struct from_payload_size_tag { -} inline constexpr from_payload_size; +struct from_payload_count_tag { +} inline constexpr from_payload_count; -struct from_frame_bytes_tag { -} inline constexpr from_frame_bytes; +struct from_size_bytes_tag { +} inline constexpr from_size_bytes; + +// unique_frame_ptr manually `operator new`s the underlying frame memory, placement-new-constructs the frame and casts it to a frame pointer. +// I'm convinced that I'm actually, technically allowed to use the resulting frame pointer in a delete-expression and therefore keep `std::default_delete` as +// the deleter type for `unique_frame_ptr::impl`: Following the standard, delete-expression requires its operand to originate from a new-expression, +// and placement-new is defined to be a new-expression. The following implicit call to operator delete is also legal, since memory was obtained from +// `operator new`. Despite the beauty of this standards loophole, @BlackMark29A and @PeterTh couldn't be convinced to let me merge it :( -- @fknorr +template +struct unique_frame_delete { + void operator()(Frame* frame) const { + if(frame) { + frame->~Frame(); + operator delete(frame); + } + } +}; /** * Owning smart pointer for variable-sized structures with a 0-sized array of type Frame::payload_type as the last member. */ template -class unique_frame_ptr : private std::unique_ptr { +class unique_frame_ptr : private std::unique_ptr> { private: - using impl = std::unique_ptr; + using impl = std::unique_ptr>; public: using payload_type = typename Frame::payload_type; unique_frame_ptr() = default; - unique_frame_ptr(from_payload_size_tag, size_t payload_size) - : impl(static_cast(operator new(frame_bytes_from_payload_size(payload_size)))), payload_size(payload_size) { - new(impl::get()) Frame; // permits later deletion through std::default_deleter - } + unique_frame_ptr(from_payload_count_tag, size_t payload_count) : unique_frame_ptr(from_size_bytes, sizeof(Frame) + sizeof(payload_type) * payload_count) {} - unique_frame_ptr(from_frame_bytes_tag, size_t frame_bytes) - : impl(static_cast(operator new(frame_bytes))), payload_size(payload_size_from_frame_bytes(frame_bytes)) { - new(impl::get()) Frame; // permits later deletion through std::default_deleter - } + unique_frame_ptr(from_size_bytes_tag, size_t size_bytes) : impl(make_frame(size_bytes)), size_bytes(size_bytes) {} - unique_frame_ptr(unique_frame_ptr&& other) noexcept : impl(static_cast(other)), payload_size(other.payload_size) { other.payload_size = 0; } + unique_frame_ptr(unique_frame_ptr&& other) noexcept : impl(static_cast(other)), size_bytes(other.size_bytes) { other.size_bytes = 0; } unique_frame_ptr& operator=(unique_frame_ptr&& other) noexcept { - static_cast(*this) = static_cast(other); - payload_size = other.payload_size; - other.payload_size = 0; + if(this == &other) return *this; // gracefully handle self-assignment + static_cast(*this) = static_cast(other); // delegate to base class unique_ptr::operator=() to delete previously held frame + size_bytes = other.size_bytes; + other.size_bytes = 0; return *this; } Frame* get_pointer() { return impl::get(); } const Frame* get_pointer() const { return impl::get(); } - size_t get_payload_size() const { return payload_size; } - size_t get_frame_size_bytes() const { return frame_bytes_from_payload_size(payload_size); } + size_t get_size_bytes() const { return size_bytes; } + size_t get_payload_count() const { return (size_bytes - sizeof(Frame)) / sizeof(payload_type); } using impl::operator bool; using impl::operator*; using impl::operator->; private: - size_t payload_size = 0; - - static size_t frame_bytes_from_payload_size(size_t payload_size) { // - return sizeof(Frame) + sizeof(payload_type) * payload_size; - } - - static size_t payload_size_from_frame_bytes(size_t frame_bytes) { - assert(frame_bytes >= sizeof(Frame) && (frame_bytes - sizeof(Frame)) % sizeof(payload_type) == 0); - return (frame_bytes - sizeof(Frame)) / sizeof(payload_type); + size_t size_bytes = 0; + + static Frame* make_frame(const size_t size_bytes) { + assert(size_bytes >= sizeof(Frame)); + assert((size_bytes - sizeof(Frame)) % sizeof(payload_type) == 0); + const auto mem = operator new(size_bytes); + try { + new(mem) Frame; + } catch(...) { + operator delete(mem); + throw; + } + return static_cast(mem); } }; diff --git a/src/buffer_transfer_manager.cc b/src/buffer_transfer_manager.cc index 60ffcce96..113f8f038 100644 --- a/src/buffer_transfer_manager.cc +++ b/src/buffer_transfer_manager.cc @@ -24,18 +24,19 @@ namespace detail { runtime::get_instance().get_buffer_manager().get_buffer_data(data.bid, cl::sycl::range<3>(data.sr.offset[0], data.sr.offset[1], data.sr.offset[2]), cl::sycl::range<3>(data.sr.range[0], data.sr.range[1], data.sr.range[2])); - unique_frame_ptr frame(from_payload_size, raw_data.get_size()); + unique_frame_ptr frame(from_payload_count, raw_data.get_size()); frame->sr = data.sr; frame->bid = data.bid; frame->rid = data.rid; frame->push_cid = pkg.cid; memcpy(frame->data, raw_data.get_pointer(), raw_data.get_size()); - CELERITY_TRACE("Ready to send {} of buffer {} ({} B) to {}", data.sr, data.bid, frame.get_payload_size(), data.target); + CELERITY_TRACE("Ready to send {} of buffer {} ({} B) to {}", data.sr, data.bid, frame.get_size_bytes(), data.target); // Start transmitting data MPI_Request req; - MPI_Isend(frame.get_pointer(), static_cast(frame.get_frame_size_bytes()), MPI_BYTE, static_cast(data.target), mpi_support::TAG_DATA_TRANSFER, + assert(frame.get_size_bytes() <= static_cast(std::numeric_limits::max())); + MPI_Isend(frame.get_pointer(), static_cast(frame.get_size_bytes()), MPI_BYTE, static_cast(data.target), mpi_support::TAG_DATA_TRANSFER, MPI_COMM_WORLD, &req); auto transfer = std::make_unique(); @@ -91,7 +92,7 @@ namespace detail { auto transfer = std::make_unique(); transfer->source_nid = static_cast(status.MPI_SOURCE); - transfer->frame = unique_frame_ptr(from_frame_bytes, static_cast(frame_bytes)); + transfer->frame = unique_frame_ptr(from_size_bytes, static_cast(frame_bytes)); // Start receiving data MPI_Imrecv(transfer->frame.get_pointer(), frame_bytes, MPI_BYTE, &msg, &transfer->request); @@ -145,7 +146,7 @@ namespace detail { void buffer_transfer_manager::commit_transfer(transfer_in& transfer) { const auto& frame = *transfer.frame; - const size_t elem_size = transfer.frame.get_payload_size() / (frame.sr.range[0] * frame.sr.range[1] * frame.sr.range[2]); + const size_t elem_size = transfer.frame.get_payload_count() / (frame.sr.range[0] * frame.sr.range[1] * frame.sr.range[2]); raw_buffer_data raw_data{elem_size, frame.sr.range}; memcpy(raw_data.get_pointer(), frame.data, raw_data.get_size()); if(frame.rid) { diff --git a/src/executor.cc b/src/executor.cc index a507d9bf2..f37904427 100644 --- a/src/executor.cc +++ b/src/executor.cc @@ -114,8 +114,9 @@ namespace detail { if(flag == 1) { int frame_bytes; MPI_Get_count(&status, MPI_BYTE, &frame_bytes); - unique_frame_ptr frame(from_frame_bytes, static_cast(frame_bytes)); + unique_frame_ptr frame(from_size_bytes, static_cast(frame_bytes)); MPI_Mrecv(frame.get_pointer(), frame_bytes, MPI_BYTE, &msg, &status); + assert(frame->num_dependencies == frame.get_payload_count()); command_queue.push(std::move(frame)); if(!first_command_received) { @@ -126,7 +127,7 @@ namespace detail { } if(jobs.size() < MAX_CONCURRENT_JOBS && !command_queue.empty()) { - if(!handle_command(command_queue.front())) { + if(!handle_command(*command_queue.front())) { // In case the command couldn't be handled, don't pop it from the queue. continue; } @@ -139,20 +140,20 @@ namespace detail { assert(running_device_compute_jobs == 0); } - bool executor::handle_command(const unique_frame_ptr& frame) { + bool executor::handle_command(const command_frame& frame) { // A worker might receive a task command before creating the corresponding task graph node - if(const auto tid = frame->pkg.get_tid()) { + if(const auto tid = frame.pkg.get_tid()) { if(!task_mngr.has_task(*tid)) { return false; } } - switch(frame->pkg.get_command_type()) { + switch(frame.pkg.get_command_type()) { case command_type::HORIZON: create_job(frame, task_mngr); break; case command_type::EPOCH: create_job(frame, task_mngr); break; case command_type::PUSH: create_job(frame, *btm, buffer_mngr); break; case command_type::AWAIT_PUSH: create_job(frame, *btm); break; case command_type::REDUCTION: create_job(frame, reduction_mngr); break; case command_type::EXECUTION: - if(task_mngr.get_task(frame->pkg.get_tid().value())->get_execution_target() == execution_target::HOST) { + if(task_mngr.get_task(frame.pkg.get_tid().value())->get_execution_target() == execution_target::HOST) { create_job(frame, h_queue, task_mngr, buffer_mngr); } else { create_job(frame, d_queue, task_mngr, buffer_mngr, reduction_mngr, local_nid); diff --git a/src/graph_serializer.cc b/src/graph_serializer.cc index 47a460548..6bae91efa 100644 --- a/src/graph_serializer.cc +++ b/src/graph_serializer.cc @@ -88,7 +88,7 @@ namespace detail { void graph_serializer::serialize_and_flush(abstract_command* cmd, const std::vector& dependencies) const { assert(!cmd->is_flushed() && "Command has already been flushed."); - unique_frame_ptr frame(from_payload_size, dependencies.size()); + unique_frame_ptr frame(from_payload_count, dependencies.size()); frame->pkg.cid = cmd->get_cid(); if(const auto* ecmd = dynamic_cast(cmd)) { @@ -108,6 +108,7 @@ namespace detail { assert(false && "Unknown command"); } + frame->num_dependencies = dependencies.size(); std::copy(dependencies.begin(), dependencies.end(), frame->dependencies); flush_cb(cmd->get_nid(), std::move(frame)); diff --git a/src/runtime.cc b/src/runtime.cc index 1242418ab..b3dac4ec5 100644 --- a/src/runtime.cc +++ b/src/runtime.cc @@ -263,8 +263,8 @@ namespace detail { // Even though command packages are small enough to use a blocking send we want to be able to send to the master node as well, // which is why we have to use Isend after all. We also have to make sure that the buffer stays around until the send is complete. MPI_Request req; - MPI_Isend(frame.get_pointer(), static_cast(frame.get_frame_size_bytes()), MPI_BYTE, static_cast(target), mpi_support::TAG_CMD, MPI_COMM_WORLD, - &req); + MPI_Isend( + frame.get_pointer(), static_cast(frame.get_size_bytes()), MPI_BYTE, static_cast(target), mpi_support::TAG_CMD, MPI_COMM_WORLD, &req); active_flushes.push_back(flush_handle{std::move(frame), req}); // Cleanup finished transfers. diff --git a/test/test_utils.h b/test/test_utils.h index 0a6ac472c..6e7a70fad 100644 --- a/test/test_utils.h +++ b/test/test_utils.h @@ -167,15 +167,15 @@ namespace test_utils { public: auto get_cb() { return [this](detail::node_id nid, detail::unique_frame_ptr frame) { - for(size_t i = 0; i < frame.get_payload_size(); ++i) { +#ifndef NDEBUG + for(const auto dcid : frame->iter_dependencies()) { // Sanity check: All dependencies must have already been flushed - const auto& dep = frame->dependencies[i]; - (void)dep; - assert(commands.count(dep) == 1); + assert(commands.count(dcid) == 1); } +#endif const detail::command_id cid = frame->pkg.cid; - commands[cid] = {nid, frame->pkg, std::vector(frame->dependencies, frame->dependencies + frame.get_payload_size())}; + commands[cid] = {nid, frame->pkg, std::vector(frame->iter_dependencies().begin(), frame->iter_dependencies().end())}; if(const auto tid = frame->pkg.get_tid()) { by_task[*tid].insert(cid); } by_node[nid].insert(cid); };