Skip to content

Commit

Permalink
rpc: getburninfo performance improvements (#1788)
Browse files Browse the repository at this point in the history
* Reverts of Require in Sub/Add Balances

* Revert ifs on Add/Sub

* Fix burninfo concurrency and switch to worker number agnostic queuing

* Remove getburninfo2 used for testing

* Optimize vector reservation

* Readd nodiscard

* Readd const

* Realign vector alloc

* Add fixed buffer to reduce allocations

* Switch to lockfree sync

* Use raw atomic ops

* Remove redundant flush

* Introducing last result cache. Switching paralelization to mutex instead of lock free atomics.

* Fix condition

* Final maintainability touches

* cacheOrDefaultResult to initialResult

---------

Co-authored-by: Mihailo Milenkovic <mihailo.milenkovic84@gmail.com>
Co-authored-by: jouzo <jdesclercs@gmail.com>
  • Loading branch information
3 people authored Mar 3, 2023
1 parent a97fc6c commit 0f29d5d
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 111 deletions.
5 changes: 3 additions & 2 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,7 @@ static bool AppInitServers()
default: return RPCResultCache::RPCCacheMode::None;
}}();
GetRPCResultCache().Init(rpcCacheMode);
GetMemoizedResultCache().Init(rpcCacheMode);

RPCServer::OnStarted(&OnRPCStarted);
RPCServer::OnStopped(&OnRPCStopped);
Expand Down Expand Up @@ -1480,8 +1481,8 @@ bool AppInitMain(InitInterfaces& interfaces)
// -par=-n means "leave n cores free" (number of cores - n - 1 script threads)
script_threads += GetNumCores();
// DeFiChain specific:
// Set this to a max value, since most custom TXs don't utilize this unfortunately
// and is just a waste of resources.
// Set this to a max value, since most custom TXs don't utilize this unfortunately
// and is just a waste of resources.
script_threads = std::min(script_threads, 4);
}

Expand Down
227 changes: 119 additions & 108 deletions src/masternodes/rpc_accounts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,6 @@
#include <masternodes/threadpool.h>
#include <boost/asio.hpp>

struct BalanceResults {
CAmount burntDFI{};
CAmount burntFee{};
CAmount auctionFee{};
CBalances burntTokens;
CBalances nonConsortiumTokens;
CBalances dexfeeburn;
CBalances paybackFee;
};

