From ae73b3e1bcfcadeb81fdc8de849411b2a73cd1b8 Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 6 Jul 2023 19:30:23 -0400 Subject: [PATCH] PaRSEC: Allow reducer tasks to be deferred to avoid copying stream inputs In the PaRSEC backend, reducer tasks will be treated as any other task and be deferred if there are readers on their first input. All other inputs are read-only and will be handled that way. This avoids extra copies if there are no other writers on these copies. Signed-off-by: Joseph Schuchart --- tests/unit/streams.cc | 3 +- ttg/ttg/parsec/ttg.h | 173 ++++++++++++++++++++---------------------- 2 files changed, 85 insertions(+), 91 deletions(-) diff --git a/tests/unit/streams.cc b/tests/unit/streams.cc index bef3a2837..108c8610b 100644 --- a/tests/unit/streams.cc +++ b/tests/unit/streams.cc @@ -60,6 +60,7 @@ TEST_CASE("streams", "[streams][core]") { CHECK(b == VALUE); reduce_ops++; }, SLICE); + make_graph_executable(op); ttg::ttg_fence(ttg::default_execution_context()); if (ttg::default_execution_context().rank() == 0) { @@ -67,7 +68,7 @@ TEST_CASE("streams", "[streams][core]") { op->invoke(i, VALUE); } } - auto &sink_op_real = *sink_op; + ttg::ttg_fence(ttg::default_execution_context()); CHECK(reduce_ops == N); } diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index c718951ad..e3b4edc65 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1550,61 +1550,56 @@ namespace ttg_parsec { return PARSEC_HOOK_RETURN_DONE; } - struct reducer_task_t { - parsec_task_t parsec_task; - task_t *task; - - reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool, - parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, - int32_t priority) - : task(task) - { - PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); - parsec_task.mempool_owner = mempool; - parsec_task.task_class = task_class; - parsec_task.status = PARSEC_TASK_STATUS_HOOK; - parsec_task.taskpool = taskpool; - parsec_task.priority = priority; - parsec_task.chore_mask = 1<<0; - } - }; - template static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) { - using rtask_t = reducer_task_t; + using rtask_t = detail::reducer_task_t; using value_t = std::tuple_element_t; constexpr const bool val_is_void = ttg::meta::is_void_v; rtask_t *rtask = (rtask_t*)parsec_task; - task_t *task = rtask->task; - ttT *baseobj = task->tt; + task_t *parent_task = static_cast(rtask->parent_task); + ttT *baseobj = parent_task->tt; derivedT *obj = static_cast(baseobj); auto& reducer = std::get(baseobj->input_reducers); - assert(parsec_ttg_caller == NULL); - parsec_ttg_caller = static_cast(task); - if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing"); } /* the copy to reduce into */ detail::ttg_data_copy_t *target_copy; - target_copy = static_cast(task->parsec_task.data[i].data_in); + target_copy = static_cast(parent_task->parsec_task.data[i].data_in); assert(val_is_void || nullptr != target_copy); /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */ std::size_t c = 0; std::size_t size = 0; - assert(task->streams[i].reduce_count > 0); + assert(parent_task->streams[i].reduce_count > 0); + if (rtask->is_first) { + if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) { + /* we were the first and there is nothing to be done */ + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + } + + assert(parsec_ttg_caller == NULL); + parsec_ttg_caller = rtask->parent_task; + do { if constexpr(!val_is_void) { /* the copies to reduce out of */ detail::ttg_data_copy_t *source_copy; parsec_list_item_t *item; - item = parsec_lifo_pop(&task->streams[i].reduce_copies); + item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies); if (nullptr == item) { // maybe someone is changing the goal right now break; @@ -1617,26 +1612,28 @@ namespace ttg_parsec { reducer(); // invoke control reducer } // there is only one task working on this stream, so no need to be atomic here - size = ++task->streams[i].size; - //std::cout << "static_reducer_op key " << task->key << " size " << size << " of " << task->streams[i].goal << std::endl; - } while ((c = (task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); + size = ++parent_task->streams[i].size; + //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl; + } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); //} while ((c = (--task->streams[i].reduce_count)) > 0); /* finalize_argstream sets goal to 1, so size may be larger than goal */ - bool complete = (size >= task->streams[i].goal); - + bool complete = (size >= parent_task->streams[i].goal); + //std::cout << "static_reducer_op size " << size + // << " of " << parent_task->streams[i].goal << " complete " << complete + // << " c " << c << std::endl; if (complete && c == 0) { /* task is still in the hash table, have release_task remove it */ - task->remove_from_hash = true; - task->release_task(task); + parent_task->remove_from_hash = true; + parent_task->release_task(parent_task); } parsec_ttg_caller = NULL; if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); } @@ -2028,12 +2025,12 @@ namespace ttg_parsec { template - reducer_task_t *create_new_reducer_task(task_t *task) { + detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) { /* make sure we can reuse the existing memory pool and don't have to create a new one */ - static_assert(sizeof(task_t) >= sizeof(reducer_task_t)); + static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t)); constexpr const bool keyT_is_Void = ttg::meta::is_void_v; auto &world_impl = world.impl(); - reducer_task_t *newtask; + detail::reducer_task_t *newtask; parsec_thread_mempool_t *mempool = get_task_mempool(); char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); // use the priority of the task we stream into @@ -2046,8 +2043,8 @@ namespace ttg_parsec { ttg::trace(world.rank(), ":", get_name(), ": creating reducer task"); } /* placement-new the task */ - newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i], - world_impl.taskpool(), priority); + newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i], + world_impl.taskpool(), priority, is_first); return newtask; } @@ -2138,30 +2135,30 @@ namespace ttg_parsec { #endif } - /* transfer ownership changes: readers, coherency state, ownership - * version of CPU copy must be changed when used inside a CPU task - * PTG increases version: complete_execution (if W) - * PTG calls transfer_ownership at beginning of CPU task (hook) if RW access - * DSL never modifies GPU data copy - * Pushout happens at the end of GPU task if successor can be CPU? - */ - - //parsec_data_transfer_ownership_to_copy(data, 0, PARSEC_FLOW_ACCESS_RW); + auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){ + detail::ttg_data_copy_t *copy = copy_in; + if (nullptr != parsec_ttg_caller) { + copy = detail::find_copy_in_task(parsec_ttg_caller, &value); + } + if (nullptr != copy) { + /* retain the data copy */ + copy = detail::register_data_copy(copy, task, true); + } else { + /* create a new copy */ + copy = detail::create_new_datacopy(std::forward(value)); + } + return copy; + }; - if (reducer) { // is this a streaming input? reduce the received value - auto submit_reducer_task = [&](auto *task){ + if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value + auto submit_reducer_task = [&](auto *parent_task){ /* check if we need to create a task */ - std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); - //std::size_t c = task->streams[i].reduce_count++; + std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); if (0 == c) { - //std::cout << "submit reducer task for task " << task->key - // << " size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; /* we are responsible for creating the reduction task */ - reducer_task_t *reduce_task; - reduce_task = create_new_reducer_task(task); - parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task }; - parsec_execution_stream_t *es = world_impl.execution_stream(); - __parsec_schedule_vp(es, vp_task_rings, 0); + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(parent_task, false); + reduce_task->release_task(reduce_task); // release immediately } }; @@ -2170,29 +2167,34 @@ namespace ttg_parsec { detail::ttg_data_copy_t *copy = nullptr; if (nullptr == (copy = task->copies[i])) { using decay_valueT = std::decay_t; - /* For now, we always create a copy because we cannot rely on the task_release - * mechanism (it would release the task, not the reduction value). */ - task->parsec_task.data[i].data_in = detail::create_new_datacopy(std::forward(value)); - task->streams[i].size++; - if (task->streams[i].size == task->streams[i].goal) { - release = true; + + /* first input value, create a task and bind it to the copy */ + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(task, true); + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward(value), false); + + /* put the copy into the task */ + task->parsec_task.data[i].data_in = copy; + + /* protected by the bucket lock */ + task->streams[i].size = 1; + task->streams[i].reduce_count.store(1, std::memory_order_relaxed); + + if (copy->push_task != &reduce_task->parsec_task) { + reduce_task->release_task(reduce_task); } + /* now we can unlock the bucket */ parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { - detail::ttg_data_copy_t *copy = nullptr; /* unlock the bucket, the lock is not needed anymore */ parsec_hash_table_unlock_bucket(&tasks_table, hk); - if (nullptr != parsec_ttg_caller) { - copy = detail::find_copy_in_task(parsec_ttg_caller, &value); - } - if (nullptr != copy) { - /* retain the data copy */ - copy = detail::register_data_copy(copy, task, true); - } else { - /* create a new copy */ - copy = detail::create_new_datacopy(std::forward(value)); - } + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), true); + /* enqueue the data copy to be reduced */ parsec_lifo_push(&task->streams[i].reduce_copies, ©->super); submit_reducer_task(task); @@ -2216,17 +2218,8 @@ namespace ttg_parsec { throw std::logic_error("bad set arg"); } - detail::ttg_data_copy_t *copy = copy_in; - if (nullptr == copy_in && nullptr != detail::parsec_ttg_caller) { - copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); - } - - if (nullptr != copy) { - /* register_data_copy might provide us with a different copy if !input_is_const */ - copy = detail::register_data_copy(copy, task, input_is_const); - } else { - copy = detail::create_new_datacopy(std::forward(value)); - } + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), input_is_const); /* if we registered as a writer and were the first to register with this copy * we need to defer the release of this task to give other tasks a chance to