From d0429ca59d802875e88a0ba9cbf66bee446505ed Mon Sep 17 00:00:00 2001 From: Joseph Schuchart Date: Thu, 6 Jul 2023 19:30:23 -0400 Subject: [PATCH] PaRSEC: Allow reducer tasks to be deferred to avoid copying stream inputs In the PaRSEC backend, reducer tasks will be treated as any other task and be deferred if there are readers on their first input. All other inputs are read-only and will be handled that way. This avoids extra copies if there are no other writers on these copies. Signed-off-by: Joseph Schuchart --- tests/unit/streams.cc | 3 +- ttg/CMakeLists.txt | 4 +- ttg/ttg/parsec/task.h | 34 ++++++- ttg/ttg/parsec/ttg.h | 177 ++++++++++++++++----------------- ttg/ttg/parsec/ttg_data_copy.h | 8 +- 5 files changed, 127 insertions(+), 99 deletions(-) diff --git a/tests/unit/streams.cc b/tests/unit/streams.cc index bef3a2837..108c8610b 100644 --- a/tests/unit/streams.cc +++ b/tests/unit/streams.cc @@ -60,6 +60,7 @@ TEST_CASE("streams", "[streams][core]") { CHECK(b == VALUE); reduce_ops++; }, SLICE); + make_graph_executable(op); ttg::ttg_fence(ttg::default_execution_context()); if (ttg::default_execution_context().rank() == 0) { @@ -67,7 +68,7 @@ TEST_CASE("streams", "[streams][core]") { op->invoke(i, VALUE); } } - auto &sink_op_real = *sink_op; + ttg::ttg_fence(ttg::default_execution_context()); CHECK(reduce_ops == N); } diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 67277a83a..12f1df749 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -10,7 +10,6 @@ configure_file( set(ttg-util-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/bug.h - ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/cublas_helper.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/demangle.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/diagnose.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/dot.h @@ -80,6 +79,7 @@ set(ttg-sources ${ttg-impl-headers} ${ttg-base-headers} ${ttg-util-headers} + ${ttg_device_headers} ${ttg-external-headers} ${CMAKE_CURRENT_SOURCE_DIR}/ttg/device/cublas_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.cpp @@ -120,7 +120,7 @@ if (TARGET CUDA::cudart) list(APPEND ttg-deps CUDA::cudart) endif (TARGET CUDA::cudart) -set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers}) +set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers};${ttg_device_headers}) if (NOT TTG_IGNORE_BUNDLED_EXTERNALS) list(APPEND ttg-sources ${ttg-external-headers}) list(APPEND ttg-public-headers ${ttg-external-headers}) diff --git a/ttg/ttg/parsec/task.h b/ttg/ttg/parsec/task.h index c1791d0ad..c5edc148f 100644 --- a/ttg/ttg/parsec/task.h +++ b/ttg/ttg/parsec/task.h @@ -184,7 +184,7 @@ namespace ttg_parsec { : (num_streams+1); TT* tt; key_type key; - size_goal_t stream[num_streams] = {}; + stream_info_t streams[num_streams] = {}; #ifdef TTG_HAS_COROUTINE void* suspended_task_address = nullptr; // if not null the function is suspended #endif @@ -239,7 +239,7 @@ namespace ttg_parsec { struct parsec_ttg_task_t : public parsec_ttg_task_base_t { static constexpr size_t num_streams = TT::numins; TT* tt; - size_goal_t stream[num_streams] = {}; + stream_info_t streams[num_streams] = {}; #ifdef TTG_HAS_COROUTINE void* suspended_task_address = nullptr; // if not null the function is suspended #endif @@ -282,6 +282,36 @@ namespace ttg_parsec { }; + /** + * Reducer task representing one or more stream reductions. + * A reducer task may be deferred on its first input (the object into which + * all other inputs are folded). Once that input becomes available the task + * is submitted and reduces all available inputs. Additional reducer tasks may + * be submitted until all required inputs have been processed. + */ + struct reducer_task_t : public parsec_ttg_task_base_t { + parsec_ttg_task_base_t *parent_task; + bool is_first; + + reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool, + parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, + int32_t priority, bool is_first) + : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, + 0, nullptr, nullptr, + &release_task, + true /* deferred until other readers have completed */) + , parent_task(task) + , is_first(is_first) + { } + + static void release_task(parsec_ttg_task_base_t* task_base) { + /* reducer tasks have one mutable input so the task can be submitted on the first release */ + parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task }; + parsec_execution_stream_t *es = parsec_my_execution_stream(); + __parsec_schedule_vp(es, vp_task_rings, 0); + } + }; + } // namespace detail } // namespace ttg_parsec diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index c718951ad..d29e3c110 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1550,93 +1550,90 @@ namespace ttg_parsec { return PARSEC_HOOK_RETURN_DONE; } - struct reducer_task_t { - parsec_task_t parsec_task; - task_t *task; - - reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool, - parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, - int32_t priority) - : task(task) - { - PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); - parsec_task.mempool_owner = mempool; - parsec_task.task_class = task_class; - parsec_task.status = PARSEC_TASK_STATUS_HOOK; - parsec_task.taskpool = taskpool; - parsec_task.priority = priority; - parsec_task.chore_mask = 1<<0; - } - }; - template static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) { - using rtask_t = reducer_task_t; + using rtask_t = detail::reducer_task_t; using value_t = std::tuple_element_t; constexpr const bool val_is_void = ttg::meta::is_void_v; rtask_t *rtask = (rtask_t*)parsec_task; - task_t *task = rtask->task; - ttT *baseobj = task->tt; + task_t *parent_task = static_cast(rtask->parent_task); + ttT *baseobj = parent_task->tt; derivedT *obj = static_cast(baseobj); auto& reducer = std::get(baseobj->input_reducers); - assert(parsec_ttg_caller == NULL); - parsec_ttg_caller = static_cast(task); - if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing"); } /* the copy to reduce into */ detail::ttg_data_copy_t *target_copy; - target_copy = static_cast(task->parsec_task.data[i].data_in); + target_copy = static_cast(parent_task->parsec_task.data[i].data_in); assert(val_is_void || nullptr != target_copy); /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */ std::size_t c = 0; std::size_t size = 0; - assert(task->streams[i].reduce_count > 0); + assert(parent_task->streams[i].reduce_count > 0); + if (rtask->is_first) { + if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) { + /* we were the first and there is nothing to be done */ + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + } + + assert(parsec_ttg_caller == NULL); + detail::parsec_ttg_caller = rtask->parent_task; + do { if constexpr(!val_is_void) { /* the copies to reduce out of */ detail::ttg_data_copy_t *source_copy; parsec_list_item_t *item; - item = parsec_lifo_pop(&task->streams[i].reduce_copies); + item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies); if (nullptr == item) { // maybe someone is changing the goal right now break; } source_copy = ((detail::ttg_data_copy_self_t *)(item))->self; - reducer(*reinterpret_cast *>(target_copy->device_private), - *reinterpret_cast *>(source_copy->device_private)); + reducer(*reinterpret_cast *>(target_copy->get_ptr()), + *reinterpret_cast *>(source_copy->get_ptr())); detail::release_data_copy(source_copy); } else if constexpr(val_is_void) { reducer(); // invoke control reducer } // there is only one task working on this stream, so no need to be atomic here - size = ++task->streams[i].size; - //std::cout << "static_reducer_op key " << task->key << " size " << size << " of " << task->streams[i].goal << std::endl; - } while ((c = (task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); + size = ++parent_task->streams[i].size; + //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl; + } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); //} while ((c = (--task->streams[i].reduce_count)) > 0); /* finalize_argstream sets goal to 1, so size may be larger than goal */ - bool complete = (size >= task->streams[i].goal); - + bool complete = (size >= parent_task->streams[i].goal); + //std::cout << "static_reducer_op size " << size + // << " of " << parent_task->streams[i].goal << " complete " << complete + // << " c " << c << std::endl; if (complete && c == 0) { /* task is still in the hash table, have release_task remove it */ - task->remove_from_hash = true; - task->release_task(task); + parent_task->remove_from_hash = true; + parent_task->release_task(parent_task); } parsec_ttg_caller = NULL; if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); } @@ -2028,12 +2025,12 @@ namespace ttg_parsec { template - reducer_task_t *create_new_reducer_task(task_t *task) { + detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) { /* make sure we can reuse the existing memory pool and don't have to create a new one */ - static_assert(sizeof(task_t) >= sizeof(reducer_task_t)); + static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t)); constexpr const bool keyT_is_Void = ttg::meta::is_void_v; auto &world_impl = world.impl(); - reducer_task_t *newtask; + detail::reducer_task_t *newtask; parsec_thread_mempool_t *mempool = get_task_mempool(); char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); // use the priority of the task we stream into @@ -2046,8 +2043,8 @@ namespace ttg_parsec { ttg::trace(world.rank(), ":", get_name(), ": creating reducer task"); } /* placement-new the task */ - newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i], - world_impl.taskpool(), priority); + newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i], + world_impl.taskpool(), priority, is_first); return newtask; } @@ -2138,30 +2135,30 @@ namespace ttg_parsec { #endif } - /* transfer ownership changes: readers, coherency state, ownership - * version of CPU copy must be changed when used inside a CPU task - * PTG increases version: complete_execution (if W) - * PTG calls transfer_ownership at beginning of CPU task (hook) if RW access - * DSL never modifies GPU data copy - * Pushout happens at the end of GPU task if successor can be CPU? - */ - - //parsec_data_transfer_ownership_to_copy(data, 0, PARSEC_FLOW_ACCESS_RW); + auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){ + detail::ttg_data_copy_t *copy = copy_in; + if (nullptr != detail::parsec_ttg_caller) { + copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); + } + if (nullptr != copy) { + /* retain the data copy */ + copy = detail::register_data_copy(copy, task, true); + } else { + /* create a new copy */ + copy = detail::create_new_datacopy(std::forward(value)); + } + return copy; + }; - if (reducer) { // is this a streaming input? reduce the received value - auto submit_reducer_task = [&](auto *task){ + if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value + auto submit_reducer_task = [&](auto *parent_task){ /* check if we need to create a task */ - std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); - //std::size_t c = task->streams[i].reduce_count++; + std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); if (0 == c) { - //std::cout << "submit reducer task for task " << task->key - // << " size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; /* we are responsible for creating the reduction task */ - reducer_task_t *reduce_task; - reduce_task = create_new_reducer_task(task); - parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task }; - parsec_execution_stream_t *es = world_impl.execution_stream(); - __parsec_schedule_vp(es, vp_task_rings, 0); + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(parent_task, false); + reduce_task->release_task(reduce_task); // release immediately } }; @@ -2170,29 +2167,34 @@ namespace ttg_parsec { detail::ttg_data_copy_t *copy = nullptr; if (nullptr == (copy = task->copies[i])) { using decay_valueT = std::decay_t; - /* For now, we always create a copy because we cannot rely on the task_release - * mechanism (it would release the task, not the reduction value). */ - task->parsec_task.data[i].data_in = detail::create_new_datacopy(std::forward(value)); - task->streams[i].size++; - if (task->streams[i].size == task->streams[i].goal) { - release = true; + + /* first input value, create a task and bind it to the copy */ + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(task, true); + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward(value), false); + + /* put the copy into the task */ + task->parsec_task.data[i].data_in = copy; + + /* protected by the bucket lock */ + task->streams[i].size = 1; + task->streams[i].reduce_count.store(1, std::memory_order_relaxed); + + if (copy->get_next_task() != &reduce_task->parsec_task) { + reduce_task->release_task(reduce_task); } + /* now we can unlock the bucket */ parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { - detail::ttg_data_copy_t *copy = nullptr; /* unlock the bucket, the lock is not needed anymore */ parsec_hash_table_unlock_bucket(&tasks_table, hk); - if (nullptr != parsec_ttg_caller) { - copy = detail::find_copy_in_task(parsec_ttg_caller, &value); - } - if (nullptr != copy) { - /* retain the data copy */ - copy = detail::register_data_copy(copy, task, true); - } else { - /* create a new copy */ - copy = detail::create_new_datacopy(std::forward(value)); - } + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), true); + /* enqueue the data copy to be reduced */ parsec_lifo_push(&task->streams[i].reduce_copies, ©->super); submit_reducer_task(task); @@ -2216,17 +2218,8 @@ namespace ttg_parsec { throw std::logic_error("bad set arg"); } - detail::ttg_data_copy_t *copy = copy_in; - if (nullptr == copy_in && nullptr != detail::parsec_ttg_caller) { - copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); - } - - if (nullptr != copy) { - /* register_data_copy might provide us with a different copy if !input_is_const */ - copy = detail::register_data_copy(copy, task, input_is_const); - } else { - copy = detail::create_new_datacopy(std::forward(value)); - } + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), input_is_const); /* if we registered as a writer and were the first to register with this copy * we need to defer the release of this task to give other tasks a chance to diff --git a/ttg/ttg/parsec/ttg_data_copy.h b/ttg/ttg/parsec/ttg_data_copy.h index 98d952777..9aa06ae27 100644 --- a/ttg/ttg/parsec/ttg_data_copy.h +++ b/ttg/ttg/parsec/ttg_data_copy.h @@ -34,11 +34,14 @@ namespace ttg_parsec { * because ttg_data_copy_t has virtual functions so we cannot cast from parsec_data_copy_t * to ttg_data_copy_t (offsetof is not supported for virtual classes). * The self pointer is a back-pointer to the ttg_data_copy_t. */ - struct ttg_data_copy_self_t : public parsec_data_copy_t { + struct ttg_data_copy_self_t { + parsec_list_element_t super; ttg_data_copy_t *self; ttg_data_copy_self_t(ttg_data_copy_t* dc) : self(dc) - { } + { + PARSEC_OBJ_CONSTRUCT(&super); + } }; /* Non-owning copy-tracking wrapper, accounting for N readers or 1 writer. @@ -57,6 +60,7 @@ namespace ttg_parsec { { } ttg_data_copy_t(const ttg_data_copy_t& c) + : ttg_data_copy_self_t(this) { /* we allow copying but do not copy any data over from the original * device copies will have to be allocated again