Skip to content

Commit

Permalink
PaRSEC: Allow reducer tasks to be deferred to avoid copying stream in…
Browse files Browse the repository at this point in the history
…puts

In the PaRSEC backend, reducer tasks will be treated as any other
task and be deferred if there are readers on their first input.
All other inputs are read-only and will be handled that way.
This avoids extra copies if there are no other writers on these copies.

Signed-off-by: Joseph Schuchart <schuchart@icl.utk.edu>
  • Loading branch information
devreal committed Jul 7, 2023
1 parent 717167a commit 97b3a20
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 100 deletions.
3 changes: 2 additions & 1 deletion tests/unit/streams.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,15 @@ TEST_CASE("streams", "[streams][core]") {
CHECK(b == VALUE);
reduce_ops++;
}, SLICE);

make_graph_executable(op);
ttg::ttg_fence(ttg::default_execution_context());
if (ttg::default_execution_context().rank() == 0) {
for (std::size_t i = 0; i < N; ++i) {
op->invoke(i, VALUE);
}
}
auto &sink_op_real = *sink_op;

ttg::ttg_fence(ttg::default_execution_context());
CHECK(reduce_ops == N);
}
Expand Down
4 changes: 2 additions & 2 deletions ttg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ configure_file(
set(ttg-util-headers
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/bug.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/cublas_helper.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/demangle.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/diagnose.h
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/dot.h
Expand Down Expand Up @@ -80,6 +79,7 @@ set(ttg-sources
${ttg-impl-headers}
${ttg-base-headers}
${ttg-util-headers}
${ttg_device_headers}
${ttg-external-headers}
${CMAKE_CURRENT_SOURCE_DIR}/ttg/device/cublas_helper.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.cpp
Expand Down Expand Up @@ -120,7 +120,7 @@ if (TARGET CUDA::cudart)
list(APPEND ttg-deps CUDA::cudart)
endif (TARGET CUDA::cudart)

set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers})
set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers};${ttg_device_headers})
if (NOT TTG_IGNORE_BUNDLED_EXTERNALS)
list(APPEND ttg-sources ${ttg-external-headers})
list(APPEND ttg-public-headers ${ttg-external-headers})
Expand Down
34 changes: 32 additions & 2 deletions ttg/ttg/parsec/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ namespace ttg_parsec {
: (num_streams+1);
TT* tt;
key_type key;
size_goal_t stream[num_streams] = {};
stream_info_t streams[num_streams] = {};
#ifdef TTG_HAS_COROUTINE
void* suspended_task_address = nullptr; // if not null the function is suspended
#endif
Expand Down Expand Up @@ -239,7 +239,7 @@ namespace ttg_parsec {
struct parsec_ttg_task_t<TT, true> : public parsec_ttg_task_base_t {
static constexpr size_t num_streams = TT::numins;
TT* tt;
size_goal_t stream[num_streams] = {};
stream_info_t streams[num_streams] = {};
#ifdef TTG_HAS_COROUTINE
void* suspended_task_address = nullptr; // if not null the function is suspended
#endif
Expand Down Expand Up @@ -282,6 +282,36 @@ namespace ttg_parsec {
};


/**
* Reducer task representing one or more stream reductions.
* A reducer task may be deferred on its first input (the object into which
* all other inputs are folded). Once that input becomes available the task
* is submitted and reduces all available inputs. Additional reducer tasks may
* be submitted until all required inputs have been processed.
*/
struct reducer_task_t : public parsec_ttg_task_base_t {
parsec_ttg_task_base_t *parent_task;
bool is_first;

reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool,
parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
int32_t priority, bool is_first)
: parsec_ttg_task_base_t(mempool, task_class, taskpool, priority,
0, nullptr, nullptr,
&release_task,
true /* deferred until other readers have completed */)
, parent_task(task)
, is_first(is_first)
{ }

static void release_task(parsec_ttg_task_base_t* task_base) {
/* reducer tasks have one mutable input so the task can be submitted on the first release */
parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task };
parsec_execution_stream_t *es = parsec_my_execution_stream();
__parsec_schedule_vp(es, vp_task_rings, 0);
}
};

} // namespace detail

} // namespace ttg_parsec
Expand Down
179 changes: 86 additions & 93 deletions ttg/ttg/parsec/ttg.h
Original file line number Diff line number Diff line change
Expand Up @@ -1550,93 +1550,90 @@ namespace ttg_parsec {
return PARSEC_HOOK_RETURN_DONE;
}

