diff --git a/tests/unit/streams.cc b/tests/unit/streams.cc index bef3a2837..108c8610b 100644 --- a/tests/unit/streams.cc +++ b/tests/unit/streams.cc @@ -60,6 +60,7 @@ TEST_CASE("streams", "[streams][core]") { CHECK(b == VALUE); reduce_ops++; }, SLICE); + make_graph_executable(op); ttg::ttg_fence(ttg::default_execution_context()); if (ttg::default_execution_context().rank() == 0) { @@ -67,7 +68,7 @@ TEST_CASE("streams", "[streams][core]") { op->invoke(i, VALUE); } } - auto &sink_op_real = *sink_op; + ttg::ttg_fence(ttg::default_execution_context()); CHECK(reduce_ops == N); } diff --git a/ttg/CMakeLists.txt b/ttg/CMakeLists.txt index 67277a83a..12f1df749 100644 --- a/ttg/CMakeLists.txt +++ b/ttg/CMakeLists.txt @@ -10,7 +10,6 @@ configure_file( set(ttg-util-headers ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/bug.h - ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/cublas_helper.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/demangle.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/diagnose.h ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/dot.h @@ -80,6 +79,7 @@ set(ttg-sources ${ttg-impl-headers} ${ttg-base-headers} ${ttg-util-headers} + ${ttg_device_headers} ${ttg-external-headers} ${CMAKE_CURRENT_SOURCE_DIR}/ttg/device/cublas_helper.cpp ${CMAKE_CURRENT_SOURCE_DIR}/ttg/util/backtrace.cpp @@ -120,7 +120,7 @@ if (TARGET CUDA::cudart) list(APPEND ttg-deps CUDA::cudart) endif (TARGET CUDA::cudart) -set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers}) +set(ttg-public-headers ${ttg-headers};${ttg-impl-headers};${ttg-base-headers};${ttg-util-headers};${ttg_device_headers}) if (NOT TTG_IGNORE_BUNDLED_EXTERNALS) list(APPEND ttg-sources ${ttg-external-headers}) list(APPEND ttg-public-headers ${ttg-external-headers}) diff --git a/ttg/ttg/parsec/task.h b/ttg/ttg/parsec/task.h index c1791d0ad..c5edc148f 100644 --- a/ttg/ttg/parsec/task.h +++ b/ttg/ttg/parsec/task.h @@ -184,7 +184,7 @@ namespace ttg_parsec { : (num_streams+1); TT* tt; key_type key; - size_goal_t stream[num_streams] = {}; + stream_info_t streams[num_streams] = {}; #ifdef TTG_HAS_COROUTINE void* suspended_task_address = nullptr; // if not null the function is suspended #endif @@ -239,7 +239,7 @@ namespace ttg_parsec { struct parsec_ttg_task_t : public parsec_ttg_task_base_t { static constexpr size_t num_streams = TT::numins; TT* tt; - size_goal_t stream[num_streams] = {}; + stream_info_t streams[num_streams] = {}; #ifdef TTG_HAS_COROUTINE void* suspended_task_address = nullptr; // if not null the function is suspended #endif @@ -282,6 +282,36 @@ namespace ttg_parsec { }; + /** + * Reducer task representing one or more stream reductions. + * A reducer task may be deferred on its first input (the object into which + * all other inputs are folded). Once that input becomes available the task + * is submitted and reduces all available inputs. Additional reducer tasks may + * be submitted until all required inputs have been processed. + */ + struct reducer_task_t : public parsec_ttg_task_base_t { + parsec_ttg_task_base_t *parent_task; + bool is_first; + + reducer_task_t(parsec_ttg_task_base_t* task, parsec_thread_mempool_t *mempool, + parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, + int32_t priority, bool is_first) + : parsec_ttg_task_base_t(mempool, task_class, taskpool, priority, + 0, nullptr, nullptr, + &release_task, + true /* deferred until other readers have completed */) + , parent_task(task) + , is_first(is_first) + { } + + static void release_task(parsec_ttg_task_base_t* task_base) { + /* reducer tasks have one mutable input so the task can be submitted on the first release */ + parsec_task_t *vp_task_rings[1] = { &task_base->parsec_task }; + parsec_execution_stream_t *es = parsec_my_execution_stream(); + __parsec_schedule_vp(es, vp_task_rings, 0); + } + }; + } // namespace detail } // namespace ttg_parsec diff --git a/ttg/ttg/parsec/ttg.h b/ttg/ttg/parsec/ttg.h index c718951ad..d29e3c110 100644 --- a/ttg/ttg/parsec/ttg.h +++ b/ttg/ttg/parsec/ttg.h @@ -1550,93 +1550,90 @@ namespace ttg_parsec { return PARSEC_HOOK_RETURN_DONE; } - struct reducer_task_t { - parsec_task_t parsec_task; - task_t *task; - - reducer_task_t(task_t* task, parsec_thread_mempool_t *mempool, - parsec_task_class_t *task_class, parsec_taskpool_t *taskpool, - int32_t priority) - : task(task) - { - PARSEC_LIST_ITEM_SINGLETON(&parsec_task.super); - parsec_task.mempool_owner = mempool; - parsec_task.task_class = task_class; - parsec_task.status = PARSEC_TASK_STATUS_HOOK; - parsec_task.taskpool = taskpool; - parsec_task.priority = priority; - parsec_task.chore_mask = 1<<0; - } - }; - template static parsec_hook_return_t static_reducer_op(parsec_execution_stream_s *es, parsec_task_t *parsec_task) { - using rtask_t = reducer_task_t; + using rtask_t = detail::reducer_task_t; using value_t = std::tuple_element_t; constexpr const bool val_is_void = ttg::meta::is_void_v; rtask_t *rtask = (rtask_t*)parsec_task; - task_t *task = rtask->task; - ttT *baseobj = task->tt; + task_t *parent_task = static_cast(rtask->parent_task); + ttT *baseobj = parent_task->tt; derivedT *obj = static_cast(baseobj); auto& reducer = std::get(baseobj->input_reducers); - assert(parsec_ttg_caller == NULL); - parsec_ttg_caller = static_cast(task); - if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": reducer executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": reducer executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : reducer executing"); } /* the copy to reduce into */ detail::ttg_data_copy_t *target_copy; - target_copy = static_cast(task->parsec_task.data[i].data_in); + target_copy = static_cast(parent_task->parsec_task.data[i].data_in); assert(val_is_void || nullptr != target_copy); /* once we hit 0 we have to stop since another thread might enqueue a new reduction task */ std::size_t c = 0; std::size_t size = 0; - assert(task->streams[i].reduce_count > 0); + assert(parent_task->streams[i].reduce_count > 0); + if (rtask->is_first) { + if (0 == (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) { + /* we were the first and there is nothing to be done */ + if (obj->tracing()) { + if constexpr (!ttg::meta::is_void_v) + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": first reducer empty"); + else + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : first reducer empty"); + } + + return PARSEC_HOOK_RETURN_DONE; + } + } + + assert(parsec_ttg_caller == NULL); + detail::parsec_ttg_caller = rtask->parent_task; + do { if constexpr(!val_is_void) { /* the copies to reduce out of */ detail::ttg_data_copy_t *source_copy; parsec_list_item_t *item; - item = parsec_lifo_pop(&task->streams[i].reduce_copies); + item = parsec_lifo_pop(&parent_task->streams[i].reduce_copies); if (nullptr == item) { // maybe someone is changing the goal right now break; } source_copy = ((detail::ttg_data_copy_self_t *)(item))->self; - reducer(*reinterpret_cast *>(target_copy->device_private), - *reinterpret_cast *>(source_copy->device_private)); + reducer(*reinterpret_cast *>(target_copy->get_ptr()), + *reinterpret_cast *>(source_copy->get_ptr())); detail::release_data_copy(source_copy); } else if constexpr(val_is_void) { reducer(); // invoke control reducer } // there is only one task working on this stream, so no need to be atomic here - size = ++task->streams[i].size; - //std::cout << "static_reducer_op key " << task->key << " size " << size << " of " << task->streams[i].goal << std::endl; - } while ((c = (task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); + size = ++parent_task->streams[i].size; + //std::cout << "static_reducer_op size " << size << " of " << parent_task->streams[i].goal << std::endl; + } while ((c = (parent_task->streams[i].reduce_count.fetch_sub(1, std::memory_order_acq_rel)-1)) > 0); //} while ((c = (--task->streams[i].reduce_count)) > 0); /* finalize_argstream sets goal to 1, so size may be larger than goal */ - bool complete = (size >= task->streams[i].goal); - + bool complete = (size >= parent_task->streams[i].goal); + //std::cout << "static_reducer_op size " << size + // << " of " << parent_task->streams[i].goal << " complete " << complete + // << " c " << c << std::endl; if (complete && c == 0) { /* task is still in the hash table, have release_task remove it */ - task->remove_from_hash = true; - task->release_task(task); + parent_task->remove_from_hash = true; + parent_task->release_task(parent_task); } parsec_ttg_caller = NULL; if (obj->tracing()) { if constexpr (!ttg::meta::is_void_v) - ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", task->key, ": done executing"); + ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : ", parent_task->key, ": done executing"); else ttg::trace(obj->get_world().rank(), ":", obj->get_name(), " : done executing"); } @@ -2028,12 +2025,12 @@ namespace ttg_parsec { template - reducer_task_t *create_new_reducer_task(task_t *task) { + detail::reducer_task_t *create_new_reducer_task(task_t *task, bool is_first) { /* make sure we can reuse the existing memory pool and don't have to create a new one */ - static_assert(sizeof(task_t) >= sizeof(reducer_task_t)); + static_assert(sizeof(task_t) >= sizeof(detail::reducer_task_t)); constexpr const bool keyT_is_Void = ttg::meta::is_void_v; auto &world_impl = world.impl(); - reducer_task_t *newtask; + detail::reducer_task_t *newtask; parsec_thread_mempool_t *mempool = get_task_mempool(); char *taskobj = (char *)parsec_thread_mempool_allocate(mempool); // use the priority of the task we stream into @@ -2046,8 +2043,8 @@ namespace ttg_parsec { ttg::trace(world.rank(), ":", get_name(), ": creating reducer task"); } /* placement-new the task */ - newtask = new (taskobj) reducer_task_t(task, mempool, inpute_reducers_taskclass[i], - world_impl.taskpool(), priority); + newtask = new (taskobj) detail::reducer_task_t(task, mempool, inpute_reducers_taskclass[i], + world_impl.taskpool(), priority, is_first); return newtask; } @@ -2138,30 +2135,30 @@ namespace ttg_parsec { #endif } - /* transfer ownership changes: readers, coherency state, ownership - * version of CPU copy must be changed when used inside a CPU task - * PTG increases version: complete_execution (if W) - * PTG calls transfer_ownership at beginning of CPU task (hook) if RW access - * DSL never modifies GPU data copy - * Pushout happens at the end of GPU task if successor can be CPU? - */ - - //parsec_data_transfer_ownership_to_copy(data, 0, PARSEC_FLOW_ACCESS_RW); + auto get_copy_fn = [&](detail::parsec_ttg_task_base_t *task, auto&& value, bool is_const){ + detail::ttg_data_copy_t *copy = copy_in; + if (nullptr != detail::parsec_ttg_caller) { + copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); + } + if (nullptr != copy) { + /* retain the data copy */ + copy = detail::register_data_copy(copy, task, true); + } else { + /* create a new copy */ + copy = detail::create_new_datacopy(std::forward(value)); + } + return copy; + }; - if (reducer) { // is this a streaming input? reduce the received value - auto submit_reducer_task = [&](auto *task){ + if (reducer && 1 != task->streams[i].goal) { // is this a streaming input? reduce the received value + auto submit_reducer_task = [&](auto *parent_task){ /* check if we need to create a task */ - std::size_t c = task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); - //std::size_t c = task->streams[i].reduce_count++; + std::size_t c = parent_task->streams[i].reduce_count.fetch_add(1, std::memory_order_release); if (0 == c) { - //std::cout << "submit reducer task for task " << task->key - // << " size " << task->streams[i].size << " of " << task->streams[i].goal << std::endl; /* we are responsible for creating the reduction task */ - reducer_task_t *reduce_task; - reduce_task = create_new_reducer_task(task); - parsec_task_t *vp_task_rings[1] = { &reduce_task->parsec_task }; - parsec_execution_stream_t *es = world_impl.execution_stream(); - __parsec_schedule_vp(es, vp_task_rings, 0); + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(parent_task, false); + reduce_task->release_task(reduce_task); // release immediately } }; @@ -2170,29 +2167,34 @@ namespace ttg_parsec { detail::ttg_data_copy_t *copy = nullptr; if (nullptr == (copy = task->copies[i])) { using decay_valueT = std::decay_t; - /* For now, we always create a copy because we cannot rely on the task_release - * mechanism (it would release the task, not the reduction value). */ - task->parsec_task.data[i].data_in = detail::create_new_datacopy(std::forward(value)); - task->streams[i].size++; - if (task->streams[i].size == task->streams[i].goal) { - release = true; + + /* first input value, create a task and bind it to the copy */ + detail::reducer_task_t *reduce_task; + reduce_task = create_new_reducer_task(task, true); + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(reduce_task, std::forward(value), false); + + /* put the copy into the task */ + task->parsec_task.data[i].data_in = copy; + + /* protected by the bucket lock */ + task->streams[i].size = 1; + task->streams[i].reduce_count.store(1, std::memory_order_relaxed); + + if (copy->get_next_task() != &reduce_task->parsec_task) { + reduce_task->release_task(reduce_task); } + /* now we can unlock the bucket */ parsec_hash_table_unlock_bucket(&tasks_table, hk); } else { - detail::ttg_data_copy_t *copy = nullptr; /* unlock the bucket, the lock is not needed anymore */ parsec_hash_table_unlock_bucket(&tasks_table, hk); - if (nullptr != parsec_ttg_caller) { - copy = detail::find_copy_in_task(parsec_ttg_caller, &value); - } - if (nullptr != copy) { - /* retain the data copy */ - copy = detail::register_data_copy(copy, task, true); - } else { - /* create a new copy */ - copy = detail::create_new_datacopy(std::forward(value)); - } + + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), true); + /* enqueue the data copy to be reduced */ parsec_lifo_push(&task->streams[i].reduce_copies, ©->super); submit_reducer_task(task); @@ -2216,17 +2218,8 @@ namespace ttg_parsec { throw std::logic_error("bad set arg"); } - detail::ttg_data_copy_t *copy = copy_in; - if (nullptr == copy_in && nullptr != detail::parsec_ttg_caller) { - copy = detail::find_copy_in_task(detail::parsec_ttg_caller, &value); - } - - if (nullptr != copy) { - /* register_data_copy might provide us with a different copy if !input_is_const */ - copy = detail::register_data_copy(copy, task, input_is_const); - } else { - copy = detail::create_new_datacopy(std::forward(value)); - } + /* get the copy to use as input for this task */ + detail::ttg_data_copy_t *copy = get_copy_fn(task, std::forward(value), input_is_const); /* if we registered as a writer and were the first to register with this copy * we need to defer the release of this task to give other tasks a chance to diff --git a/ttg/ttg/parsec/ttg_data_copy.h b/ttg/ttg/parsec/ttg_data_copy.h index 98d952777..9aa06ae27 100644 --- a/ttg/ttg/parsec/ttg_data_copy.h +++ b/ttg/ttg/parsec/ttg_data_copy.h @@ -34,11 +34,14 @@ namespace ttg_parsec { * because ttg_data_copy_t has virtual functions so we cannot cast from parsec_data_copy_t * to ttg_data_copy_t (offsetof is not supported for virtual classes). * The self pointer is a back-pointer to the ttg_data_copy_t. */ - struct ttg_data_copy_self_t : public parsec_data_copy_t { + struct ttg_data_copy_self_t { + parsec_list_element_t super; ttg_data_copy_t *self; ttg_data_copy_self_t(ttg_data_copy_t* dc) : self(dc) - { } + { + PARSEC_OBJ_CONSTRUCT(&super); + } }; /* Non-owning copy-tracking wrapper, accounting for N readers or 1 writer. @@ -57,6 +60,7 @@ namespace ttg_parsec { { } ttg_data_copy_t(const ttg_data_copy_t& c) + : ttg_data_copy_self_t(this) { /* we allow copying but do not copy any data over from the original * device copies will have to be allocated again