Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace single-use MPI datatypes with binary serialization #114

Merged
merged 3 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ set(SOURCES
src/executor.cc
src/graph_generator.cc
src/graph_serializer.cc
src/mpi_support.cc
src/print_graph.cc
src/print_utils.cc
src/runtime.cc
Expand Down
18 changes: 11 additions & 7 deletions include/buffer_transfer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,25 @@ namespace detail {
void poll();

private:
struct data_header {
struct data_frame {
using payload_type = std::byte;

// variable-sized structure
data_frame() = default;
data_frame(const data_frame&) = delete;
data_frame& operator=(const data_frame&) = delete;

buffer_id bid;
reduction_id rid; // zero if this does not belong to a reduction
subrange<3> sr;
command_id push_cid;
alignas(std::max_align_t) payload_type data[]; // max_align to allow reinterpret_casting a pointer to this member to any buffer element pointer
};

struct transfer_in {
node_id source_nid;
MPI_Request request;
data_header header;
raw_buffer_data data;
mpi_support::single_use_data_type data_type;
unique_frame_ptr<data_frame> frame;
};

struct incoming_transfer_handle : transfer_handle {
Expand All @@ -54,9 +60,7 @@ namespace detail {
struct transfer_out {
std::shared_ptr<transfer_handle> handle;
MPI_Request request;
data_header header;
raw_buffer_data data;
mpi_support::single_use_data_type data_type;
unique_frame_ptr<data_frame> frame;
};

std::list<std::unique_ptr<transfer_in>> incoming_transfers;
Expand Down
17 changes: 17 additions & 0 deletions include/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <variant>

#include "intrusive_graph.h"
#include "mpi_support.h"
#include "ranges.h"
#include "task.h"
#include "types.h"
Expand Down Expand Up @@ -220,5 +221,21 @@ namespace detail {
}
};

struct command_frame {
using payload_type = command_id;

command_pkg pkg;
size_t num_dependencies = 0; // This information is duplicated from unique_frame_ptr::get_payload_count() so that we can still use a
// `const command_frame &` without its owning pointer
payload_type dependencies[];

// variable-sized structure
command_frame() = default;
command_frame(const command_frame&) = delete;
command_frame& operator=(const command_frame&) = delete;

iterable_range<const command_id*> iter_dependencies() const { return {dependencies, dependencies + num_dependencies}; }
};

} // namespace detail
} // namespace celerity
10 changes: 5 additions & 5 deletions include/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,22 @@ namespace detail {
bool first_command_received = false;

template <typename Job, typename... Args>
void create_job(const command_pkg& pkg, const std::vector<command_id>& dependencies, Args&&... args) {
void create_job(const command_frame& frame, Args&&... args) {
const auto& pkg = frame.pkg;
jobs[pkg.cid] = {std::make_unique<Job>(pkg, std::forward<Args>(args)...), pkg.get_command_type(), {}, 0};

// If job doesn't exist we assume it has already completed.
// This is true as long as we're respecting task-graph (anti-)dependencies when processing tasks.
for(const command_id& d : dependencies) {
const auto it = jobs.find(d);
if(it != jobs.end()) {
for(const auto dcid : frame.iter_dependencies()) {
if(const auto it = jobs.find(dcid); it != jobs.end()) {
it->second.dependents.push_back(pkg.cid);
jobs[pkg.cid].unsatisfied_dependencies++;
}
}
}

void run();
bool handle_command(const command_pkg& pkg, const std::vector<command_id>& dependencies);
bool handle_command(const command_frame& frame);

void update_metrics();
};
Expand Down
4 changes: 2 additions & 2 deletions include/graph_serializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
#include <functional>
#include <vector>

#include "command.h"
#include "types.h"

namespace celerity {
namespace detail {

struct command_pkg;
class abstract_command;
class task_command;
class command_graph;

class graph_serializer {
using flush_callback = std::function<void(node_id, command_pkg, const std::vector<command_id>&)>;
using flush_callback = std::function<void(node_id, unique_frame_ptr<command_frame>)>;

public:
/*
Expand Down
141 changes: 88 additions & 53 deletions include/mpi_support.h
Original file line number Diff line number Diff line change
@@ -1,56 +1,91 @@
#pragma once

#include <cassert>
#include <memory>
#include <utility>
#include <vector>

#include <mpi.h>

namespace celerity {
namespace detail {
namespace mpi_support {

constexpr int TAG_CMD = 0;
constexpr int TAG_DATA_TRANSFER = 1;
constexpr int TAG_TELEMETRY = 2;

class single_use_data_type {
public:
single_use_data_type() = default;
single_use_data_type(MPI_Datatype dt) : dt(dt){};

single_use_data_type(single_use_data_type&& other) noexcept { *this = std::move(other); }
single_use_data_type& operator=(single_use_data_type&& other) noexcept {
if(this != &other) {
dt = other.dt;
other.dt = MPI_DATATYPE_NULL;
}
return *this;
}

single_use_data_type(const single_use_data_type& other) = delete;
single_use_data_type& operator=(const single_use_data_type& other) = delete;

MPI_Datatype operator*() const { return dt; }

~single_use_data_type() {
if(dt != MPI_DATATYPE_NULL) { MPI_Type_free(&dt); }
}

private:
MPI_Datatype dt = MPI_DATATYPE_NULL;
};

/**
* @brief Constructs a new MPI data type for a particular list of blocks.
*
* The returned data type uses MPI_BYTE internally, with block displacements set to the given pointers, i.e. using the type
* operates directly on the objects pointed to. This is useful e.g. when transferring multiple objects that don't exist in a contiguous memory region.
*
* @param blocks A list pairs of an object size (in bytes) and a pointer to the object
* @returns A RAII-wrapped MPI data type
*/
single_use_data_type build_single_use_composite_type(const std::vector<std::pair<size_t, void*>>& blocks);

} // namespace mpi_support
} // namespace detail
} // namespace celerity

namespace celerity::detail {

namespace mpi_support {

constexpr int TAG_CMD = 0;
constexpr int TAG_DATA_TRANSFER = 1;
constexpr int TAG_TELEMETRY = 2;

} // namespace mpi_support

struct from_payload_count_tag {
} inline constexpr from_payload_count;

struct from_size_bytes_tag {
} inline constexpr from_size_bytes;

// unique_frame_ptr manually `operator new`s the underlying frame memory, placement-new-constructs the frame and casts it to a frame pointer.
// I'm convinced that I'm actually, technically allowed to use the resulting frame pointer in a delete-expression and therefore keep `std::default_delete` as
// the deleter type for `unique_frame_ptr::impl`: Following the standard, delete-expression requires its operand to originate from a new-expression,
// and placement-new is defined to be a new-expression. The following implicit call to operator delete is also legal, since memory was obtained from
// `operator new`. Despite the beauty of this standards loophole, @BlackMark29A and @PeterTh couldn't be convinced to let me merge it :( -- @fknorr
template <typename Frame>
struct unique_frame_delete {
void operator()(Frame* frame) const {
if(frame) {
frame->~Frame();
operator delete(frame);
}
}
};

/**
* Owning smart pointer for variable-sized structures with a 0-sized array of type Frame::payload_type as the last member.
*/
template <typename Frame>
class unique_frame_ptr : private std::unique_ptr<Frame, unique_frame_delete<Frame>> {
private:
using impl = std::unique_ptr<Frame, unique_frame_delete<Frame>>;

public:
using payload_type = typename Frame::payload_type;

unique_frame_ptr() = default;

unique_frame_ptr(from_payload_count_tag, size_t payload_count) : unique_frame_ptr(from_size_bytes, sizeof(Frame) + sizeof(payload_type) * payload_count) {}

unique_frame_ptr(from_size_bytes_tag, size_t size_bytes) : impl(make_frame(size_bytes)), size_bytes(size_bytes) {}

unique_frame_ptr(unique_frame_ptr&& other) noexcept : impl(static_cast<impl&&>(other)), size_bytes(other.size_bytes) { other.size_bytes = 0; }

unique_frame_ptr& operator=(unique_frame_ptr&& other) noexcept {
if(this == &other) return *this; // gracefully handle self-assignment
static_cast<impl&>(*this) = static_cast<impl&&>(other); // delegate to base class unique_ptr<Frame>::operator=() to delete previously held frame
size_bytes = other.size_bytes;
other.size_bytes = 0;
psalz marked this conversation as resolved.
Show resolved Hide resolved
return *this;
}

Frame* get_pointer() { return impl::get(); }
const Frame* get_pointer() const { return impl::get(); }
size_t get_size_bytes() const { return size_bytes; }
size_t get_payload_count() const { return (size_bytes - sizeof(Frame)) / sizeof(payload_type); }

using impl::operator bool;
using impl::operator*;
using impl::operator->;

private:
size_t size_bytes = 0;

static Frame* make_frame(const size_t size_bytes) {
assert(size_bytes >= sizeof(Frame));
assert((size_bytes - sizeof(Frame)) % sizeof(payload_type) == 0);
const auto mem = operator new(size_bytes);
try {
new(mem) Frame;
} catch(...) {
operator delete(mem);
throw;
}
return static_cast<Frame*>(mem);
}
};

} // namespace celerity::detail
6 changes: 2 additions & 4 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,8 @@ namespace detail {
std::unique_ptr<executor> exec;

struct flush_handle {
command_pkg pkg;
std::vector<command_id> dependencies;
unique_frame_ptr<command_frame> frame;
MPI_Request req;
mpi_support::single_use_data_type data_type;
};
std::deque<flush_handle> active_flushes;

Expand All @@ -129,7 +127,7 @@ namespace detail {
*/
void maybe_destroy_runtime() const;

void flush_command(node_id target, const command_pkg& pkg, const std::vector<command_id>& dependencies);
void flush_command(node_id target, unique_frame_ptr<command_frame> frame);

// ------------------------------------------ TESTING UTILS ------------------------------------------
// We have to jump through some hoops to be able to re-initialize the runtime for unit testing.
Expand Down
Loading