Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcdaemon: extract awaitable async task run by executor #1953

Merged
merged 11 commits into from
Apr 9, 2024
1 change: 1 addition & 0 deletions .github/workflows/rpc-performance-tests.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
name: QA - RPC Performance Tests

on:
workflow_dispatch:
schedule:
- cron: '0 0 * * *' # Run every day at 00:00 AM UTC

Expand Down
79 changes: 35 additions & 44 deletions silkworm/rpc/commands/debug_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
#include <string>
#include <vector>

#include <boost/asio/compose.hpp>
#include <boost/asio/post.hpp>
#include <evmc/evmc.hpp>

#include <silkworm/core/common/endian.hpp>
Expand All @@ -36,6 +34,7 @@
#include <silkworm/db/util.hpp>
#include <silkworm/infra/common/ensure.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/rpc/common/async_task.hpp>
#include <silkworm/rpc/common/util.hpp>
#include <silkworm/rpc/core/account_dumper.hpp>
#include <silkworm/rpc/core/blocks.hpp>
Expand All @@ -47,7 +46,6 @@
#include <silkworm/rpc/core/storage_walker.hpp>
#include <silkworm/rpc/ethdb/kv/cached_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/json/call.hpp>
#include <silkworm/rpc/json/types.hpp>
#include <silkworm/rpc/types/block.hpp>
#include <silkworm/rpc/types/call.hpp>
Expand Down Expand Up @@ -254,10 +252,10 @@ Task<void> DebugRpcApi::handle_debug_storage_range_at(const nlohmann::json& requ
co_await storage_walker.storage_range_at(block_number, address, start_key, max_result, collector);

nlohmann::json result = {{"storage", storage}};
if (next_key.length() > 0) {
result["nextKey"] = "0x" + silkworm::to_hex(next_key);
} else {
if (next_key.empty()) {
result["nextKey"] = nlohmann::json();
} else {
result["nextKey"] = "0x" + silkworm::to_hex(next_key);
}

reply = make_json_content(request, result);
Expand Down Expand Up @@ -314,46 +312,39 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n

auto chain_config_ptr = co_await chain_storage->read_chain_config();
ensure(chain_config_ptr.has_value(), "cannot read chain config");

auto this_executor = co_await boost::asio::this_coro::executor;
auto result = co_await async_task(workers_.executor(), [&]() -> nlohmann::json {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

EVMExecutor executor{*chain_config_ptr, workers_, state};

auto result = co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), void(nlohmann::json)>(
[&](auto& self) {
boost::asio::post(workers_, [&, self = std::move(self)]() mutable {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

EVMExecutor executor{*chain_config_ptr, workers_, state};

uint64_t index = std::min(static_cast<uint64_t>(transactions.size()), tx_index);
for (uint64_t idx{0}; idx < index; idx++) {
rpc::Transaction txn{transactions[idx]};
executor.call(block, txn);
}

const auto& ibs = executor.get_ibs_state();

nlohmann::json json_result;
if (ibs.exists(address)) {
std::ostringstream oss;
oss << std::hex << ibs.get_nonce(address);
json_result["nonce"] = "0x" + oss.str();
json_result["balance"] = "0x" + intx::to_string(ibs.get_balance(address), 16);
json_result["codeHash"] = ibs.get_code_hash(address);
json_result["code"] = "0x" + silkworm::to_hex(ibs.get_code(address));
} else {
json_result["balance"] = "0x0";
json_result["code"] = "0x";
json_result["codeHash"] = "0x0000000000000000000000000000000000000000000000000000000000000000";
json_result["nonce"] = "0x0";
}

boost::asio::post(this_executor, [json_result, self = std::move(self)]() mutable {
self.complete(json_result);
});
});
},
boost::asio::use_awaitable);
uint64_t index = std::min(static_cast<uint64_t>(transactions.size()), tx_index);
for (uint64_t idx{0}; idx < index; idx++) {
rpc::Transaction txn{transactions[idx]};
executor.call(block, txn);
}

const auto& ibs = executor.get_ibs_state();

nlohmann::json json_result;
if (ibs.exists(address)) {
std::ostringstream oss;
oss << std::hex << ibs.get_nonce(address);
json_result["nonce"] = "0x" + oss.str();
json_result["balance"] = "0x" + intx::to_string(ibs.get_balance(address), 16);
json_result["codeHash"] = ibs.get_code_hash(address);
json_result["code"] = "0x" + silkworm::to_hex(ibs.get_code(address));
} else {
json_result["balance"] = "0x0";
json_result["code"] = "0x";
json_result["codeHash"] = "0x0000000000000000000000000000000000000000000000000000000000000000";
json_result["nonce"] = "0x0";
}
return json_result;
});

reply = make_json_content(request, result);
} catch (const std::invalid_argument& e) {
Expand Down
82 changes: 82 additions & 0 deletions silkworm/rpc/common/async_task.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
Copyright 2024 The Silkworm Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#include <exception>
#include <type_traits>
#include <utility>

#include <silkworm/infra/concurrency/task.hpp>

#include <boost/asio/compose.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/this_coro.hpp>
#include <boost/asio/use_awaitable.hpp>

namespace silkworm::rpc {

//! Helper trait for any completion handler signature
template <typename R, typename F, typename... Args>
struct CompletionHandler {
using type = void(std::exception_ptr, R);
};

//! Partial specialization for \code void return type
template <typename F, typename... Args>
struct CompletionHandler<void, F, Args...> {
using type = void(std::exception_ptr);
};

//! Alias helper trait for the completion handler signature of any task
template <typename F, typename... Args>
using TaskCompletionHandler = typename CompletionHandler<std::invoke_result_t<F, Args...>, F, Args...>::type;

//! Asynchronous \code co_await-able task executing function \code fn with arguments \code args in \code runner executor
template <typename Executor, typename F, typename... Args>
// NOLINTNEXTLINE(cppcoreguidelines-missing-std-forward) because of https://github.com/llvm/llvm-project/issues/68105
Task<std::invoke_result_t<F, Args...>> async_task(Executor runner, F&& fn, Args&&... args) {
auto this_executor = co_await ThisTask::executor;
co_return co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), TaskCompletionHandler<F, Args...>>(
[&this_executor, &runner, fn = std::forward<F>(fn), ... args = std::forward<Args>(args)](auto& self) mutable {
boost::asio::post(runner, [&, fn = std::forward<decltype(fn)>(fn), ... args = std::forward<Args>(args), self = std::move(self)]() mutable {
try {
if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>) {
std::invoke(fn, args...);
boost::asio::post(this_executor, [self = std::move(self)]() mutable {
self.complete({});
});
} else {
auto result = std::invoke(fn, args...);
boost::asio::post(this_executor, [result = std::move(result), self = std::move(self)]() mutable {
self.complete({}, result);
});
}
} catch (...) {
std::exception_ptr eptr = std::current_exception();
boost::asio::post(this_executor, [eptr, self = std::move(self)]() mutable {
if constexpr (std::is_void_v<std::invoke_result_t<F, Args...>>)
self.complete(eptr);
else
self.complete(eptr, {});
});
}
});
},
boost::asio::use_awaitable);
}

} // namespace silkworm::rpc
92 changes: 92 additions & 0 deletions silkworm/rpc/common/async_task_benchmark.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
Copyright 2024 The Silkworm Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#include <benchmark/benchmark.h>
#include <boost/asio/thread_pool.hpp>