struct reducer_task_t {
parsec_task_t parsec_task;
task_t *task;

reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool,
parsec_task_class_t *task_class, parsec_taskpool_t *taskpool,
int32_t priority)
: task(task)
{
PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super);
parsec_task.mempool_owner = mempool;
parsec_task.task_class = task_class;
parsec_task.status = PARSEC_TASK_STATUS_HOOK;
parsec_task.taskpool = taskpool;
parsec_task.priority = priority;
parsec_task.chore_mask = 1<<0;
}
};

template <std::size_t i>
static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) {
using rtask_t = reducer_task_t;
using rtask_t = detail::reducer_task_t;
using value_t = std::tuple_element_t<i, actual_input_tuple_type>;
constexpr const bool val_is_void = ttg::meta::is_void_v<value_t>;
rtask_t *rtask = (rtask_t*)parsec_task;
task_t *task = rtask->task;
ttT *baseobj = task->tt;
task_t *parent_task = static_cast<task_t*>(rtask->parent_task);
ttT *baseobj = parent_task->tt;
derivedT *obj = static_cast<derivedT *>(baseobj);

auto& reducer = std::get<i>(baseobj->input_reducers);

assert(parsec_ttg_caller == NULL);
parsec_ttg_caller = static_cast<detail::parsec_ttg_task_base_t*>(task);

if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing");
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing");
}

/* the copy to reduce into */
detail::ttg_data_copy_t *target_copy;
target_copy = static_cast<detail::ttg_data_copy_t *>(task->parsec_task.data[i].data_in);
target_copy = static_cast<detail::ttg_data_copy_t *>(parent_task->parsec_task.data[i].data_in);
assert(val_is_void || nullptr != target_copy);
/* once we hit 0 we have to stop since another thread might enqueue a new reduction task */
std::size_t c = 0;
std::size_t size = 0;
assert(task->streams[i].reduce_count > 0);
assert(parent_task->streams[i].reduce_count > 0);
if (rtask->is_first) {
if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) {
/* we were the first and there is nothing to be done */
if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty");
}

return PARSEC_HOOK_RETURN_DONE;
}
}

assert(parsec_ttg_caller == NULL);
detail::parsec_ttg_caller = rtask->parent_task;

do {
if constexpr(!val_is_void) {
/* the copies to reduce out of */
detail::ttg_data_copy_t *source_copy;
parsec_list_item_t *item;
item = parsec_lifo_pop(&task->streams[i].reduce_copies);
item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies);
if (nullptr == item) {
// maybe someone is changing the goal right now
break;
}
source_copy = ((detail::ttg_data_copy_self_t *)(item))->self;
reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->device_private),
*reinterpret_cast<std::decay_t<value_t> *>(source_copy->device_private));
reducer(*reinterpret_cast<std::decay_t<value_t> *>(target_copy->get_ptr()),
*reinterpret_cast<std::decay_t<value_t> *>(source_copy->get_ptr()));
detail::release_data_copy(source_copy);
} else if constexpr(val_is_void) {
reducer(); // invoke control reducer
}
// there is only one task working on this stream, so no need to be atomic here
size = ++task->streams[i].size;
//std::cout << "static_reducer_op key " << task->key << " size " << size << " of " << task->streams[i].goal << std::endl;
} while ((c = (task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
size = ++parent_task->streams[i].size;
//std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl;
} while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0);
//} while ((c = (--task->streams[i].reduce_count)) > 0);

/* finalize_argstream sets goal to 1, so size may be larger than goal */
bool complete = (size >= task->streams[i].goal);

bool complete = (size >= parent_task->streams[i].goal);

//std::cout << "static_reducer_op size " << size
// << " of " << parent_task->streams[i].goal << " complete " << complete
// << " c " << c << std::endl;
if (complete && c == 0) {
/* task is still in the hash table, have release_task remove it */
task->remove_from_hash = true;
task->release_task(task);
parent_task->remove_from_hash = true;
parent_task->release_task(parent_task);
}

parsec_ttg_caller = NULL;
detail::parsec_ttg_caller = NULL;

if (obj->tracing()) {
if constexpr (!ttg::meta::is_void_v<keyT>)
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing");
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing");
else
ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing");
}
Expand Down Expand Up @@ -2028,12 +2025,12 @@ namespace ttg_parsec {


template <std::size_t i>
reducer_task_t *create_new_reducer_task(task_t *task) {
detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) {
/* make sure we can reuse the existing memory pool and don't have to create a new one */
static_assert(sizeof(task_t) >= sizeof(reducer_task_t));
static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t));
constexpr const bool keyT_is_Void = ttg::meta::is_void_v<keyT>;
auto &world_impl = world.impl();
reducer_task_t *newtask;
detail::reducer_task_t *newtask;
parsec_thread_mempool_t *mempool = get_task_mempool();
char *taskobj = (char *)parsec_thread_mempool_allocate(mempool);
// use the priority of the task we stream into
Expand All @@ -2046,8 +2043,8 @@ namespace ttg_parsec {
ttg::trace(world.rank(), ":", get_name(), ": creating reducer task");
}
/* placement-new the task */
newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
world_impl.taskpool(), priority);
newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i],
world_impl.taskpool(), priority, is_first);

