diff --git a/core/os/threaded_array_processor.h b/core/os/threaded_array_processor.h index 9dcd6ceecef3..de545976d519 100644 --- a/core/os/threaded_array_processor.h +++ b/core/os/threaded_array_processor.h @@ -36,31 +36,44 @@ #include "core/os/thread.h" #include "core/os/thread_safe.h" #include "core/safe_refcount.h" +#include "thirdparty/misc/wsq.hpp" template struct ThreadArrayProcessData { - uint32_t elements; - uint32_t index; - C *instance; + C *instance = nullptr; U userdata; - void (C::*method)(uint32_t, U); + void (C::*method)(uint32_t, U) = nullptr; + tf::WorkStealingQueue *queue = nullptr; void process(uint32_t p_index) { (instance->*method)(p_index, userdata); } }; -#ifndef NO_THREADS - template void process_array_thread(void *ud) { T &data = *(T *)ud; - while (true) { - uint32_t index = atomic_increment(&data.index); - if (index >= data.elements) - break; - data.process(index); + while (!data.queue->empty()) { + std::optional work = data.queue->steal(); + if (work.has_value()) { + data.process(work.value()); + } + } +} + +template +void process_array_single(C *p_instance, M p_method, U p_userdata) { + + ThreadArrayProcessData data; + data.method = p_method; + data.instance = p_instance; + data.userdata = p_userdata; + while (!data.queue->empty()) { + std::optional work = data.queue->steal(); + if (work.has_value()) { + data.process(work.value()); + } } } @@ -71,12 +84,14 @@ void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_us data.method = p_method; data.instance = p_instance; data.userdata = p_userdata; - data.index = 0; - data.elements = p_elements; - data.process(data.index); //process first, let threads increment for next + data.queue = memnew(tf::WorkStealingQueue(next_power_of_2(p_elements))); - Vector threads; + for (uint32_t i = 0; i < p_elements; i++) { + data.queue->push(i); + } +#ifndef NO_THREADS + Vector threads; threads.resize(OS::get_singleton()->get_processor_count()); for (int i = 0; i < threads.size(); i++) { @@ -87,24 +102,9 @@ void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_us Thread::wait_to_finish(threads[i]); memdelete(threads[i]); } -} - #else - -template -void thread_process_array(uint32_t p_elements, C *p_instance, M p_method, U p_userdata) { - - ThreadArrayProcessData data; - data.method = p_method; - data.instance = p_instance; - data.userdata = p_userdata; - data.index = 0; - data.elements = p_elements; - for (uint32_t i = 0; i < p_elements; i++) { - data.process(i); - } -} - + process_array_single(p_elements, p_instance, p_method, p_userdata); #endif +} #endif // THREADED_ARRAY_PROCESSOR_H diff --git a/thirdparty/misc/wsq.hpp b/thirdparty/misc/wsq.hpp new file mode 100644 index 000000000000..27830b968fb4 --- /dev/null +++ b/thirdparty/misc/wsq.hpp @@ -0,0 +1,284 @@ +// MIT License + +// Copyright (c) 2018-2019 T.-W. Huang, C.-X. Lin, G. Guo, and M. Wong + +// University of Utah, Salt Lake City, UT, USA +// University of Illinois at Urbana-Champaign, IL, USA + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +// 2019/05/15 - created by Tsung-Wei Huang +// - isolated from the original workstealing executor + +#pragma once + +#include +#include +#include + +namespace tf { + +/** +@class: WorkStealingQueue +@tparam T data type +@brief Lock-free unbounded single-producer multiple-consumer queue. +This class implements the work stealing queue described in the paper, +"Correct and Efficient Work-Stealing for Weak Memory Models," +available at https://www.di.ens.fr/~zappa/readings/ppopp13.pdf. +Only the queue owner can perform pop and push operations, +while others can steal data from the queue. +*/ +template +class WorkStealingQueue { + + //constexpr static int64_t cacheline_size = 64; + + //using storage_type = std::aligned_storage_t; + + struct Array { + + int64_t C; + int64_t M; + //storage_type* S; + std::atomic *S; + + //T* S; + + explicit Array(int64_t c) : + C{ c }, + M{ c - 1 }, + //S {new storage_type[C]} { + //S {new T[static_cast(C)]} { + S{ new std::atomic[static_cast(C)] } { + //for(int64_t i=0; i(std::addressof(S[i]))->~T(); + //} + delete[] S; + } + + int64_t capacity() const noexcept { + return C; + } + + template + void push(int64_t i, O &&o) noexcept { + //T* ptr = reinterpret_cast(std::addressof(S[i & M])); + //*ptr = std::forward(o); + //S[i & M] = std::forward(o); + S[i & M].store(std::forward(o), std::memory_order_relaxed); + } + + T pop(int64_t i) noexcept { + //return *reinterpret_cast(std::addressof(S[i & M])); + //return S[i & M]; + return S[i & M].load(std::memory_order_relaxed); + } + + Array *resize(int64_t b, int64_t t) { + Array *ptr = new Array{ 2 * C }; + for (int64_t i = t; i != b; ++i) { + ptr->push(i, pop(i)); + } + return ptr; + } + }; + + std::atomic _top; + std::atomic _bottom; + std::atomic _array; + std::vector _garbage; + //char _padding[cacheline_size]; + +public: + /** + @brief constructs the queue with a given capacity + @param capacity the capacity of the queue (must be power of 2) + */ + explicit WorkStealingQueue(int64_t capacity = 1024); + + /** + @brief destructs the queue + */ + ~WorkStealingQueue(); + + /** + @brief queries if the queue is empty at the time of this call + */ + bool empty() const noexcept; + + /** + @brief queries the number of items at the time of this call + */ + size_t size() const noexcept; + + /** + @brief queries the capacity of the queue + */ + int64_t capacity() const noexcept; + + /** + @brief inserts an item to the queue + Only the owner thread can insert an item to the queue. + The operation can trigger the queue to resize its capacity + if more space is required. + @tparam O data type + @param item the item to perfect-forward to the queue + */ + template + void push(O &&item); + + /** + @brief pops out an item from the queue + Only the owner thread can pop out an item from the queue. + The return can be a @std_nullopt if this operation failed (empty queue). + */ + std::optional pop(); + + /** + @brief steals an item from the queue + Any threads can try to steal an item from the queue. + The return can be a @std_nullopt if this operation failed (not necessary empty). + */ + std::optional steal(); +}; + +// Constructor +template +WorkStealingQueue::WorkStealingQueue(int64_t c) { + //GODOT Patch START + if (!(c && (!(c & (c - 1))))) { + return; + } + //GODOT Patch END + _top.store(0, std::memory_order_relaxed); + _bottom.store(0, std::memory_order_relaxed); + _array.store(new Array{ c }, std::memory_order_relaxed); + _garbage.reserve(32); +} + +// Destructor +template +WorkStealingQueue::~WorkStealingQueue() { + for (auto a : _garbage) { + delete a; + } + delete _array.load(); +} + +// Function: empty +template +bool WorkStealingQueue::empty() const noexcept { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_relaxed); + return b <= t; +} + +// Function: size +template +size_t WorkStealingQueue::size() const noexcept { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_relaxed); + return static_cast(b >= t ? b - t : 0); +} + +// Function: push +template +template +void WorkStealingQueue::push(O &&o) { + int64_t b = _bottom.load(std::memory_order_relaxed); + int64_t t = _top.load(std::memory_order_acquire); + Array *a = _array.load(std::memory_order_relaxed); + + // queue is full + if (a->capacity() - 1 < (b - t)) { + Array *tmp = a->resize(b, t); + _garbage.push_back(a); + std::swap(a, tmp); + _array.store(a, std::memory_order_relaxed); + } + + a->push(b, std::forward(o)); + std::atomic_thread_fence(std::memory_order_release); + _bottom.store(b + 1, std::memory_order_relaxed); +} + +// Function: pop +template +std::optional WorkStealingQueue::pop() { + int64_t b = _bottom.load(std::memory_order_relaxed) - 1; + Array *a = _array.load(std::memory_order_relaxed); + _bottom.store(b, std::memory_order_relaxed); + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t t = _top.load(std::memory_order_relaxed); + + std::optional item; + + if (t <= b) { + item = a->pop(b); + if (t == b) { + // the last item just got stolen + if (!_top.compare_exchange_strong(t, t + 1, + std::memory_order_seq_cst, + std::memory_order_relaxed)) { + item = std::nullopt; + } + _bottom.store(b + 1, std::memory_order_relaxed); + } + } else { + _bottom.store(b + 1, std::memory_order_relaxed); + } + + return item; +} + +// Function: steal +template +std::optional WorkStealingQueue::steal() { + int64_t t = _top.load(std::memory_order_acquire); + std::atomic_thread_fence(std::memory_order_seq_cst); + int64_t b = _bottom.load(std::memory_order_acquire); + + std::optional item; + + if (t < b) { + Array *a = _array.load(std::memory_order_consume); + item = a->pop(t); + if (!_top.compare_exchange_strong(t, t + 1, + std::memory_order_seq_cst, + std::memory_order_relaxed)) { + return std::nullopt; + } + } + + return item; +} + +// Function: capacity +template +int64_t WorkStealingQueue::capacity() const noexcept { + return _array.load(std::memory_order_relaxed)->capacity(); +} + +} // namespace tf \ No newline at end of file