From 40eafe980d0ff4aa67de8b6a727009097cd30074 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Sat, 4 Mar 2023 00:32:50 +0800 Subject: [PATCH 01/10] Make CLockFreeGuard safer --- src/sync.h | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/sync.h b/src/sync.h index 301033386b3..86ec3b8aeed 100644 --- a/src/sync.h +++ b/src/sync.h @@ -337,17 +337,16 @@ class CLockFreeGuard public: CLockFreeGuard(std::atomic_bool& lock) : lock(lock) { - bool desired = false; - while (!lock.compare_exchange_weak(desired, true, - std::memory_order_release, - std::memory_order_relaxed)) { - desired = false; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + bool expected = false; + while (!lock.compare_exchange_weak(expected, true)) { + expected = false; + // std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } + ~CLockFreeGuard() { - lock.store(false, std::memory_order_release); + lock.store(false); } }; From 86f3db32dc53cc19670e92a3fcdac202fd2e00e5 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Sat, 4 Mar 2023 00:40:34 +0800 Subject: [PATCH 02/10] Fix getburninfo consortium and result allocations --- src/masternodes/rpc_accounts.cpp | 34 ++++++++++++++++---------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/masternodes/rpc_accounts.cpp b/src/masternodes/rpc_accounts.cpp index 5a483291c88..578c59621ff 100644 --- a/src/masternodes/rpc_accounts.cpp +++ b/src/masternodes/rpc_accounts.cpp @@ -2019,30 +2019,29 @@ UniValue getburninfo(const JSONRPCRequest& request) { explicit WorkerResultPool(size_t size) { pool.reserve(size); for (size_t i = 0; i < size; i++) { - pool.push_back(CGetBurnInfoResult{}); + pool.push_back(std::make_shared()); } } std::shared_ptr Acquire() { - LOCK(syncFlag); - auto res = std::make_shared(pool.back()); + CLockFreeGuard lock{syncFlag}; + auto res = pool.back(); pool.pop_back(); - return res; } void Release(std::shared_ptr res) { - LOCK(syncFlag); - pool.push_back(*res); + CLockFreeGuard lock{syncFlag}; + pool.push_back(res); } - std::vector &GetContainer() { + std::vector> &GetContainer() { return pool; } private: - CCriticalSection syncFlag; - std::vector pool; + std::atomic_bool syncFlag{}; + std::vector> pool; }; auto nWorkers = DfTxTaskPool->GetAvailableThreads(); @@ -2152,15 +2151,17 @@ UniValue getburninfo(const JSONRPCRequest& request) { g.WaitForCompletion(); for (const auto &r : resultsPool.GetContainer()) { - totalResult->burntDFI += r.burntDFI; - totalResult->burntFee += r.burntFee; - totalResult->auctionFee += r.auctionFee; - totalResult->burntTokens.AddBalances(r.burntTokens.balances); - totalResult->nonConsortiumTokens.AddBalances(r.nonConsortiumTokens.balances); - totalResult->dexfeeburn.AddBalances(r.dexfeeburn.balances); - totalResult->paybackFee.AddBalances(r.paybackFee.balances); + totalResult->burntDFI += r->burntDFI; + totalResult->burntFee += r->burntFee; + totalResult->auctionFee += r->auctionFee; + totalResult->burntTokens.AddBalances(r->burntTokens.balances); + totalResult->nonConsortiumTokens.AddBalances(r->nonConsortiumTokens.balances); + totalResult->dexfeeburn.AddBalances(r->dexfeeburn.balances); + totalResult->paybackFee.AddBalances(r->paybackFee.balances); } + GetMemoizedResultCache().Set(request, {height, hash, *totalResult}); + CDataStructureV0 liveKey = {AttributeTypes::Live, ParamIDs::Economy, EconomyKeys::ConsortiumMinted}; auto balances = attributes->GetValue(liveKey, CConsortiumGlobalMinted{}); @@ -2194,7 +2195,6 @@ UniValue getburninfo(const JSONRPCRequest& request) { result.pushKV("dfip2203", AmountsToJSON(dfi2203Tokens.balances)); result.pushKV("dfip2206f", AmountsToJSON(dfiToDUSDTokens.balances)); - GetMemoizedResultCache().Set(request, {height, hash, *totalResult}); return GetRPCResultCache() .Set(request, result); } From b9dcc1ef218fb7988b31d868a8b35f737c1daadc Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:19:38 +0800 Subject: [PATCH 03/10] Fix CLockFreeGuard, introduce AtomicMutex --- src/sync.h | 68 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/src/sync.h b/src/sync.h index 301033386b3..1f79aa76951 100644 --- a/src/sync.h +++ b/src/sync.h @@ -331,18 +331,74 @@ struct SCOPED_LOCKABLE LockAssertion ~LockAssertion() UNLOCK_FUNCTION() {} }; +class AtomicMutex { +private: + std::atomic flag{false}; + int64_t spins; + int64_t yields; + +public: + AtomicMutex(int64_t spins = 10, int64_t yields = 4): + spins(spins), yields(yields) {} + + void lock() { + // Note: The loop here addresses both, spurious failures as well + // as to suspend or spin wait until it's set + // Additional: + // - We use this a lock for external critical section, so we use + // seq ordering, to ensure it provides the right ordering guarantees + // for the others + // On failure of CAS, we don't care about the existing value, we just + // discard it, so relaxed ordering is sufficient. + bool expected = false; + auto i = 0; + while (std::atomic_compare_exchange_weak_explicit( + &flag, + &expected, true, + std::memory_order_seq_cst, + std::memory_order_relaxed) == false) { + // Could have been a spurious failure or another thread could have taken the + // lock in-between since we're now out of the atomic ops. + // Reset expected to start from scratch again, since we only want + // a singular atomic false -> true transition. + expected = false; + if (i > spins) { + if (i > spins + yields) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } else { + std::this_thread::yield(); + } + } + i++; + } + } + + void unlock() { + flag.store(false, std::memory_order_seq_cst); + } + + bool try_lock() noexcept { + return !flag.exchange(true, std::memory_order_seq_cst); + } +}; + class CLockFreeGuard { std::atomic_bool& lock; public: CLockFreeGuard(std::atomic_bool& lock) : lock(lock) { - bool desired = false; - while (!lock.compare_exchange_weak(desired, true, - std::memory_order_release, - std::memory_order_relaxed)) { - desired = false; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + bool expected = false; + while (std::atomic_compare_exchange_weak_explicit( + &lock, + &expected, true, + std::memory_order_seq_cst, + std::memory_order_relaxed) == false) { + // Could have been a spurious failure or another thread could have taken the + // lock in-between since we're now out of the atomic ops. + // Reset expected to start from scratch again, since we only want + // a singular atomic false -> true transition. + std::this_thread::sleep_for(std::chrono::milliseconds(1); } } ~CLockFreeGuard() From 5de5a53b57bd7209a507a002435442f19c115c4e Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:27:54 +0800 Subject: [PATCH 04/10] Fix missing expected --- src/sync.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/sync.h b/src/sync.h index 1f79aa76951..d9fe0fab06a 100644 --- a/src/sync.h +++ b/src/sync.h @@ -398,6 +398,7 @@ class CLockFreeGuard // lock in-between since we're now out of the atomic ops. // Reset expected to start from scratch again, since we only want // a singular atomic false -> true transition. + expected = false; std::this_thread::sleep_for(std::chrono::milliseconds(1); } } From 2ebd23daf63d977e374a4b5f06ac858505958718 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:29:29 +0800 Subject: [PATCH 05/10] Use larger thread quantums --- src/sync.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sync.h b/src/sync.h index d9fe0fab06a..cee73ee6aae 100644 --- a/src/sync.h +++ b/src/sync.h @@ -364,7 +364,9 @@ class AtomicMutex { expected = false; if (i > spins) { if (i > spins + yields) { - std::this_thread::sleep_for(std::chrono::milliseconds(1)); + // Use larger sleep, in line with the largest quantum, which is + // Windows with 16ms + std::this_thread::sleep_for(std::chrono::milliseconds(16)); } else { std::this_thread::yield(); } From d84f789a16cc53bcb537779a5301e36a2fc3031d Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:30:26 +0800 Subject: [PATCH 06/10] Increase default yields since sleep quantums are larger --- src/sync.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync.h b/src/sync.h index cee73ee6aae..c4596adb897 100644 --- a/src/sync.h +++ b/src/sync.h @@ -338,7 +338,7 @@ class AtomicMutex { int64_t yields; public: - AtomicMutex(int64_t spins = 10, int64_t yields = 4): + AtomicMutex(int64_t spins = 10, int64_t yields = 16): spins(spins), yields(yields) {} void lock() { From bf61ef3f7984f1c049e49ceb5fe0cfe527dd8c28 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:33:44 +0800 Subject: [PATCH 07/10] Fix syntax --- src/sync.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sync.h b/src/sync.h index c4596adb897..a9535d1e5aa 100644 --- a/src/sync.h +++ b/src/sync.h @@ -401,7 +401,7 @@ class CLockFreeGuard // Reset expected to start from scratch again, since we only want // a singular atomic false -> true transition. expected = false; - std::this_thread::sleep_for(std::chrono::milliseconds(1); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); } } ~CLockFreeGuard() From 6c00d71259e891bb6ddade9f50aaa995aa624a93 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:39:32 +0800 Subject: [PATCH 08/10] Add more comments --- src/sync.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/sync.h b/src/sync.h index a9535d1e5aa..2608d1d16a3 100644 --- a/src/sync.h +++ b/src/sync.h @@ -380,6 +380,11 @@ class AtomicMutex { } bool try_lock() noexcept { + // We locked it if and only if it was a false -> true transition. + // Otherwise, we just re-wrote an already existing value as true which is harmless + // We could theoritically use CAS here to prevent the additional write, but + // but requires loop on weak, or using strong. Simpler to just use an exchange for + // for now, since all ops are seq_cst anyway. return !flag.exchange(true, std::memory_order_seq_cst); } }; From 2e09252c33d6ed21b8ee4c2eb4f6023e275001d0 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 08:46:21 +0800 Subject: [PATCH 09/10] Introduce BufferPool --- src/masternodes/threadpool.cpp | 9 +++++---- src/masternodes/threadpool.h | 37 +++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/src/masternodes/threadpool.cpp b/src/masternodes/threadpool.cpp index 1390eb378d1..4fe0fbc519f 100644 --- a/src/masternodes/threadpool.cpp +++ b/src/masternodes/threadpool.cpp @@ -31,16 +31,17 @@ void ShutdownDfTxGlobalTaskPool() { void TaskGroup::AddTask() { - tasks.fetch_add(1, std::memory_order_relaxed); + tasks.fetch_add(1, std::memory_order_release); } void TaskGroup::RemoveTask() { - if (tasks.fetch_sub(1, std::memory_order_seq_cst) == 1) { - cv.notify_one(); + if (tasks.fetch_sub(1, std::memory_order_acq_rel) == 1) { + cv.notify_all(); } } -void TaskGroup::WaitForCompletion() { +void TaskGroup::WaitForCompletion(bool checkForPrematureCompletion) { + if (checkForPrematureCompletion && tasks.load() == 0) return; std::unique_lock l(cv_m); cv.wait(l, [&] { return tasks.load() == 0; }); } diff --git a/src/masternodes/threadpool.h b/src/masternodes/threadpool.h index 46374ef15e4..d5ccf7d6bfc 100644 --- a/src/masternodes/threadpool.h +++ b/src/masternodes/threadpool.h @@ -2,7 +2,9 @@ #define DEFI_MASTERNODES_THREADPOOL_H #include +#include #include +#include static const int DEFAULT_DFTX_WORKERS=0; @@ -33,11 +35,44 @@ class TaskGroup { public: void AddTask(); void RemoveTask(); - void WaitForCompletion(); + void WaitForCompletion(bool checkForPrematureCompletion = true); + void MarkCancellation() { is_cancelled.store(true); } + bool IsCancelled() { return is_cancelled.load(); } + private: std::atomic tasks{0}; std::mutex cv_m; std::condition_variable cv; + std::atomic_bool is_cancelled{false}; +}; + +template +class BufferPool { + public: + explicit BufferPool(size_t size) { + pool.reserve(size); + for (size_t i = 0; i < size; i++) { + pool.push_back(std::make_shared()); + } + } + + std::shared_ptr Acquire() { + std::unique_lock l{m}; + auto res = pool.back(); + pool.pop_back(); + return res; + } + + void Release(std::shared_ptr res) { + std::unique_lock l{m}; + pool.push_back(res); + } + + std::vector> &GetBuffer() { return pool; } + + private: + AtomicMutex m{}; + std::vector> pool; }; extern std::unique_ptr DfTxTaskPool; From 9f4f9839a7697200a0762310adc07f57aaf079d3 Mon Sep 17 00:00:00 2001 From: Prasanna Loganathar Date: Mon, 6 Mar 2023 09:04:25 +0800 Subject: [PATCH 10/10] Switch getburninfo to use BufferPool --- src/masternodes/rpc_accounts.cpp | 44 +++++--------------------------- 1 file changed, 7 insertions(+), 37 deletions(-) diff --git a/src/masternodes/rpc_accounts.cpp b/src/masternodes/rpc_accounts.cpp index c53d4d02729..538a14e2e45 100644 --- a/src/masternodes/rpc_accounts.cpp +++ b/src/masternodes/rpc_accounts.cpp @@ -2009,53 +2009,23 @@ UniValue getburninfo(const JSONRPCRequest& request) { } } - class WorkerResultPool { - public: - explicit WorkerResultPool(size_t size) { - pool.reserve(size); - for (size_t i = 0; i < size; i++) { - pool.push_back(std::make_shared()); - } - } - - std::shared_ptr Acquire() { - CLockFreeGuard lock{syncFlag}; - auto res = pool.back(); - pool.pop_back(); - return res; - } - - void Release(std::shared_ptr res) { - CLockFreeGuard lock{syncFlag}; - pool.push_back(res); - } - - std::vector> &GetContainer() { - return pool; - } - - private: - std::atomic_bool syncFlag{}; - std::vector> pool; - }; - auto nWorkers = DfTxTaskPool->GetAvailableThreads(); if (static_cast(height) < nWorkers) { nWorkers = height; } - const auto chunks = height / nWorkers; + const auto chunkSize = height / nWorkers; TaskGroup g; - WorkerResultPool resultsPool{nWorkers}; + BufferPool resultsPool{nWorkers}; auto &pool = DfTxTaskPool->pool; auto processedHeight = initialResult.height; auto i = 0; while (processedHeight < height) { - auto startHeight = initialResult.height + (chunks * (i + 1)); - auto stopHeight = initialResult.height + (chunks * (i)); + auto startHeight = initialResult.height + (chunkSize * (i + 1)); + auto stopHeight = initialResult.height + (chunkSize * (i)); g.AddTask(); boost::asio::post(pool, [startHeight, stopHeight, &g, &resultsPool] { @@ -2138,14 +2108,14 @@ UniValue getburninfo(const JSONRPCRequest& request) { g.RemoveTask(); }); - // perfect accuracy: processedHeight += (startHeight > height) ? chunksRemainder : chunks; - processedHeight += chunks; + // perfect accuracy: processedHeight += (startHeight > height) ? chunksRemainder : chunkSize; + processedHeight += chunkSize; i++; } g.WaitForCompletion(); - for (const auto &r : resultsPool.GetContainer()) { + for (const auto &r : resultsPool.GetBuffer()) { totalResult->burntDFI += r->burntDFI; totalResult->burntFee += r->burntFee; totalResult->auctionFee += r->auctionFee;