diff --git a/silkworm/rpc/commands/admin_api.hpp b/silkworm/rpc/commands/admin_api.hpp index e9d8f7285a..c088d059cc 100644 --- a/silkworm/rpc/commands/admin_api.hpp +++ b/silkworm/rpc/commands/admin_api.hpp @@ -41,6 +41,7 @@ class AdminRpcApi { AdminRpcApi(const AdminRpcApi&) = delete; AdminRpcApi& operator=(const AdminRpcApi&) = delete; + AdminRpcApi(AdminRpcApi&&) = default; protected: Task handle_admin_node_info(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/debug_api.cpp b/silkworm/rpc/commands/debug_api.cpp index 60e6898baf..e8c28566dd 100644 --- a/silkworm/rpc/commands/debug_api.cpp +++ b/silkworm/rpc/commands/debug_api.cpp @@ -298,7 +298,7 @@ Task DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n const auto block_with_hash = co_await core::read_block_by_hash(*block_cache_, *chain_storage, block_hash); if (!block_with_hash) { const std::string error_msg = "block not found "; - SILK_ERROR << "handle_debug_account_at: core::read_block_by_hash: " << error_msg << request.dump(); + SILK_TRACE << "handle_debug_account_at: core::read_block_by_hash: " << error_msg << request.dump(); reply = make_json_error(request, -32000, error_msg); co_await tx->close(); // RAII not (yet) available with coroutines co_return; @@ -308,7 +308,7 @@ Task DebugRpcApi::handle_debug_account_at(const nlohmann::json& request, n auto block_number = block.header.number; const auto& transactions = block.transactions; - SILK_DEBUG << "Block number: " << block_number << " #tnx: " << transactions.size(); + SILK_TRACE << "Block number: " << block_number << " #tnx: " << transactions.size(); auto chain_config_ptr = co_await chain_storage->read_chain_config(); ensure(chain_config_ptr.has_value(), "cannot read chain config"); diff --git a/silkworm/rpc/commands/debug_api.hpp b/silkworm/rpc/commands/debug_api.hpp index 094137dbaf..d913327c2b 100644 --- a/silkworm/rpc/commands/debug_api.hpp +++ b/silkworm/rpc/commands/debug_api.hpp @@ -51,6 +51,7 @@ class DebugRpcApi { DebugRpcApi(const DebugRpcApi&) = delete; DebugRpcApi& operator=(const DebugRpcApi&) = delete; + DebugRpcApi(DebugRpcApi&&) = default; protected: Task handle_debug_account_range(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/engine_api.hpp b/silkworm/rpc/commands/engine_api.hpp index 2782ae5c78..705c133fd0 100644 --- a/silkworm/rpc/commands/engine_api.hpp +++ b/silkworm/rpc/commands/engine_api.hpp @@ -46,6 +46,7 @@ class EngineRpcApi { EngineRpcApi(const EngineRpcApi&) = delete; EngineRpcApi& operator=(const EngineRpcApi&) = delete; + EngineRpcApi(EngineRpcApi&&) = default; protected: Task handle_engine_exchange_capabilities(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/erigon_api.hpp b/silkworm/rpc/commands/erigon_api.hpp index e97a8cc591..d302c769da 100644 --- a/silkworm/rpc/commands/erigon_api.hpp +++ b/silkworm/rpc/commands/erigon_api.hpp @@ -46,6 +46,7 @@ class ErigonRpcApi { ErigonRpcApi(const ErigonRpcApi&) = delete; ErigonRpcApi& operator=(const ErigonRpcApi&) = delete; + ErigonRpcApi(ErigonRpcApi&&) = default; protected: Task handle_erigon_block_number(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/eth_api.hpp b/silkworm/rpc/commands/eth_api.hpp index daf701c1e5..f624dc325a 100644 --- a/silkworm/rpc/commands/eth_api.hpp +++ b/silkworm/rpc/commands/eth_api.hpp @@ -64,6 +64,7 @@ class EthereumRpcApi { EthereumRpcApi(const EthereumRpcApi&) = delete; EthereumRpcApi& operator=(const EthereumRpcApi&) = delete; + EthereumRpcApi(EthereumRpcApi&&) = default; protected: Task handle_eth_block_number(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/net_api.hpp b/silkworm/rpc/commands/net_api.hpp index 8ca27ef4ba..9fe5a0b2a8 100644 --- a/silkworm/rpc/commands/net_api.hpp +++ b/silkworm/rpc/commands/net_api.hpp @@ -41,6 +41,7 @@ class NetRpcApi { NetRpcApi(const NetRpcApi&) = delete; NetRpcApi& operator=(const NetRpcApi&) = delete; + NetRpcApi(NetRpcApi&&) = default; protected: Task handle_net_listening(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/ots_api.hpp b/silkworm/rpc/commands/ots_api.hpp index eceb30e7e2..34d3d72fe8 100644 --- a/silkworm/rpc/commands/ots_api.hpp +++ b/silkworm/rpc/commands/ots_api.hpp @@ -193,6 +193,7 @@ class OtsRpcApi { OtsRpcApi(const OtsRpcApi&) = delete; OtsRpcApi& operator=(const OtsRpcApi&) = delete; + OtsRpcApi(OtsRpcApi&&) = default; protected: Task handle_ots_get_api_level(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/parity_api.hpp b/silkworm/rpc/commands/parity_api.hpp index acd5291920..af060d5d49 100644 --- a/silkworm/rpc/commands/parity_api.hpp +++ b/silkworm/rpc/commands/parity_api.hpp @@ -44,6 +44,7 @@ class ParityRpcApi { ParityRpcApi(const ParityRpcApi&) = delete; ParityRpcApi& operator=(const ParityRpcApi&) = delete; + ParityRpcApi(ParityRpcApi&&) = default; protected: Task handle_parity_get_block_receipts(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/rpc_api.hpp b/silkworm/rpc/commands/rpc_api.hpp index b16d6581b1..259f07730a 100644 --- a/silkworm/rpc/commands/rpc_api.hpp +++ b/silkworm/rpc/commands/rpc_api.hpp @@ -70,6 +70,7 @@ class RpcApi : protected EthereumRpcApi, RpcApi(const RpcApi&) = delete; RpcApi& operator=(const RpcApi&) = delete; + RpcApi(RpcApi&&) = default; friend class RpcApiTable; friend class silkworm::rpc::json_rpc::RequestHandler; diff --git a/silkworm/rpc/commands/rpc_api_table.hpp b/silkworm/rpc/commands/rpc_api_table.hpp index 83d9fe4edc..30173ee3f7 100644 --- a/silkworm/rpc/commands/rpc_api_table.hpp +++ b/silkworm/rpc/commands/rpc_api_table.hpp @@ -39,6 +39,7 @@ class RpcApiTable { RpcApiTable(const RpcApiTable&) = delete; RpcApiTable& operator=(const RpcApiTable&) = delete; + RpcApiTable(RpcApiTable&&) = default; [[nodiscard]] std::optional find_json_handler(const std::string& method) const; [[nodiscard]] std::optional find_json_glaze_handler(const std::string& method) const; diff --git a/silkworm/rpc/commands/trace_api.hpp b/silkworm/rpc/commands/trace_api.hpp index ec2423ff1b..8e2fca9076 100644 --- a/silkworm/rpc/commands/trace_api.hpp +++ b/silkworm/rpc/commands/trace_api.hpp @@ -52,6 +52,7 @@ class TraceRpcApi { TraceRpcApi(const TraceRpcApi&) = delete; TraceRpcApi& operator=(const TraceRpcApi&) = delete; + TraceRpcApi(TraceRpcApi&&) = default; protected: Task handle_trace_call(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/txpool_api.hpp b/silkworm/rpc/commands/txpool_api.hpp index 428c466c78..4a9d8d3817 100644 --- a/silkworm/rpc/commands/txpool_api.hpp +++ b/silkworm/rpc/commands/txpool_api.hpp @@ -41,6 +41,7 @@ class TxPoolRpcApi { TxPoolRpcApi(const TxPoolRpcApi&) = delete; TxPoolRpcApi& operator=(const TxPoolRpcApi&) = delete; + TxPoolRpcApi(TxPoolRpcApi&&) = default; protected: Task handle_txpool_status(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/commands/web3_api.hpp b/silkworm/rpc/commands/web3_api.hpp index 8c6f3322b2..c2c020d6b5 100644 --- a/silkworm/rpc/commands/web3_api.hpp +++ b/silkworm/rpc/commands/web3_api.hpp @@ -42,6 +42,7 @@ class Web3RpcApi { Web3RpcApi(const Web3RpcApi&) = delete; Web3RpcApi& operator=(const Web3RpcApi&) = delete; + Web3RpcApi(Web3RpcApi&&) = default; protected: Task handle_web3_client_version(const nlohmann::json& request, nlohmann::json& reply); diff --git a/silkworm/rpc/daemon.cpp b/silkworm/rpc/daemon.cpp index 14c35899d9..08f422419b 100644 --- a/silkworm/rpc/daemon.cpp +++ b/silkworm/rpc/daemon.cpp @@ -37,6 +37,7 @@ #include #include #include +#include #include namespace silkworm::rpc { @@ -301,9 +302,17 @@ void Daemon::start() { boost::asio::io_context& ioc, std::optional jwt_secret, InterfaceLogSettings ilog_settings) { + commands::RpcApi rpc_api{ioc, worker_pool_}; + commands::RpcApiTable handler_table{api_spec}; + auto make_jsonrpc_handler = [rpc_api = std::move(rpc_api), + handler_table = std::move(handler_table), + ilog_settings = std::move(ilog_settings)](StreamWriter* stream_writer) mutable { + return std::make_unique(stream_writer, rpc_api, handler_table, ilog_settings); + }; + return std::make_unique( - end_point, api_spec, ioc, worker_pool_, settings_.cors_domain, std::move(jwt_secret), - settings_.use_websocket, settings_.ws_compression, settings_.http_compression, std::move(ilog_settings)); + end_point, std::move(make_jsonrpc_handler), ioc, worker_pool_, settings_.cors_domain, std::move(jwt_secret), + settings_.use_websocket, settings_.ws_compression, settings_.http_compression); }; // Put the interface logs into the data folder diff --git a/silkworm/rpc/http/connection.cpp b/silkworm/rpc/http/connection.cpp index c804fbc90a..606fdf0296 100644 --- a/silkworm/rpc/http/connection.cpp +++ b/silkworm/rpc/http/connection.cpp @@ -43,30 +43,29 @@ static constexpr auto kMaxPayloadSize{30 * kMebi}; // 30MiB static constexpr std::array kAcceptedContentTypes{"application/json", "application/jsonrequest", "application/json-rpc"}; static constexpr auto kGzipEncoding{"gzip"}; -Connection::Connection(boost::asio::io_context& io_context, - commands::RpcApi& api, - commands::RpcApiTable& handler_table, +Task Connection::run_read_loop(std::shared_ptr connection) { + co_await connection->read_loop(); +} + +Connection::Connection(boost::asio::ip::tcp::socket socket, + RequestHandlerFactory& handler_factory, const std::vector& allowed_origins, std::optional jwt_secret, bool ws_upgrade_enabled, bool ws_compression, bool http_compression, - boost::asio::thread_pool& workers, - InterfaceLogSettings ifc_log_settings) - : socket_{io_context}, - api_{api}, - handler_table_{handler_table}, - request_handler_{this, api, handler_table, std::move(ifc_log_settings)}, + boost::asio::thread_pool& workers) + : socket_{std::move(socket)}, + handler_factory_{handler_factory}, + handler_{handler_factory_(this)}, allowed_origins_{allowed_origins}, jwt_secret_{std ::move(jwt_secret)}, ws_upgrade_enabled_{ws_upgrade_enabled}, ws_compression_{ws_compression}, http_compression_{http_compression}, workers_{workers} { - SILK_TRACE << "Connection::Connection socket " << &socket_ << " created"; - if (http_compression_) { // temporary to avoid warning with clang - SILK_TRACE << "Connection::Connection compression enabled"; - } + socket_.set_option(boost::asio::ip::tcp::socket::keep_alive(true)); + SILK_TRACE << "Connection::Connection created for " << socket_.remote_endpoint(); } Connection::~Connection() { @@ -81,7 +80,11 @@ Task Connection::read_loop() { continue_processing = co_await do_read(); } } catch (const boost::system::system_error& se) { - SILK_TRACE << "Connection::read_loop system-error: " << se.code(); + if (se.code() == boost::beast::http::error::end_of_stream) { + SILK_TRACE << "Connection::read_loop received graceful close from " << socket_.remote_endpoint(); + } else { + SILK_TRACE << "Connection::read_loop system_error: " << se.code(); + } } catch (const std::exception& e) { SILK_ERROR << "Connection::read_loop exception: " << e.what(); } @@ -126,7 +129,7 @@ Task Connection::do_upgrade(const boost::beast::http::request stream(std::move(socket_)); - auto ws_connection = std::make_shared(std::move(stream), api_, std::move(handler_table_), ws_compression_); + auto ws_connection = std::make_shared(std::move(stream), handler_factory_, ws_compression_); co_await ws_connection->accept(req); auto connection_loop = [](auto websocket_connection) -> Task { co_await websocket_connection->read_loop(); }; @@ -213,7 +216,7 @@ Task Connection::handle_actual_request(const boost::beast::http::requesthandle(req.body()); if (rsp_content) { co_await do_write(rsp_content->append("\n"), boost::beast::http::status::ok, gzip_encoding_requested ? kGzipEncoding : ""); } diff --git a/silkworm/rpc/http/connection.hpp b/silkworm/rpc/http/connection.hpp index 96ed084242..b0f980ae79 100644 --- a/silkworm/rpc/http/connection.hpp +++ b/silkworm/rpc/http/connection.hpp @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include @@ -32,8 +33,8 @@ #include #include #include -#include -#include +#include +#include #include namespace silkworm::rpc::http { @@ -41,33 +42,33 @@ namespace silkworm::rpc::http { //! Represents a single connection from a client. class Connection : public StreamWriter { public: + //! Run the asynchronous read loop for the specified connection. + //! \note This is co_spawn-friendly because the connection lifetime is tied to the coroutine frame + static Task run_read_loop(std::shared_ptr connection); + Connection(const Connection&) = delete; Connection& operator=(const Connection&) = delete; //! Construct a connection running within the given execution context. - Connection(boost::asio::io_context& io_context, - commands::RpcApi& api, - commands::RpcApiTable& handler_table, + Connection(boost::asio::ip::tcp::socket socket, + RequestHandlerFactory& handler_factory, const std::vector& allowed_origins, std::optional jwt_secret, bool ws_upgrade_enabled, bool ws_compression, bool http_compression, - boost::asio::thread_pool& workers, - InterfaceLogSettings ifc_log_settings); + boost::asio::thread_pool& workers); ~Connection() override; - boost::asio::ip::tcp::socket& socket() { return socket_; } - - //! Start the asynchronous read loop for the connection. - Task read_loop(); - /* StreamWriter Interface */ Task open_stream() override; Task close_stream() override; Task write(std::string_view content, bool last) override; private: + //! Start the asynchronous read loop for the connection + Task read_loop(); + using AuthorizationError = std::string; using AuthorizationResult = tl::expected; AuthorizationResult is_request_authorized(const boost::beast::http::request& req); @@ -98,11 +99,10 @@ class Connection : public StreamWriter { //! Socket for the connection. boost::asio::ip::tcp::socket socket_; - commands::RpcApi& api_; - const commands::RpcApiTable& handler_table_; + RequestHandlerFactory& handler_factory_; //! The handler used to process the incoming request. - json_rpc::RequestHandler request_handler_; + RequestHandlerPtr handler_; const std::vector& allowed_origins_; const std::optional jwt_secret_; diff --git a/silkworm/rpc/http/server.cpp b/silkworm/rpc/http/server.cpp index 3e20269584..812a23acb8 100644 --- a/silkworm/rpc/http/server.cpp +++ b/silkworm/rpc/http/server.cpp @@ -42,25 +42,21 @@ std::tuple Server::parse_endpoint(const std::string& t } Server::Server(const std::string& end_point, - const std::string& api_spec, + RequestHandlerFactory&& handler_factory, boost::asio::io_context& io_context, boost::asio::thread_pool& workers, std::vector allowed_origins, std::optional jwt_secret, bool use_websocket, bool ws_compression, - bool http_compression, - InterfaceLogSettings ifc_log_settings) - : rpc_api_{io_context, workers}, - handler_table_{api_spec}, - io_context_(io_context), + bool http_compression) + : handler_factory_{std::move(handler_factory)}, acceptor_{io_context}, allowed_origins_{std::move(allowed_origins)}, jwt_secret_(std::move(jwt_secret)), use_websocket_{use_websocket}, ws_compression_{ws_compression}, http_compression_{http_compression}, - ifc_log_settings_{std::move(ifc_log_settings)}, workers_{workers} { const auto [host, port] = parse_endpoint(end_point); @@ -82,24 +78,23 @@ void Server::start() { Task Server::run() { acceptor_.listen(); + auto this_executor = co_await ThisTask::executor; try { while (acceptor_.is_open()) { - SILK_DEBUG << "Server::run accepting using io_context " << &io_context_ << "..."; + SILK_TRACE << "Server::run accepting using executor " << &this_executor << "..."; - auto new_connection = std::make_shared( - io_context_, rpc_api_, handler_table_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, workers_, ifc_log_settings_); - co_await acceptor_.async_accept(new_connection->socket(), boost::asio::use_awaitable); + boost::asio::ip::tcp::socket socket{this_executor}; + co_await acceptor_.async_accept(socket, boost::asio::use_awaitable); if (!acceptor_.is_open()) { SILK_TRACE << "Server::run returning..."; co_return; } - new_connection->socket().set_option(boost::asio::ip::tcp::socket::keep_alive(true)); - - SILK_TRACE << "Server::run starting connection for socket: " << &new_connection->socket(); - auto connection_loop = [](auto connection) -> Task { co_await connection->read_loop(); }; + SILK_TRACE << "Server::run accepted connection from " << socket.remote_endpoint(); - boost::asio::co_spawn(io_context_, connection_loop(new_connection), boost::asio::detached); + auto new_connection = std::make_shared( + std::move(socket), handler_factory_, allowed_origins_, jwt_secret_, use_websocket_, ws_compression_, http_compression_, workers_); + boost::asio::co_spawn(this_executor, Connection::run_read_loop(new_connection), boost::asio::detached); } } catch (const boost::system::system_error& se) { if (se.code() != boost::asio::error::operation_aborted) { diff --git a/silkworm/rpc/http/server.hpp b/silkworm/rpc/http/server.hpp index 307494b2d1..6038f30f85 100644 --- a/silkworm/rpc/http/server.hpp +++ b/silkworm/rpc/http/server.hpp @@ -29,7 +29,7 @@ #include #include #include -#include +#include namespace silkworm::rpc::http { @@ -41,15 +41,14 @@ class Server { // Construct the server to listen on the specified local TCP end-point Server(const std::string& end_point, - const std::string& api_spec, + RequestHandlerFactory&& handler_factory, boost::asio::io_context& io_context, boost::asio::thread_pool& workers, std::vector allowed_origins, std::optional jwt_secret, bool use_websocket, bool ws_compression, - bool http_compression, - InterfaceLogSettings ifc_log_settings = {}); + bool http_compression); void start(); @@ -60,14 +59,8 @@ class Server { Task run(); - //! The JSON RPC API implementation - commands::RpcApi rpc_api_; - - //! The repository of API request handlers - commands::RpcApiTable handler_table_; - - //! The context used to perform asynchronous operations - boost::asio::io_context& io_context_; + //! The factory of RPC request handlers + RequestHandlerFactory handler_factory_; //! The acceptor used to listen for incoming TCP connections boost::asio::ip::tcp::acceptor acceptor_; @@ -84,12 +77,9 @@ class Server { //! Flag indicating if WebSocket protocol compression will be used bool ws_compression_; - //! Flag indicating if Http protocol compression will be used + //! Flag indicating if HTTP protocol compression will be used bool http_compression_; - //! The interface logging configuration - InterfaceLogSettings ifc_log_settings_; - //! The configured workers boost::asio::thread_pool& workers_; }; diff --git a/silkworm/rpc/json/stream.hpp b/silkworm/rpc/json/stream.hpp index ac5881f2fb..1ac506d14e 100644 --- a/silkworm/rpc/json/stream.hpp +++ b/silkworm/rpc/json/stream.hpp @@ -32,7 +32,7 @@ #include #include -#include +#include namespace silkworm::rpc::json { diff --git a/silkworm/rpc/json_rpc/request_handler.cpp b/silkworm/rpc/json_rpc/request_handler.cpp index b98357c050..7b39ef7ec1 100644 --- a/silkworm/rpc/json_rpc/request_handler.cpp +++ b/silkworm/rpc/json_rpc/request_handler.cpp @@ -24,8 +24,8 @@ #include #include #include -#include #include +#include namespace silkworm::rpc::json_rpc { diff --git a/silkworm/rpc/json_rpc/request_handler.hpp b/silkworm/rpc/json_rpc/request_handler.hpp index 204be5a2db..26035cdc42 100644 --- a/silkworm/rpc/json_rpc/request_handler.hpp +++ b/silkworm/rpc/json_rpc/request_handler.hpp @@ -31,12 +31,13 @@ #include #include #include -#include #include +#include +#include namespace silkworm::rpc::json_rpc { -class RequestHandler { +class RequestHandler : public rpc::RequestHandler { public: RequestHandler(StreamWriter* stream_writer, commands::RpcApi& rpc_api, @@ -47,7 +48,7 @@ class RequestHandler { RequestHandler(const RequestHandler&) = delete; RequestHandler& operator=(const RequestHandler&) = delete; - Task> handle(const std::string& request); + Task> handle(const std::string& request) override; protected: Task handle_request_and_create_reply(const nlohmann::json& request_json, std::string& response); diff --git a/silkworm/rpc/test/api_test_database.hpp b/silkworm/rpc/test/api_test_database.hpp index e9d537b9a6..f397d2af2a 100644 --- a/silkworm/rpc/test/api_test_database.hpp +++ b/silkworm/rpc/test/api_test_database.hpp @@ -39,11 +39,11 @@ #include #include #include -#include #include #include #include #include +#include namespace silkworm::rpc::test { diff --git a/silkworm/rpc/transport/request_handler.hpp b/silkworm/rpc/transport/request_handler.hpp new file mode 100644 index 0000000000..fd51829fcb --- /dev/null +++ b/silkworm/rpc/transport/request_handler.hpp @@ -0,0 +1,51 @@ +/* + Copyright 2024 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include +#include +#include +#include + +#include + +#include + +#include "stream_writer.hpp" + +namespace silkworm::rpc { + +using Request = std::string; +using Response = std::string; + +class RequestHandler { + public: + RequestHandler() = default; + virtual ~RequestHandler() = default; + + RequestHandler(const RequestHandler&) = delete; + RequestHandler& operator=(const RequestHandler&) = delete; + + virtual Task> handle(const Request& request) = 0; +}; + +using RequestHandlerPtr = std::unique_ptr; + +//! We use \code absl::AnyInvocable waiting for \code std::move_only_function in C++23 +using RequestHandlerFactory = absl::AnyInvocable; + +} // namespace silkworm::rpc diff --git a/silkworm/rpc/common/writer.hpp b/silkworm/rpc/transport/stream_writer.hpp similarity index 93% rename from silkworm/rpc/common/writer.hpp rename to silkworm/rpc/transport/stream_writer.hpp index 68ed505de5..edb4b7f2bb 100644 --- a/silkworm/rpc/common/writer.hpp +++ b/silkworm/rpc/transport/stream_writer.hpp @@ -16,15 +16,12 @@ #pragma once -#include -#include +#include #include +#include #include -#include -#include - namespace silkworm::rpc { class StreamWriter { diff --git a/silkworm/rpc/common/writer_test.cpp b/silkworm/rpc/transport/stream_writer_test.cpp similarity index 99% rename from silkworm/rpc/common/writer_test.cpp rename to silkworm/rpc/transport/stream_writer_test.cpp index 967d27eb51..ef4f1c706a 100644 --- a/silkworm/rpc/common/writer_test.cpp +++ b/silkworm/rpc/transport/stream_writer_test.cpp @@ -14,7 +14,7 @@ limitations under the License. */ -#include "writer.hpp" +#include "stream_writer.hpp" #include diff --git a/silkworm/rpc/ws/connection.cpp b/silkworm/rpc/ws/connection.cpp index e32512ed0a..a7ee6ea8da 100644 --- a/silkworm/rpc/ws/connection.cpp +++ b/silkworm/rpc/ws/connection.cpp @@ -28,18 +28,17 @@ namespace silkworm::rpc::ws { -Connection::Connection(boost::beast::websocket::stream&& stream, - commands::RpcApi& api, - const commands::RpcApiTable& handler_table, +Connection::Connection(TcpStream&& stream, + RequestHandlerFactory& handler_factory, bool compression) - : ws_{std::move(stream)}, - request_handler_{this, api, handler_table}, + : stream_{std::move(stream)}, + handler_{handler_factory(this)}, compression_{compression} { - SILK_DEBUG << "ws::Connection::Connection ws created:" << &ws_; + SILK_TRACE << "ws::Connection::Connection socket created:" << &stream_; } Connection::~Connection() { - SILK_TRACE << "ws::Connection::~Connection ws deleted:" << &ws_; + SILK_TRACE << "ws::Connection::~Connection socket deleted:" << &stream_; } Task Connection::accept(const boost::beast::http::request& req) { @@ -50,9 +49,9 @@ Task Connection::accept(const boost::beast::http::request Connection::accept(const boost::beast::http::request Connection::read_loop() { - SILK_TRACE << "ws::Connection::run starting connection for websocket: " << &ws_; + SILK_TRACE << "ws::Connection::run starting connection for socket: " << &stream_; try { while (true) { @@ -86,11 +85,11 @@ Task Connection::read_loop() { Task Connection::do_read() { std::string req_content; auto req_buffer = boost::asio::dynamic_buffer(req_content); - const auto bytes_read = co_await ws_.async_read(req_buffer, boost::asio::use_awaitable); + const auto bytes_read = co_await stream_.async_read(req_buffer, boost::asio::use_awaitable); SILK_TRACE << "ws::Connection::do_read bytes_read: " << bytes_read << " [" << req_content << "]"; - auto rsp_content = co_await request_handler_.handle(req_content); + auto rsp_content = co_await handler_->handle(req_content); if (rsp_content) { co_await do_write(*rsp_content); } @@ -98,11 +97,10 @@ Task Connection::do_read() { Task Connection::write(std::string_view content, bool last) { try { - const auto written = co_await ws_.async_write_some(last, boost::asio::buffer(content.data(), content.size()), boost::asio::use_awaitable); + const auto written = co_await stream_.async_write_some(last, boost::asio::buffer(content.data(), content.size()), boost::asio::use_awaitable); SILK_TRACE << "ws::Connection::write: [" << content.data() << "]"; co_return written; - } catch (const boost::system::system_error& se) { SILK_TRACE << "ws::Connection::write system_error: " << se.what(); throw; @@ -114,7 +112,7 @@ Task Connection::write(std::string_view content, bool last) { Task Connection::do_write(const std::string& content) { try { - const auto written = co_await ws_.async_write(boost::asio::buffer(content), boost::asio::use_awaitable); + const auto written = co_await stream_.async_write(boost::asio::buffer(content), boost::asio::use_awaitable); SILK_TRACE << "ws::Connection::do_write: [" << content << "]"; co_return written; diff --git a/silkworm/rpc/ws/connection.hpp b/silkworm/rpc/ws/connection.hpp index 8132143a1f..1db5d3911e 100644 --- a/silkworm/rpc/ws/connection.hpp +++ b/silkworm/rpc/ws/connection.hpp @@ -30,11 +30,13 @@ #include #include -#include -#include +#include +#include namespace silkworm::rpc::ws { +using TcpStream = boost::beast::websocket::stream; + //! Represents a single connection from a client via websocket. class Connection : public StreamWriter { public: @@ -42,9 +44,8 @@ class Connection : public StreamWriter { Connection& operator=(const Connection&) = delete; //! Construct a connection running within the given execution context. - Connection(boost::beast::websocket::stream&& stream, - commands::RpcApi& api, - const commands::RpcApiTable& handler_table, + Connection(TcpStream&& stream, + RequestHandlerFactory& handler_factory, bool compression = false); ~Connection() override; @@ -64,11 +65,11 @@ class Connection : public StreamWriter { //! Perform an asynchronous write operation. Task do_write(const std::string& content); - // websocket stream - boost::beast::websocket::stream ws_; + //! The WebSocket TCP stream + TcpStream stream_; //! The handler used to process the incoming request. - json_rpc::RequestHandler request_handler_; + RequestHandlerPtr handler_; //! enable compress flag bool compression_{false};