std::string tokenAmountString(const CTokenAmount &amount, AmountFormat format = AmountFormat::Symbol) {
const auto token = pcustomcsview->GetToken(amount.nTokenId);
const auto amountString = ValueFromAmount(amount.nValue).getValStr();
Expand Down Expand Up @@ -1972,19 +1962,14 @@ UniValue getburninfo(const JSONRPCRequest& request) {
}.Check(request);

if (auto res = GetRPCResultCache().TryGet(request)) return *res;
auto initialResult = GetMemoizedResultCache().GetOrDefault(request);
auto totalResult = std::get_if<CGetBurnInfoResult>(&initialResult.data);

CAmount burntDFI{0};
CAmount burntFee{0};
CAmount auctionFee{0};
CAmount dfiPaybackFee{0};
CAmount burnt{0};

CBalances burntTokens;
CBalances consortiumTokens;
CBalances nonConsortiumTokens;
CBalances dexfeeburn;
CBalances paybackfees;
CBalances paybackFee;
CBalances paybacktokens;
CBalances dfi2203Tokens;
CBalances dfipaybacktokens;
Expand All @@ -1993,6 +1978,7 @@ UniValue getburninfo(const JSONRPCRequest& request) {
LOCK(cs_main);

auto height = ::ChainActive().Height();
auto hash = ::ChainActive().Tip()->GetBlockHash();
auto fortCanningHeight = Params().GetConsensus().FortCanningHeight;
auto burnAddress = Params().GetConsensus().burnAddress;
auto view = *pcustomcsview;
Expand Down Expand Up @@ -2028,109 +2014,133 @@ UniValue getburninfo(const JSONRPCRequest& request) {
}
}

class WorkerResultPool {
public:
explicit WorkerResultPool(size_t size) {
pool.reserve(size);
for (auto i = 0; i < size; i++) {
pool.push_back(CGetBurnInfoResult{});
}
}

std::shared_ptr<CGetBurnInfoResult> Acquire() {
LOCK(syncFlag);
auto res = std::make_shared<CGetBurnInfoResult>(pool.back());
pool.pop_back();

return res;
}

void Release(std::shared_ptr<CGetBurnInfoResult> res) {
LOCK(syncFlag);
pool.push_back(*res);
}

std::vector<CGetBurnInfoResult> &GetContainer() {
return pool;
}

private:
CCriticalSection syncFlag;
std::vector<CGetBurnInfoResult> pool;
};

auto nWorkers = DfTxTaskPool->GetAvailableThreads();
if (static_cast<decltype(nWorkers)>(height) < nWorkers) {
if (static_cast<size_t>(height) < nWorkers) {
nWorkers = height;
}

const auto chunks = height / nWorkers;

TaskGroup g;

std::vector<std::shared_ptr<BalanceResults>> workerResults;
// Note this creates a massive amount of chunks as we go in mem.
// But this is fine for now. Most optimal impl is to return the future val
// and add it on receive. It requires a bit more changes, but for now
// this should do.
// However reserve in one-go to prevent numerous reallocations
workerResults.reserve(chunks + 1);

for (size_t i = 0; i <= chunks; i++) {
auto result = std::make_shared<BalanceResults>();
workerResults.push_back(result);
}
WorkerResultPool resultsPool{nWorkers};

auto &pool = DfTxTaskPool->pool;
auto processedHeight = 0;
auto processedHeight = initialResult.height;
auto i = 0;
while (processedHeight < height)
{
auto startHeight = (chunks * (i + 1));
auto stopHeight = (chunks * (i));
auto result = workerResults[i];
auto startHeight = initialResult.height + (chunks * (i + 1));
auto stopHeight = initialResult.height + (chunks * (i));

g.AddTask();
boost::asio::post(pool, [result, startHeight, stopHeight, &g] {
pburnHistoryDB->ForEachAccountHistory([result, stopHeight](const AccountHistoryKey &key, const AccountHistoryValue &value) {
boost::asio::post(pool, [startHeight, stopHeight, &g, &resultsPool] {
auto currentResult = resultsPool.Acquire();

// Stop on chunk range for worker
if (key.blockHeight <= stopHeight) {
return false;
}
pburnHistoryDB->ForEachAccountHistory(
[currentResult, stopHeight](const AccountHistoryKey &key, const AccountHistoryValue &value) {
// Stop on chunk range for worker
if (key.blockHeight <= stopHeight) {
return false;
}

// UTXO burn
if (value.category == uint8_t(CustomTxType::None)) {
for (auto const & diff : value.diff) {
result->burntDFI += diff.second;
// UTXO burn
if (value.category == uint8_t(CustomTxType::None)) {
for (auto const &diff : value.diff) {
currentResult->burntDFI += diff.second;
}
return true;
}
return true;
}

// Fee burn
if (value.category == uint8_t(CustomTxType::CreateMasternode)
|| value.category == uint8_t(CustomTxType::CreateToken)
|| value.category == uint8_t(CustomTxType::Vault)
|| value.category == uint8_t(CustomTxType::CreateCfp)
|| value.category == uint8_t(CustomTxType::CreateVoc)) {
for (auto const & diff : value.diff) {
result->burntFee += diff.second;
// Fee burn
if (value.category == uint8_t(CustomTxType::CreateMasternode) ||
value.category == uint8_t(CustomTxType::CreateToken) ||
value.category == uint8_t(CustomTxType::Vault) ||
value.category == uint8_t(CustomTxType::CreateCfp) ||
value.category == uint8_t(CustomTxType::CreateVoc)) {
for (auto const &diff : value.diff) {
currentResult->burntFee += diff.second;
}
return true;
}
return true;
}

// withdraw burn
if (value.category == uint8_t(CustomTxType::PaybackLoan)
|| value.category == uint8_t(CustomTxType::PaybackLoanV2)
|| value.category == uint8_t(CustomTxType::PaybackWithCollateral)) {
for (const auto& [id, amount] : value.diff) {
result->paybackFee.Add({id, amount});
// withdraw burn
if (value.category == uint8_t(CustomTxType::PaybackLoan) ||
value.category == uint8_t(CustomTxType::PaybackLoanV2) ||
value.category == uint8_t(CustomTxType::PaybackWithCollateral)) {
for (const auto &[id, amount] : value.diff) {
currentResult->paybackFee.Add({id, amount});
}
return true;
}
return true;
}

// auction burn
if (value.category == uint8_t(CustomTxType::AuctionBid)) {
for (auto const & diff : value.diff) {
result->auctionFee += diff.second;
// auction burn
if (value.category == uint8_t(CustomTxType::AuctionBid)) {
for (auto const &diff : value.diff) {
currentResult->auctionFee += diff.second;
}
return true;
}
return true;
}

// dex fee burn
if (value.category == uint8_t(CustomTxType::PoolSwap)
|| value.category == uint8_t(CustomTxType::PoolSwapV2)) {
for (auto const & diff : value.diff) {
result->dexfeeburn.Add({diff.first, diff.second});
// dex fee burn
if (value.category == uint8_t(CustomTxType::PoolSwap) ||
value.category == uint8_t(CustomTxType::PoolSwapV2)) {
for (auto const &diff : value.diff) {
currentResult->dexfeeburn.Add({diff.first, diff.second});
}
return true;
}
return true;
}

// token burn with burnToken tx
if (value.category == uint8_t(CustomTxType::BurnToken))
{
for (auto const & diff : value.diff) {
result->nonConsortiumTokens.Add({diff.first, diff.second});
// token burn with burnToken tx
if (value.category == uint8_t(CustomTxType::BurnToken)) {
for (auto const &diff : value.diff) {
currentResult->nonConsortiumTokens.Add({diff.first, diff.second});
}
return true;
}
return true;
}

// Token burn
for (auto const & diff : value.diff) {
result->burntTokens.Add({diff.first, diff.second});
}
// Token burn
for (auto const &diff : value.diff) {
currentResult->burntTokens.Add({diff.first, diff.second});
}

return true;
}, {}, startHeight, std::numeric_limits<uint32_t>::max());
return true;
},
{},
startHeight,
std::numeric_limits<uint32_t>::max());

resultsPool.Release(currentResult);
g.RemoveTask();
});

Expand All @@ -2141,38 +2151,38 @@ UniValue getburninfo(const JSONRPCRequest& request) {

g.WaitForCompletion();

for (const auto &result : workerResults) {
burntDFI += result->burntDFI;
burntFee += result->burntFee;
auctionFee += result->auctionFee;
burntTokens.AddBalances(result->burntTokens.balances);
nonConsortiumTokens.AddBalances(result->nonConsortiumTokens.balances);
dexfeeburn.AddBalances(result->dexfeeburn.balances);
paybackFee.AddBalances(result->paybackFee.balances);
for (const auto &r : resultsPool.GetContainer()) {
totalResult->burntDFI += r.burntDFI;
totalResult->burntFee += r.burntFee;
totalResult->auctionFee += r.auctionFee;
totalResult->burntTokens.AddBalances(r.burntTokens.balances);
totalResult->nonConsortiumTokens.AddBalances(r.nonConsortiumTokens.balances);
totalResult->dexfeeburn.AddBalances(r.dexfeeburn.balances);
totalResult->paybackFee.AddBalances(r.paybackFee.balances);
}

CDataStructureV0 liveKey = {AttributeTypes::Live, ParamIDs::Economy, EconomyKeys::ConsortiumMinted};
auto balances = attributes->GetValue(liveKey, CConsortiumGlobalMinted{});

for (const auto &token : nonConsortiumTokens.balances) {
for (const auto &token : totalResult->nonConsortiumTokens.balances) {
TAmounts amount;
amount[token.first] = balances[token.first].burnt;
consortiumTokens.AddBalances(amount);
}

nonConsortiumTokens.SubBalances(consortiumTokens.balances);
burntTokens.AddBalances(nonConsortiumTokens.balances);
totalResult->nonConsortiumTokens.SubBalances(consortiumTokens.balances);
totalResult->burntTokens.AddBalances(totalResult->nonConsortiumTokens.balances);

UniValue result(UniValue::VOBJ);
result.pushKV("address", ScriptToString(burnAddress));
result.pushKV("amount", ValueFromAmount(burntDFI));
result.pushKV("amount", ValueFromAmount(totalResult->burntDFI));

result.pushKV("tokens", AmountsToJSON(burntTokens.balances));
result.pushKV("tokens", AmountsToJSON(totalResult->burntTokens.balances));
result.pushKV("consortiumtokens", AmountsToJSON(consortiumTokens.balances));
result.pushKV("feeburn", ValueFromAmount(burntFee));
result.pushKV("auctionburn", ValueFromAmount(auctionFee));
result.pushKV("paybackburn", AmountsToJSON(paybackFee.balances));
result.pushKV("dexfeetokens", AmountsToJSON(dexfeeburn.balances));
result.pushKV("feeburn", ValueFromAmount(totalResult->burntFee));
result.pushKV("auctionburn", ValueFromAmount(totalResult->auctionFee));
result.pushKV("paybackburn", AmountsToJSON(totalResult->paybackFee.balances));
result.pushKV("dexfeetokens", AmountsToJSON(totalResult->dexfeeburn.balances));

result.pushKV("dfipaybackfee", ValueFromAmount(dfiPaybackFee));
result.pushKV("dfipaybacktokens", AmountsToJSON(dfipaybacktokens.balances));
Expand All @@ -2184,6 +2194,7 @@ UniValue getburninfo(const JSONRPCRequest& request) {
result.pushKV("dfip2203", AmountsToJSON(dfi2203Tokens.balances));
result.pushKV("dfip2206f", AmountsToJSON(dfiToDUSDTokens.balances));

GetMemoizedResultCache().Set(request, {height, hash, *totalResult});
return GetRPCResultCache()
.Set(request, result);
}
Expand Down
43 changes: 42 additions & 1 deletion src/rpc/resultcache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,47 @@ int GetLastValidatedHeight() {

void SetLastValidatedHeight(int height) {
LogPrint(BCLog::RPCCACHE, "RPCCache: set height: %d\n", height);
g_lastValidatedHeight.store(height, std::memory_order_release);
g_lastValidatedHeight.store(height, std::memory_order_release);
GetRPCResultCache().InvalidateCaches();
}

void MemoizedResultCache::Init(RPCResultCache::RPCCacheMode mode) {
CLockFreeGuard lock{syncFlag};
this->mode = mode;
}

CMemoizedResultValue MemoizedResultCache::GetOrDefault(const JSONRPCRequest &request) {
if (mode == RPCResultCache::RPCCacheMode::None) return {};
auto key = GetKey(request);
{
CLockFreeGuard lock{syncFlag};
if (auto res = cacheMap.find(key); res != cacheMap.end()) {
if (!::ChainActive().Contains(LookupBlockIndex(res->second.hash)))
return {};

if (LogAcceptCategory(BCLog::RPCCACHE)) {
LogPrint(BCLog::RPCCACHE, "RPCCache: hit: key: %d/%s\n", res->second.height, key);
}
return res->second;
}
}
return {};
}

void MemoizedResultCache::Set(const JSONRPCRequest &request, const CMemoizedResultValue &value) {
auto key = GetKey(request);
{
CLockFreeGuard lock{syncFlag};
if (LogAcceptCategory(BCLog::RPCCACHE)) {
LogPrint(BCLog::RPCCACHE, "RPCCache: set: key: %d/%s\n", value.height, key);
}
cacheMap[key] = value;
}
}

// Note: We initialize all the globals in the init phase. So, it's safe. Otherwise,
// static init is undefined behavior when multiple threads init them at the same time.
MemoizedResultCache& GetMemoizedResultCache() {
static MemoizedResultCache g_memoizedResultCache;
return g_memoizedResultCache;
}
Loading

0 comments on commit 0f29d5d

Please sign in to comment.