From 88a7c489fea8e3fa0f392669b12e86062b8f6310 Mon Sep 17 00:00:00 2001 From: "K. S. Ernest (iFire) Lee" Date: Sun, 23 Feb 2020 11:46:04 -0800 Subject: [PATCH] Queue attempt 2. --- core/os/SCsub | 3 + core/os/threaded_array_processor.h | 72 +--- thirdparty/FEMFXAsync/FEMFXAsyncThreading.h | 197 +++++++++ thirdparty/FEMFXAsync/FEMFXCommon.h | 134 ++++++ thirdparty/FEMFXAsync/FEMFXParallelFor.cpp | 261 ++++++++++++ thirdparty/FEMFXAsync/FEMFXParallelFor.h | 106 +++++ thirdparty/FEMFXAsync/FEMFXTaskGraph.h | 394 ++++++++++++++++++ .../FEMFXAsync/FEMFXTaskSystemInterface.h | 164 ++++++++ thirdparty/misc/wsq.hpp | 284 ------------- 9 files changed, 1282 insertions(+), 333 deletions(-) create mode 100644 thirdparty/FEMFXAsync/FEMFXAsyncThreading.h create mode 100644 thirdparty/FEMFXAsync/FEMFXCommon.h create mode 100644 thirdparty/FEMFXAsync/FEMFXParallelFor.cpp create mode 100644 thirdparty/FEMFXAsync/FEMFXParallelFor.h create mode 100644 thirdparty/FEMFXAsync/FEMFXTaskGraph.h create mode 100644 thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h delete mode 100644 thirdparty/misc/wsq.hpp diff --git a/core/os/SCsub b/core/os/SCsub index 1c5f954470dc..1877aec6dc0d 100644 --- a/core/os/SCsub +++ b/core/os/SCsub @@ -2,4 +2,7 @@ Import('env') +env.Prepend(CPPPATH=['#thirdparty/FEMFXAsync/Vectormath']) +env.Prepend(CPPPATH=['#thirdparty/FEMFXAsync']) + env.add_source_files(env.core_sources, "*.cpp") diff --git a/core/os/threaded_array_processor.h b/core/os/threaded_array_processor.h index de545976d519..055603247b0e 100644 --- a/core/os/threaded_array_processor.h +++ b/core/os/threaded_array_processor.h @@ -31,51 +31,39 @@ #ifndef THREADED_ARRAY_PROCESSOR_H #define THREADED_ARRAY_PROCESSOR_H +#include "core/os/memory.h" #include "core/os/mutex.h" #include "core/os/os.h" #include "core/os/thread.h" #include "core/os/thread_safe.h" #include "core/safe_refcount.h" -#include "thirdparty/misc/wsq.hpp" +#include "thirdparty/FEMFXAsync/FEMFXAsyncThreading.h" +#include "thirdparty/FEMFXAsync/FEMFXCommon.h" +#include "thirdparty/FEMFXAsync/FEMFXParallelFor.h" +#include "thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h" + +using namespace AMD; template struct ThreadArrayProcessData { - C *instance = nullptr; + uint32_t elements; + C *instance; U userdata; - void (C::*method)(uint32_t, U) = nullptr; - tf::WorkStealingQueue *queue = nullptr; - + void (C::*method)(uint32_t, U); void process(uint32_t p_index) { (instance->*method)(p_index, userdata); } }; -template -void process_array_thread(void *ud) { - - T &data = *(T *)ud; - while (!data.queue->empty()) { - std::optional work = data.queue->steal(); - if (work.has_value()) { - data.process(work.value()); - } - } -} - -template -void process_array_single(C *p_instance, M p_method, U p_userdata) { - - ThreadArrayProcessData data; - data.method = p_method; - data.instance = p_instance; - data.userdata = p_userdata; - while (!data.queue->empty()) { - std::optional work = data.queue->steal(); - if (work.has_value()) { - data.process(work.value()); - } +template +void FmTaskProcessArray(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) { + ThreadArrayProcessData *taskData = (ThreadArrayProcessData *) inTaskData; + uint startIdx, endIdx; + FmGetIndexRange(&startIdx, &endIdx, (uint)inTaskBeginIndex, OS::get_singleton()->get_processor_count(), taskData->elements); + for (uint i = startIdx; i < endIdx; i++) { + taskData->process(i); } -} +} template void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) { @@ -84,27 +72,13 @@ void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_us data.method = p_method; data.instance = p_instance; data.userdata = p_userdata; - data.queue = memnew(tf::WorkStealingQueue(next_power_of_2(p_elements))); - - for (uint32_t i = 0; i < p_elements; i++) { - data.queue->push(i); - } - + data.elements = p_elements; + int32_t num_threads = OS::get_singleton()->get_processor_count(); #ifndef NO_THREADS - Vector threads; - threads.resize(OS::get_singleton()->get_processor_count()); - - for (int i = 0; i < threads.size(); i++) { - threads.write[i] = Thread::create(process_array_thread >, &data); - } - - for (int i = 0; i < threads.size(); i++) { - Thread::wait_to_finish(threads[i]); - memdelete(threads[i]); - } -#else - process_array_single(p_elements, p_instance, p_method, p_userdata); + num_threads = 1; #endif + AMD::FmTaskSystemCallbacks callbacks; + AMD::FmParallelForAsync("thread_process_array", FmTaskProcessArray, FmTaskProcessArray, NULL, &data, p_elements, callbacks.SubmitAsyncTask, num_threads); } #endif // THREADED_ARRAY_PROCESSOR_H diff --git a/thirdparty/FEMFXAsync/FEMFXAsyncThreading.h b/thirdparty/FEMFXAsync/FEMFXAsyncThreading.h new file mode 100644 index 000000000000..d319d179a3ef --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXAsyncThreading.h @@ -0,0 +1,197 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +//--------------------------------------------------------------------------------------- +// Support for asynchronous approach for task scheduling, where dispatched tasks +// track progress and issue follow-up tasks after work complete. +//--------------------------------------------------------------------------------------- + +#pragma once + +#include "FEMFXTaskSystemInterface.h" +#include "stdint.h" +#include + +namespace AMD +{ + // Creates a task function and wrapper function to run task inside FmExecuteTask +#define FM_WRAPPED_TASK_FUNC(taskFunc) \ + void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex); \ + void taskFunc##Wrapped(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) { FmExecuteTask(taskFunc, inTaskData, inTaskBeginIndex, inTaskEndIndex); } \ + void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) + +#define FM_WRAPPED_TASK_DECLARATION(taskFunc) \ + void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex); \ + void taskFunc##Wrapped(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex); + +#define FM_TASK_AND_WRAPPED_TASK_ARGS(taskFunc) taskFunc, taskFunc##Wrapped + + struct FmTask + { + FmTaskFuncCallback func; + void* data; + int32_t beginIndex; + int32_t endIndex; + + FmTask() : func(NULL), data(NULL), beginIndex(0), endIndex(0) {} + }; + + // Async tasks need to be executed using this function, which starts a loop that will pick up subsequent tasks set by FmSetNextTask() + void FmExecuteTask(FmTaskFuncCallback taskFunc, void* taskData, int32_t taskBeginIndex, int32_t taskEndIndex); + + // Set next task for thread to execute in FmExecuteTask() loop + void FmSetNextTask(FmTaskFuncCallback taskFunc, void* taskData, int32_t taskBeginIndex, int32_t taskEndIndex); + void FmSetNextTask(const FmTask& inTask); + + // Holds an atomic count of incomplete tasks, and the follow-up task to execute when all tasks are complete. + class FmAsyncTasksProgress + { + std::atomic_uint32_t nextIndex; // Atomic index used to start processing work items in sorted order + std::atomic_uint32_t numTasksIncomplete; // Atomic count of remaining tasks to detect completion + FmTask followTask; + + public: + + FmAsyncTasksProgress() + { + nextIndex.store(0); + numTasksIncomplete.store(0); + followTask.func = NULL; + followTask.data = NULL; + followTask.beginIndex = 0; + followTask.endIndex = 1; + } + + void Init(uint32_t inNumTasksToRun, FmTaskFuncCallback inFollowTask, void* inFollowTaskData) + { + nextIndex.store(0); + numTasksIncomplete.store(inNumTasksToRun); + followTask.func = inFollowTask; + followTask.data = inFollowTaskData; + } + + void Init(uint32_t inNumTasksToRun, FmTaskFuncCallback inFollowTask, void* inFollowTaskData, int32_t inFollowTaskBeginIndex, int32_t inFollowTaskEndIndex) + { + nextIndex.store(0); + numTasksIncomplete.store(inNumTasksToRun); + followTask.func = inFollowTask; + followTask.data = inFollowTaskData; + followTask.beginIndex = inFollowTaskBeginIndex; + followTask.endIndex = inFollowTaskEndIndex; + } + + void ResetNextIndex() + { + nextIndex.store(0); + } + + uint32_t GetNextIndex() + { + return nextIndex++ - 1; + } + + // Increment number of incomplete tasks. + void TaskIsStarting() + { + nextIndex++; + } + + // Add to number of incomplete tasks. + void TasksAreStarting(uint32_t numTasks) + { + numTasksIncomplete += numTasks; + } + + // Decrement number of incomplete tasks. + // If this is last task, delete task data, and run follow task. + // This FmAsyncTasksProgress can belong to task data. + // + // NOTE: This sets the global next task function rather than making a call, so + // must be executed within a FmTaskFuncLoop() and no other async calls or + // waits can occur before returning to the loop. + template + bool TasksAreFinished(uint32_t numFinishedTasks, T* taskDataToDelete) + { +#if FM_ASYNC_THREADING + numTasksIncomplete -= numFinishedTasks; + uint32_t numTasks = numTasksIncomplete; + + if (numTasks == 0) + { + if (followTask.func) + { + FmSetNextTask(followTask); + } + + return true; + } + else + { + return false; + } +#else + uint32_t numTasks = FmAtomicSub(&numTasksIncomplete.val, numFinishedTasks); + + if (numTasks == 0) + { + return true; + } + else + { + return false; + } +#endif + } + + template + bool TaskIsFinished(T* taskDataToDelete) + { + return TasksAreFinished(1, taskDataToDelete); + } + + bool TaskIsFinished() + { + return TaskIsFinished(NULL); + } + + bool TasksAreFinished(uint32_t numTasks) + { + return TasksAreFinished(numTasks, NULL); + } + }; + + // Task data base class + class FmAsyncTaskData + { + public: + + FmAsyncTasksProgress progress; // Progress of a parallel operation plus a task to follow + FmTask followTask; // Task to call following scope of this task data + FmAsyncTaskData* parentTaskData; // Pointer to parent task data for nested parallel operations + + FmAsyncTaskData() : parentTaskData(NULL) { } + + virtual ~FmAsyncTaskData() {} + }; +} diff --git a/thirdparty/FEMFXAsync/FEMFXCommon.h b/thirdparty/FEMFXAsync/FEMFXCommon.h new file mode 100644 index 000000000000..11f903623f1a --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXCommon.h @@ -0,0 +1,134 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +//--------------------------------------------------------------------------------------- +// Common constants, compiler attributes, and configuration options for library +//--------------------------------------------------------------------------------------- + +#pragma once + +#include "stdint.h" +#include +#include + +// Library constants +#define FM_INVALID_ID 0xffffffff // Indicates uninitialized or invalid id +#define FM_RB_FLAG 0x80000000 // Bit set in object ids as flag for rigid body +#define FM_DEFAULT_MAX_CG_ITERATIONS 60 // Default initialized number for maximum CG iterations +#define FM_DEFAULT_FRICTION_COEFF 0.4f // Default initialized value for friction +#define FM_MAX_VERT_INCIDENT_TETS 64 // Maximum tets allowed to be incident on a vertex; bounds some array sizes +#define FM_MAX_TET_ASPECT_RATIO 25.0f // Maximum aspect ratio for warnings; lower than this is preferred; higher aspect ratios harm stability + +// Enable a few checks and warnings +#define FM_DEBUG_CHECKS 0 +#if FM_DEBUG_CHECKS +extern int FmDebugPrint(const char *format, ...); // Must be defined in application +#define FM_PRINT(x) FmDebugPrint x +int FmAssertBreak(); +#ifdef FM_ASSERT +#undef FM_ASSERT +#endif +#define FM_ASSERT(x) \ + if (!(x)) { \ + FM_PRINT(("FEMFX assert failed: " #x "\n")); \ + FmAssertBreak(); \ + } +#else +#define FM_PRINT(x) +#endif + +namespace AMD { +// TODO: more angle constraint types +enum FmRigidBodyAngleConstraintTypes { + FM_RB_JOINT_TYPE_HINGE +}; +} // namespace AMD + +// Save per phase and total step timings each step +#define FM_TIMINGS 0 + +#if FM_TIMINGS +namespace AMD { +extern double gFEMFXStartTime; +extern double gFEMFXMeshComponentsTime; +extern double gFEMFXImplicitStepTime; +extern double gFEMFXBroadPhaseTime; +extern double gFEMFXMeshContactsTime; +extern double gFEMFXConstraintIslandsTime; +extern double gFEMFXConstraintSolveTime; +extern double gFEMFXTotalStepTime; +} // namespace AMD +#endif + +// Code path options to configure library + +// Run a separate stabilization solve to correct constraint error. +#define FM_CONSTRAINT_STABILIZATION_SOLVE 1 + +// Option to solve for delta velocity in implicit solve +#define FM_SOLVE_DELTAV 0 + +// Option to assemble system matrix by iterating over tets to compute and add stiffness submats into matrix. +// Otherwise will iterate over verts, applying submats from incident tets list. +#define FM_MATRIX_ASSEMBLY_BY_TETS 1 + +// Compute a relative rotation for plastically deformed rest positions, in order to compute an "elastic-only" tet rotation for computing elastic stress. +// In practice this seems to have a small effect, so may be possible to disable for CPU and mem savings. +#define FM_COMPUTE_PLASTIC_REL_ROTATION 0 + +// Option to sort adjacent vertices by id in each row of implicit solve system matrix. +#define FM_SORT_MATRIX_ROW_VERTS 0 + +// Use structure-of-arrays style SIMD to run multiple triangle-pair intersections in parallel. +#define FM_SOA_TRI_INTERSECTION 1 + +// Use SoA implementations of CCD functions. +#define FM_SOA_TRI_CCD (1 && FM_SOA_TRI_INTERSECTION) + +// Use SoA implementation to compute tet matrices. +#define FM_SOA_TET_MATH 1 + +// Create distance contacts on intersection of tet mesh surfaces. +// Contacts are placed at intersecting triangles, and normals are derived from the volume contact gradients. +// This gives better quality contact for intersecting meshes, but with added cost. +// Using a limit of contacts per object pair (see FEMFXCollisionPairData.h) +#define FM_SURFACE_INTERSECTION_CONTACTS 1 + +// Use task dependency graph within a constraint island to allow better scheduling. +// This gives a significant benefit in some cases, but seems neutral in other cases. +// There is some overhead in constructing the graph, but it reduces barrier syncs and allows some of the +// solver tasks to be scheduled earlier reducing idle time. +#define FM_CONSTRAINT_ISLAND_DEPENDENCY_GRAPH 1 + +// Maintains a limited set of distance contacts for each tet mesh vertex based on time of impact or distance, +// and culls a contact if no vertex references it. +#define FM_CONTACT_REDUCTION 1 + +// Option to project friction to circle in PGS (technique described by Erwin Coumans) +// Otherwise friction forces are projected to a box, aligned to relative velocity on the contact plane. +#define FM_PROJECT_FRICTION_TO_CIRCLE 0 + +// Compute norms for convergence test (not yet supported) +// Reference: Eberlen, "Physics Based Animation" +#define FM_CONSTRAINT_SOLVER_CONVERGENCE_TEST 0 diff --git a/thirdparty/FEMFXAsync/FEMFXParallelFor.cpp b/thirdparty/FEMFXAsync/FEMFXParallelFor.cpp new file mode 100644 index 000000000000..d940767f61eb --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXParallelFor.cpp @@ -0,0 +1,261 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "FEMFXParallelFor.h" +#include "FEMFXAsyncThreading.h" + +namespace AMD +{ + // To control stack usage when using async tasks (especially if simulating with a single thread), + // instead of making tail calls to follow-up tasks, each thread will execute a loop on a global + // task function until it's NULL, and each task will overwrite the global task function with a + // follow-up task rather than calling it directly. + struct FmTaskFuncLoopData + { + FmTaskFuncCallback func; + void* data; + int32_t beginIndex; + int32_t endIndex; + + FmTaskFuncLoopData() : func(NULL), data(NULL), beginIndex(0), endIndex(1) {} + + void SetNextTask(FmTaskFuncCallback inFunc, void* inData) + { + func = inFunc; + data = inData; + beginIndex = 0; + endIndex = 1; + } + + void SetNextTask(FmTaskFuncCallback inFunc, void* inData, int32_t inBeginIndex, int32_t inEndIndex) + { + FM_ASSERT(func == NULL); // should have run and cleared this + func = inFunc; + data = inData; + beginIndex = inBeginIndex; + endIndex = inEndIndex; + } + }; + + FM_THREAD_LOCAL_STORAGE FmTaskFuncLoopData gFEMFXTaskFuncLoopData; + + // Loop while there is a non-NULL task function set in gFEMFXTaskFuncLoopData. + // Reduces use of stack with Async code. + // Instead of tail calls, task functions should call FmSetNextTask and return. + void FmExecuteTask(FmTaskFuncCallback inTaskFunc, void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) + { + gFEMFXTaskFuncLoopData.SetNextTask(inTaskFunc, inTaskData, inTaskBeginIndex, inTaskEndIndex); + + while (gFEMFXTaskFuncLoopData.func) + { + FmTaskFuncCallback func = gFEMFXTaskFuncLoopData.func; + void* data = gFEMFXTaskFuncLoopData.data; + int32_t beginIndex = gFEMFXTaskFuncLoopData.beginIndex; + int32_t endIndex = gFEMFXTaskFuncLoopData.endIndex; + + // Clear func to exit loop, unless called function adds new task + gFEMFXTaskFuncLoopData.func = NULL; + + func(data, beginIndex, endIndex); + } + } + + void FmSetNextTask(FmTaskFuncCallback inTaskFunc, void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) + { + gFEMFXTaskFuncLoopData.SetNextTask(inTaskFunc, inTaskData, inTaskBeginIndex, inTaskEndIndex); + } + + void FmSetNextTask(const FmTask& inTask) + { + gFEMFXTaskFuncLoopData.SetNextTask(inTask.func, inTask.data, inTask.beginIndex, inTask.endIndex); + } + + // Data needed for a thread to dispatch a portion of parallel-for tasks + class FmParallelForDispatcherData + { + public: + FM_CLASS_NEW_DELETE(FmParallelForDispatcherData) + + FmAtomicUint dispatcherIndexAtomic; + FmAtomicUint numDispatchersIncomplete; + + FmSubmitAsyncTaskCallback SubmitAsyncTask; + const char* taskName; + FmTaskFuncCallback TaskFunc; + FmTaskFuncCallback TaskFuncWrapped; + FmBatchingFuncCallback BatchingFunc; + void* taskData; + uint problemSize; + uint numDispatchers; + + FmParallelForDispatcherData( + FmSubmitAsyncTaskCallback inSubmitAsyncTask, + const char* inTaskName, + FmTaskFuncCallback inTaskFunc, + FmTaskFuncCallback inTaskFuncWrapped, + FmBatchingFuncCallback inBatchingFunc, + void* inTaskData, + uint inProblemSize, + uint inNumDispatchers) + { + SubmitAsyncTask = inSubmitAsyncTask; + taskName = inTaskName; + TaskFunc = inTaskFunc; + TaskFuncWrapped = inTaskFuncWrapped; + BatchingFunc = inBatchingFunc; + taskData = inTaskData; + problemSize = inProblemSize; + numDispatchers = inNumDispatchers; + FmAtomicWrite(&dispatcherIndexAtomic.val, 0); + FmAtomicWrite(&numDispatchersIncomplete.val, numDispatchers); + } + }; + + // Dispatch a portion of parallel-for tasks. + // TODO: Include batching of tasks based on weights, using the begin and end indices + FM_WRAPPED_TASK_FUNC(FmTaskFuncParallelForDispatcher) + { + (void)inTaskEndIndex; + + FmParallelForDispatcherData* dispatcherData = (FmParallelForDispatcherData *)inTaskData; + + FmBatchingFuncCallback BatchingFunc = dispatcherData->BatchingFunc; + + uint dispatcherIndex = (uint)inTaskBeginIndex; + dispatcherIndex = FmAtomicIncrement(&dispatcherData->dispatcherIndexAtomic.val) - 1; + + uint numDispatchers = dispatcherData->numDispatchers; + uint problemSize = dispatcherData->problemSize; + + // Compute range of parallel-for indices this dispatcher covers + uint beginIndex, endIndex; + FmGetIndexRangeEvenDistribution(&beginIndex, &endIndex, dispatcherIndex, numDispatchers, problemSize); + + if (BatchingFunc) + { + // Batch indices based on batching function + void* taskData = dispatcherData->taskData; + + // Save first batch to run on this thread + uint firstNumItems = (uint)BatchingFunc(taskData, beginIndex, endIndex); + uint firstBeginIndex = beginIndex; + + beginIndex += firstNumItems; + + while (beginIndex < endIndex) + { + uint numItems = (uint)BatchingFunc(taskData, beginIndex, endIndex); + + // Submit task + dispatcherData->SubmitAsyncTask(dispatcherData->taskName, dispatcherData->TaskFuncWrapped, dispatcherData->taskData, beginIndex, beginIndex + numItems); + + beginIndex += numItems; + } + + if (firstNumItems > 0) + { + FmSetNextTask(dispatcherData->TaskFunc, dispatcherData->taskData, firstBeginIndex, firstBeginIndex + firstNumItems); + } + } + else + { + uint numTasks = endIndex - beginIndex; + +#define FM_STRIDED 1 +#if FM_STRIDED + // Experiment to improve ordering of tasks, however depends on task system; also should have no effect if using an atomic counter to ensure order. + beginIndex = dispatcherIndex; + uint stride = numDispatchers; +#endif + + // Run one task in-line but submit rest + for (uint i = 1; i < numTasks; i++) + { +#if FM_STRIDED + uint taskIndex = beginIndex + stride * i; +#else + uint taskIndex = beginIndex + i; +#endif + dispatcherData->SubmitAsyncTask(dispatcherData->taskName, dispatcherData->TaskFuncWrapped, dispatcherData->taskData, taskIndex, taskIndex + 1); + } + + if (numTasks >= 1) + { + FmSetNextTask(dispatcherData->TaskFunc, dispatcherData->taskData, beginIndex, beginIndex + 1); + } + } + + uint numIncomplete = FmAtomicDecrement(&dispatcherData->numDispatchersIncomplete.val); + + if (numIncomplete == 0) + { + delete dispatcherData; + } + } + + // Dispatch parallel-for tasks. + // NOTE: Calling code is responsible for continuing execution when taskCount == 0. + // TaskFuncWrapped is a function that calls TaskFunc using FmExecuteTask(). + // Any SubmitAsyncTask calls within this will use TaskFuncWrapped to ensure a loop is running which will catch FmSetNextTask(). + // If runLoop true, any processing of TaskFunc on this thread will also use TaskFuncWrapped. + // This is necessary if FmParallelForAsync is not called from FmExecuteTask(), or other FmParallelForAsync() calls may take place before returning to FmExecuteTask(). + void FmParallelForAsync(const char* taskName, + FmTaskFuncCallback TaskFunc, FmTaskFuncCallback TaskFuncWrapped, FmBatchingFuncCallback BatchingFunc, void* taskData, int32_t taskCount, + FmSubmitAsyncTaskCallback SubmitAsyncTask, uint numThreads, bool runLoop) + { + (void)taskName; + + if (taskCount <= 0) + { + return; + } + + // If taskCount under a threshold will just submit all on this thread. + // Otherwise, splits up the submitting work and spawns other threads to dispatch. + + const int32_t numSubmitsPerThread = 16; + + // Get number of dispatchers needed + int numDispatchers = FmGetNumTasks((uint)taskCount, numSubmitsPerThread); + numDispatchers = FmMinUint(numThreads * 8, numDispatchers); + + FmParallelForDispatcherData* dispatcherData = new FmParallelForDispatcherData(SubmitAsyncTask, taskName, TaskFunc, TaskFuncWrapped, BatchingFunc, taskData, (uint)taskCount, numDispatchers); + + // Submit other dispatchers + for (uint i = 1; i < dispatcherData->numDispatchers; i++) + { + dispatcherData->SubmitAsyncTask("FEMFXParallelForDispatcher", FmTaskFuncParallelForDispatcherWrapped, dispatcherData, i, i + 1); + } + + // Dispatch some tasks on this thread + if (runLoop) + { + FmTaskFuncParallelForDispatcherWrapped(dispatcherData, 0, 1); + } + else + { + FmSetNextTask(FmTaskFuncParallelForDispatcher, dispatcherData, 0, 1); + } + } +} diff --git a/thirdparty/FEMFXAsync/FEMFXParallelFor.h b/thirdparty/FEMFXAsync/FEMFXParallelFor.h new file mode 100644 index 000000000000..ffe1a88aaaf2 --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXParallelFor.h @@ -0,0 +1,106 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +//--------------------------------------------------------------------------------------- +// Asynchronous parallel-for call which executes an array of work items using specified +// task callback function, and optionally may collect work items into task using a +// batching callback function. Expects task callback to detect completion and +// submit follow-up task. +//--------------------------------------------------------------------------------------- + +#pragma once + +#include "FEMFXAsyncThreading.h" +#include "FEMFXTaskSystemInterface.h" + +namespace AMD { + +struct FmScene; + +// Prototype for a callback function to batch work items - returns a count of work items starting at taskBeginIndex +// If provided to FmParallelForAsync, workers will apply the batching function to adjust granularity of tasks submitted to the task system +typedef int32_t (*FmBatchingFuncCallback)(void *taskData, int32_t taskBeginIndex, int32_t taskEndIndex); + +// Dispatch parallel-for tasks. +// NOTE: Calling code is responsible for continuing execution when taskCount == 0. +// TaskFuncWrapped is a function that calls TaskFunc using FmExecuteTask(). +// Any SubmitAsyncTask calls within this will use TaskFuncWrapped to ensure a loop is running which will catch FmSetNextTask(). +// If runLoop true, any processing of TaskFunc on this thread will also use TaskFuncWrapped. +// This is necessary if FmParallelForAsync is not called from FmExecuteTask(), or other FmParallelForAsync() calls may take place before returning to FmExecuteTask(). +void FmParallelForAsync(const char *taskName, + FmTaskFuncCallback TaskFunc, FmTaskFuncCallback TaskFuncWrapped, FmBatchingFuncCallback BatchingFunc, void *taskData, int32_t taskCount, + FmSubmitAsyncTaskCallback SubmitAsyncTask, uint numThreads, bool runLoop = false); + +// Get number of tasks assuming maximum batch size per task +static _FORCE_INLINE_ uint FmGetNumTasks(uint problemSize, uint maxTaskBatchSize) { + return (problemSize + maxTaskBatchSize - 1) / maxTaskBatchSize; +} + +// Get number of tasks based on desired batch size per task, but limited to at most maxTasks +static _FORCE_INLINE_ uint FmGetNumTasksLimited(uint problemSize, uint taskBatchSize, uint maxTasks) { + uint numTasks = (problemSize + taskBatchSize - 1) / taskBatchSize; + numTasks = MIN(maxTasks, numTasks); + return numTasks; +} + +// Get problem index range for the specified task index, assuming FmGetNumTasks() tasks +static _FORCE_INLINE_ void FmGetIndexRange(uint *beginIndex, uint *endIndex, uint taskIndex, uint maxTaskBatchSize, uint problemSize) { + uint begin = taskIndex * maxTaskBatchSize; + uint end = begin + maxTaskBatchSize; + begin = MIN(begin, problemSize); + end = MIN(end, problemSize); + *beginIndex = begin; + *endIndex = end; +} + +// Get number of tasks for a minimum batch size per task, assuming remainder will be evenly distributed to all tasks. +static _FORCE_INLINE_ uint FmGetNumTasksMinBatchSize(uint problemSize, uint minTaskBatchSize) { + return MAX(problemSize / minTaskBatchSize, 1); +} + +// Get problem index range for the specified task index, assuming problem is distributed to tasks as evenly as possible. +// NOTE: output range may be zero-sized if problemSize < taskCount +static _FORCE_INLINE_ void FmGetIndexRangeEvenDistribution(uint *beginIndex, uint *endIndex, uint taskIndex, uint taskCount, uint problemSize) { + uint taskBatchSize = problemSize / taskCount; + uint remainderBatchSize = problemSize % taskCount; + + uint taskExtra = remainderBatchSize / taskCount; + uint remainder = remainderBatchSize % taskCount; + + taskBatchSize += taskExtra; + + uint begin, end; + if (taskIndex < remainder) { + taskBatchSize++; + begin = taskIndex * taskBatchSize; + } else { + begin = remainder * (taskBatchSize + 1) + (taskIndex - remainder) * taskBatchSize; + } + + end = begin + taskBatchSize; + + *beginIndex = begin; + *endIndex = end; +} +} // namespace AMD diff --git a/thirdparty/FEMFXAsync/FEMFXTaskGraph.h b/thirdparty/FEMFXAsync/FEMFXTaskGraph.h new file mode 100644 index 000000000000..6b74c1be4820 --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXTaskGraph.h @@ -0,0 +1,394 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +//--------------------------------------------------------------------------------------- +// A task graph that lets you specify a set of tasks and dependencies between them. +// Running task will message dependent nodes and nodes with all dependencies met are +// submitted to scheduler. Includes event type that allows tasks to dynamically +// message other nodes. +//--------------------------------------------------------------------------------------- + +#pragma once + +#include "FEMFXCommon.h" +#include "FEMFXArray.h" +#include "FEMFXAtomicOps.h" +#include "FEMFXTaskSystemInterface.h" +#include "FEMFXAsyncThreading.h" + +namespace AMD +{ + class FmTaskGraph; + + // Node in a task graph that contains a task function and links to successor nodes. + // The nodeFunc task is called with a pointer to this node. + // The nodeFunc task must call this node's TaskIsFinished() method on completion. + // Other custom data can be accessed by nodeFunc through the 'graph' pointer, if the graph is a derived class from FmTaskGraph. + class FmTaskGraphNode + { + FmAtomicUint numPredecessorsIncomplete; // num predecessors still incomplete + int32_t numPredecessors; // num predecessors that must be completed before executing this node + + const char* nodeName; + FmTaskFuncCallback nodeFunc; + FmTaskFuncCallback nodeFuncWrapped; // "Wrapped" is the task called within FmExecuteTask loop for tail call optimization + int32_t nodeIndex; + uint nodeWeight; // Weight which can be used to decide which node to run directly vs submit as task + + int32_t predecessorMessage; // Value stored by last completed predecessor + + FmArray successors; + FmTaskGraph* graph; + + // Called to signal completion to successor node, which may be ready to run. + // If ppNextNode non-NULL, and *ppNextNode initialized to NULL, one of the ready successors will be returned for this thread to run. + // The remaining ready nodes will be submitted tasks. + void SignalSuccessors(int32_t message, FmTaskGraphNode** ppNextNode = NULL) + { + int numSuccessors = (int)successors.GetNumElems(); + for (int i = 0; i < numSuccessors; i++) + { + successors[i]->PredecessorComplete(message, ppNextNode); + } + } + + public: + FM_CLASS_NEW_DELETE(FmTaskGraphNode) + + FmTaskGraphNode() : numPredecessorsIncomplete(0), numPredecessors(0), nodeName(NULL), nodeFunc(NULL), nodeFuncWrapped(NULL), nodeIndex(0), nodeWeight((uint)-1), predecessorMessage(-1), graph(NULL) + { + } + + FmTaskGraphNode(const char* inName, FmTaskGraph* inGraph, FmTaskFuncCallback inNodeFunc, FmTaskFuncCallback inNodeFuncWrapped, int32_t inNodeIndex, uint inNodeWeight = (uint)-1) + { + Init(inName, inGraph, inNodeFunc, inNodeFuncWrapped, inNodeIndex, inNodeWeight); + } + + void Init(const char* inName, FmTaskGraph* inGraph, FmTaskFuncCallback inNodeFunc, FmTaskFuncCallback inNodeFuncWrapped, int32_t inNodeIndex, uint inNodeWeight = (uint)-1) + { + FmAtomicWrite(&numPredecessorsIncomplete.val, 0); + numPredecessors = 0; + + nodeName = inName; + nodeFunc = inNodeFunc; + nodeFuncWrapped = inNodeFuncWrapped; + nodeIndex = inNodeIndex; + nodeWeight = inNodeWeight; + + predecessorMessage = -1; + + graph = inGraph; + } + + inline const char* GetName() { return nodeName; } + inline FmTaskFuncCallback GetTaskFunc() { return nodeFunc; } + inline FmTaskFuncCallback GetTaskFuncWrapped() { return nodeFuncWrapped; } + inline int32_t GetIndex() { return nodeIndex; } + inline int32_t GetWeight() { return nodeWeight; } + FmTaskGraph* GetGraph() { return graph; } + inline int32_t GetPredecessorMessage() const { return predecessorMessage; } + + void AddSuccessor(FmTaskGraphNode* node) + { + successors.Add(node); + node->IncrementNumPredecessors(); + } + + void SetNumPredecessors(int32_t inNumPredecessors) + { + numPredecessors = inNumPredecessors; + FmAtomicWrite(&numPredecessorsIncomplete.val, inNumPredecessors); + } + + void IncrementNumPredecessors() + { + numPredecessors++; + FmAtomicIncrement(&numPredecessorsIncomplete.val); + } + + // Called to signal completion to successor node, which may be ready to run. + // If ppNextNode non-NULL, and *ppNextNode initialized to NULL, one of the ready successors will be returned for this thread to run. + // The remaining ready nodes will be submitted tasks. + inline void PredecessorComplete(int32_t message, FmTaskGraphNode** ppNextNode = NULL); + + // Called from scheduled task + inline void Run(); + + // Signal successors and update graph progress + inline void TaskIsFinished(int32_t message, FmTaskGraphNode** ppNextNode = NULL); + }; + + // An event is just a list of successor nodes that can be signaled. + // These can be used to implement some dynamic branching in the task graph. + class FmTaskGraphEvent + { + FmArray successors; + + public: + FM_CLASS_NEW_DELETE(FmTaskGraphEvent) + + void AddSuccessor(FmTaskGraphNode* node) + { + successors.Add(node); + node->IncrementNumPredecessors(); + } + + // Called to signal completion to successor node, which may be ready to run. + // If ppNextNode non-NULL, and *ppNextNode initialized to NULL, one of the ready successors will be returned for this thread to run. + // The remaining ready nodes will be submitted tasks. + void SignalSuccessors(int32_t message, FmTaskGraphNode** ppNextNode = NULL) + { + int numSuccessors = (int)successors.GetNumElems(); + for (int i = 0; i < numSuccessors; i++) + { + successors[i]->PredecessorComplete(message, ppNextNode); + } + } + + uint32_t GetNumSuccessors() + { + return (uint32_t)successors.GetNumElems(); + } + + FmArray& GetSuccessors() + { + return successors; + } + }; + + // A task graph that can be run either with a wait until completion, or asychronously with a follow-up task to run once completion is detected. + class FmTaskGraph + { + FmAsyncTasksProgress progress; + + FmSubmitAsyncTaskCallback SubmitAsyncTask; +#if !FM_ASYNC_THREADING + FmCreateTaskWaitCounterCallback CreateTaskWaitCounter; + FmWaitForTaskWaitCounterCallback WaitForTaskWaitCounter; + FmDestroyTaskWaitCounterCallback DestroyTaskWaitCounter; + FmSubmitTaskCallback SubmitTask; + + FmTaskWaitCounter* waitCounter; // Count of running nodes, after Start() value of 0 will signify graph is complete +#endif + + FmTaskGraphEvent startEvent; + + public: + FM_CLASS_NEW_DELETE(FmTaskGraph) + + FmTaskGraph() + { + SubmitAsyncTask = NULL; +#if !FM_ASYNC_THREADING + CreateTaskWaitCounter = NULL; + WaitForTaskWaitCounter = NULL; + DestroyTaskWaitCounter = NULL; + SubmitTask = NULL; + waitCounter = NULL; +#endif + } + + void Destroy() + { + startEvent.GetSuccessors().Clear(); + } + + void SetCallbacks( + FmSubmitAsyncTaskCallback InSubmitAsyncTask +#if !FM_ASYNC_THREADING + , FmCreateTaskWaitCounterCallback InCreateTaskWaitCounter, + FmWaitForTaskWaitCounterCallback InWaitForTaskWaitCounter, + FmDestroyTaskWaitCounterCallback InDestroyTaskWaitCounter, + FmSubmitTaskCallback InSubmitTask +#endif + ) + { + SubmitAsyncTask = InSubmitAsyncTask; +#if !FM_ASYNC_THREADING + CreateTaskWaitCounter = InCreateTaskWaitCounter; + WaitForTaskWaitCounter = InWaitForTaskWaitCounter; + DestroyTaskWaitCounter = InDestroyTaskWaitCounter; + SubmitTask = InSubmitTask; +#endif + } + + void SetFollowTask(FmTaskFuncCallback followFunc, void* followData) + { + progress.Init(0, followFunc, followData); + } + + void AddToStart(FmTaskGraphNode* node) + { + startEvent.AddSuccessor(node); + } + + void SubmitNodeTask(FmTaskGraphNode* node, FmTaskGraphNode** ppNextNode = NULL) + { +#if FM_ASYNC_THREADING + progress.TaskIsStarting(); + if (ppNextNode) + { + // Saving one successor task to run directly, instead of submitting as task + if (*ppNextNode == NULL) + { + // First successor reached + *ppNextNode = node; + } + else + { + // Run the largest-weight successor directly, submit the others. + // This may help perf by reducing delays in the longest execution path. + FmTaskGraphNode* nodeToSubmit = node; + FmTaskGraphNode* nodeToRun = *ppNextNode; + + if (node->GetWeight() > nodeToRun->GetWeight()) + { + nodeToSubmit = *ppNextNode; + nodeToRun = node; + } + + SubmitAsyncTask(nodeToSubmit->GetName(), nodeToSubmit->GetTaskFuncWrapped(), nodeToSubmit, nodeToSubmit->GetIndex(), nodeToSubmit->GetIndex() + 1); + *ppNextNode = nodeToRun; + } + } + else + { + SubmitAsyncTask(node->GetName(), node->GetTaskFuncWrapped(), node, node->GetIndex(), node->GetIndex() + 1); + } +#else + (void)ppNextNode; + SubmitTask(node->GetName(), node->GetTaskFunc(), node, node->GetIndex(), 0, waitCounter); +#endif + } + +#if !FM_ASYNC_THREADING + // Signal start and wait for graph to complete + void StartAndWait() + { + waitCounter = CreateTaskWaitCounter(); + + // Signal starting nodes + startEvent.SignalSuccessors(0); + + WaitForTaskWaitCounter(waitCounter); + + DestroyTaskWaitCounter(waitCounter); + + waitCounter = NULL; + } +#endif + + // Start graph using asynchronous threading. Assumes follow task already set with SetFollowTask(). + // May call FmSetNextTask(). Must be called within FmExecuteTask() loop. + void StartAsync() + { + // The task count is always incremented by one before a node is submitted, and that node won't decrement until it has + // finished incrementing the count by one for each of the successor nodes that it submits. This means that the initial + // increment can't be canceled out until all successive work for that node has completed. + // By the same logic we first increment by one before signalling the start nodes, and decrement after. + + // Starting node, first increment by one + TaskIsStarting(); + + // Signal starting nodes + FmTaskGraphNode* nextNode = NULL; + int numSuccessors = (int)startEvent.GetNumSuccessors(); + for (int i = 0; i < numSuccessors; i++) + { + startEvent.GetSuccessors()[i]->PredecessorComplete(0, &nextNode); + } + + // If a successor node claimed by this thread, can set it as next task to be executed in loop assumed to be in the callstack. + // If the task exists, the TaskIsFinished() below can not detect the graph is complete and set a different task. + if (nextNode) + { + FmSetNextTask(nextNode->GetTaskFunc(), nextNode, nextNode->GetIndex(), nextNode->GetIndex() + 1); + } + + TaskIsFinished(); + } + + // Start graph using asynchronous threading and provide a task to run when complete. + // May call FmSetNextTask(). Must be called within FmExecuteTask() loop. + void StartAsync(FmTaskFuncCallback followFunc, void* followData) + { + SetFollowTask(followFunc, followData); + StartAsync(); + } + + // Must use to increment running task count before submitting a task + void TaskIsStarting() + { + progress.TaskIsStarting(); + } + + // Indicates task is complete, and must call from a node's task function + void TaskIsFinished() + { + progress.TaskIsFinished(); + } + }; + + inline void FmTaskGraphNode::PredecessorComplete(int32_t message, FmTaskGraphNode** ppNextNode) + { + int32_t newNumPredecessorsIncomplete = FmAtomicDecrement(&numPredecessorsIncomplete.val); + + FM_ASSERT(newNumPredecessorsIncomplete >= 0); + + // If this was the last predecessor being waited on, enqueue the task. + // It will signal successors after it is run + if (newNumPredecessorsIncomplete == 0) + { + predecessorMessage = message; + + // Reset num predecessors for subsequent iteration, before submitting + FmAtomicWrite(&numPredecessorsIncomplete.val, numPredecessors); + + graph->SubmitNodeTask(this, ppNextNode); + } + } + + inline void FmTaskGraphNode::Run() + { + nodeFunc(this, nodeIndex, 0); + } + + inline void FmTaskGraphNode::TaskIsFinished(int32_t message, FmTaskGraphNode** ppNextNode) + { + SignalSuccessors(message, ppNextNode); + +#if FM_ASYNC_THREADING + // Set next task, which requires this is the tail of task reached from FmExecuteTask. + // NOTE: if there is ppNextNode, graph->TaskIsFinished() can't finish the graph and can't also set a task + if (ppNextNode && *ppNextNode) + { + FmTaskGraphNode* pNextNode = *ppNextNode; + FmSetNextTask(pNextNode->GetTaskFunc(), pNextNode, pNextNode->GetIndex(), pNextNode->GetIndex() + 1); + } +#endif + + graph->TaskIsFinished(); + } + +} diff --git a/thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h b/thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h new file mode 100644 index 000000000000..2e74f596d7df --- /dev/null +++ b/thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h @@ -0,0 +1,164 @@ +/* +MIT License + +Copyright (c) 2019 Advanced Micro Devices, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +//--------------------------------------------------------------------------------------- +// FEMFX library's interface to an external task scheduler +//--------------------------------------------------------------------------------------- + +#pragma once + +#include + +// Async threading dispatches work that detects completion and submits follow-up tasks. +// Avoids possibility that waiting thread will stall execution, and simplifies task system interface. +// +// NOTE: This is the only supported option. The previous blocking approach is still included in the source, +// being somewhat easier to read and understand, and because this was helpful for development. But future +// releases may remove the non-async path. + +#define FM_ASYNC_THREADING 1 + +namespace AMD { +// Get current number of worker threads +typedef int (*FmGetTaskSystemNumThreadsCallback)(); + +// Return index of current worker thread, >= 0 and < task system numThreads, unique between workers running concurrently. +typedef int (*FmGetTaskSystemWorkerIndex)(); + +// Prototype for a FEMFX task function +typedef void (*FmTaskFuncCallback)(void *taskData, int32_t taskBeginIndex, int32_t taskEndIndex); + +// Submit asynchronous task to task scheduler, which should run FmExecuteTask with TaskFunc, taskData and taskIndex arguments +typedef void (*FmSubmitAsyncTaskCallback)(const char *taskName, FmTaskFuncCallback TaskFunc, void *taskData, int32_t taskBeginIndex, int32_t taskEndIndex); + +// Lib holds a void* to external object representing an event which can be waited on until triggered by a task +typedef void FmSyncEvent; + +// Create task event +typedef FmSyncEvent *(*FmCreateSyncEventCallback)(); + +// Destroy task event +typedef void (*FmDestroySyncEventCallback)(FmSyncEvent *taskEvent); + +// Wait for task event to be triggered +typedef void (*FmWaitForSyncEventCallback)(FmSyncEvent *taskEvent); + +// Trigger task event +typedef void (*FmTriggerSyncEventCallback)(FmSyncEvent *taskEvent); + +#if !FM_ASYNC_THREADING +// Lib holds a void* to external object representing a synchronization primitive, +// which holds number of work items currently being waited for. +typedef void FmTaskWaitCounter; + +// Create counter at default unblocked value +typedef FmTaskWaitCounter *(*FmCreateTaskWaitCounterCallback)(); + +// Wait on counter, and offer thread for task processing. +// NOTE: The library assumes no more than FmSceneSetupParams::numWorkerThreads is processing tasks simultaneously, and only allocates thread temporary memory for that number of threads. +typedef void (*FmWaitForTaskWaitCounterCallback)(FmTaskWaitCounter *counter); + +// Destroy counter +typedef void (*FmDestroyTaskWaitCounterCallback)(FmTaskWaitCounter *counter); + +// Submit task to task scheduler, which should run TaskFunc with taskData and taskIndex arguments. +// If waitCounter non-NULL, this call must increment counter, and task must decrement on completion. +// NOTE: For async threading to work, the implementation must execute each task by passing the task to FmExecuteTask(). +typedef void (*FmSubmitTaskCallback)(const char *taskName, FmTaskFuncCallback TaskFunc, void *taskData, int32_t taskBeginIndex, int32_t taskEndIndex, FmTaskWaitCounter *waitCounter); + +// Run an array of tasks in parallel and wait for all to complete. +// Makes taskCount calls to TaskFunc, passing it taskData and a unique taskIndex in range 0..taskCount-1 +// NOTE: For async threading to work, the implementation must execute each task by passing the task to FmExecuteTask(). +typedef void (*FmParallelForCallback)(const char *taskName, FmTaskFuncCallback TaskFunc, void *taskData, int32_t taskCount); +#endif + +// Set of callbacks that define the interface to external scheduler. +struct FmTaskSystemCallbacks { + FmGetTaskSystemNumThreadsCallback GetTaskSystemNumThreads; + FmGetTaskSystemWorkerIndex GetTaskSystemWorkerIndex; + FmSubmitAsyncTaskCallback SubmitAsyncTask; + FmCreateSyncEventCallback CreateSyncEvent; + FmDestroySyncEventCallback DestroySyncEvent; + FmWaitForSyncEventCallback WaitForSyncEvent; + FmTriggerSyncEventCallback TriggerSyncEvent; +#if !FM_ASYNC_THREADING + FmCreateTaskWaitCounterCallback CreateTaskWaitCounter; + FmWaitForTaskWaitCounterCallback WaitForTaskWaitCounter; + FmDestroyTaskWaitCounterCallback DestroyTaskWaitCounter; + FmSubmitTaskCallback SubmitTask; + FmParallelForCallback ParallelFor; +#endif + + FmTaskSystemCallbacks() { + GetTaskSystemNumThreads = NULL; + GetTaskSystemWorkerIndex = NULL; + SubmitAsyncTask = NULL; + CreateSyncEvent = NULL; + DestroySyncEvent = NULL; + WaitForSyncEvent = NULL; + TriggerSyncEvent = NULL; +#if !FM_ASYNC_THREADING + GetTaskSystemNumThreads = NULL; + CreateTaskWaitCounter = NULL; + WaitForTaskWaitCounter = NULL; + DestroyTaskWaitCounter = NULL; + SubmitTask = NULL; + ParallelFor = NULL; +#endif + } + + void SetCallbacks( + FmGetTaskSystemNumThreadsCallback InGetTaskSystemNumThreads, + FmGetTaskSystemWorkerIndex InGetTaskSystemWorkerIndex, + FmSubmitAsyncTaskCallback InSubmitAsyncTask, + FmCreateSyncEventCallback InCreateSyncEvent, + FmDestroySyncEventCallback InDestroySyncEvent, + FmWaitForSyncEventCallback InWaitForSyncEvent, + FmTriggerSyncEventCallback InTriggerSyncEvent +#if !FM_ASYNC_THREADING + , + FmCreateTaskWaitCounterCallback InCreateTaskWaitCounter, + FmWaitForTaskWaitCounterCallback InWaitForTaskWaitCounter, + FmDestroyTaskWaitCounterCallback InDestroyTaskWaitCounter, + FmSubmitTaskCallback InSubmitTask, + FmParallelForCallback InParallelFor +#endif + ) { + GetTaskSystemNumThreads = InGetTaskSystemNumThreads; + GetTaskSystemWorkerIndex = InGetTaskSystemWorkerIndex; + SubmitAsyncTask = InSubmitAsyncTask; + CreateSyncEvent = InCreateSyncEvent; + DestroySyncEvent = InDestroySyncEvent; + WaitForSyncEvent = InWaitForSyncEvent; + TriggerSyncEvent = InTriggerSyncEvent; +#if !FM_ASYNC_THREADING + CreateTaskWaitCounter = InCreateTaskWaitCounter; + WaitForTaskWaitCounter = InWaitForTaskWaitCounter; + DestroyTaskWaitCounter = InDestroyTaskWaitCounter; + SubmitTask = InSubmitTask; + ParallelFor = InParallelFor; +#endif + } +}; +} // namespace AMD diff --git a/thirdparty/misc/wsq.hpp b/thirdparty/misc/wsq.hpp deleted file mode 100644 index 27830b968fb4..000000000000 --- a/thirdparty/misc/wsq.hpp +++ /dev/null @@ -1,284 +0,0 @@ -// MIT License - -// Copyright (c) 2018-2019 T.-W. Huang, C.-X. Lin, G. Guo, and M. Wong - -// University of Utah, Salt Lake City, UT, USA -// University of Illinois at Urbana-Champaign, IL, USA - -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: - -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. - -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -// 2019/05/15 - created by Tsung-Wei Huang -// - isolated from the original workstealing executor - -#pragma once - -#include -#include -#include - -namespace tf { - -/** -@class: WorkStealingQueue -@tparam T data type -@brief Lock-free unbounded single-producer multiple-consumer queue. -This class implements the work stealing queue described in the paper, -"Correct and Efficient Work-Stealing for Weak Memory Models," -available at https://www.di.ens.fr/~zappa/readings/ppopp13.pdf. -Only the queue owner can perform pop and push operations, -while others can steal data from the queue. -*/ -template -class WorkStealingQueue { - - //constexpr static int64_t cacheline_size = 64; - - //using storage_type = std::aligned_storage_t; - - struct Array { - - int64_t C; - int64_t M; - //storage_type* S; - std::atomic *S; - - //T* S; - - explicit Array(int64_t c) : - C{ c }, - M{ c - 1 }, - //S {new storage_type[C]} { - //S {new T[static_cast(C)]} { - S{ new std::atomic[static_cast(C)] } { - //for(int64_t i=0; i(std::addressof(S[i]))->~T(); - //} - delete[] S; - } - - int64_t capacity() const noexcept { - return C; - } - - template - void push(int64_t i, O &&o) noexcept { - //T* ptr = reinterpret_cast(std::addressof(S[i & M])); - //*ptr = std::forward(o); - //S[i & M] = std::forward(o); - S[i & M].store(std::forward(o), std::memory_order_relaxed); - } - - T pop(int64_t i) noexcept { - //return *reinterpret_cast(std::addressof(S[i & M])); - //return S[i & M]; - return S[i & M].load(std::memory_order_relaxed); - } - - Array *resize(int64_t b, int64_t t) { - Array *ptr = new Array{ 2 * C }; - for (int64_t i = t; i != b; ++i) { - ptr->push(i, pop(i)); - } - return ptr; - } - }; - - std::atomic _top; - std::atomic _bottom; - std::atomic _array; - std::vector _garbage; - //char _padding[cacheline_size]; - -public: - /** - @brief constructs the queue with a given capacity - @param capacity the capacity of the queue (must be power of 2) - */ - explicit WorkStealingQueue(int64_t capacity = 1024); - - /** - @brief destructs the queue - */ - ~WorkStealingQueue(); - - /** - @brief queries if the queue is empty at the time of this call - */ - bool empty() const noexcept; - - /** - @brief queries the number of items at the time of this call - */ - size_t size() const noexcept; - - /** - @brief queries the capacity of the queue - */ - int64_t capacity() const noexcept; - - /** - @brief inserts an item to the queue - Only the owner thread can insert an item to the queue. - The operation can trigger the queue to resize its capacity - if more space is required. - @tparam O data type - @param item the item to perfect-forward to the queue - */ - template - void push(O &&item); - - /** - @brief pops out an item from the queue - Only the owner thread can pop out an item from the queue. - The return can be a @std_nullopt if this operation failed (empty queue). - */ - std::optional pop(); - - /** - @brief steals an item from the queue - Any threads can try to steal an item from the queue. - The return can be a @std_nullopt if this operation failed (not necessary empty). - */ - std::optional steal(); -}; - -// Constructor -template -WorkStealingQueue::WorkStealingQueue(int64_t c) { - //GODOT Patch START - if (!(c && (!(c & (c - 1))))) { - return; - } - //GODOT Patch END - _top.store(0, std::memory_order_relaxed); - _bottom.store(0, std::memory_order_relaxed); - _array.store(new Array{ c }, std::memory_order_relaxed); - _garbage.reserve(32); -} - -// Destructor -template -WorkStealingQueue::~WorkStealingQueue() { - for (auto a : _garbage) { - delete a; - } - delete _array.load(); -} - -// Function: empty -template -bool WorkStealingQueue::empty() const noexcept { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_relaxed); - return b <= t; -} - -// Function: size -template -size_t WorkStealingQueue::size() const noexcept { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_relaxed); - return static_cast(b >= t ? b - t : 0); -} - -// Function: push -template -template -void WorkStealingQueue::push(O &&o) { - int64_t b = _bottom.load(std::memory_order_relaxed); - int64_t t = _top.load(std::memory_order_acquire); - Array *a = _array.load(std::memory_order_relaxed); - - // queue is full - if (a->capacity() - 1 < (b - t)) { - Array *tmp = a->resize(b, t); - _garbage.push_back(a); - std::swap(a, tmp); - _array.store(a, std::memory_order_relaxed); - } - - a->push(b, std::forward(o)); - std::atomic_thread_fence(std::memory_order_release); - _bottom.store(b + 1, std::memory_order_relaxed); -} - -// Function: pop -template -std::optional WorkStealingQueue::pop() { - int64_t b = _bottom.load(std::memory_order_relaxed) - 1; - Array *a = _array.load(std::memory_order_relaxed); - _bottom.store(b, std::memory_order_relaxed); - std::atomic_thread_fence(std::memory_order_seq_cst); - int64_t t = _top.load(std::memory_order_relaxed); - - std::optional item; - - if (t <= b) { - item = a->pop(b); - if (t == b) { - // the last item just got stolen - if (!_top.compare_exchange_strong(t, t + 1, - std::memory_order_seq_cst, - std::memory_order_relaxed)) { - item = std::nullopt; - } - _bottom.store(b + 1, std::memory_order_relaxed); - } - } else { - _bottom.store(b + 1, std::memory_order_relaxed); - } - - return item; -} - -// Function: steal -template -std::optional WorkStealingQueue::steal() { - int64_t t = _top.load(std::memory_order_acquire); - std::atomic_thread_fence(std::memory_order_seq_cst); - int64_t b = _bottom.load(std::memory_order_acquire); - - std::optional item; - - if (t < b) { - Array *a = _array.load(std::memory_order_consume); - item = a->pop(t); - if (!_top.compare_exchange_strong(t, t + 1, - std::memory_order_seq_cst, - std::memory_order_relaxed)) { - return std::nullopt; - } - } - - return item; -} - -// Function: capacity -template -int64_t WorkStealingQueue::capacity() const noexcept { - return _array.load(std::memory_order_relaxed)->capacity(); -} - -} // namespace tf \ No newline at end of file