Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpcdaemon: extract JSON-RPC handler out of transport #1962

Merged
merged 4 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions silkworm/rpc/commands/admin_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class AdminRpcApi {

AdminRpcApi(const AdminRpcApi&) = delete;
AdminRpcApi& operator=(const AdminRpcApi&) = delete;
AdminRpcApi(AdminRpcApi&&) = default;

protected:
Task<void> handle_admin_node_info(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
4 changes: 2 additions & 2 deletions silkworm/rpc/commands/debug_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n
const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash);
if (!block_with_hash) {
const std::string error_msg = "block not found ";
SILK_ERROR << "handle_debug_account_at: core::read_block_by_hash: " << error_msg << request.dump();
SILK_TRACE << "handle_debug_account_at: core::read_block_by_hash: " << error_msg << request.dump();
reply = make_json_error(request, -32000, error_msg);
co_await tx->close(); // RAII not (yet) available with coroutines
co_return;
Expand All @@ -308,7 +308,7 @@ Task<void> DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n
auto block_number = block.header.number;
const auto& transactions = block.transactions;

SILK_DEBUG << "Block number: " << block_number << " #tnx: " << transactions.size();
SILK_TRACE << "Block number: " << block_number << " #tnx: " << transactions.size();

auto chain_config_ptr = co_await chain_storage->read_chain_config();
ensure(chain_config_ptr.has_value(), "cannot read chain config");
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/debug_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class DebugRpcApi {

DebugRpcApi(const DebugRpcApi&) = delete;
DebugRpcApi& operator=(const DebugRpcApi&) = delete;
DebugRpcApi(DebugRpcApi&&) = default;

protected:
Task<void> handle_debug_account_range(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/engine_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class EngineRpcApi {

EngineRpcApi(const EngineRpcApi&) = delete;
EngineRpcApi& operator=(const EngineRpcApi&) = delete;
EngineRpcApi(EngineRpcApi&&) = default;

protected:
Task<void> handle_engine_exchange_capabilities(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/erigon_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ErigonRpcApi {

ErigonRpcApi(const ErigonRpcApi&) = delete;
ErigonRpcApi& operator=(const ErigonRpcApi&) = delete;
ErigonRpcApi(ErigonRpcApi&&) = default;

protected:
Task<void> handle_erigon_block_number(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/eth_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class EthereumRpcApi {

EthereumRpcApi(const EthereumRpcApi&) = delete;
EthereumRpcApi& operator=(const EthereumRpcApi&) = delete;
EthereumRpcApi(EthereumRpcApi&&) = default;

protected:
Task<void> handle_eth_block_number(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/net_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class NetRpcApi {

NetRpcApi(const NetRpcApi&) = delete;
NetRpcApi& operator=(const NetRpcApi&) = delete;
NetRpcApi(NetRpcApi&&) = default;

protected:
Task<void> handle_net_listening(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/ots_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class OtsRpcApi {

OtsRpcApi(const OtsRpcApi&) = delete;
OtsRpcApi& operator=(const OtsRpcApi&) = delete;
OtsRpcApi(OtsRpcApi&&) = default;

protected:
Task<void> handle_ots_get_api_level(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/parity_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ParityRpcApi {

ParityRpcApi(const ParityRpcApi&) = delete;
ParityRpcApi& operator=(const ParityRpcApi&) = delete;
ParityRpcApi(ParityRpcApi&&) = default;

protected:
Task<void> handle_parity_get_block_receipts(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/rpc_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class RpcApi : protected EthereumRpcApi,

RpcApi(const RpcApi&) = delete;
RpcApi& operator=(const RpcApi&) = delete;
RpcApi(RpcApi&&) = default;

friend class RpcApiTable;
friend class silkworm::rpc::json_rpc::RequestHandler;
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/rpc_api_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RpcApiTable {

RpcApiTable(const RpcApiTable&) = delete;
RpcApiTable& operator=(const RpcApiTable&) = delete;
RpcApiTable(RpcApiTable&&) = default;

[[nodiscard]] std::optional<HandleMethod> find_json_handler(const std::string& method) const;
[[nodiscard]] std::optional<HandleMethodGlaze> find_json_glaze_handler(const std::string& method) const;
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/trace_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class TraceRpcApi {

TraceRpcApi(const TraceRpcApi&) = delete;
TraceRpcApi& operator=(const TraceRpcApi&) = delete;
TraceRpcApi(TraceRpcApi&&) = default;

protected:
Task<void> handle_trace_call(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/txpool_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class TxPoolRpcApi {

TxPoolRpcApi(const TxPoolRpcApi&) = delete;
TxPoolRpcApi& operator=(const TxPoolRpcApi&) = delete;
TxPoolRpcApi(TxPoolRpcApi&&) = default;

protected:
Task<void> handle_txpool_status(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
1 change: 1 addition & 0 deletions silkworm/rpc/commands/web3_api.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Web3RpcApi {

Web3RpcApi(const Web3RpcApi&) = delete;
Web3RpcApi& operator=(const Web3RpcApi&) = delete;
Web3RpcApi(Web3RpcApi&&) = default;

protected:
Task<void> handle_web3_client_version(const nlohmann::json& request, nlohmann::json& reply);
Expand Down
13 changes: 11 additions & 2 deletions silkworm/rpc/daemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include <silkworm/rpc/ethdb/file/local_database.hpp>
#include <silkworm/rpc/ethdb/kv/remote_database.hpp>
#include <silkworm/rpc/http/jwt.hpp>
#include <silkworm/rpc/json_rpc/request_handler.hpp>
#include <silkworm/rpc/json_rpc/validator.hpp>

namespace silkworm::rpc {
Expand Down Expand Up @@ -301,9 +302,17 @@ void Daemon::start() {
boost::asio::io_context& ioc,
std::optional<std::string> jwt_secret,
InterfaceLogSettings ilog_settings) {
commands::RpcApi rpc_api{ioc, worker_pool_};
commands::RpcApiTable handler_table{api_spec};
auto make_jsonrpc_handler = [rpc_api = std::move(rpc_api),
handler_table = std::move(handler_table),
ilog_settings = std::move(ilog_settings)](StreamWriter* stream_writer) mutable {
return std::make_unique<json_rpc::RequestHandler>(stream_writer, rpc_api, handler_table, ilog_settings);
};

return std::make_unique<http::Server>(
end_point, api_spec, ioc, worker_pool_, settings_.cors_domain, std::move(jwt_secret),
settings_.use_websocket, settings_.ws_compression, settings_.http_compression, std::move(ilog_settings));
end_point, std::move(make_jsonrpc_handler), ioc, worker_pool_, settings_.cors_domain, std::move(jwt_secret),
settings_.use_websocket, settings_.ws_compression, settings_.http_compression);
};

// Put the interface logs into the data folder
Expand Down
35 changes: 19 additions & 16 deletions silkworm/rpc/http/connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,29 @@ static constexpr auto kMaxPayloadSize{30 * kMebi}; // 30MiB
static constexpr std::array kAcceptedContentTypes{"application/json", "application/jsonrequest", "application/json-rpc"};
static constexpr auto kGzipEncoding{"gzip"};

Connection::Connection(boost::asio::io_context& io_context,
commands::RpcApi& api,
commands::RpcApiTable& handler_table,
Task<void> Connection::run_read_loop(std::shared_ptr<Connection> connection) {
co_await connection->read_loop();
}

Connection::Connection(boost::asio::ip::tcp::socket socket,
RequestHandlerFactory& handler_factory,
const std::vector<std::string>& allowed_origins,
std::optional<std::string> jwt_secret,
bool ws_upgrade_enabled,
bool ws_compression,
bool http_compression,
boost::asio::thread_pool& workers,
InterfaceLogSettings ifc_log_settings)
: socket_{io_context},
api_{api},
handler_table_{handler_table},
request_handler_{this, api, handler_table, std::move(ifc_log_settings)},
boost::asio::thread_pool& workers)
: socket_{std::move(socket)},
handler_factory_{handler_factory},
handler_{handler_factory_(this)},
allowed_origins_{allowed_origins},
jwt_secret_{std ::move(jwt_secret)},
ws_upgrade_enabled_{ws_upgrade_enabled},
ws_compression_{ws_compression},
http_compression_{http_compression},
workers_{workers} {
SILK_TRACE << "Connection::Connection socket " << &socket_ << " created";
if (http_compression_) { // temporary to avoid warning with clang
SILK_TRACE << "Connection::Connection compression enabled";
}
socket_.set_option(boost::asio::ip::tcp::socket::keep_alive(true));
SILK_TRACE << "Connection::Connection created for " << socket_.remote_endpoint();
}

Connection::~Connection() {
Expand All @@ -81,7 +80,11 @@ Task<void> Connection::read_loop() {
continue_processing = co_await do_read();
}
} catch (const boost::system::system_error& se) {
SILK_TRACE << "Connection::read_loop system-error: " << se.code();
if (se.code() == boost::beast::http::error::end_of_stream) {
SILK_TRACE << "Connection::read_loop received graceful close from " << socket_.remote_endpoint();
} else {
SILK_TRACE << "Connection::read_loop system_error: " << se.code();
}
} catch (const std::exception& e) {
SILK_ERROR << "Connection::read_loop exception: " << e.what();
}
Expand Down Expand Up @@ -126,7 +129,7 @@ Task<void> Connection::do_upgrade(const boost::beast::http::request<boost::beast
// Now that talking to the socket is successful, we tie the socket object to a WebSocket stream
boost::beast::websocket::stream<boost::beast::tcp_stream> stream(std::move(socket_));

auto ws_connection = std::make_shared<ws::Connection>(std::move(stream), api_, std::move(handler_table_), ws_compression_);
auto ws_connection = std::make_shared<ws::Connection>(std::move(stream), handler_factory_, ws_compression_);
co_await ws_connection->accept(req);

auto connection_loop = [](auto websocket_connection) -> Task<void> { co_await websocket_connection->read_loop(); };
Expand Down Expand Up @@ -213,7 +216,7 @@ Task<void> Connection::handle_actual_request(const boost::beast::http::request<b
origin_ = req[boost::beast::http::field::origin];
method_ = req.method();

auto rsp_content = co_await request_handler_.handle(req.body());
auto rsp_content = co_await handler_->handle(req.body());
if (rsp_content) {
co_await do_write(rsp_content->append("\n"), boost::beast::http::status::ok, gzip_encoding_requested ? kGzipEncoding : "");
}
Expand Down
30 changes: 15 additions & 15 deletions silkworm/rpc/http/connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#pragma once

#include <array>
#include <memory>
#include <string>

#include <silkworm/infra/concurrency/task.hpp>
Expand All @@ -32,42 +33,42 @@
#include <silkworm/rpc/commands/rpc_api_table.hpp>
#include <silkworm/rpc/common/constants.hpp>
#include <silkworm/rpc/common/interface_log.hpp>
#include <silkworm/rpc/common/writer.hpp>
#include <silkworm/rpc/json_rpc/request_handler.hpp>
#include <silkworm/rpc/transport/request_handler.hpp>
#include <silkworm/rpc/transport/stream_writer.hpp>
#include <silkworm/rpc/ws/connection.hpp>

namespace silkworm::rpc::http {

//! Represents a single connection from a client.
class Connection : public StreamWriter {
public:
//! Run the asynchronous read loop for the specified connection.
//! \note This is co_spawn-friendly because the connection lifetime is tied to the coroutine frame
static Task<void> run_read_loop(std::shared_ptr<Connection> connection);

Connection(const Connection&) = delete;
Connection& operator=(const Connection&) = delete;

//! Construct a connection running within the given execution context.
Connection(boost::asio::io_context& io_context,
commands::RpcApi& api,
commands::RpcApiTable& handler_table,
Connection(boost::asio::ip::tcp::socket socket,
RequestHandlerFactory& handler_factory,
const std::vector<std::string>& allowed_origins,
std::optional<std::string> jwt_secret,
bool ws_upgrade_enabled,
bool ws_compression,
bool http_compression,
boost::asio::thread_pool& workers,
InterfaceLogSettings ifc_log_settings);
boost::asio::thread_pool& workers);
~Connection() override;

boost::asio::ip::tcp::socket& socket() { return socket_; }

//! Start the asynchronous read loop for the connection.
Task<void> read_loop();

/* StreamWriter Interface */
Task<void> open_stream() override;
Task<void> close_stream() override;
Task<std::size_t> write(std::string_view content, bool last) override;

private:
//! Start the asynchronous read loop for the connection
Task<void> read_loop();

using AuthorizationError = std::string;
using AuthorizationResult = tl::expected<void, AuthorizationError>;
AuthorizationResult is_request_authorized(const boost::beast::http::request<boost::beast::http::string_body>& req);
Expand Down Expand Up @@ -98,11 +99,10 @@ class Connection : public StreamWriter {
//! Socket for the connection.
boost::asio::ip::tcp::socket socket_;

commands::RpcApi& api_;
const commands::RpcApiTable& handler_table_;
RequestHandlerFactory& handler_factory_;

//! The handler used to process the incoming request.
json_rpc::RequestHandler request_handler_;
RequestHandlerPtr handler_;

const std::vector<std::string>& allowed_origins_;
const std::optional<std::string> jwt_secret_;
Expand Down
27 changes: 11 additions & 16 deletions silkworm/rpc/http/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,21 @@ std::tuple<std::string, std::string> Server::parse_endpoint(const std::string& t
}

Server::Server(const std::string& end_point,
const std::string& api_spec,
RequestHandlerFactory&& handler_factory,
boost::asio::io_context& io_context,
boost::asio::thread_pool& workers,
std::vector<std::string> allowed_origins,
std::optional<std::string> jwt_secret,
bool use_websocket,
bool ws_compression,
bool http_compression,
InterfaceLogSettings ifc_log_settings)
: rpc_api_{io_context, workers},
handler_table_{api_spec},
io_context_(io_context),
bool http_compression)
: handler_factory_{std::move(handler_factory)},
acceptor_{io_context},
allowed_origins_{std::move(allowed_origins)},
jwt_secret_(std::move(jwt_secret)),
use_websocket_{use_websocket},
ws_compression_{ws_compression},
http_compression_{http_compression},
ifc_log_settings_{std::move(ifc_log_settings)},
workers_{workers} {
const auto [host, port] = parse_endpoint(end_point);

Expand All @@ -82,24 +78,23 @@ void Server::start() {
Task<void> Server::run() {
acceptor_.listen();

auto this_executor = co_await ThisTask::executor;
try {
while (acceptor_.is_open()) {
SILK_DEBUG << "Server::run accepting using io_context " << &io_context_ << "...";
SILK_TRACE << "Server::run accepting using executor " << &this_executor << "...";

auto new_connection = std::make_shared<Connection>(
io_context_, rpc_api_, handler_table_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, workers_, ifc_log_settings_);
co_await acceptor_.async_accept(new_connection->socket(), boost::asio::use_awaitable);
boost::asio::ip::tcp::socket socket{this_executor};
co_await acceptor_.async_accept(socket, boost::asio::use_awaitable);
if (!acceptor_.is_open()) {
SILK_TRACE << "Server::run returning...";
co_return;
}

new_connection->socket().set_option(boost::asio::ip::tcp::socket::keep_alive(true));

SILK_TRACE << "Server::run starting connection for socket: " << &new_connection->socket();
auto connection_loop = [](auto connection) -> Task<void> { co_await connection->read_loop(); };
SILK_TRACE << "Server::run accepted connection from " << socket.remote_endpoint();

boost::asio::co_spawn(io_context_, connection_loop(new_connection), boost::asio::detached);
auto new_connection = std::make_shared<Connection>(
std::move(socket), handler_factory_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, workers_);
boost::asio::co_spawn(this_executor, Connection::run_read_loop(new_connection), boost::asio::detached);
}
} catch (const boost::system::system_error& se) {
if (se.code() != boost::asio::error::operation_aborted) {
Expand Down
Loading
Loading