Skip to content

Commit

Permalink
Job system, parallel component updating, Remotery replaced by Tracy
Browse files Browse the repository at this point in the history
  • Loading branch information
Mormert committed Sep 9, 2023
1 parent 49700d6 commit 8609d9f
Show file tree
Hide file tree
Showing 15 changed files with 485 additions and 53 deletions.
21 changes: 21 additions & 0 deletions engine/3rdparty/WickedEngine/LICENCE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
MIT License

Copyright (c) 2018 Turánszki János

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
299 changes: 299 additions & 0 deletions engine/3rdparty/WickedEngine/wiJobSystem.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
#include "wiJobSystem.h"

#include <memory>
#include <algorithm>
#include <deque>
#include <string>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <cassert>

#ifdef _WIN32
#include <Windows.h>
#endif
#undef max
#undef min

#ifdef __linux__
#include <pthread.h>
#endif

namespace wi::jobsystem
{
struct Job
{
std::function<void(JobArgs)> task;
context* ctx;
uint32_t groupID;
uint32_t groupJobOffset;
uint32_t groupJobEnd;
uint32_t sharedmemory_size;
};
struct JobQueue
{
std::deque<Job> queue;
std::mutex locker;

inline void push_back(const Job& item)
{
std::scoped_lock lock(locker);
queue.push_back(item);
}

inline bool pop_front(Job& item)
{
std::scoped_lock lock(locker);
if (queue.empty())
{
return false;
}
item = std::move(queue.front());
queue.pop_front();
return true;
}

};

// This structure is responsible to stop worker thread loops.
// Once this is destroyed, worker threads will be woken up and end their loops.
struct InternalState
{
uint32_t numCores = 0;
uint32_t numThreads = 0;
std::unique_ptr<JobQueue[]> jobQueuePerThread;
std::atomic_bool alive{ true };
std::condition_variable wakeCondition;
std::mutex wakeMutex;
std::atomic<uint32_t> nextQueue{ 0 };
std::vector<std::thread> threads;
void ShutDown()
{
alive.store(false); // indicate that new jobs cannot be started from this point
bool wake_loop = true;
std::thread waker([&] {
while (wake_loop)
{
wakeCondition.notify_all(); // wakes up sleeping worker threads
}
});
for (auto& thread : threads)
{
thread.join();
}
wake_loop = false;
waker.join();
jobQueuePerThread.reset();
threads.clear();
numCores = 0;
numThreads = 0;
}
~InternalState()
{
ShutDown();
}
} static internal_state;

// Start working on a job queue
// After the job queue is finished, it can switch to an other queue and steal jobs from there
inline void work(uint32_t startingQueue)
{
Job job;
for (uint32_t i = 0; i < internal_state.numThreads; ++i)
{
JobQueue& job_queue = internal_state.jobQueuePerThread[startingQueue % internal_state.numThreads];
while (job_queue.pop_front(job))
{
JobArgs args;
args.groupID = job.groupID;
if (job.sharedmemory_size > 0)
{
thread_local static std::vector<uint8_t> shared_allocation_data;
shared_allocation_data.reserve(job.sharedmemory_size);
args.sharedmemory = shared_allocation_data.data();
}
else
{
args.sharedmemory = nullptr;
}

for (uint32_t j = job.groupJobOffset; j < job.groupJobEnd; ++j)
{
args.jobIndex = j;
args.groupIndex = j - job.groupJobOffset;
args.isFirstJobInGroup = (j == job.groupJobOffset);
args.isLastJobInGroup = (j == job.groupJobEnd - 1);
job.task(args);
}

job.ctx->counter.fetch_sub(1);
}
startingQueue++; // go to next queue
}
}

void Initialize(uint32_t maxThreadCount)
{
if (internal_state.numThreads > 0)
return;
maxThreadCount = std::max(1u, maxThreadCount);

// Retrieve the number of hardware threads in this system:
internal_state.numCores = std::thread::hardware_concurrency();

// Calculate the actual number of worker threads we want (-1 main thread):
internal_state.numThreads = std::min(maxThreadCount, std::max(1u, internal_state.numCores - 1));
internal_state.jobQueuePerThread.reset(new JobQueue[internal_state.numThreads]);
internal_state.threads.reserve(internal_state.numThreads);

for (uint32_t threadID = 0; threadID < internal_state.numThreads; ++threadID)
{
internal_state.threads.emplace_back([threadID] {

while (internal_state.alive.load())
{
work(threadID);

// finished with jobs, put to sleep
std::unique_lock<std::mutex> lock(internal_state.wakeMutex);
internal_state.wakeCondition.wait(lock);
}

});
std::thread& worker = internal_state.threads.back();

#ifdef _WIN32
// Do Windows-specific thread setup:
HANDLE handle = (HANDLE)worker.native_handle();

// Put each thread on to dedicated core:
DWORD_PTR affinityMask = 1ull << threadID;
DWORD_PTR affinity_result = SetThreadAffinityMask(handle, affinityMask);
assert(affinity_result > 0);

//// Increase thread priority:
//BOOL priority_result = SetThreadPriority(handle, THREAD_PRIORITY_HIGHEST);
//assert(priority_result != 0);

// Name the thread:
std::wstring wthreadname = L"jle_job_" + std::to_wstring(threadID);
HRESULT hr = SetThreadDescription(handle, wthreadname.c_str());
assert(SUCCEEDED(hr));
#elif defined(__linux__)
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); } while (0)

int ret;
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
size_t cpusetsize = sizeof(cpuset);

CPU_SET(threadID, &cpuset);
ret = pthread_setaffinity_np(worker.native_handle(), cpusetsize, &cpuset);
if (ret != 0)
handle_error_en(ret, std::string(" pthread_setaffinity_np[" + std::to_string(threadID) + ']').c_str());

// Name the thread
std::string thread_name = "jle_job_" + std::to_string(threadID);
ret = pthread_setname_np(worker.native_handle(), thread_name.c_str());
if (ret != 0)
handle_error_en(ret, std::string(" pthread_setname_np[" + std::to_string(threadID) + ']').c_str());
#undef handle_error_en
#elif defined(PLATFORM_PS5)
wi::jobsystem::ps5::SetupWorker(worker, threadID);
#endif // _WIN32
}

