Skip to content

Commit

Permalink
Switch to distributed scheduling model
Browse files Browse the repository at this point in the history
This replaces Celerity's master/worker scheduling model with a fully
distributed approach: Each node still processes all chunks, but commands
are only generated for locally executed chunks, as well as to push data
to other nodes as needed. Instead of tracking where the newest data
resides within the distributed system at all times, each node now only
determines which parts of the data are currently up to date (based on
task-level write requirements) as well as which parts it "owns" (based
on writes by locally executed chunks). Read requirements on stale data
by local chunks generate await push commands, which now can be satisfied
by any number of incoming transfers (as nodes do not know who currently
owns the data). Conversely, reads on locally owned data by remote chunks
generate push commands.

Co-authored-by: Fabian Knorr <fabian.knorr@dps.uibk.ac.at>
  • Loading branch information
psalz and fknorr committed Jul 13, 2023
1 parent 2e77c6b commit 0970bff
Show file tree
Hide file tree
Showing 41 changed files with 3,197 additions and 3,729 deletions.
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,14 @@ set(SOURCES
src/config.cc
src/device_queue.cc
src/executor.cc
src/graph_generator.cc
src/distributed_graph_generator.cc
src/graph_serializer.cc
src/print_graph.cc
src/print_utils.cc
src/runtime.cc
src/scheduler.cc
src/task.cc
src/task_manager.cc
src/transformers/naive_split.cc
src/user_bench.cc
src/worker_job.cc
"${CMAKE_CURRENT_BINARY_DIR}/src/version.cc"
Expand Down
21 changes: 16 additions & 5 deletions examples/matmul/matmul.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,24 @@

#include <celerity.h>

#if !defined(NDEBUG)
const size_t MAT_SIZE = 128;
#else
const size_t MAT_SIZE = 1024;
#endif

template <typename T>
void set_identity(celerity::distr_queue queue, celerity::buffer<T, 2> mat) {
void set_identity(celerity::distr_queue queue, celerity::buffer<T, 2> mat, bool reverse) {
queue.submit([&](celerity::handler& cgh) {
celerity::accessor dw{mat, cgh, celerity::access::one_to_one{}, celerity::write_only, celerity::no_init};
cgh.parallel_for<class set_identity_kernel>(mat.get_range(), [=](celerity::item<2> item) { dw[item] = item[0] == item[1]; });
const auto range = mat.get_range();
cgh.parallel_for<class set_identity_kernel>(range, [=](celerity::item<2> item) {
if(!reverse) {
dw[item] = item[0] == item[1];
} else {
dw[item] = item[0] == (range[1] - item[1] - 1);
}
});
});
}

Expand Down Expand Up @@ -81,15 +92,15 @@ int main() {
celerity::debug::set_buffer_name(mat_a_buf, "mat_a");
celerity::debug::set_buffer_name(mat_b_buf, "mat_b");

set_identity(queue, mat_a_buf);
set_identity(queue, mat_b_buf);
set_identity(queue, mat_a_buf, false);
set_identity(queue, mat_b_buf, true);

multiply(queue, mat_a_buf, mat_b_buf, mat_c_buf);
multiply(queue, mat_b_buf, mat_c_buf, mat_a_buf);

// each node verifies part of the result, so we pass per-node verification results through a host object
celerity::experimental::host_object<bool> passed_obj(false);
verify(queue, mat_c_buf, passed_obj);
verify(queue, mat_a_buf, passed_obj);

// The value of `passed` can differ between hosts if only part of the verification failed.
const bool passed = celerity::experimental::fence(queue, passed_obj).get();
Expand Down
50 changes: 46 additions & 4 deletions include/buffer_transfer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ namespace detail {
bool complete = false;
};

buffer_transfer_manager();
buffer_transfer_manager(const size_t num_nodes);

// TODO: BTM should have no notion of command_pkg - decouple
std::shared_ptr<const transfer_handle> push(const command_pkg& pkg);
std::shared_ptr<const transfer_handle> await_push(const command_pkg& pkg);

Expand All @@ -46,7 +47,7 @@ namespace detail {
buffer_id bid;
reduction_id rid; // zero if this does not belong to a reduction
subrange<3> sr;
command_id push_cid;
transfer_id trid;
alignas(std::max_align_t) payload_type data[]; // max_align to allow reinterpret_casting a pointer to this member to any buffer element pointer
};

Expand All @@ -60,7 +61,46 @@ namespace detail {
};

struct incoming_transfer_handle : transfer_handle {
std::unique_ptr<transfer_in> transfer;
incoming_transfer_handle(const size_t num_nodes) : m_num_nodes(num_nodes) {}

void set_expected_region(GridRegion<3> region) { m_expected_region = std::move(region); }

void add_transfer(std::unique_ptr<transfer_in>&& t) {
assert(!complete);
assert(t->frame->rid == 0 || m_is_reduction || m_transfers.empty()); // Either all or none
m_is_reduction = t->frame->rid != 0;
const auto box = subrange_to_grid_box(t->frame->sr);
assert(GridRegion<3>::intersect(m_received_region, box).empty() || m_is_reduction);
assert(!m_expected_region.has_value() || GridRegion<3>::difference(box, *m_expected_region).empty());
m_received_region = GridRegion<3>::merge(m_received_region, box);
m_transfers.push_back(std::move(t));
}

bool received_full_region() const {
if(!m_expected_region.has_value()) return false;
if(m_is_reduction) {
assert(m_expected_region->area() == 1);
// For reductions we're waiting to receive one message per peer
return m_transfers.size() == m_num_nodes - 1;
}
return (m_received_region == *m_expected_region);
}

template <typename Callback>
void drain_transfers(Callback&& cb) {
assert(received_full_region());
for(auto& t : m_transfers) {
cb(std::move(t));
}
m_transfers.clear();
}

private:
size_t m_num_nodes; // Number of nodes in the system, required for reductions
bool m_is_reduction = false;
std::vector<std::unique_ptr<transfer_in>> m_transfers;
std::optional<GridRegion<3>> m_expected_region; // This will only be set once the await push job has started
GridRegion<3> m_received_region;
};

struct transfer_out {
Expand All @@ -69,13 +109,15 @@ namespace detail {
unique_frame_ptr<data_frame> frame;
};

size_t m_num_nodes;

std::list<std::unique_ptr<transfer_in>> m_incoming_transfers;
std::list<std::unique_ptr<transfer_out>> m_outgoing_transfers;

// Here we store two types of handles:
// - Incoming pushes that have not yet been requested through ::await_push
// - Still outstanding pushes that have been requested through ::await_push
std::unordered_map<command_id, std::shared_ptr<incoming_transfer_handle>> m_push_blackboard;
std::unordered_map<std::pair<buffer_id, transfer_id>, std::shared_ptr<incoming_transfer_handle>, utils::pair_hash> m_push_blackboard;

mpi_support::data_type m_send_recv_unit;

Expand Down
71 changes: 31 additions & 40 deletions include/command.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@ namespace detail {
friend class command_graph;

protected:
abstract_command(command_id cid, node_id nid) : m_cid(cid), m_nid(nid) {}
abstract_command(command_id cid) : m_cid(cid) {}

public:
virtual ~abstract_command() = 0;

command_id get_cid() const { return m_cid; }

node_id get_nid() const { return m_nid; }

void mark_as_flushed() {
assert(!m_flushed);
m_flushed = true;
Expand All @@ -52,43 +50,53 @@ namespace detail {
using parent_type::remove_dependency;

command_id m_cid;
node_id m_nid;
bool m_flushed = false;
};
inline abstract_command::~abstract_command() {}

class push_command final : public abstract_command {
friend class command_graph;
push_command(command_id cid, node_id nid, buffer_id bid, reduction_id rid, node_id target, subrange<3> push_range)
: abstract_command(cid, nid), m_bid(bid), m_rid(rid), m_target(target), m_push_range(push_range) {}
push_command(command_id cid, buffer_id bid, reduction_id rid, node_id target, transfer_id trid, subrange<3> push_range)
: abstract_command(cid), m_bid(bid), m_rid(rid), m_target(target), m_trid(trid), m_push_range(push_range) {}

public:
buffer_id get_bid() const { return m_bid; }
reduction_id get_rid() const { return m_rid; }
reduction_id get_reduction_id() const { return m_rid; }
node_id get_target() const { return m_target; }
transfer_id get_transfer_id() const { return m_trid; }
const subrange<3>& get_range() const { return m_push_range; }

private:
buffer_id m_bid;
reduction_id m_rid;
node_id m_target;
transfer_id m_trid;
subrange<3> m_push_range;
};

class await_push_command final : public abstract_command {
friend class command_graph;
await_push_command(command_id cid, node_id nid, push_command* source) : abstract_command(cid, nid), m_source(source) { assert(source != nullptr); }
await_push_command(command_id cid, buffer_id bid, reduction_id rid, transfer_id trid, GridRegion<3> region)
: abstract_command(cid), m_bid(bid), m_rid(rid), m_trid(trid), m_region(std::move(region)) {}

public:
push_command* get_source() const { return m_source; }
buffer_id get_bid() const { return m_bid; }
reduction_id get_reduction_id() const { return m_rid; }
transfer_id get_transfer_id() const { return m_trid; }
GridRegion<3> get_region() const { return m_region; }

private:
push_command* m_source;
buffer_id m_bid;
// Having the reduction ID here isn't strictly required for matching against incoming pushes,
// but it allows us to sanity check that they match as well as include the ID during graph printing.
reduction_id m_rid;
transfer_id m_trid;
GridRegion<3> m_region;
};

class reduction_command final : public abstract_command {
friend class command_graph;
reduction_command(command_id cid, node_id nid, const reduction_info& info) : abstract_command(cid, nid), m_info(info) {}
reduction_command(command_id cid, const reduction_info& info) : abstract_command(cid), m_info(info) {}

public:
const reduction_info& get_reduction_info() const { return m_info; }
Expand All @@ -99,7 +107,7 @@ namespace detail {

class task_command : public abstract_command {
protected:
task_command(command_id cid, node_id nid, task_id tid) : abstract_command(cid, nid), m_tid(tid) {}
task_command(command_id cid, task_id tid) : abstract_command(cid), m_tid(tid) {}

public:
task_id get_tid() const { return m_tid; }
Expand All @@ -110,7 +118,7 @@ namespace detail {

class epoch_command final : public task_command {
friend class command_graph;
epoch_command(const command_id& cid, const node_id& nid, const task_id& tid, epoch_action action) : task_command(cid, nid, tid), m_action(action) {}
epoch_command(const command_id& cid, const task_id& tid, epoch_action action) : task_command(cid, tid), m_action(action) {}

public:
epoch_action get_epoch_action() const { return m_action; }
Expand All @@ -128,8 +136,7 @@ namespace detail {
friend class command_graph;

protected:
execution_command(command_id cid, node_id nid, task_id tid, subrange<3> execution_range)
: task_command(cid, nid, tid), m_execution_range(execution_range) {}
execution_command(command_id cid, task_id tid, subrange<3> execution_range) : task_command(cid, tid), m_execution_range(execution_range) {}

public:
const subrange<3>& get_execution_range() const { return m_execution_range; }
Expand All @@ -152,6 +159,10 @@ namespace detail {
// -------------------------------------------- SERIALIZED COMMANDS -----------------------------------------------
// ----------------------------------------------------------------------------------------------------------------

// TODO: These are a holdover from the master/worker scheduling model. Remove at some point.
// The only reason we keep them around for now is that they allow us to persist commands beyond graph pruning.
// They no longer have to be network-serializable though.

struct horizon_data {
task_id tid;
};
Expand All @@ -171,15 +182,15 @@ namespace detail {
buffer_id bid;
reduction_id rid;
node_id target;
transfer_id trid;
subrange<3> sr;
};

struct await_push_data {
buffer_id bid;
reduction_id rid;
node_id source;
command_id source_cid;
subrange<3> sr;
transfer_id trid;
GridRegion<3> region;
};

struct reduction_data {
Expand All @@ -192,12 +203,10 @@ namespace detail {

using command_data = std::variant<std::monostate, horizon_data, epoch_data, execution_data, push_data, await_push_data, reduction_data, fence_data>;

/**
* A command package is what is actually transferred between nodes.
*/
struct command_pkg {
command_id cid{};
command_data data;
command_data data{};
std::vector<command_id> dependencies;

std::optional<task_id> get_tid() const {
// clang-format off
Expand Down Expand Up @@ -230,23 +239,5 @@ namespace detail {
}
};

struct command_frame {
using payload_type = command_id;

command_pkg pkg;
size_t num_dependencies = 0;
payload_type dependencies[];

// variable-sized structure
command_frame() = default;
command_frame(const command_frame&) = delete;
command_frame& operator=(const command_frame&) = delete;

iterable_range<const command_id*> iter_dependencies() const { return {dependencies, dependencies + num_dependencies}; }
};

// unique_frame_ptr assumes that the flexible payload member begins at exactly sizeof(Frame) bytes
static_assert(offsetof(command_frame, dependencies) == sizeof(command_frame));

} // namespace detail
} // namespace celerity
31 changes: 22 additions & 9 deletions include/command_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ namespace detail {
const auto cmd = unique_cmd.get();
m_commands.emplace(std::pair{cmd->get_cid(), std::move(unique_cmd)});
if constexpr(std::is_base_of_v<task_command, T>) { m_by_task[cmd->get_tid()].emplace_back(cmd); }
m_execution_fronts[cmd->get_nid()].insert(cmd);
m_execution_front.insert(cmd);
return cmd;
}

Expand All @@ -120,7 +120,10 @@ namespace detail {
}

size_t command_count() const { return m_commands.size(); }
size_t task_command_count(task_id tid) const { return m_by_task.at(tid).size(); }
size_t task_command_count(task_id tid) const {
if(m_by_task.count(tid) == 0) return 0;
return m_by_task.at(tid).size();
}

auto all_commands() const {
const auto transform = [](auto& uptr) { return uptr.second.get(); };
Expand All @@ -129,28 +132,38 @@ namespace detail {

auto& task_commands(task_id tid) { return m_by_task.at(tid); }

std::optional<std::string> print_graph(size_t max_nodes, const task_manager& tm, const buffer_manager* bm) const;
std::optional<std::string> print_graph(const node_id local_nid, const size_t max_nodes, const task_manager& tm, const buffer_manager* bm) const;

// TODO unify dependency terminology to this
void add_dependency(abstract_command* depender, abstract_command* dependee, dependency_kind kind, dependency_origin origin) {
assert(depender->get_nid() == dependee->get_nid()); // We cannot depend on commands executed on another node!
assert(dependee != depender);
depender->add_dependency({dependee, kind, origin});
m_execution_fronts[depender->get_nid()].erase(dependee);
m_execution_front.erase(dependee);

// Sanity check: For non-dataflow dependencies the commands can only be of specific types
if(origin == dependency_origin::execution_front) { assert(isa<epoch_command>(depender) || isa<horizon_command>(depender)); }
if(origin == dependency_origin::collective_group_serialization) {
assert(isa<execution_command>(depender));
// The original execution command may have been subsumed by a horizon / epoch
assert(isa<execution_command>(dependee) || isa<epoch_command>(dependee) || isa<horizon_command>(dependee));
}
if(origin == dependency_origin::last_epoch) { assert(isa<epoch_command>(dependee) || isa<horizon_command>(dependee)); }

// Sanity check for unit tests, where we may have multiple CDAGS
assert(m_commands.at(depender->get_cid()).get() == depender);
assert(m_commands.at(dependee->get_cid()).get() == dependee);
}

void remove_dependency(abstract_command* depender, abstract_command* dependee) { depender->remove_dependency(dependee); }

const std::unordered_set<abstract_command*>& get_execution_front(node_id nid) const { return m_execution_fronts.at(nid); }
const std::unordered_set<abstract_command*>& get_execution_front() const { return m_execution_front; }

private:
command_id m_next_cmd_id = 0;
// TODO: Consider storing commands in a contiguous memory data structure instead
std::unordered_map<command_id, std::unique_ptr<abstract_command>> m_commands;
std::unordered_map<task_id, std::vector<task_command*>> m_by_task;

// Set of per-node commands with no dependents
std::unordered_map<node_id, std::unordered_set<abstract_command*>> m_execution_fronts;
std::unordered_set<abstract_command*> m_execution_front;
};

} // namespace detail
Expand Down
Loading

0 comments on commit 0970bff

Please sign in to comment.