Skip to content

Commit

Permalink
Pass task pointers around instead of ids
Browse files Browse the repository at this point in the history
  • Loading branch information
psalz committed Jul 25, 2022
1 parent 9aadf9e commit 9a6a200
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 292 deletions.
172 changes: 86 additions & 86 deletions ci/perf/gpuc2_bench.csv

Large diffs are not rendered by default.

178 changes: 89 additions & 89 deletions ci/perf/gpuc2_bench.md

Large diffs are not rendered by default.

18 changes: 8 additions & 10 deletions include/graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,16 @@ namespace detail {
public:
/**
* @param num_nodes Number of CELERITY nodes, including the master node.
* @param tm
* @param cdag The command graph this generator should operate on.
*/
graph_generator(size_t num_nodes, task_manager& tm, reduction_manager& rm, command_graph& cdag);
graph_generator(size_t num_nodes, reduction_manager& rm, command_graph& cdag);

void add_buffer(buffer_id bid, const cl::sycl::range<3>& range);

// Build the commands for a single task
void build_task(task_id tid, const std::vector<graph_transformer*>& transformers);
void build_task(const task& tsk, const std::vector<graph_transformer*>& transformers);

private:
task_manager& task_mngr;
reduction_manager& reduction_mngr;
const size_t num_nodes;
command_graph& cdag;
Expand Down Expand Up @@ -129,19 +127,19 @@ namespace detail {
void generate_anti_dependencies(task_id tid, buffer_id bid, const region_map<std::optional<command_id>>& last_writers_map,
const GridRegion<3>& write_req, abstract_command* write_cmd);

void process_task_data_requirements(task_id tid);
void process_task_data_requirements(const task& tsk);

void process_task_side_effect_requirements(task_id tid);
void process_task_side_effect_requirements(const task& tsk);

void generate_epoch_dependencies(abstract_command* cmd);

void generate_epoch_commands(const task* tsk);
void generate_epoch_commands(const task& tsk);

void generate_horizon_commands(const task* tsk);
void generate_horizon_commands(const task& tsk);

void generate_collective_execution_commands(const task* tsk);
void generate_collective_execution_commands(const task& tsk);

void generate_independent_execution_commands(const task* tsk);
void generate_independent_execution_commands(const task& tsk);

void prune_commands_before(const command_id min_epoch);
};
Expand Down
9 changes: 4 additions & 5 deletions include/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
#include <queue>
#include <thread>

#include "types.h"

namespace celerity {
namespace detail {

class graph_generator;
class graph_serializer;
class task;

enum class scheduler_event_type { TASK_AVAILABLE, SHUTDOWN };

struct scheduler_event {
scheduler_event_type type;
size_t data;
const task* tsk;
};

// Abstract base class to allow different threading implementation in tests
Expand All @@ -34,7 +33,7 @@ namespace detail {
/**
* @brief Notifies the scheduler that a new task has been created and is ready for scheduling.
*/
void notify_task_created(task_id tid) { notify(scheduler_event_type::TASK_AVAILABLE, tid); }
void notify_task_created(const task* tsk) { notify(scheduler_event_type::TASK_AVAILABLE, tsk); }

protected:
/**
Expand All @@ -54,7 +53,7 @@ namespace detail {

const size_t num_nodes;

void notify(scheduler_event_type type, size_t data);
void notify(scheduler_event_type type, const task* tsk);
};

class scheduler final : public abstract_scheduler {
Expand Down
16 changes: 9 additions & 7 deletions include/task_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace celerity {
namespace detail {

class reduction_manager;
using task_callback = std::function<void(task_id)>;
using task_callback = std::function<void(const task*)>;

// Allows other threads to await an epoch change in the task manager.
// This is worth a separate class to encapsulate the synchronization behavior.
Expand Down Expand Up @@ -66,6 +66,7 @@ namespace detail {
template <typename CGF, typename... Hints>
task_id submit_command_group(CGF cgf, Hints... hints) {
task_id tid;
const task* tsk_ptr = nullptr;
{
std::lock_guard lock(task_mutex);
auto reservation = task_buffer.reserve_task_entry(await_free_task_slot_callback());
Expand All @@ -74,16 +75,17 @@ namespace detail {
prepass_handler cgh(tid, std::make_unique<command_group_storage<CGF>>(cgf), num_collective_nodes);
cgf(cgh);
task& task_ref = register_task_internal(std::move(reservation), std::move(cgh).into_task());
tsk_ptr = &task_ref;

compute_dependencies(tid);
compute_dependencies(task_ref);
if(queue) queue->require_collective_group(task_ref.get_collective_group_id());

// the following deletion is intentionally redundant with the one happening when waiting for free task slots
// we want to free tasks earlier than just when running out of slots,
// so that we can potentially reclaim additional resources such as buffers earlier
task_buffer.delete_up_to(latest_epoch_reached.get());
}
invoke_callbacks(tid);
invoke_callbacks(tsk_ptr);
if(need_new_horizon()) { generate_horizon_task(); }
return tid;
}
Expand Down Expand Up @@ -217,23 +219,23 @@ namespace detail {

task& register_task_internal(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> task);

void invoke_callbacks(task_id tid);
void invoke_callbacks(const task* tsk) const;

void add_dependency(task* depender, task* dependee, dependency_kind kind, dependency_origin origin);
void add_dependency(task& depender, task& dependee, dependency_kind kind, dependency_origin origin);

inline bool need_new_horizon() const { return max_pseudo_critical_path_length - current_horizon_critical_path_length >= task_horizon_step_size; }

int get_max_pseudo_critical_path_length() const { return max_pseudo_critical_path_length; }

task_id reduce_execution_front(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> new_front);
task& reduce_execution_front(task_ring_buffer::reservation&& reserve, std::unique_ptr<task> new_front);

void set_epoch_for_new_tasks(task_id epoch);

const std::unordered_set<task*>& get_execution_front() { return execution_front; }

task_id generate_horizon_task();

void compute_dependencies(task_id tid);
void compute_dependencies(task& tsk);

// Finds the first in-flight epoch, or returns the currently reached one if there are none in-flight
// Used in await_free_task_slot_callback to check for hangs
Expand Down
73 changes: 36 additions & 37 deletions src/graph_generator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
namespace celerity {
namespace detail {

graph_generator::graph_generator(size_t num_nodes, task_manager& tm, reduction_manager& rm, command_graph& cdag)
: task_mngr(tm), reduction_mngr(rm), num_nodes(num_nodes), cdag(cdag) {
graph_generator::graph_generator(size_t num_nodes, reduction_manager& rm, command_graph& cdag) : reduction_mngr(rm), num_nodes(num_nodes), cdag(cdag) {
// Build initial epoch command for each node (these are required to properly handle anti-dependencies on host-initialized buffers).
// We manually generate the first set of commands, these will be replaced by applied horizons or explicit epochs down the line (see
// set_epoch_for_new_commands).
Expand All @@ -38,14 +37,14 @@ namespace detail {
buffer_states.emplace(bid, distributed_state{{range, std::move(all_nodes)}});
}

void graph_generator::build_task(const task_id tid, const std::vector<graph_transformer*>& transformers) {
void graph_generator::build_task(const task& tsk, const std::vector<graph_transformer*>& transformers) {
std::lock_guard<std::mutex> lock(buffer_mutex);
// TODO: Maybe assert that this task hasn't been processed before

auto tsk = task_mngr.get_task(tid);
const auto tid = tsk.get_id();
const auto min_epoch_to_prune_before = min_epoch_for_new_commands;

switch(tsk->get_type()) {
switch(tsk.get_type()) {
case task_type::EPOCH: generate_epoch_commands(tsk); break;
case task_type::HORIZON: generate_horizon_commands(tsk); break;
case task_type::COLLECTIVE: generate_collective_execution_commands(tsk); break;
Expand All @@ -55,11 +54,11 @@ namespace detail {
}

for(auto& t : transformers) {
t->transform_task(*tsk, cdag);
t->transform_task(tsk, cdag);
}

// Only execution tasks can have data requirements or reductions
if(tsk->get_execution_target() != execution_target::NONE) {
if(tsk.get_execution_target() != execution_target::NONE) {
#ifndef NDEBUG
// It is currently undefined to split reduction-producer tasks into multiple chunks on the same node:
// - Per-node reduction intermediate results are stored with fixed access to a single SYCL buffer, so multiple chunks on the same node will race
Expand All @@ -68,7 +67,7 @@ namespace detail {
// - Inputs to the final reduction command are ordered by origin node ids to guarantee bit-identical results. It is not possible to distinguish
// more than one chunk per node in the serialized commands, so different nodes can produce different final reduction results for non-associative
// or non-commutative operations
if(!tsk->get_reductions().empty()) {
if(!tsk.get_reductions().empty()) {
std::unordered_set<node_id> producer_nids;
for(auto& cmd : cdag.task_commands(tid)) {
assert(producer_nids.insert(cmd->get_nid()).second);
Expand All @@ -78,8 +77,8 @@ namespace detail {

// TODO: At some point we might want to do this also before calling transformers
// --> So that more advanced transformations can also take data transfers into account
process_task_data_requirements(tid);
process_task_side_effect_requirements(tid);
process_task_data_requirements(tsk);
process_task_side_effect_requirements(tsk);
}

// Commands without any other true-dependency must depend on the active epoch command to ensure they cannot be re-ordered before the epoch
Expand Down Expand Up @@ -123,14 +122,14 @@ namespace detail {
assert(cdag.get_execution_front(nid).size() == 1 && *cdag.get_execution_front(nid).begin() == new_front);
}

void graph_generator::generate_epoch_commands(const task* const tsk) {
assert(tsk->get_type() == task_type::EPOCH);
void graph_generator::generate_epoch_commands(const task& tsk) {
assert(tsk.get_type() == task_type::EPOCH);

command_id min_new_epoch;
for(node_id nid = 0; nid < num_nodes; ++nid) {
auto& node = node_data.at(nid);

const auto epoch = cdag.create<epoch_command>(nid, tsk->get_id(), tsk->get_epoch_action());
const auto epoch = cdag.create<epoch_command>(nid, tsk.get_id(), tsk.get_epoch_action());
const auto cid = epoch->get_cid();
if(nid == 0) { min_new_epoch = cid; }

Expand All @@ -144,14 +143,14 @@ namespace detail {
min_epoch_for_new_commands = min_new_epoch;
}

void graph_generator::generate_horizon_commands(const task* const tsk) {
assert(tsk->get_type() == task_type::HORIZON);
void graph_generator::generate_horizon_commands(const task& tsk) {
assert(tsk.get_type() == task_type::HORIZON);

std::optional<command_id> min_new_epoch;
for(node_id nid = 0; nid < num_nodes; ++nid) {
auto& node = node_data.at(nid);

const auto horizon = cdag.create<horizon_command>(nid, tsk->get_id());
const auto horizon = cdag.create<horizon_command>(nid, tsk.get_id());
const auto cid = horizon->get_cid();

if(node.current_horizon) {
Expand All @@ -174,18 +173,18 @@ namespace detail {
}
}

void graph_generator::generate_collective_execution_commands(const task* const tsk) {
assert(tsk->get_type() == task_type::COLLECTIVE);
void graph_generator::generate_collective_execution_commands(const task& tsk) {
assert(tsk.get_type() == task_type::COLLECTIVE);

for(size_t nid = 0; nid < num_nodes; ++nid) {
auto offset = cl::sycl::id<1>{nid};
auto range = cl::sycl::range<1>{1};
const auto sr = subrange_cast<3>(subrange<1>{offset, range});
auto* cmd = cdag.create<execution_command>(nid, tsk->get_id(), sr);
auto* cmd = cdag.create<execution_command>(nid, tsk.get_id(), sr);

// Collective host tasks have an implicit dependency on the previous task in the same collective group, which is required in order to guarantee
// they are executed in the same order on every node.
auto cgid = tsk->get_collective_group_id();
auto cgid = tsk.get_collective_group_id();
auto& last_collective_commands = node_data.at(nid).last_collective_commands;
if(auto prev = last_collective_commands.find(cgid); prev != last_collective_commands.end()) {
cdag.add_dependency(cmd, cdag.get(prev->second), dependency_kind::TRUE_DEP, dependency_origin::collective_group_serialization);
Expand All @@ -195,23 +194,23 @@ namespace detail {
}
}

void graph_generator::generate_independent_execution_commands(const task* const tsk) {
assert(tsk->get_type() == task_type::HOST_COMPUTE || tsk->get_type() == task_type::DEVICE_COMPUTE || tsk->get_type() == task_type::MASTER_NODE);
void graph_generator::generate_independent_execution_commands(const task& tsk) {
assert(tsk.get_type() == task_type::HOST_COMPUTE || tsk.get_type() == task_type::DEVICE_COMPUTE || tsk.get_type() == task_type::MASTER_NODE);

const auto sr = subrange<3>{tsk->get_global_offset(), tsk->get_global_size()};
cdag.create<execution_command>(0, tsk->get_id(), sr);
const auto sr = subrange<3>{tsk.get_global_offset(), tsk.get_global_size()};
cdag.create<execution_command>(0, tsk.get_id(), sr);
}

using buffer_requirements_map = std::unordered_map<buffer_id, std::unordered_map<cl::sycl::access::mode, GridRegion<3>>>;

buffer_requirements_map get_buffer_requirements_for_mapped_access(const task* tsk, subrange<3> sr, const cl::sycl::range<3> global_size) {
buffer_requirements_map get_buffer_requirements_for_mapped_access(const task& tsk, subrange<3> sr, const cl::sycl::range<3> global_size) {
buffer_requirements_map result;
const auto& access_map = tsk->get_buffer_access_map();
const auto& access_map = tsk.get_buffer_access_map();
const auto buffers = access_map.get_accessed_buffers();
for(const buffer_id bid : buffers) {
const auto modes = access_map.get_access_modes(bid);
for(auto m : modes) {
result[bid][m] = access_map.get_requirements_for_access(bid, m, tsk->get_dimensions(), sr, global_size);
result[bid][m] = access_map.get_requirements_for_access(bid, m, tsk.get_dimensions(), sr, global_size);
}
}
return result;
Expand Down Expand Up @@ -272,8 +271,8 @@ namespace detail {
}
} // namespace

void graph_generator::process_task_data_requirements(task_id tid) {
const auto tsk = task_mngr.get_task(tid);
void graph_generator::process_task_data_requirements(const task& tsk) {
const task_id tid = tsk.get_id();

// Copy the list of task commands so we can safely modify the command graph in the loop below
// NOTE: We assume that none of these commands are deleted
Expand All @@ -287,7 +286,7 @@ namespace detail {
command_id reduction_initializer_cid = 0;
{
std::unordered_set<command_id> optimal_reduction_initializer_cids;
for(auto rid : tsk->get_reductions()) {
for(auto rid : tsk.get_reductions()) {
auto reduction = reduction_mngr.get_reduction(rid);
if(reduction.initialize_from_buffer) {
if(optimal_reduction_initializer_cids.empty()) {
Expand Down Expand Up @@ -343,14 +342,14 @@ namespace detail {

ecmd->set_is_reduction_initializer(cid == reduction_initializer_cid);

auto requirements = get_buffer_requirements_for_mapped_access(tsk, ecmd->get_execution_range(), tsk->get_global_size());
auto requirements = get_buffer_requirements_for_mapped_access(tsk, ecmd->get_execution_range(), tsk.get_global_size());

// Any reduction that includes the value previously found in the buffer (i.e. the absence of sycl::property::reduction::initialize_to_identity)
// must read that original value in the eventual reduction_command generated by a future buffer requirement. Since whenever a buffer is used as
// a reduction output, we replace its state with a pending_reduction_state, that original value would be lost. To avoid duplicating the buffer,
// we simply include it in the pre-reduced state of a single execution_command.
std::unordered_map<buffer_id, reduction_id> buffer_reduction_map;
for(auto rid : tsk->get_reductions()) {
for(auto rid : tsk.get_reductions()) {
auto reduction = reduction_mngr.get_reduction(rid);

auto rmode = cl::sycl::access::mode::discard_write;
Expand Down Expand Up @@ -526,7 +525,7 @@ namespace detail {
// If there is only one chunk/command, it already implicitly generates the final reduced value and the buffer does not need to be flagged as
// a pending reduction.
if(task_commands.size() > 1) {
for(auto rid : tsk->get_reductions()) {
for(auto rid : tsk.get_reductions()) {
auto bid = reduction_mngr.get_reduction(rid).output_buffer_id;
buffer_states.at(bid) = pending_reduction_state{rid, {}};
}
Expand Down Expand Up @@ -584,14 +583,14 @@ namespace detail {
}
}

void graph_generator::process_task_side_effect_requirements(const task_id tid) {
const auto tsk = task_mngr.get_task(tid);
if(tsk->get_side_effect_map().empty()) return; // skip the loop in the common case
void graph_generator::process_task_side_effect_requirements(const task& tsk) {
const task_id tid = tsk.get_id();
if(tsk.get_side_effect_map().empty()) return; // skip the loop in the common case

for(const auto cmd : cdag.task_commands(tid)) {
auto& nd = node_data.at(cmd->get_nid());

for(const auto& side_effect : tsk->get_side_effect_map()) {
for(const auto& side_effect : tsk.get_side_effect_map()) {
const auto [hoid, order] = side_effect;
if(const auto last_effect = nd.host_object_last_effects.find(hoid); last_effect != nd.host_object_last_effects.end()) {
// TODO once we have different side_effect_orders, their interaction will determine the dependency kind
Expand Down
4 changes: 2 additions & 2 deletions src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,11 @@ namespace detail {
exec = std::make_unique<executor>(local_nid, *h_queue, *d_queue, *task_mngr, *buffer_mngr, *reduction_mngr);
if(is_master_node()) {
cdag = std::make_unique<command_graph>();
ggen = std::make_shared<graph_generator>(num_nodes, *task_mngr, *reduction_mngr, *cdag);
ggen = std::make_shared<graph_generator>(num_nodes, *reduction_mngr, *cdag);
gsrlzr = std::make_unique<graph_serializer>(
*cdag, [this](node_id target, unique_frame_ptr<command_frame> frame) { flush_command(target, std::move(frame)); });
schdlr = std::make_unique<scheduler>(*ggen, *gsrlzr, num_nodes);
task_mngr->register_task_callback([this](task_id tid) { schdlr->notify_task_created(tid); });
task_mngr->register_task_callback([this](const task* tsk) { schdlr->notify_task_created(tsk); });
}

CELERITY_INFO(
Expand Down
Loading

0 comments on commit 9a6a200

Please sign in to comment.