Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcdaemon: refactor chain and state access layer #2084

Merged
merged 4 commits into from
Jun 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/get_latest_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,14 @@
#include <absl/strings/match.h>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/use_awaitable.hpp>
#include <boost/asio/use_future.hpp>
#include <grpcpp/grpcpp.h>

#include <silkworm/core/common/util.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/infra/grpc/client/client_context_pool.hpp>
#include <silkworm/rpc/common/constants.hpp>
#include <silkworm/rpc/core/blocks.hpp>
#include <silkworm/rpc/ethdb/kv/remote_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>

using namespace silkworm;
using namespace silkworm::rpc;
Expand All @@ -48,8 +45,7 @@ Task<std::optional<uint64_t>> latest_block(ethdb::Database& db) {

const auto db_transaction = co_await db.begin();
try {
ethdb::TransactionDatabase tx_db_reader{*db_transaction};
block_height = co_await core::get_latest_block_number(tx_db_reader);
block_height = co_await core::get_latest_block_number(*db_transaction);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what();
} catch (...) {
Expand Down Expand Up @@ -95,8 +91,9 @@ int main(int argc, char* argv[]) {
auto& context = context_pool.next_context();
auto io_context = context.io_context();

ethdb::kv::CoherentStateCache state_cache;
auto channel{::grpc::CreateChannel(target, ::grpc::InsecureChannelCredentials())};
auto database = std::make_unique<ethdb::kv::RemoteDatabase>(*context.grpc_context(), channel);
auto database = std::make_unique<ethdb::kv::RemoteDatabase>(&state_cache, *context.grpc_context(), channel);

auto context_pool_thread = std::thread([&]() { context_pool.run(); });

Expand Down
78 changes: 30 additions & 48 deletions silkworm/rpc/commands/debug_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "debug_api.hpp"

#include <algorithm>
#include <chrono>
#include <ostream>
#include <set>
#include <stdexcept>
Expand All @@ -44,8 +43,6 @@
#include <silkworm/rpc/core/rawdb/chain.hpp>
#include <silkworm/rpc/core/state_reader.hpp>
#include <silkworm/rpc/core/storage_walker.hpp>
#include <silkworm/rpc/ethdb/kv/cached_database.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/json/types.hpp>
#include <silkworm/rpc/types/block.hpp>
#include <silkworm/rpc/types/call.hpp>
Expand Down Expand Up @@ -126,11 +123,10 @@ Task<void> DebugRpcApi::handle_debug_get_modified_accounts_by_number(const nlohm
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto start_block_number = co_await core::get_block_number(start_block_id, tx_database);
const auto end_block_number = co_await core::get_block_number(end_block_id, tx_database);
const auto start_block_number = co_await core::get_block_number(start_block_id, *tx);
const auto end_block_number = co_await core::get_block_number(end_block_id, *tx);

const auto addresses = co_await get_modified_accounts(tx_database, start_block_number, end_block_number);
const auto addresses = co_await get_modified_accounts(*tx, start_block_number, end_block_number);
reply = make_json_content(request, addresses);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -167,11 +163,9 @@ Task<void> DebugRpcApi::handle_debug_get_modified_accounts_by_hash(const nlohman
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};

const auto start_block_number = co_await core::rawdb::read_header_number(tx_database, start_hash);
const auto end_block_number = co_await core::rawdb::read_header_number(tx_database, end_hash);
auto addresses = co_await get_modified_accounts(tx_database, start_block_number, end_block_number);
const auto start_block_number = co_await core::rawdb::read_header_number(*tx, start_hash);
const auto end_block_number = co_await core::rawdb::read_header_number(*tx, end_hash);
auto addresses = co_await get_modified_accounts(*tx, start_block_number, end_block_number);
reply = make_json_content(request, addresses);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -213,8 +207,7 @@ Task<void> DebugRpcApi::handle_debug_storage_range_at(const nlohmann::json& requ
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash);
if (!block_with_hash) {
Expand Down Expand Up @@ -292,8 +285,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash);
if (!block_with_hash) {
Expand All @@ -315,7 +307,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n

auto this_executor = co_await boost::asio::this_coro::executor;
auto result = co_await async_task(workers_.executor(), [&]() -> nlohmann::json {
auto state = tx->create_state(this_executor, tx_database, *chain_storage, block_number - 1);
auto state = tx->create_state(this_executor, *chain_storage, block_number - 1);
auto account_opt = state->read_account(address);
account_opt.value_or(silkworm::Account{});

Expand Down Expand Up @@ -389,9 +381,8 @@ Task<void> DebugRpcApi::handle_debug_trace_transaction(const nlohmann::json& req
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(tx_database, backend_);
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(backend_);
co_await executor.trace_transaction(stream, *chain_storage, transaction_hash);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -436,15 +427,12 @@ Task<void> DebugRpcApi::handle_debug_trace_call(const nlohmann::json& request, j
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
ethdb::kv::CachedDatabase cached_database{block_number_or_hash, *tx, *state_cache_};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

const bool is_latest_block = co_await core::is_latest_block_number(block_number_or_hash, tx_database);
const core::rawdb::DatabaseReader& db_reader =
is_latest_block ? static_cast<core::rawdb::DatabaseReader&>(cached_database) : static_cast<core::rawdb::DatabaseReader&>(tx_database);
const bool is_latest_block = co_await core::is_latest_block_number(block_number_or_hash, *tx);
tx->set_state_cache_enabled(/*cache_enabled=*/is_latest_block);

debug::DebugExecutor executor{db_reader, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_call(stream, block_number_or_hash, *chain_storage, call);
} catch (const std::exception& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -511,9 +499,8 @@ Task<void> DebugRpcApi::handle_debug_trace_call_many(const nlohmann::json& reque
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(tx_database, backend_);
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
const auto chain_storage = tx->create_storage(backend_);
co_await executor.trace_call_many(stream, *chain_storage, bundles, simulation_context);
} catch (...) {
SILK_ERROR << "unexpected exception processing request: " << request.dump();
Expand Down Expand Up @@ -558,10 +545,9 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_number(const nlohmann::json&
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_block(stream, *chain_storage, block_number);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand Down Expand Up @@ -611,10 +597,9 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_hash(const nlohmann::json& r
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, backend_);
const auto chain_storage = tx->create_storage(backend_);

debug::DebugExecutor executor{tx_database, *block_cache_, workers_, *tx, config};
debug::DebugExecutor executor{*block_cache_, workers_, *tx, config};
co_await executor.trace_block(stream, *chain_storage, block_hash);
} catch (const std::invalid_argument& e) {
SILK_ERROR << "exception: " << e.what() << " processing request: " << request.dump();
Expand All @@ -638,8 +623,8 @@ Task<void> DebugRpcApi::handle_debug_trace_block_by_hash(const nlohmann::json& r
co_return;
}

Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase& tx_database, BlockNum start_block_number, BlockNum end_block_number) {
const auto latest_block_number = co_await core::get_block_number(core::kLatestBlockId, tx_database);
Task<std::set<evmc::address>> get_modified_accounts(ethdb::Transaction& tx, BlockNum start_block_number, BlockNum end_block_number) {
const auto latest_block_number = co_await core::get_block_number(core::kLatestBlockId, tx);

SILK_DEBUG << "latest: " << latest_block_number << " start: " << start_block_number << " end: " << end_block_number;

Expand All @@ -649,7 +634,7 @@ Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase&
msg << "start block (" << start_block_number << ") is later than the latest block (" << latest_block_number << ")";
throw std::invalid_argument(msg.str());
} else if (start_block_number <= end_block_number) {
core::rawdb::Walker walker = [&](const silkworm::Bytes& key, const silkworm::Bytes& value) {
auto walker = [&](const silkworm::Bytes& key, const silkworm::Bytes& value) {
auto block_number = static_cast<BlockNum>(std::stol(silkworm::to_hex(key), nullptr, 16));
if (block_number <= end_block_number) {
auto address = bytes_to_address(value.substr(0, kAddressLength));
Expand All @@ -663,7 +648,7 @@ Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase&
const auto key = silkworm::db::block_key(start_block_number);
SILK_TRACE << "Ready to walk starting from key: " << silkworm::to_hex(key);

co_await tx_database.walk(db::table::kAccountChangeSetName, key, 0, walker);
co_await tx.walk(db::table::kAccountChangeSetName, key, 0, walker);
}

co_return addresses;
Expand All @@ -684,9 +669,8 @@ Task<void> DebugRpcApi::handle_debug_get_raw_block(const nlohmann::json& request
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, nullptr);
const auto block_number = co_await core::get_block_number(block_id, tx_database);
const auto chain_storage = tx->create_storage(backend_);
const auto block_number = co_await core::get_block_number(block_id, *tx);
silkworm::Block block;
if (!(co_await chain_storage->read_canonical_block(block_number, block))) {
throw std::invalid_argument("block not found");
Expand Down Expand Up @@ -723,9 +707,8 @@ Task<void> DebugRpcApi::handle_debug_get_raw_header(const nlohmann::json& reques
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage = tx->create_storage(tx_database, nullptr);
const auto block_number = co_await core::get_block_number(block_id, tx_database);
const auto chain_storage = tx->create_storage(backend_);
const auto block_number = co_await core::get_block_number(block_id, *tx);
const auto block_hash = co_await chain_storage->read_canonical_hash(block_number);
auto header = co_await chain_storage->read_header(block_number, block_hash->bytes);
if (!header) {
Expand Down Expand Up @@ -763,8 +746,7 @@ Task<void> DebugRpcApi::handle_debug_get_raw_transaction(const nlohmann::json& r
auto tx = co_await database_->begin();

try {
ethdb::TransactionDatabase tx_database{*tx};
const auto chain_storage{tx->create_storage(tx_database, nullptr)};
const auto chain_storage{tx->create_storage(backend_)};

Bytes rlp{};
auto success = co_await chain_storage->read_rlp_transaction(transaction_hash, rlp);
Expand Down
5 changes: 2 additions & 3 deletions silkworm/rpc/commands/debug_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
#include <silkworm/core/common/block_cache.hpp>
#include <silkworm/infra/concurrency/private_service.hpp>
#include <silkworm/infra/concurrency/shared_service.hpp>
#include <silkworm/rpc/core/rawdb/accessors.hpp>
#include <silkworm/rpc/ethdb/database.hpp>
#include <silkworm/rpc/ethdb/kv/state_cache.hpp>
#include <silkworm/rpc/ethdb/transaction_database.hpp>
#include <silkworm/rpc/ethdb/transaction.hpp>
#include <silkworm/rpc/json/stream.hpp>
#include <silkworm/rpc/json/types.hpp>

Expand Down Expand Up @@ -81,6 +80,6 @@ class DebugRpcApi {
friend class silkworm::rpc::json_rpc::RequestHandler;
};

Task<std::set<evmc::address>> get_modified_accounts(ethdb::TransactionDatabase& tx_database, BlockNum start_block_number, BlockNum end_block_number);
Task<std::set<evmc::address>> get_modified_accounts(ethdb::Transaction& tx, BlockNum start_block_number, BlockNum end_block_number);

} // namespace silkworm::rpc::commands
20 changes: 10 additions & 10 deletions silkworm/rpc/commands/debug_api_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <silkworm/infra/test_util/log.hpp>
#include <silkworm/rpc/core/blocks.hpp>
#include <silkworm/rpc/core/filter_storage.hpp>
#include <silkworm/rpc/ethdb/base_transaction.hpp>
#include <silkworm/rpc/ethdb/kv/state_cache.hpp>
#if !defined(__clang__)
#include <silkworm/rpc/stagedsync/stages.hpp>
Expand Down Expand Up @@ -160,10 +161,10 @@ class DummyCursor : public ethdb::CursorDupSort {
nlohmann::json::iterator itr_;
};

class DummyTransaction : public ethdb::Transaction {
class DummyTransaction : public ethdb::BaseTransaction {
public:
explicit DummyTransaction(const nlohmann::json& json)
: json_{json}, tx_id_{next_tx_id++}, view_id_{next_view_id++} {};
: BaseTransaction(nullptr), json_{json}, tx_id_{next_tx_id++}, view_id_{next_view_id++} {};

[[nodiscard]] uint64_t tx_id() const override {
return tx_id_;
Expand Down Expand Up @@ -191,11 +192,11 @@ class DummyTransaction : public ethdb::Transaction {
co_return cursor;
}

std::shared_ptr<silkworm::State> create_state(boost::asio::any_io_executor&, const core::rawdb::DatabaseReader&, const ChainStorage&, BlockNum) override {
std::shared_ptr<silkworm::State> create_state(boost::asio::any_io_executor&, const ChainStorage&, BlockNum) override {
return nullptr;
}

std::shared_ptr<ChainStorage> create_storage(const core::rawdb::DatabaseReader&, ethbackend::BackEnd*) override {
std::shared_ptr<ChainStorage> create_storage(ethbackend::BackEnd*) override {
return nullptr;
}

Expand Down Expand Up @@ -326,10 +327,9 @@ TEST_CASE("get_modified_accounts") {
auto database = DummyDatabase{json};
auto begin_result = boost::asio::co_spawn(pool, database.begin(), boost::asio::use_future);
auto tx = begin_result.get();
ethdb::TransactionDatabase tx_database{*tx};

SECTION("end == start") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a010), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a010), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 1);
Expand All @@ -341,7 +341,7 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("end == start + 1") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a011), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a011), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 2);
Expand All @@ -354,7 +354,7 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("end >> start") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a010, 0x52a058), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a010, 0x52a058), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.size() == 70);
Expand Down Expand Up @@ -435,14 +435,14 @@ TEST_CASE("get_modified_accounts") {
}

SECTION("start > end") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a011, 0x52a010), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a011, 0x52a010), boost::asio::use_future);
auto accounts = result.get();

CHECK(accounts.empty());
}

SECTION("start > last block") {
auto result = boost::asio::co_spawn(pool, get_modified_accounts(tx_database, 0x52a061, 0x52a061), boost::asio::use_future);
auto result = boost::asio::co_spawn(pool, get_modified_accounts(*tx, 0x52a061, 0x52a061), boost::asio::use_future);
CHECK_THROWS_AS(result.get(), std::invalid_argument);
}
}
Expand Down
Loading
Loading