Skip to content

Commit

Permalink
PaRSEC: Allow reducer tasks to be deferred to avoid copying stream in…
Browse files Browse the repository at this point in the history
…puts

In the PaRSEC backend, reducer tasks will be treated as any other
task and be deferred if there are readers on their first input.
All other inputs are read-only and will be handled that way.
This avoids extra copies if there are no other writers on these copies.

Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
  • Loading branch information
devreal committed Jul 7, 2023
1 parent 717167a commit ae73b3e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 91 deletions.
3 changes: 2 additions & 1 deletion tests/unit/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ TEST_CASE("streams", "[streams][core]") {
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::ttg_fence(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}
auto &sink_op_real = *sink_op;

ttg::ttg_fence(ttg::default_execution_context());
CHECK(reduce_ops == N);
}
Expand Down
173 changes: 83 additions & 90 deletions ttg/ttg/parsec/ttg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1550,61 +1550,56 @@ namespace ttg_parsec {
return PARSEC_HOOK_RETURN_DONE;
}

struct reducer_task_t {
parsec_task_t parsec_task;
task_t *task;

reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool,
parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
int32_t priority)
: task(task)
{
PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
parsec_task.mempool_owner = mempool;
parsec_task.task_class = task_class;
parsec_task.status = PARSEC_TASK_STATUS_HOOK;
parsec_task.taskpool = taskpool;
parsec_task.priority = priority;
parsec_task.chore_mask = 1<<0;
}
};

template <std::size_t i>
static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
using rtask_t = reducer_task_t;
using rtask_t = detail::reducer_task_t;
using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
rtask_t *rtask = (rtask_t*)parsec_task;
task_t *task = rtask->task;
ttT *baseobj = task->tt;
task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
ttT *baseobj = parent_task->tt;
derivedT *obj = static_cast<derivedT *>(baseobj);

auto& reducer = std::get<i>(baseobj->input_reducers);

assert(parsec_ttg_caller == NULL);
parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);

if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing");
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
}

/* the copy to reduce into */
detail::ttg_data_copy_t *target_copy;
target_copy = static_cast<detail::ttg_data_copy_t *>(task->parsec_task.data[i].data_in);
target_copy = static_cast<detail::ttg_data_copy_t *>(parent_task->parsec_task.data[i].data_in);
assert(val_is_void || nullptr != target_copy);
/* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
std::size_t c = 0;
std::size_t size = 0;
assert(task->streams[i].reduce_count > 0);
assert(parent_task->streams[i].reduce_count > 0);
if (rtask->is_first) {
if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
/* we were the first and there is nothing to be done */
if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
}

return PARSEC_HOOK_RETURN_DONE;
}
}

assert(parsec_ttg_caller == NULL);
parsec_ttg_caller = rtask->parent_task;

