Skip to content

Commit

Permalink
Address reviewer commands on Frames
Browse files Browse the repository at this point in the history
Duplicates dependency count in command_frame
  • Loading branch information
fknorr committed May 18, 2022
1 parent b75bc3e commit c60527f
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 57 deletions.
2 changes: 1 addition & 1 deletion include/buffer_transfer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ namespace detail {
reduction_id rid; // zero if this does not belong to a reduction
subrange<3> sr;
command_id push_cid;
alignas(std::max_align_t) payload_type data[0];
alignas(std::max_align_t) payload_type data[]; // max_align to allow reinterpret_casting a pointer to this member to any buffer element pointer
};

struct transfer_in {
Expand Down
8 changes: 6 additions & 2 deletions include/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,17 @@ namespace detail {
struct command_frame {
using payload_type = command_id;

command_pkg pkg;
size_t num_dependencies = 0; // This information is duplicated from unique_frame_ptr::get_payload_count() so that we can still use a
// `const command_frame &` without its owning pointer
payload_type dependencies[];

// variable-sized structure
command_frame() = default;
command_frame(const command_frame&) = delete;
command_frame& operator=(const command_frame&) = delete;

command_pkg pkg;
payload_type dependencies[0];
iterable_range<const command_id*> iter_dependencies() const { return {dependencies, dependencies + num_dependencies}; }
};

} // namespace detail
Expand Down
11 changes: 5 additions & 6 deletions include/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,22 @@ namespace detail {
bool first_command_received = false;

template <typename Job, typename... Args>
void create_job(const unique_frame_ptr<command_frame>& frame, Args&&... args) {
const auto& pkg = frame->pkg;
void create_job(const command_frame& frame, Args&&... args) {
const auto& pkg = frame.pkg;
jobs[pkg.cid] = {std::make_unique<Job>(pkg, std::forward<Args>(args)...), pkg.get_command_type(), {}, 0};

// If job doesn't exist we assume it has already completed.
// This is true as long as we're respecting task-graph (anti-)dependencies when processing tasks.
for(size_t i = 0; i < frame.get_payload_size(); ++i) {
const auto it = jobs.find(frame->dependencies[i]);
if(it != jobs.end()) {
for(const auto dcid : frame.iter_dependencies()) {
if(const auto it = jobs.find(dcid); it != jobs.end()) {
it->second.dependents.push_back(pkg.cid);
jobs[pkg.cid].unsatisfied_dependencies++;
}
}
}

void run();
bool handle_command(const unique_frame_ptr<command_frame>& frame);
bool handle_command(const command_frame& frame);

void update_metrics();
};
Expand Down
72 changes: 43 additions & 29 deletions include/mpi_support.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,63 +14,77 @@ namespace mpi_support {

} // namespace mpi_support

struct from_payload_size_tag {
} inline constexpr from_payload_size;
struct from_payload_count_tag {
} inline constexpr from_payload_count;

struct from_frame_bytes_tag {
} inline constexpr from_frame_bytes;
struct from_size_bytes_tag {
} inline constexpr from_size_bytes;

// unique_frame_ptr manually `operator new`s the underlying frame memory, placement-new-constructs the frame and casts it to a frame pointer.
// I'm convinced that I'm actually, technically allowed to use the resulting frame pointer in a delete-expression and therefore keep `std::default_delete` as
// the deleter type for `unique_frame_ptr::impl`: Following the standard, delete-expression requires its operand to originate from a new-expression,
// and placement-new is defined to be a new-expression. The following implicit call to operator delete is also legal, since memory was obtained from
// `operator new`. Despite the beauty of this standards loophole, @BlackMark29A and @PeterTh couldn't be convinced to let me merge it :( -- @fknorr
template <typename Frame>
struct unique_frame_delete {
void operator()(Frame* frame) const {
if(frame) {
frame->~Frame();
operator delete(frame);
}
}
};

/**
* Owning smart pointer for variable-sized structures with a 0-sized array of type Frame::payload_type as the last member.
*/
template <typename Frame>
class unique_frame_ptr : private std::unique_ptr<Frame> {
class unique_frame_ptr : private std::unique_ptr<Frame, unique_frame_delete<Frame>> {
private:
using impl = std::unique_ptr<Frame>;
using impl = std::unique_ptr<Frame, unique_frame_delete<Frame>>;

public:
using payload_type = typename Frame::payload_type;

unique_frame_ptr() = default;

unique_frame_ptr(from_payload_size_tag, size_t payload_size)
: impl(static_cast<Frame*>(operator new(frame_bytes_from_payload_size(payload_size)))), payload_size(payload_size) {
new(impl::get()) Frame; // permits later deletion through std::default_deleter
}
unique_frame_ptr(from_payload_count_tag, size_t payload_count) : unique_frame_ptr(from_size_bytes, sizeof(Frame) + sizeof(payload_type) * payload_count) {}

unique_frame_ptr(from_frame_bytes_tag, size_t frame_bytes)
: impl(static_cast<Frame*>(operator new(frame_bytes))), payload_size(payload_size_from_frame_bytes(frame_bytes)) {
new(impl::get()) Frame; // permits later deletion through std::default_deleter
}
unique_frame_ptr(from_size_bytes_tag, size_t size_bytes) : impl(make_frame(size_bytes)), size_bytes(size_bytes) {}

unique_frame_ptr(unique_frame_ptr&& other) noexcept : impl(static_cast<impl&&>(other)), payload_size(other.payload_size) { other.payload_size = 0; }
unique_frame_ptr(unique_frame_ptr&& other) noexcept : impl(static_cast<impl&&>(other)), size_bytes(other.size_bytes) { other.size_bytes = 0; }

unique_frame_ptr& operator=(unique_frame_ptr&& other) noexcept {
static_cast<impl&>(*this) = static_cast<impl&&>(other);
payload_size = other.payload_size;
other.payload_size = 0;
if(this == &other) return *this; // gracefully handle self-assignment
static_cast<impl&>(*this) = static_cast<impl&&>(other); // delegate to base class unique_ptr<Frame>::operator=() to delete previously held frame
size_bytes = other.size_bytes;
other.size_bytes = 0;
return *this;
}

Frame* get_pointer() { return impl::get(); }
const Frame* get_pointer() const { return impl::get(); }
size_t get_payload_size() const { return payload_size; }
size_t get_frame_size_bytes() const { return frame_bytes_from_payload_size(payload_size); }
size_t get_size_bytes() const { return size_bytes; }
size_t get_payload_count() const { return (size_bytes - sizeof(Frame)) / sizeof(payload_type); }

using impl::operator bool;
using impl::operator*;
using impl::operator->;

private:
size_t payload_size = 0;

static size_t frame_bytes_from_payload_size(size_t payload_size) { //
return sizeof(Frame) + sizeof(payload_type) * payload_size;
}

static size_t payload_size_from_frame_bytes(size_t frame_bytes) {
assert(frame_bytes >= sizeof(Frame) && (frame_bytes - sizeof(Frame)) % sizeof(payload_type) == 0);
return (frame_bytes - sizeof(Frame)) / sizeof(payload_type);
size_t size_bytes = 0;

static Frame* make_frame(const size_t size_bytes) {
assert(size_bytes >= sizeof(Frame));
assert((size_bytes - sizeof(Frame)) % sizeof(payload_type) == 0);
const auto mem = operator new(size_bytes);
try {
new(mem) Frame;
} catch(...) {
operator delete(mem);
throw;
}
return static_cast<Frame*>(mem);
}
};

Expand Down
11 changes: 6 additions & 5 deletions src/buffer_transfer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ namespace detail {
runtime::get_instance().get_buffer_manager().get_buffer_data(data.bid, cl::sycl::range<3>(data.sr.offset[0], data.sr.offset[1], data.sr.offset[2]),
cl::sycl::range<3>(data.sr.range[0], data.sr.range[1], data.sr.range[2]));

unique_frame_ptr<data_frame> frame(from_payload_size, raw_data.get_size());
unique_frame_ptr<data_frame> frame(from_payload_count, raw_data.get_size());
frame->sr = data.sr;
frame->bid = data.bid;
frame->rid = data.rid;
frame->push_cid = pkg.cid;
memcpy(frame->data, raw_data.get_pointer(), raw_data.get_size());

CELERITY_TRACE("Ready to send {} of buffer {} ({} B) to {}", data.sr, data.bid, frame.get_payload_size(), data.target);
CELERITY_TRACE("Ready to send {} of buffer {} ({} B) to {}", data.sr, data.bid, frame.get_size_bytes(), data.target);

// Start transmitting data
MPI_Request req;
MPI_Isend(frame.get_pointer(), static_cast<int>(frame.get_frame_size_bytes()), MPI_BYTE, static_cast<int>(data.target), mpi_support::TAG_DATA_TRANSFER,
assert(frame.get_size_bytes() <= static_cast<size_t>(std::numeric_limits<int>::max()));
MPI_Isend(frame.get_pointer(), static_cast<int>(frame.get_size_bytes()), MPI_BYTE, static_cast<int>(data.target), mpi_support::TAG_DATA_TRANSFER,
MPI_COMM_WORLD, &req);

auto transfer = std::make_unique<transfer_out>();
Expand Down Expand Up @@ -91,7 +92,7 @@ namespace detail {

auto transfer = std::make_unique<transfer_in>();
transfer->source_nid = static_cast<node_id>(status.MPI_SOURCE);
transfer->frame = unique_frame_ptr<data_frame>(from_frame_bytes, static_cast<size_t>(frame_bytes));
transfer->frame = unique_frame_ptr<data_frame>(from_size_bytes, static_cast<size_t>(frame_bytes));

// Start receiving data
MPI_Imrecv(transfer->frame.get_pointer(), frame_bytes, MPI_BYTE, &msg, &transfer->request);
Expand Down Expand Up @@ -145,7 +146,7 @@ namespace detail {

void buffer_transfer_manager::commit_transfer(transfer_in& transfer) {
const auto& frame = *transfer.frame;
const size_t elem_size = transfer.frame.get_payload_size() / (frame.sr.range[0] * frame.sr.range[1] * frame.sr.range[2]);
const size_t elem_size = transfer.frame.get_payload_count() / (frame.sr.range[0] * frame.sr.range[1] * frame.sr.range[2]);
raw_buffer_data raw_data{elem_size, frame.sr.range};
memcpy(raw_data.get_pointer(), frame.data, raw_data.get_size());
if(frame.rid) {
Expand Down
13 changes: 7 additions & 6 deletions src/executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,9 @@ namespace detail {
if(flag == 1) {
int frame_bytes;
MPI_Get_count(&status, MPI_BYTE, &frame_bytes);
unique_frame_ptr<command_frame> frame(from_frame_bytes, static_cast<size_t>(frame_bytes));
unique_frame_ptr<command_frame> frame(from_size_bytes, static_cast<size_t>(frame_bytes));
MPI_Mrecv(frame.get_pointer(), frame_bytes, MPI_BYTE, &msg, &status);
assert(frame->num_dependencies == frame.get_payload_count());
command_queue.push(std::move(frame));

if(!first_command_received) {
Expand All @@ -126,7 +127,7 @@ namespace detail {
}

if(jobs.size() < MAX_CONCURRENT_JOBS && !command_queue.empty()) {
if(!handle_command(command_queue.front())) {
if(!handle_command(*command_queue.front())) {
// In case the command couldn't be handled, don't pop it from the queue.
continue;
}
Expand All @@ -139,20 +140,20 @@ namespace detail {
assert(running_device_compute_jobs == 0);
}

bool executor::handle_command(const unique_frame_ptr<command_frame>& frame) {
bool executor::handle_command(const command_frame& frame) {
// A worker might receive a task command before creating the corresponding task graph node
if(const auto tid = frame->pkg.get_tid()) {
if(const auto tid = frame.pkg.get_tid()) {
if(!task_mngr.has_task(*tid)) { return false; }
}

switch(frame->pkg.get_command_type()) {
switch(frame.pkg.get_command_type()) {
case command_type::HORIZON: create_job<horizon_job>(frame, task_mngr); break;
case command_type::EPOCH: create_job<epoch_job>(frame, task_mngr); break;
case command_type::PUSH: create_job<push_job>(frame, *btm, buffer_mngr); break;
case command_type::AWAIT_PUSH: create_job<await_push_job>(frame, *btm); break;
case command_type::REDUCTION: create_job<reduction_job>(frame, reduction_mngr); break;
case command_type::EXECUTION:
if(task_mngr.get_task(frame->pkg.get_tid().value())->get_execution_target() == execution_target::HOST) {
if(task_mngr.get_task(frame.pkg.get_tid().value())->get_execution_target() == execution_target::HOST) {
create_job<host_execute_job>(frame, h_queue, task_mngr, buffer_mngr);
} else {
create_job<device_execute_job>(frame, d_queue, task_mngr, buffer_mngr, reduction_mngr, local_nid);
Expand Down
3 changes: 2 additions & 1 deletion src/graph_serializer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ namespace detail {
void graph_serializer::serialize_and_flush(abstract_command* cmd, const std::vector<command_id>& dependencies) const {
assert(!cmd->is_flushed() && "Command has already been flushed.");

unique_frame_ptr<command_frame> frame(from_payload_size, dependencies.size());
unique_frame_ptr<command_frame> frame(from_payload_count, dependencies.size());

frame->pkg.cid = cmd->get_cid();
if(const auto* ecmd = dynamic_cast<epoch_command*>(cmd)) {
Expand All @@ -108,6 +108,7 @@ namespace detail {
assert(false && "Unknown command");
}

frame->num_dependencies = dependencies.size();
std::copy(dependencies.begin(), dependencies.end(), frame->dependencies);

flush_cb(cmd->get_nid(), std::move(frame));
Expand Down
4 changes: 2 additions & 2 deletions src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,8 @@ namespace detail {
// Even though command packages are small enough to use a blocking send we want to be able to send to the master node as well,
// which is why we have to use Isend after all. We also have to make sure that the buffer stays around until the send is complete.
MPI_Request req;
MPI_Isend(frame.get_pointer(), static_cast<int>(frame.get_frame_size_bytes()), MPI_BYTE, static_cast<int>(target), mpi_support::TAG_CMD, MPI_COMM_WORLD,
&req);
MPI_Isend(
frame.get_pointer(), static_cast<int>(frame.get_size_bytes()), MPI_BYTE, static_cast<int>(target), mpi_support::TAG_CMD, MPI_COMM_WORLD, &req);
active_flushes.push_back(flush_handle{std::move(frame), req});

// Cleanup finished transfers.
Expand Down
10 changes: 5 additions & 5 deletions test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,15 @@ namespace test_utils {
public:
auto get_cb() {
return [this](detail::node_id nid, detail::unique_frame_ptr<detail::command_frame> frame) {
for(size_t i = 0; i < frame.get_payload_size(); ++i) {
#ifndef NDEBUG
for(const auto dcid : frame->iter_dependencies()) {
// Sanity check: All dependencies must have already been flushed
const auto& dep = frame->dependencies[i];
(void)dep;
assert(commands.count(dep) == 1);
assert(commands.count(dcid) == 1);
}
#endif

const detail::command_id cid = frame->pkg.cid;
commands[cid] = {nid, frame->pkg, std::vector(frame->dependencies, frame->dependencies + frame.get_payload_size())};
commands[cid] = {nid, frame->pkg, std::vector(frame->iter_dependencies().begin(), frame->iter_dependencies().end())};
if(const auto tid = frame->pkg.get_tid()) { by_task[*tid].insert(cid); }
by_node[nid].insert(cid);
};
Expand Down

0 comments on commit c60527f

Please sign in to comment.