Skip to content
This repository has been archived by the owner on Sep 3, 2022. It is now read-only.

Commit

Permalink
Queue attempt 2.
Browse files Browse the repository at this point in the history
  • Loading branch information
fire committed Feb 23, 2020
1 parent 534ef47 commit 88a7c48
Show file tree
Hide file tree
Showing 9 changed files with 1,282 additions and 333 deletions.
3 changes: 3 additions & 0 deletions core/os/SCsub
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@

Import('env')

env.Prepend(CPPPATH=['#thirdparty/FEMFXAsync/Vectormath'])
env.Prepend(CPPPATH=['#thirdparty/FEMFXAsync'])

env.add_source_files(env.core_sources, "*.cpp")
72 changes: 23 additions & 49 deletions core/os/threaded_array_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,51 +31,39 @@
#ifndef THREADED_ARRAY_PROCESSOR_H
#define THREADED_ARRAY_PROCESSOR_H

#include "core/os/memory.h"
#include "core/os/mutex.h"
#include "core/os/os.h"
#include "core/os/thread.h"
#include "core/os/thread_safe.h"
#include "core/safe_refcount.h"
#include "thirdparty/misc/wsq.hpp"
#include "thirdparty/FEMFXAsync/FEMFXAsyncThreading.h"
#include "thirdparty/FEMFXAsync/FEMFXCommon.h"
#include "thirdparty/FEMFXAsync/FEMFXParallelFor.h"
#include "thirdparty/FEMFXAsync/FEMFXTaskSystemInterface.h"

using namespace AMD;

template <class C, class U>
struct ThreadArrayProcessData {
C *instance = nullptr;
uint32_t elements;
C *instance;
U userdata;
void (C::*method)(uint32_t, U) = nullptr;
tf::WorkStealingQueue<uint32_t> *queue = nullptr;

void (C::*method)(uint32_t, U);
void process(uint32_t p_index) {
(instance->*method)(p_index, userdata);
}
};

template <class T>
void process_array_thread(void *ud) {

T &data = *(T *)ud;
while (!data.queue->empty()) {
std::optional<uint32_t> work = data.queue->steal();
if (work.has_value()) {
data.process(work.value());
}
}
}

template <class C, class M, class U>
void process_array_single(C *p_instance, M p_method, U p_userdata) {

ThreadArrayProcessData<C, U> data;
data.method = p_method;
data.instance = p_instance;
data.userdata = p_userdata;
while (!data.queue->empty()) {
std::optional<uint32_t> work = data.queue->steal();
if (work.has_value()) {
data.process(work.value());
}
template <class C, class U>
void FmTaskProcessArray(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) {
ThreadArrayProcessData<C, U> *taskData = (ThreadArrayProcessData<C, U> *) inTaskData;
uint startIdx, endIdx;
FmGetIndexRange(&startIdx, &endIdx, (uint)inTaskBeginIndex, OS::get_singleton()->get_processor_count(), taskData->elements);
for (uint i = startIdx; i < endIdx; i++) {
taskData->process(i);
}
}
}

template <class C, class M, class U>
void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) {
Expand All @@ -84,27 +72,13 @@ void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_us
data.method = p_method;
data.instance = p_instance;
data.userdata = p_userdata;
data.queue = memnew(tf::WorkStealingQueue<uint32_t>(next_power_of_2(p_elements)));

for (uint32_t i = 0; i < p_elements; i++) {
data.queue->push(i);
}

data.elements = p_elements;
int32_t num_threads = OS::get_singleton()->get_processor_count();
#ifndef NO_THREADS
Vector<Thread *> threads;
threads.resize(OS::get_singleton()->get_processor_count());

for (int i = 0; i < threads.size(); i++) {
threads.write[i] = Thread::create(process_array_thread<ThreadArrayProcessData<C, U> >, &data);
}

for (int i = 0; i < threads.size(); i++) {
Thread::wait_to_finish(threads[i]);
memdelete(threads[i]);
}
#else
process_array_single(p_elements, p_instance, p_method, p_userdata);
num_threads = 1;
#endif
AMD::FmTaskSystemCallbacks callbacks;
AMD::FmParallelForAsync("thread_process_array", FmTaskProcessArray<C, U>, FmTaskProcessArray<C, U>, NULL, &data, p_elements, callbacks.SubmitAsyncTask, num_threads);
}

#endif // THREADED_ARRAY_PROCESSOR_H
197 changes: 197 additions & 0 deletions thirdparty/FEMFXAsync/FEMFXAsyncThreading.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
MIT License
Copyright (c) 2019 Advanced Micro Devices, Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

//---------------------------------------------------------------------------------------
// Support for asynchronous approach for task scheduling, where dispatched tasks
// track progress and issue follow-up tasks after work complete.
//---------------------------------------------------------------------------------------

#pragma once

#include "FEMFXTaskSystemInterface.h"
#include "stdint.h"
#include <atomic>