do {
if constexpr(!val_is_void) {
/* the copies to reduce out of */
detail::ttg_data_copy_t *source_copy;
parsec_list_item_t *item;
item = parsec_lifo_pop(&task->streams[i].reduce_copies);
item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
if (nullptr == item) {
// maybe someone is changing the goal right now
break;
Expand All @@ -1617,26 +1612,28 @@ namespace ttg_parsec {
reducer(); // invoke control reducer
}
// there is only one task working on this stream, so no need to be atomic here
size = ++task->streams[i].size;
//std::cout << "static_reducer_op key " << task->key << " size " << size << " of " << task->streams[i].goal << std::endl;
} while ((c = (task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
size = ++parent_task->streams[i].size;
//std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
} while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
//} while ((c = (--task->streams[i].reduce_count)) > 0);

/* finalize_argstream sets goal to 1, so size may be larger than goal */
bool complete = (size >= task->streams[i].goal);

bool complete = (size >= parent_task->streams[i].goal);

//std::cout << "static_reducer_op size " << size
// << " of " << parent_task->streams[i].goal << " complete " << complete
// << " c " << c << std::endl;
if (complete && c == 0) {
/* task is still in the hash table, have release_task remove it */
task->remove_from_hash = true;
task->release_task(task);
parent_task->remove_from_hash = true;
parent_task->release_task(parent_task);
}

parsec_ttg_caller = NULL;

if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
}
Expand Down Expand Up @@ -2028,12 +2025,12 @@ namespace ttg_parsec {


template <std::size_t i>
reducer_task_t *create_new_reducer_task(task_t *task) {
detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) {
/* make sure we can reuse the existing memory pool and don't have to create a new one */
static_assert(sizeof(task_t) >= sizeof(reducer_task_t));
static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
auto &world_impl = world.impl();
reducer_task_t *newtask;
detail::reducer_task_t *newtask;
parsec_thread_mempool_t *mempool = get_task_mempool();
char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
// use the priority of the task we stream into
Expand All @@ -2046,8 +2043,8 @@ namespace ttg_parsec {
ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
}
/* placement-new the task */
newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
world_impl.taskpool(), priority);
newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
world_impl.taskpool(), priority, is_first);

return newtask;
}
Expand Down Expand Up @@ -2138,30 +2135,30 @@ namespace ttg_parsec {
#endif
}

/* transfer ownership changes: readers, coherency state, ownership
* version of CPU copy must be changed when used inside a CPU task
* PTG increases version: complete_execution (if W)
* PTG calls transfer_ownership at beginning of CPU task (hook) if RW access
* DSL never modifies GPU data copy
* Pushout happens at the end of GPU task if successor can be CPU?
*/

//parsec_data_transfer_ownership_to_copy(data, 0, PARSEC_FLOW_ACCESS_RW);
auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
detail::ttg_data_copy_t *copy = copy_in;
if (nullptr != parsec_ttg_caller) {
copy = detail::find_copy_in_task(parsec_ttg_caller, &value);
}
if (nullptr != copy) {
/* retain the data copy */
copy = detail::register_data_copy<valueT>(copy, task, true);
} else {
/* create a new copy */
copy = detail::create_new_datacopy(std::forward<Value>(value));
}
return copy;
};

if (reducer) { // is this a streaming input? reduce the received value
auto submit_reducer_task = [&](auto *task){
if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
auto submit_reducer_task = [&](auto *parent_task){
/* check if we need to create a task */
std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release);
//std::size_t c = task->streams[i].reduce_count++;
std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release);
if (0 == c) {
//std::cout << "submit reducer task for task " << task->key
// << " size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl;
/* we are responsible for creating the reduction task */
reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(task);
parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task };
parsec_execution_stream_t *es = world_impl.execution_stream();
__parsec_schedule_vp(es, vp_task_rings, 0);
detail::reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(parent_task, false);
reduce_task->release_task(reduce_task); // release immediately
}
};

Expand All @@ -2170,29 +2167,34 @@ namespace ttg_parsec {
detail::ttg_data_copy_t *copy = nullptr;
if (nullptr == (copy = task->copies[i])) {
using decay_valueT = std::decay_t<valueT>;
/* For now, we always create a copy because we cannot rely on the task_release
* mechanism (it would release the task, not the reduction value). */
task->parsec_task.data[i].data_in = detail::create_new_datacopy(std::forward<Value>(value));
task->streams[i].size++;
if (task->streams[i].size == task->streams[i].goal) {
release = true;

/* first input value, create a task and bind it to the copy */
detail::reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(task, true);

/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);

/* put the copy into the task */
task->parsec_task.data[i].data_in = copy;

/* protected by the bucket lock */
task->streams[i].size = 1;
task->streams[i].reduce_count.store(1, std::memory_order_relaxed);

if (copy->push_task != &reduce_task->parsec_task) {
reduce_task->release_task(reduce_task);
}

/* now we can unlock the bucket */
parsec_hash_table_unlock_bucket(&tasks_table, hk);
} else {
detail::ttg_data_copy_t *copy = nullptr;
/* unlock the bucket, the lock is not needed anymore */
parsec_hash_table_unlock_bucket(&tasks_table, hk);
if (nullptr != parsec_ttg_caller) {
copy = detail::find_copy_in_task(parsec_ttg_caller, &value);
}
if (nullptr != copy) {
/* retain the data copy */
copy = detail::register_data_copy<valueT>(copy, task, true);
} else {
/* create a new copy */
copy = detail::create_new_datacopy(std::forward<Value>(value));
}

/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);

/* enqueue the data copy to be reduced */
parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
submit_reducer_task(task);
Expand All @@ -2216,17 +2218,8 @@ namespace ttg_parsec {
throw std::logic_error("bad set arg");
}

detail::ttg_data_copy_t *copy = copy_in;
if (nullptr == copy_in && nullptr != detail::parsec_ttg_caller) {
copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
}

if (nullptr != copy) {
/* register_data_copy might provide us with a different copy if !input_is_const */
copy = detail::register_data_copy<valueT>(copy, task, input_is_const);
} else {
copy = detail::create_new_datacopy(std::forward<Value>(value));
}
/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);

/* if we registered as a writer and were the first to register with this copy
* we need to defer the release of this task to give other tasks a chance to
Expand Down

0 comments on commit ae73b3e

Please sign in to comment.