#include <silkworm/rpc/test/context_test_base.hpp>

#include "async_task.hpp"

namespace silkworm::rpc {

std::size_t recursive_factorial(std::size_t n) {
return n == 0 ? 1 : n * recursive_factorial(n - 1);
}

struct AsyncTaskBenchTest : test::ContextTestBase {
};

template <typename Executor>
Task<std::size_t> async_compose_factorial(const Executor runner, const std::size_t number) {
const auto this_executor = co_await ThisTask::executor;
co_return co_await boost::asio::async_compose<decltype(boost::asio::use_awaitable), void(std::exception_ptr, std::size_t)>(
[&](auto& self) {
boost::asio::post(runner, [&, self = std::move(self)]() mutable {
try {
const auto result = recursive_factorial(number);
boost::asio::post(this_executor, [result, self = std::move(self)]() mutable {
self.complete({}, result);
});
} catch (...) {
std::exception_ptr eptr = std::current_exception();
boost::asio::post(this_executor, [eptr, self = std::move(self)]() mutable {
self.complete(eptr, {});
});
}
});
},
boost::asio::use_awaitable);
}

static void benchmark_async_compose(benchmark::State& state) {
const auto n = static_cast<std::size_t>(state.range(0));

boost::asio::thread_pool workers{};
AsyncTaskBenchTest test;
for ([[maybe_unused]] auto _ : state) {
const auto result = test.spawn_and_wait(async_compose_factorial(workers.get_executor(), n));
benchmark::DoNotOptimize(result);
}
}

BENCHMARK(benchmark_async_compose)->Arg(10);
BENCHMARK(benchmark_async_compose)->Arg(100);
BENCHMARK(benchmark_async_compose)->Arg(1'000);
BENCHMARK(benchmark_async_compose)->Arg(10'000);

template <typename Executor>
Task<std::size_t> async_task_factorial(Executor runner, std::size_t number) {
co_return co_await async_task(runner, recursive_factorial, number);
}

static void benchmark_async_task(benchmark::State& state) {
const auto n = static_cast<std::size_t>(state.range(0));

boost::asio::thread_pool workers{};
AsyncTaskBenchTest test;
for ([[maybe_unused]] auto _ : state) {
const auto result = test.spawn_and_wait(async_task_factorial(workers.get_executor(), n));
benchmark::DoNotOptimize(result);
}
}

BENCHMARK(benchmark_async_task)->Arg(10);
BENCHMARK(benchmark_async_task)->Arg(100);
BENCHMARK(benchmark_async_task)->Arg(1'000);
BENCHMARK(benchmark_async_task)->Arg(10'000);

} // namespace silkworm::rpc
Loading
Loading