namespace AMD
{
// Creates a task function and wrapper function to run task inside FmExecuteTask
#define FM_WRAPPED_TASK_FUNC(taskFunc) \
void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex); \
void taskFunc##Wrapped(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex) { FmExecuteTask(taskFunc, inTaskData, inTaskBeginIndex, inTaskEndIndex); } \
void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex)

#define FM_WRAPPED_TASK_DECLARATION(taskFunc) \
void taskFunc(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex); \
void taskFunc##Wrapped(void* inTaskData, int32_t inTaskBeginIndex, int32_t inTaskEndIndex);

#define FM_TASK_AND_WRAPPED_TASK_ARGS(taskFunc) taskFunc, taskFunc##Wrapped

struct FmTask
{
FmTaskFuncCallback func;
void* data;
int32_t beginIndex;
int32_t endIndex;

FmTask() : func(NULL), data(NULL), beginIndex(0), endIndex(0) {}
};

// Async tasks need to be executed using this function, which starts a loop that will pick up subsequent tasks set by FmSetNextTask()
void FmExecuteTask(FmTaskFuncCallback taskFunc, void* taskData, int32_t taskBeginIndex, int32_t taskEndIndex);

// Set next task for thread to execute in FmExecuteTask() loop
void FmSetNextTask(FmTaskFuncCallback taskFunc, void* taskData, int32_t taskBeginIndex, int32_t taskEndIndex);
void FmSetNextTask(const FmTask& inTask);

// Holds an atomic count of incomplete tasks, and the follow-up task to execute when all tasks are complete.
class FmAsyncTasksProgress
{
std::atomic_uint32_t nextIndex; // Atomic index used to start processing work items in sorted order
std::atomic_uint32_t numTasksIncomplete; // Atomic count of remaining tasks to detect completion
FmTask followTask;

public:

FmAsyncTasksProgress()
{
nextIndex.store(0);
numTasksIncomplete.store(0);
followTask.func = NULL;
followTask.data = NULL;
followTask.beginIndex = 0;
followTask.endIndex = 1;
}

void Init(uint32_t inNumTasksToRun, FmTaskFuncCallback inFollowTask, void* inFollowTaskData)
{
nextIndex.store(0);
numTasksIncomplete.store(inNumTasksToRun);
followTask.func = inFollowTask;
followTask.data = inFollowTaskData;
}

void Init(uint32_t inNumTasksToRun, FmTaskFuncCallback inFollowTask, void* inFollowTaskData, int32_t inFollowTaskBeginIndex, int32_t inFollowTaskEndIndex)
{
nextIndex.store(0);
numTasksIncomplete.store(inNumTasksToRun);
followTask.func = inFollowTask;
followTask.data = inFollowTaskData;
followTask.beginIndex = inFollowTaskBeginIndex;
followTask.endIndex = inFollowTaskEndIndex;
}

void ResetNextIndex()
{
nextIndex.store(0);
}

uint32_t GetNextIndex()
{
return nextIndex++ - 1;
}

// Increment number of incomplete tasks.
void TaskIsStarting()
{
nextIndex++;
}

// Add to number of incomplete tasks.
void TasksAreStarting(uint32_t numTasks)
{
numTasksIncomplete += numTasks;
}

// Decrement number of incomplete tasks.
// If this is last task, delete task data, and run follow task.
// This FmAsyncTasksProgress can belong to task data.
//
// NOTE: This sets the global next task function rather than making a call, so
// must be executed within a FmTaskFuncLoop() and no other async calls or
// waits can occur before returning to the loop.
template<class T>
bool TasksAreFinished(uint32_t numFinishedTasks, T* taskDataToDelete)
{
#if FM_ASYNC_THREADING
numTasksIncomplete -= numFinishedTasks;
uint32_t numTasks = numTasksIncomplete;

if (numTasks == 0)
{
if (followTask.func)
{
FmSetNextTask(followTask);
}

return true;
}
else
{
return false;
}
#else
uint32_t numTasks = FmAtomicSub(&numTasksIncomplete.val, numFinishedTasks);

if (numTasks == 0)
{
return true;
}
else
{
return false;
}
#endif
}

template<class T>
bool TaskIsFinished(T* taskDataToDelete)
{
return TasksAreFinished(1, taskDataToDelete);
}

bool TaskIsFinished()
{
return TaskIsFinished<void>(NULL);
}

bool TasksAreFinished(uint32_t numTasks)
{
return TasksAreFinished<void>(numTasks, NULL);
}
};

// Task data base class
class FmAsyncTaskData
{
public:

FmAsyncTasksProgress progress; // Progress of a parallel operation plus a task to follow
FmTask followTask; // Task to call following scope of this task data
FmAsyncTaskData* parentTaskData; // Pointer to parent task data for nested parallel operations

FmAsyncTaskData() : parentTaskData(NULL) { }

virtual ~FmAsyncTaskData() {}
};
}
Loading

0 comments on commit 88a7c48

Please sign in to comment.