return newtask;
}
Expand Down Expand Up @@ -2138,30 +2135,30 @@ namespace ttg_parsec {
#endif
}

/* transfer ownership changes: readers, coherency state, ownership
* version of CPU copy must be changed when used inside a CPU task
* PTG increases version: complete_execution (if W)
* PTG calls transfer_ownership at beginning of CPU task (hook) if RW access
* DSL never modifies GPU data copy
* Pushout happens at the end of GPU task if successor can be CPU?
*/

//parsec_data_transfer_ownership_to_copy(data, 0, PARSEC_FLOW_ACCESS_RW);
auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){
detail::ttg_data_copy_t *copy = copy_in;
if (nullptr != detail::parsec_ttg_caller) {
copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
}
if (nullptr != copy) {
/* retain the data copy */
copy = detail::register_data_copy<valueT>(copy, task, true);
} else {
/* create a new copy */
copy = detail::create_new_datacopy(std::forward<Value>(value));
}
return copy;
};

if (reducer) { // is this a streaming input? reduce the received value
auto submit_reducer_task = [&](auto *task){
if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value
auto submit_reducer_task = [&](auto *parent_task){
/* check if we need to create a task */
std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release);
//std::size_t c = task->streams[i].reduce_count++;
std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release);
if (0 == c) {
//std::cout << "submit reducer task for task " << task->key
// << " size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl;
/* we are responsible for creating the reduction task */
reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(task);
parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task };
parsec_execution_stream_t *es = world_impl.execution_stream();
__parsec_schedule_vp(es, vp_task_rings, 0);
detail::reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(parent_task, false);
reduce_task->release_task(reduce_task); // release immediately
}
};

Expand All @@ -2170,29 +2167,34 @@ namespace ttg_parsec {
detail::ttg_data_copy_t *copy = nullptr;
if (nullptr == (copy = task->copies[i])) {
using decay_valueT = std::decay_t<valueT>;
/* For now, we always create a copy because we cannot rely on the task_release
* mechanism (it would release the task, not the reduction value). */
task->parsec_task.data[i].data_in = detail::create_new_datacopy(std::forward<Value>(value));
task->streams[i].size++;
if (task->streams[i].size == task->streams[i].goal) {
release = true;

/* first input value, create a task and bind it to the copy */
detail::reducer_task_t *reduce_task;
reduce_task = create_new_reducer_task<i>(task, true);

/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward<Value>(value), false);

/* put the copy into the task */
task->parsec_task.data[i].data_in = copy;

/* protected by the bucket lock */
task->streams[i].size = 1;
task->streams[i].reduce_count.store(1, std::memory_order_relaxed);

if (copy->get_next_task() != &reduce_task->parsec_task) {
reduce_task->release_task(reduce_task);
}

/* now we can unlock the bucket */
parsec_hash_table_unlock_bucket(&tasks_table, hk);
} else {
detail::ttg_data_copy_t *copy = nullptr;
/* unlock the bucket, the lock is not needed anymore */
parsec_hash_table_unlock_bucket(&tasks_table, hk);
if (nullptr != parsec_ttg_caller) {
copy = detail::find_copy_in_task(parsec_ttg_caller, &value);
}
if (nullptr != copy) {
/* retain the data copy */
copy = detail::register_data_copy<valueT>(copy, task, true);
} else {
/* create a new copy */
copy = detail::create_new_datacopy(std::forward<Value>(value));
}

/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), true);

/* enqueue the data copy to be reduced */
parsec_lifo_push(&task->streams[i].reduce_copies, &copy->super);
submit_reducer_task(task);
Expand All @@ -2216,17 +2218,8 @@ namespace ttg_parsec {
throw std::logic_error("bad set arg");
}

detail::ttg_data_copy_t *copy = copy_in;
if (nullptr == copy_in && nullptr != detail::parsec_ttg_caller) {
copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value);
}

if (nullptr != copy) {
/* register_data_copy might provide us with a different copy if !input_is_const */
copy = detail::register_data_copy<valueT>(copy, task, input_is_const);
} else {
copy = detail::create_new_datacopy(std::forward<Value>(value));
}
/* get the copy to use as input for this task */
detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward<Value>(value), input_is_const);

/* if we registered as a writer and were the first to register with this copy
* we need to defer the release of this task to give other tasks a chance to
Expand Down
Loading

0 comments on commit 97b3a20

Please sign in to comment.