// wi::backlog::post("wi::jobsystem Initialized with [" + std::to_string(internal_state.numCores) + " cores] [" + std::to_string(internal_state.numThreads) + " threads] (" + std::to_string((int)std::round(timer.elapsed())) + " ms)");
}

void ShutDown()
{
internal_state.ShutDown();
}

uint32_t GetThreadCount()
{
return internal_state.numThreads;
}

void Execute(context& ctx, const std::function<void(JobArgs)>& task)
{
// Context state is updated:
ctx.counter.fetch_add(1);

Job job;
job.ctx = &ctx;
job.task = task;
job.groupID = 0;
job.groupJobOffset = 0;
job.groupJobEnd = 1;
job.sharedmemory_size = 0;

internal_state.jobQueuePerThread[internal_state.nextQueue.fetch_add(1) % internal_state.numThreads].push_back(job);
internal_state.wakeCondition.notify_one();
}

void Dispatch(context& ctx, uint32_t jobCount, uint32_t groupSize, const std::function<void(JobArgs)>& task, size_t sharedmemory_size)
{
if (jobCount == 0 || groupSize == 0)
{
return;
}

const uint32_t groupCount = DispatchGroupCount(jobCount, groupSize);

// Context state is updated:
ctx.counter.fetch_add(groupCount);

Job job;
job.ctx = &ctx;
job.task = task;
job.sharedmemory_size = (uint32_t)sharedmemory_size;

for (uint32_t groupID = 0; groupID < groupCount; ++groupID)
{
// For each group, generate one real job:
job.groupID = groupID;
job.groupJobOffset = groupID * groupSize;
job.groupJobEnd = std::min(job.groupJobOffset + groupSize, jobCount);

internal_state.jobQueuePerThread[internal_state.nextQueue.fetch_add(1) % internal_state.numThreads].push_back(job);
}

internal_state.wakeCondition.notify_all();
}

uint32_t DispatchGroupCount(uint32_t jobCount, uint32_t groupSize)
{
// Calculate the amount of job groups to dispatch (overestimate, or "ceil"):
return (jobCount + groupSize - 1) / groupSize;
}

bool IsBusy(const context& ctx)
{
// Whenever the context label is greater than zero, it means that there is still work that needs to be done
return ctx.counter.load() > 0;
}

void Wait(const context& ctx)
{
if (IsBusy(ctx))
{
// Wake any threads that might be sleeping:
internal_state.wakeCondition.notify_all();

// work() will pick up any jobs that are on stand by and execute them on this thread:
work(internal_state.nextQueue.fetch_add(1) % internal_state.numThreads);

while (IsBusy(ctx))
{
// If we are here, then there are still remaining jobs that work() couldn't pick up.
// In this case those jobs are not standing by on a queue but currently executing
// on other threads, so they cannot be picked up by this thread.
// Allow to swap out this thread by OS to not spin endlessly for nothing
std::this_thread::yield();
}
}
}
}
49 changes: 49 additions & 0 deletions engine/3rdparty/WickedEngine/wiJobSystem.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#pragma once

#include <atomic>
#include <functional>

namespace wi::jobsystem
{
void Initialize(uint32_t maxThreadCount = ~0u);
void ShutDown();

struct JobArgs {
uint32_t jobIndex; // job index relative to dispatch (like SV_DispatchThreadID in HLSL)
uint32_t groupID; // group index relative to dispatch (like SV_GroupID in HLSL)
uint32_t groupIndex; // job index relative to group (like SV_GroupIndex in HLSL)
bool isFirstJobInGroup; // is the current job the first one in the group?
bool isLastJobInGroup; // is the current job the last one in the group?
void *sharedmemory; // stack memory shared within the current group (jobs within a group execute serially)
};

uint32_t GetThreadCount();

// Defines a state of execution, can be waited on
struct context {
std::atomic<uint32_t> counter{0};
};

// Add a task to execute asynchronously. Any idle thread will execute this.
void Execute(context &ctx, const std::function<void(JobArgs)> &task);

// Divide a task onto multiple jobs and execute in parallel.
// jobCount : how many jobs to generate for this task.
// groupSize : how many jobs to execute per thread. Jobs inside a group execute serially. It might be worth
// to increase for small jobs task : receives a JobArgs as parameter
void Dispatch(context &ctx,
uint32_t jobCount,
uint32_t groupSize,
const std::function<void(JobArgs)> &task,
size_t sharedmemory_size = 0);

// Returns the amount of job groups that will be created for a set number of jobs and group size
uint32_t DispatchGroupCount(uint32_t jobCount, uint32_t groupSize);

// Check if any threads are working currently or not
bool IsBusy(const context &ctx);

// Wait until all threads become idle
// Current thread will become a worker thread, executing jobs
void Wait(const context &ctx);
} // namespace wi::jobsystem
2 changes: 1 addition & 1 deletion engine/3rdparty/git_submodules_forks/jle_bullet3
Loading

0 comments on commit 8609d9f

Please sign in to comment.