From aec28eab3381c51e97034f7a2e3fb5baff1c464e Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Mon, 19 Aug 2024 00:24:12 +0200 Subject: [PATCH 1/6] infra: encapsulate usage of asio co_spawn --- .../concurrency/awaitable_wait_for_all.hpp | 24 ++++---- .../concurrency/awaitable_wait_for_one.hpp | 24 ++++---- silkworm/infra/concurrency/co_spawn_sw.hpp | 47 +++++++++++++--- .../infra/concurrency/parallel_group_test.cpp | 6 +- .../infra/grpc/client/client_context_pool.cpp | 3 +- .../infra/test_util/context_test_base.hpp | 5 +- .../execution/api/active_direct_service.cpp | 38 ++++++------- silkworm/node/stagedsync/execution_engine.cpp | 5 +- .../node/stagedsync/stages/stage_triggers.cpp | 4 +- .../discovery/node_db/serial_node_db.cpp | 56 +++++++++---------- silkworm/sentry/message_receiver.cpp | 3 +- silkworm/sentry/peer_manager.cpp | 10 ++-- silkworm/sentry/peer_manager_api.cpp | 3 +- silkworm/sentry/rlpx/peer.cpp | 4 +- 14 files changed, 130 insertions(+), 102 deletions(-) diff --git a/silkworm/infra/concurrency/awaitable_wait_for_all.hpp b/silkworm/infra/concurrency/awaitable_wait_for_all.hpp index 183e942338..d1502f9926 100644 --- a/silkworm/infra/concurrency/awaitable_wait_for_all.hpp +++ b/silkworm/infra/concurrency/awaitable_wait_for_all.hpp @@ -71,8 +71,8 @@ awaitable operator&&( auto [order, ex0, ex1] = co_await make_parallel_group( - co_spawn_sw(ex, std::move(t), deferred), - co_spawn_sw(ex, std::move(u), deferred)) + co_spawn(ex, std::move(t), deferred), + co_spawn(ex, std::move(u), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); @@ -93,8 +93,8 @@ awaitable operator&&( auto [order, ex0, ex1, r1] = co_await make_parallel_group( - co_spawn_sw(ex, std::move(t), deferred), - co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred)) + co_spawn(ex, std::move(t), deferred), + co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); @@ -115,8 +115,8 @@ awaitable operator&&( auto [order, ex0, r0, ex1] = co_await make_parallel_group( - co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred), - co_spawn_sw(ex, std::move(u), deferred)) + co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), + co_spawn(ex, std::move(u), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); @@ -137,8 +137,8 @@ awaitable, Executor> operator&&( auto [order, ex0, r0, ex1, r1] = co_await make_parallel_group( - co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred), - co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred)) + co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), + co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); @@ -161,8 +161,8 @@ awaitable, Executor> operator&&( auto [order, ex0, r0, ex1, r1] = co_await make_parallel_group( - co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred), - co_spawn_sw(ex, std::move(u), deferred)) + co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), + co_spawn(ex, std::move(u), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); @@ -183,8 +183,8 @@ awaitable, Executor> operator&&( auto [order, ex0, r0, ex1, r1] = co_await make_parallel_group( - co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred), - co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred)) + co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), + co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred)) .async_wait( wait_for_one_error(), use_awaitable_t{}); diff --git a/silkworm/infra/concurrency/awaitable_wait_for_one.hpp b/silkworm/infra/concurrency/awaitable_wait_for_one.hpp index b4df238ca5..39cc37b4ff 100644 --- a/silkworm/infra/concurrency/awaitable_wait_for_one.hpp +++ b/silkworm/infra/concurrency/awaitable_wait_for_one.hpp @@ -70,8 +70,8 @@ awaitable, Executor> operator||(awa auto ex = co_await this_coro::executor; auto [order, ex0, ex1] = - co_await make_parallel_group(co_spawn_sw(ex, std::move(t), deferred), - co_spawn_sw(ex, std::move(u), deferred)) + co_await make_parallel_group(co_spawn(ex, std::move(t), deferred), + co_spawn(ex, std::move(u), deferred)) .async_wait(wait_for_one(), use_awaitable_t{}); if (order[0] == 0) { @@ -93,8 +93,8 @@ awaitable, Executor> operator||(awaitable{}); if (order[0] == 0) { @@ -119,8 +119,8 @@ awaitable, Executor> operator||(awaitable{}); if (order[0] == 0) { @@ -145,8 +145,8 @@ awaitable, Executor> operator||(awaitable t, awa auto ex = co_await this_coro::executor; auto [order, ex0, r0, ex1, r1] = - co_await make_parallel_group(co_spawn_sw(ex, detail::awaitable_wrap(std::move(t)), deferred), - co_spawn_sw(ex, detail::awaitable_wrap(std::move(u)), deferred)) + co_await make_parallel_group(co_spawn(ex, detail::awaitable_wrap(std::move(t)), deferred), + co_spawn(ex, detail::awaitable_wrap(std::move(u)), deferred)) .async_wait(wait_for_one(), use_awaitable_t{}); if (order[0] == 0) { @@ -169,8 +169,8 @@ awaitable, Executor> operator||(awaitable{}); using widen = detail::widen_variant; @@ -194,8 +194,8 @@ awaitable, Executor> operator||(awaitable{}); using widen = detail::widen_variant; diff --git a/silkworm/infra/concurrency/co_spawn_sw.hpp b/silkworm/infra/concurrency/co_spawn_sw.hpp index 73c0298373..896f04c033 100644 --- a/silkworm/infra/concurrency/co_spawn_sw.hpp +++ b/silkworm/infra/concurrency/co_spawn_sw.hpp @@ -21,21 +21,54 @@ #include #include +#include #include #include +#include -#define co_spawn_sw co_spawn +#include namespace silkworm::concurrency { -using namespace boost::asio; +using namespace boost::asio; // TODO(remove) template -inline BOOST_ASIO_INITFN_AUTO_RESULT_TYPE( - boost::asio::use_awaitable, - typename boost::asio::detail::awaitable_signature::type>::type = 0) - co_spawn_and_await(const Executor& ex, F&& f) { - return (co_spawn_sw)(ex, std::forward(f), boost::asio::use_awaitable); +auto spawn_and_async_wait(const Executor& ex, F&& f, + typename boost::asio::constraint< + boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { + return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_awaitable); +} + +template +auto spawn_and_async_wait(ExecutionContext& ctx, F&& f, + typename boost::asio::constraint>::type = 0) { + return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_awaitable); +} + +template +auto spawn(const Executor& ex, F&& f, + typename boost::asio::constraint< + boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { + return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_future); +} + +template +auto spawn(ExecutionContext& ctx, F&& f, + typename boost::asio::constraint>::type = 0) { + return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_future); +} + +template +auto spawn_and_wait(const Executor& ex, F&& f, + typename boost::asio::constraint< + boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { + return spawn(ex, std::forward(f)).get(); +} + +template +auto spawn_and_wait(ExecutionContext& ctx, F&& f, + typename boost::asio::constraint>::type = 0) { + return spawn(ctx, std::forward(f)).get(); } } // namespace silkworm::concurrency diff --git a/silkworm/infra/concurrency/parallel_group_test.cpp b/silkworm/infra/concurrency/parallel_group_test.cpp index 8183251863..8a3a99f1f6 100644 --- a/silkworm/infra/concurrency/parallel_group_test.cpp +++ b/silkworm/infra/concurrency/parallel_group_test.cpp @@ -52,12 +52,12 @@ awaitable throw_op() { } awaitable spawn_throw_op(strand& strand) { - co_await co_spawn_sw(strand, throw_op(), use_awaitable); + co_await spawn_and_async_wait(strand, throw_op()); } awaitable spawn_noop_loop(strand& strand) { while (true) { - co_await co_spawn_sw(strand, noop(), use_awaitable); + co_await spawn_and_async_wait(strand, noop()); } } @@ -74,6 +74,6 @@ awaitable co_spawn_cancellation_handler_bug() { TEST_CASE("parallel_group.co_spawn_cancellation_handler_bug") { io_context context; - co_spawn_sw(context, co_spawn_cancellation_handler_bug(), use_future); + spawn(context, co_spawn_cancellation_handler_bug()); context.run(); } diff --git a/silkworm/infra/grpc/client/client_context_pool.cpp b/silkworm/infra/grpc/client/client_context_pool.cpp index 1a7683a942..b394bd7a04 100644 --- a/silkworm/infra/grpc/client/client_context_pool.cpp +++ b/silkworm/infra/grpc/client/client_context_pool.cpp @@ -77,7 +77,8 @@ void ClientContext::execute_loop_single_threaded(IdleStrategy&& idle_strategy) { void ClientContext::execute_loop_multi_threaded() { SILK_DEBUG << "Multi-thread execution loop start [" << std::this_thread::get_id() << "]"; - std::thread grpc_context_thread{[grpc_context = grpc_context_]() { + std::thread grpc_context_thread{[context_id = context_id_, grpc_context = grpc_context_]() { + log::set_thread_name(("grpc_ctx_s" + std::to_string(context_id)).c_str()); grpc_context->run_completion_queue(); }}; std::exception_ptr run_exception; diff --git a/silkworm/infra/test_util/context_test_base.hpp b/silkworm/infra/test_util/context_test_base.hpp index 66d8c54483..a94cbe5dc5 100644 --- a/silkworm/infra/test_util/context_test_base.hpp +++ b/silkworm/infra/test_util/context_test_base.hpp @@ -24,6 +24,7 @@ #include #include +#include #include #include @@ -35,12 +36,12 @@ class ContextTestBase { template auto spawn(AwaitableOrFunction&& awaitable) { - return boost::asio::co_spawn(io_context_, std::forward(awaitable), boost::asio::use_future); + return concurrency::spawn(io_context_, std::forward(awaitable)); } template auto spawn_and_wait(AwaitableOrFunction&& awaitable) { - return spawn(std::forward(awaitable)).get(); + return concurrency::spawn_and_wait(io_context_, std::forward(awaitable)); } static void sleep_for(std::chrono::milliseconds sleep_time_ms) { diff --git a/silkworm/node/execution/api/active_direct_service.cpp b/silkworm/node/execution/api/active_direct_service.cpp index ba194351a7..9d34621886 100644 --- a/silkworm/node/execution/api/active_direct_service.cpp +++ b/silkworm/node/execution/api/active_direct_service.cpp @@ -41,7 +41,7 @@ bool ActiveDirectService::stop() { // rpc InsertBlocks(InsertBlocksRequest) returns(InsertionResult); Task ActiveDirectService::insert_blocks(const Blocks& blocks) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& bb) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& bb) { return self->DirectService::insert_blocks(bb); }(this, blocks)); } @@ -50,14 +50,14 @@ Task ActiveDirectService::insert_blocks(const Blocks& blocks) { // rpc ValidateChain(ValidationRequest) returns(ValidationReceipt); Task ActiveDirectService::validate_chain(BlockNumAndHash number_and_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_and_hash) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_and_hash) { return self->DirectService::validate_chain(num_and_hash); }(this, number_and_hash)); } // rpc UpdateForkChoice(ForkChoice) returns(ForkChoiceReceipt); Task ActiveDirectService::update_fork_choice(const ForkChoice& fork_choice) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& choice) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& choice) { return self->DirectService::update_fork_choice(choice); }(this, fork_choice)); } @@ -66,14 +66,14 @@ Task ActiveDirectService::update_fork_choice(const ForkChoice& // rpc AssembleBlock(AssembleBlockRequest) returns(AssembleBlockResponse); Task ActiveDirectService::assemble_block(const api::BlockUnderConstruction& block) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& b) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& b) { return self->DirectService::assemble_block(b); }(this, block)); } // rpc GetAssembledBlock(GetAssembledBlockRequest) returns(GetAssembledBlockResponse); Task ActiveDirectService::get_assembled_block(PayloadId payload_id) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto id) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto id) { return self->DirectService::get_assembled_block(id); }(this, payload_id)); } @@ -82,35 +82,35 @@ Task ActiveDirectService::get_assembled_block(PayloadId pa // rpc CurrentHeader(google.protobuf.Empty) returns(GetHeaderResponse); Task> ActiveDirectService::current_header() { - return concurrency::co_spawn_and_await(executor_, [](auto* self) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self) { return self->DirectService::current_header(); }(this)); } // rpc GetTD(GetSegmentRequest) returns(GetTDResponse); Task> ActiveDirectService::get_td(BlockNumberOrHash number_or_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_td(num_or_hash); }(this, number_or_hash)); } // rpc GetHeader(GetSegmentRequest) returns(GetHeaderResponse); Task> ActiveDirectService::get_header(BlockNumberOrHash number_or_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_header(num_or_hash); }(this, number_or_hash)); } // rpc GetBody(GetSegmentRequest) returns(GetBodyResponse); Task> ActiveDirectService::get_body(BlockNumberOrHash number_or_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_body(num_or_hash); }(this, number_or_hash)); } // rpc HasBlock(GetSegmentRequest) returns(HasBlockResponse); Task ActiveDirectService::has_block(BlockNumberOrHash number_or_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::has_block(num_or_hash); }(this, number_or_hash)); } @@ -119,14 +119,14 @@ Task ActiveDirectService::has_block(BlockNumberOrHash number_or_hash) { // rpc GetBodiesByRange(GetBodiesByRangeRequest) returns(GetBodiesBatchResponse); Task ActiveDirectService::get_bodies_by_range(BlockNumRange number_range) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto num_range) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_range) { return self->DirectService::get_bodies_by_range(num_range); }(this, number_range)); } // rpc GetBodiesByHashes(GetBodiesByHashesRequest) returns(GetBodiesBatchResponse); Task ActiveDirectService::get_bodies_by_hashes(const BlockHashes& hashes) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, const auto& hh) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& hh) { return self->DirectService::get_bodies_by_hashes(hh); }(this, hashes)); } @@ -135,21 +135,21 @@ Task ActiveDirectService::get_bodies_by_hashes(const BlockHashes& h // rpc IsCanonicalHash(types.H256) returns(IsCanonicalResponse); Task ActiveDirectService::is_canonical_hash(Hash block_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto h) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto h) { return self->DirectService::is_canonical_hash(h); }(this, block_hash)); } // rpc GetHeaderHashNumber(types.H256) returns(GetHeaderHashNumberResponse); Task> ActiveDirectService::get_header_hash_number(Hash block_hash) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto h) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto h) { return self->DirectService::get_header_hash_number(h); }(this, block_hash)); } // rpc GetForkChoice(google.protobuf.Empty) returns(ForkChoice); Task ActiveDirectService::get_fork_choice() { - return concurrency::co_spawn_and_await(executor_, [](auto* self) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self) { return self->DirectService::get_fork_choice(); }(this)); } @@ -158,14 +158,14 @@ Task ActiveDirectService::get_fork_choice() { // rpc Ready(google.protobuf.Empty) returns(ReadyResponse); Task ActiveDirectService::ready() { - return concurrency::co_spawn_and_await(executor_, [](auto* self) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self) { return self->DirectService::ready(); }(this)); } // rpc FrozenBlocks(google.protobuf.Empty) returns(FrozenBlocksResponse); Task ActiveDirectService::frozen_blocks() { - return concurrency::co_spawn_and_await(executor_, [](auto* self) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self) { return self->DirectService::frozen_blocks(); }(this)); } @@ -173,13 +173,13 @@ Task ActiveDirectService::frozen_blocks() { /** Additional non-RPC methods **/ Task ActiveDirectService::get_last_headers(uint64_t n) { - return concurrency::co_spawn_and_await(executor_, [](auto* self, auto how_many) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto how_many) { return self->DirectService::get_last_headers(how_many); }(this, n)); } Task ActiveDirectService::block_progress() { - return concurrency::co_spawn_and_await(executor_, [](auto* self) { + return concurrency::spawn_and_async_wait(executor_, [](auto* self) { return self->DirectService::block_progress(); }(this)); } diff --git a/silkworm/node/stagedsync/execution_engine.cpp b/silkworm/node/stagedsync/execution_engine.cpp index a36f770839..684ccf163b 100644 --- a/silkworm/node/stagedsync/execution_engine.cpp +++ b/silkworm/node/stagedsync/execution_engine.cpp @@ -17,9 +17,6 @@ #include "execution_engine.hpp" #include -#include - -#include #include #include @@ -209,7 +206,7 @@ bool ExecutionEngine::notify_fork_choice_update(Hash head_block_hash, // notify the fork of the update - we need to block here to restore the invariant auto fork_choice_aw_future = (*f)->fork_choice(head_block_hash, finalized_block_hash, safe_block_hash); - std::future fork_choice_future = concurrency::co_spawn_sw(io_context_, fork_choice_aw_future.get(), use_future); + std::future fork_choice_future = concurrency::spawn(io_context_, fork_choice_aw_future.get()); bool updated = fork_choice_future.get(); // BLOCKING if (!updated) return false; diff --git a/silkworm/node/stagedsync/stages/stage_triggers.cpp b/silkworm/node/stagedsync/stages/stage_triggers.cpp index 2dbf1d9b3f..97d3aa7b28 100644 --- a/silkworm/node/stagedsync/stages/stage_triggers.cpp +++ b/silkworm/node/stagedsync/stages/stage_triggers.cpp @@ -18,8 +18,6 @@ #include -#include -#include #include #include @@ -48,7 +46,7 @@ Task TriggersStage::schedule(std::function(db::RWTxn&)> task) { assert(tx); co_await t(*tx); }; - return concurrency::co_spawn_sw(io_context_, task_caller(), boost::asio::use_awaitable); + return concurrency::spawn_and_async_wait(io_context_, task_caller()); } bool TriggersStage::stop() { diff --git a/silkworm/sentry/discovery/node_db/serial_node_db.cpp b/silkworm/sentry/discovery/node_db/serial_node_db.cpp index 61805dff6c..4cead23c05 100644 --- a/silkworm/sentry/discovery/node_db/serial_node_db.cpp +++ b/silkworm/sentry/discovery/node_db/serial_node_db.cpp @@ -25,115 +25,115 @@ namespace silkworm::sentry::discovery::node_db { using namespace boost::asio; Task SerialNodeDb::upsert_node_address(NodeId id, NodeAddress address) { - return concurrency::co_spawn_sw(strand_, db_.upsert_node_address(std::move(id), std::move(address)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.upsert_node_address(std::move(id), std::move(address))); } Task> SerialNodeDb::find_node_address_v4(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_node_address_v4(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_node_address_v4(std::move(id))); } Task> SerialNodeDb::find_node_address_v6(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_node_address_v6(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_node_address_v6(std::move(id))); } Task SerialNodeDb::update_next_ping_time(NodeId id, Time value) { - return concurrency::co_spawn_sw(strand_, db_.update_next_ping_time(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_next_ping_time(std::move(id), value)); } Task> SerialNodeDb::find_next_ping_time(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_next_ping_time(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_next_ping_time(std::move(id))); } Task SerialNodeDb::update_last_pong_time(NodeId id, Time value) { - return concurrency::co_spawn_sw(strand_, db_.update_last_pong_time(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_last_pong_time(std::move(id), value)); } Task> SerialNodeDb::find_last_pong_time(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_last_pong_time(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_last_pong_time(std::move(id))); } Task SerialNodeDb::update_ping_fails(NodeId id, size_t value) { - return concurrency::co_spawn_sw(strand_, db_.update_ping_fails(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_ping_fails(std::move(id), value)); } Task> SerialNodeDb::find_ping_fails(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_ping_fails(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_ping_fails(std::move(id))); } Task SerialNodeDb::update_peer_disconnected_time(NodeId id, Time value) { - return concurrency::co_spawn_sw(strand_, db_.update_peer_disconnected_time(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_peer_disconnected_time(std::move(id), value)); } Task> SerialNodeDb::find_peer_disconnected_time(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_peer_disconnected_time(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_peer_disconnected_time(std::move(id))); } Task SerialNodeDb::update_peer_is_useless(NodeId id, bool value) { - return concurrency::co_spawn_sw(strand_, db_.update_peer_is_useless(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_peer_is_useless(std::move(id), value)); } Task> SerialNodeDb::find_peer_is_useless(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_peer_is_useless(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_peer_is_useless(std::move(id))); } Task SerialNodeDb::update_distance(NodeId id, size_t value) { - return concurrency::co_spawn_sw(strand_, db_.update_distance(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_distance(std::move(id), value)); } Task> SerialNodeDb::find_distance(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_distance(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_distance(std::move(id))); } Task SerialNodeDb::update_enr_seq_num(NodeId id, uint64_t value) { - return concurrency::co_spawn_sw(strand_, db_.update_enr_seq_num(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_enr_seq_num(std::move(id), value)); } Task> SerialNodeDb::find_enr_seq_num(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_enr_seq_num(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_enr_seq_num(std::move(id))); } Task SerialNodeDb::update_eth1_fork_id(NodeId id, std::optional value) { - return concurrency::co_spawn_sw(strand_, db_.update_eth1_fork_id(std::move(id), value), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.update_eth1_fork_id(std::move(id), value)); } Task> SerialNodeDb::find_eth1_fork_id(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.find_eth1_fork_id(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_eth1_fork_id(std::move(id))); } Task> SerialNodeDb::find_ping_candidates(Time time, size_t limit) { - return concurrency::co_spawn_sw(strand_, db_.find_ping_candidates(time, limit), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_ping_candidates(time, limit)); } Task> SerialNodeDb::find_useful_nodes(Time min_pong_time, size_t limit) { - return concurrency::co_spawn_sw(strand_, db_.find_useful_nodes(min_pong_time, limit), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_useful_nodes(min_pong_time, limit)); } Task> SerialNodeDb::find_lookup_candidates(FindLookupCandidatesQuery query) { - return concurrency::co_spawn_sw(strand_, db_.find_lookup_candidates(query), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_lookup_candidates(query)); } Task SerialNodeDb::mark_taken_lookup_candidates(const std::vector& ids, Time time) { - return concurrency::co_spawn_sw(strand_, db_.mark_taken_lookup_candidates(ids, time), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.mark_taken_lookup_candidates(ids, time)); } Task> SerialNodeDb::take_lookup_candidates(FindLookupCandidatesQuery query, Time time) { - return concurrency::co_spawn_sw(strand_, db_.take_lookup_candidates(query, time), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.take_lookup_candidates(query, time)); } Task> SerialNodeDb::find_peer_candidates(FindPeerCandidatesQuery query) { - return concurrency::co_spawn_sw(strand_, db_.find_peer_candidates(std::move(query)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.find_peer_candidates(std::move(query))); } Task SerialNodeDb::mark_taken_peer_candidates(const std::vector& ids, Time time) { - return concurrency::co_spawn_sw(strand_, db_.mark_taken_peer_candidates(ids, time), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.mark_taken_peer_candidates(ids, time)); } Task> SerialNodeDb::take_peer_candidates(FindPeerCandidatesQuery query, Time time) { - return concurrency::co_spawn_sw(strand_, db_.take_peer_candidates(std::move(query), time), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.take_peer_candidates(std::move(query), time)); } Task SerialNodeDb::delete_node(NodeId id) { - return concurrency::co_spawn_sw(strand_, db_.delete_node(std::move(id)), use_awaitable); + return concurrency::spawn_and_async_wait(strand_, db_.delete_node(std::move(id))); } } // namespace silkworm::sentry::discovery::node_db diff --git a/silkworm/sentry/message_receiver.cpp b/silkworm/sentry/message_receiver.cpp index 480d405bc3..4e15004ea3 100644 --- a/silkworm/sentry/message_receiver.cpp +++ b/silkworm/sentry/message_receiver.cpp @@ -21,7 +21,6 @@ #include #include -#include #include #include @@ -42,7 +41,7 @@ Task MessageReceiver::run(std::shared_ptr self, PeerManag self->peer_tasks_.wait() && self->unsubscription_tasks_.wait() && self->handle_calls(); - co_await concurrency::co_spawn_sw(self->strand_, std::move(run), use_awaitable); + co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); } Task MessageReceiver::handle_calls() { diff --git a/silkworm/sentry/peer_manager.cpp b/silkworm/sentry/peer_manager.cpp index 31396234b8..97926ca584 100644 --- a/silkworm/sentry/peer_manager.cpp +++ b/silkworm/sentry/peer_manager.cpp @@ -49,7 +49,7 @@ Task PeerManager::run( connect_peer_tasks_.wait() && drop_peer_tasks_.wait() && peer_tasks_.wait(); - co_await concurrency::co_spawn_sw(strand_, std::move(run), use_awaitable); + co_await concurrency::spawn_and_async_wait(strand_, std::move(run)); } Task PeerManager::run_in_strand(concurrency::Channel>& peer_channel) { @@ -122,15 +122,15 @@ Task PeerManager::drop_peer( } Task PeerManager::count_peers() { - co_return (co_await concurrency::co_spawn_sw(strand_, count_peers_in_strand(), use_awaitable)); + co_return (co_await concurrency::spawn_and_async_wait(strand_, count_peers_in_strand())); } Task PeerManager::enumerate_peers(EnumeratePeersCallback callback) { - co_await concurrency::co_spawn_sw(strand_, enumerate_peers_in_strand(callback), use_awaitable); + co_await concurrency::spawn_and_async_wait(strand_, enumerate_peers_in_strand(callback)); } Task PeerManager::enumerate_random_peers(size_t max_count, EnumeratePeersCallback callback) { - co_await concurrency::co_spawn_sw(strand_, enumerate_random_peers_in_strand(max_count, callback), use_awaitable); + co_await concurrency::spawn_and_async_wait(strand_, enumerate_random_peers_in_strand(max_count, callback)); } Task PeerManager::count_peers_in_strand() { @@ -251,7 +251,7 @@ Task PeerManager::connect_peer(EnodeUrl peer_url, bool is_static_peer, std [[maybe_unused]] auto _ = gsl::finally([this, peer_url] { this->connecting_peer_urls_.erase(peer_url); }); try { - auto peer1 = co_await concurrency::co_spawn_sw(executor_pool_.any_executor(), client->connect(peer_url, is_static_peer), use_awaitable); + auto peer1 = co_await concurrency::spawn_and_async_wait(executor_pool_.any_executor(), client->connect(peer_url, is_static_peer)); auto peer = std::shared_ptr(std::move(peer1)); co_await client_peer_channel_.send(peer); } catch (const boost::system::system_error& ex) { diff --git a/silkworm/sentry/peer_manager_api.cpp b/silkworm/sentry/peer_manager_api.cpp index 2cf8f672bc..c49faddfb7 100644 --- a/silkworm/sentry/peer_manager_api.cpp +++ b/silkworm/sentry/peer_manager_api.cpp @@ -22,7 +22,6 @@ #include #include -#include #include #include @@ -48,7 +47,7 @@ Task PeerManagerApi::run(std::shared_ptr self) { self->handle_peer_events_calls() && self->events_unsubscription_tasks_.wait() && self->forward_peer_events(); - co_await concurrency::co_spawn_sw(self->strand_, std::move(run), use_awaitable); + co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); } Task PeerManagerApi::handle_peer_count_calls() { diff --git a/silkworm/sentry/rlpx/peer.cpp b/silkworm/sentry/rlpx/peer.cpp index add694fb84..7782813967 100644 --- a/silkworm/sentry/rlpx/peer.cpp +++ b/silkworm/sentry/rlpx/peer.cpp @@ -77,7 +77,7 @@ Task Peer::run(std::shared_ptr peer) { using namespace concurrency::awaitable_wait_for_one; auto run = peer->handle() || peer->send_message_tasks_.wait(); - co_await concurrency::co_spawn_sw(peer->strand_, std::move(run), use_awaitable); + co_await concurrency::spawn_and_async_wait(peer->strand_, std::move(run)); } static bool is_fatal_network_error(const boost::system::system_error& ex) { @@ -244,7 +244,7 @@ Task Peer::handle() { } Task Peer::drop(const std::shared_ptr& peer, DisconnectReason reason) { - return concurrency::co_spawn_sw(peer->strand_, Peer::drop_in_strand(peer, reason), use_awaitable); + return concurrency::spawn_and_async_wait(peer->strand_, Peer::drop_in_strand(peer, reason)); } Task Peer::drop_in_strand(std::shared_ptr peer, DisconnectReason reason) { From a5dfc11fb48f561a4d723b50a3811a100a133736 Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Mon, 19 Aug 2024 02:01:01 +0200 Subject: [PATCH 2/6] infra: remove using directive and fix other calls --- silkworm/db/kv/api/service_router.cpp | 4 +--- silkworm/db/kv/state_changes_stream.cpp | 3 +-- silkworm/infra/concurrency/co_spawn_sw.hpp | 2 -- 3 files changed, 2 insertions(+), 7 deletions(-) diff --git a/silkworm/db/kv/api/service_router.cpp b/silkworm/db/kv/api/service_router.cpp index bf69b87a23..27675cbf3c 100644 --- a/silkworm/db/kv/api/service_router.cpp +++ b/silkworm/db/kv/api/service_router.cpp @@ -16,8 +16,6 @@ #include "service_router.hpp" -#include - #include namespace silkworm::db::kv::api { @@ -26,7 +24,7 @@ using namespace boost::asio; Task StateChangeRunner::run(std::shared_ptr self) { auto run = self->handle_calls(); - co_await concurrency::co_spawn(self->strand_, std::move(run), use_awaitable); + co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); } StateChangeRunner::StateChangeRunner(const boost::asio::any_io_executor& executor) diff --git a/silkworm/db/kv/state_changes_stream.cpp b/silkworm/db/kv/state_changes_stream.cpp index edcaad83d5..92bb6fd691 100644 --- a/silkworm/db/kv/state_changes_stream.cpp +++ b/silkworm/db/kv/state_changes_stream.cpp @@ -16,7 +16,6 @@ #include "state_changes_stream.hpp" -#include #include #include @@ -32,7 +31,7 @@ StateChangesStream::StateChangesStream(rpc::ClientContext& context, api::Client& cache_(must_use_shared_service(scheduler_)) {} std::future StateChangesStream::open() { - return concurrency::co_spawn(scheduler_, run(), boost::asio::use_future); + return concurrency::spawn(scheduler_, run()); } void StateChangesStream::close() { diff --git a/silkworm/infra/concurrency/co_spawn_sw.hpp b/silkworm/infra/concurrency/co_spawn_sw.hpp index 896f04c033..42c874139f 100644 --- a/silkworm/infra/concurrency/co_spawn_sw.hpp +++ b/silkworm/infra/concurrency/co_spawn_sw.hpp @@ -30,8 +30,6 @@ namespace silkworm::concurrency { -using namespace boost::asio; // TODO(remove) - template auto spawn_and_async_wait(const Executor& ex, F&& f, typename boost::asio::constraint< From daf050c743366ba0f95ebeba5220dd5c6736f546 Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Mon, 19 Aug 2024 08:49:18 +0200 Subject: [PATCH 3/6] infra: rename co_spawn_sw header as spawn --- silkworm/db/kv/api/service_router.cpp | 2 +- silkworm/db/kv/state_changes_stream.cpp | 2 +- silkworm/infra/concurrency/awaitable_wait_for_all.hpp | 2 +- silkworm/infra/concurrency/awaitable_wait_for_one.hpp | 2 +- silkworm/infra/concurrency/parallel_group_test.cpp | 2 +- silkworm/infra/concurrency/{co_spawn_sw.hpp => spawn.hpp} | 0 silkworm/infra/test_util/context_test_base.hpp | 2 +- silkworm/node/execution/api/active_direct_service.cpp | 2 +- silkworm/node/stagedsync/execution_engine.cpp | 2 +- silkworm/node/stagedsync/stages/stage_triggers.cpp | 2 +- silkworm/rpc/daemon.cpp | 1 + silkworm/sentry/discovery/node_db/serial_node_db.cpp | 2 +- silkworm/sentry/message_receiver.cpp | 2 +- silkworm/sentry/peer_manager.cpp | 2 +- silkworm/sentry/peer_manager_api.cpp | 2 +- silkworm/sentry/rlpx/peer.cpp | 2 +- 16 files changed, 15 insertions(+), 14 deletions(-) rename silkworm/infra/concurrency/{co_spawn_sw.hpp => spawn.hpp} (100%) diff --git a/silkworm/db/kv/api/service_router.cpp b/silkworm/db/kv/api/service_router.cpp index 27675cbf3c..dda54c0166 100644 --- a/silkworm/db/kv/api/service_router.cpp +++ b/silkworm/db/kv/api/service_router.cpp @@ -16,7 +16,7 @@ #include "service_router.hpp" -#include +#include namespace silkworm::db::kv::api { diff --git a/silkworm/db/kv/state_changes_stream.cpp b/silkworm/db/kv/state_changes_stream.cpp index 92bb6fd691..8405d49c1d 100644 --- a/silkworm/db/kv/state_changes_stream.cpp +++ b/silkworm/db/kv/state_changes_stream.cpp @@ -19,8 +19,8 @@ #include #include -#include #include +#include #include namespace silkworm::db::kv { diff --git a/silkworm/infra/concurrency/awaitable_wait_for_all.hpp b/silkworm/infra/concurrency/awaitable_wait_for_all.hpp index d1502f9926..79a03894c7 100644 --- a/silkworm/infra/concurrency/awaitable_wait_for_all.hpp +++ b/silkworm/infra/concurrency/awaitable_wait_for_all.hpp @@ -33,8 +33,8 @@ #include #include -#include "co_spawn_sw.hpp" #include "parallel_group_utils.hpp" +#include "spawn.hpp" namespace silkworm::concurrency::awaitable_wait_for_all { diff --git a/silkworm/infra/concurrency/awaitable_wait_for_one.hpp b/silkworm/infra/concurrency/awaitable_wait_for_one.hpp index 39cc37b4ff..855a877354 100644 --- a/silkworm/infra/concurrency/awaitable_wait_for_one.hpp +++ b/silkworm/infra/concurrency/awaitable_wait_for_one.hpp @@ -34,7 +34,7 @@ #include #include -#include "co_spawn_sw.hpp" +#include "spawn.hpp" namespace silkworm::concurrency::awaitable_wait_for_one { diff --git a/silkworm/infra/concurrency/parallel_group_test.cpp b/silkworm/infra/concurrency/parallel_group_test.cpp index 8a3a99f1f6..d2725f3c07 100644 --- a/silkworm/infra/concurrency/parallel_group_test.cpp +++ b/silkworm/infra/concurrency/parallel_group_test.cpp @@ -28,7 +28,7 @@ #include #include -#include +#include using namespace boost::asio; using namespace boost::asio::experimental; diff --git a/silkworm/infra/concurrency/co_spawn_sw.hpp b/silkworm/infra/concurrency/spawn.hpp similarity index 100% rename from silkworm/infra/concurrency/co_spawn_sw.hpp rename to silkworm/infra/concurrency/spawn.hpp diff --git a/silkworm/infra/test_util/context_test_base.hpp b/silkworm/infra/test_util/context_test_base.hpp index a94cbe5dc5..7b02d39490 100644 --- a/silkworm/infra/test_util/context_test_base.hpp +++ b/silkworm/infra/test_util/context_test_base.hpp @@ -24,7 +24,7 @@ #include #include -#include +#include #include #include diff --git a/silkworm/node/execution/api/active_direct_service.cpp b/silkworm/node/execution/api/active_direct_service.cpp index 9d34621886..a0058c1db1 100644 --- a/silkworm/node/execution/api/active_direct_service.cpp +++ b/silkworm/node/execution/api/active_direct_service.cpp @@ -16,7 +16,7 @@ #include "active_direct_service.hpp" -#include +#include namespace silkworm::execution::api { diff --git a/silkworm/node/stagedsync/execution_engine.cpp b/silkworm/node/stagedsync/execution_engine.cpp index 684ccf163b..30c31274dd 100644 --- a/silkworm/node/stagedsync/execution_engine.cpp +++ b/silkworm/node/stagedsync/execution_engine.cpp @@ -20,7 +20,7 @@ #include #include -#include +#include namespace silkworm::stagedsync { diff --git a/silkworm/node/stagedsync/stages/stage_triggers.cpp b/silkworm/node/stagedsync/stages/stage_triggers.cpp index 97d3aa7b28..17c06d80bd 100644 --- a/silkworm/node/stagedsync/stages/stage_triggers.cpp +++ b/silkworm/node/stagedsync/stages/stage_triggers.cpp @@ -20,7 +20,7 @@ #include -#include +#include namespace silkworm::stagedsync { diff --git a/silkworm/rpc/daemon.cpp b/silkworm/rpc/daemon.cpp index 18c205b750..994d28a760 100644 --- a/silkworm/rpc/daemon.cpp +++ b/silkworm/rpc/daemon.cpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include #include diff --git a/silkworm/sentry/discovery/node_db/serial_node_db.cpp b/silkworm/sentry/discovery/node_db/serial_node_db.cpp index 4cead23c05..0e0bf71a13 100644 --- a/silkworm/sentry/discovery/node_db/serial_node_db.cpp +++ b/silkworm/sentry/discovery/node_db/serial_node_db.cpp @@ -18,7 +18,7 @@ #include -#include +#include namespace silkworm::sentry::discovery::node_db { diff --git a/silkworm/sentry/message_receiver.cpp b/silkworm/sentry/message_receiver.cpp index 4e15004ea3..3c765f9d7d 100644 --- a/silkworm/sentry/message_receiver.cpp +++ b/silkworm/sentry/message_receiver.cpp @@ -26,7 +26,7 @@ #include #include -#include +#include namespace silkworm::sentry { diff --git a/silkworm/sentry/peer_manager.cpp b/silkworm/sentry/peer_manager.cpp index 97926ca584..c9cdf4a4e5 100644 --- a/silkworm/sentry/peer_manager.cpp +++ b/silkworm/sentry/peer_manager.cpp @@ -23,8 +23,8 @@ #include #include #include -#include #include +#include #include #include "peer_manager_observer.hpp" diff --git a/silkworm/sentry/peer_manager_api.cpp b/silkworm/sentry/peer_manager_api.cpp index c49faddfb7..dcaa2cba34 100644 --- a/silkworm/sentry/peer_manager_api.cpp +++ b/silkworm/sentry/peer_manager_api.cpp @@ -27,7 +27,7 @@ #include #include -#include +#include #include namespace silkworm::sentry { diff --git a/silkworm/sentry/rlpx/peer.cpp b/silkworm/sentry/rlpx/peer.cpp index 7782813967..c5a195e94d 100644 --- a/silkworm/sentry/rlpx/peer.cpp +++ b/silkworm/sentry/rlpx/peer.cpp @@ -27,8 +27,8 @@ #include #include #include -#include #include +#include #include #include "auth/auth_message_error.hpp" From 753899f169e79394d2de0940a16b312d2dab0aa1 Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Mon, 19 Aug 2024 12:31:47 +0200 Subject: [PATCH 4/6] use concepts instead of constraints in spawn functions --- silkworm/infra/concurrency/spawn.hpp | 39 +++++++++++++--------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/silkworm/infra/concurrency/spawn.hpp b/silkworm/infra/concurrency/spawn.hpp index 42c874139f..680fc8c42a 100644 --- a/silkworm/infra/concurrency/spawn.hpp +++ b/silkworm/infra/concurrency/spawn.hpp @@ -30,42 +30,39 @@ namespace silkworm::concurrency { -template -auto spawn_and_async_wait(const Executor& ex, F&& f, - typename boost::asio::constraint< - boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { +template +concept AsioExecutor = boost::asio::is_executor::value || boost::asio::execution::is_executor::value; + +template +concept AsioExecutionContext = std::is_convertible_v; + +template +auto spawn_and_async_wait(const Executor& ex, F&& f) { return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_awaitable); } -template -auto spawn_and_async_wait(ExecutionContext& ctx, F&& f, - typename boost::asio::constraint>::type = 0) { +template +auto spawn_and_async_wait(ExecutionContext& ctx, F&& f) { return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_awaitable); } -template -auto spawn(const Executor& ex, F&& f, - typename boost::asio::constraint< - boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { +template +auto spawn(const Executor& ex, F&& f) { return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_future); } -template -auto spawn(ExecutionContext& ctx, F&& f, - typename boost::asio::constraint>::type = 0) { +template +auto spawn(ExecutionContext& ctx, F&& f) { return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_future); } -template -auto spawn_and_wait(const Executor& ex, F&& f, - typename boost::asio::constraint< - boost::asio::is_executor::value || boost::asio::execution::is_executor::value>::type = 0) { +template +auto spawn_and_wait(const Executor& ex, F&& f) { return spawn(ex, std::forward(f)).get(); } -template -auto spawn_and_wait(ExecutionContext& ctx, F&& f, - typename boost::asio::constraint>::type = 0) { +template +auto spawn_and_wait(ExecutionContext& ctx, F&& f) { return spawn(ctx, std::forward(f)).get(); } From f8eb41a72803a3e7eac97e0a3a6e280f9322ca82 Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Mon, 19 Aug 2024 13:01:54 +0200 Subject: [PATCH 5/6] remove duplicated code --- .../{sync_wait_test.cpp => spawn_test.cpp} | 46 ++++++----- silkworm/infra/concurrency/sync_wait.hpp | 76 ------------------- silkworm/sync/sync_pos.cpp | 1 - silkworm/sync/sync_pow.cpp | 26 ++++--- 4 files changed, 40 insertions(+), 109 deletions(-) rename silkworm/infra/concurrency/{sync_wait_test.cpp => spawn_test.cpp} (64%) delete mode 100644 silkworm/infra/concurrency/sync_wait.hpp diff --git a/silkworm/infra/concurrency/sync_wait_test.cpp b/silkworm/infra/concurrency/spawn_test.cpp similarity index 64% rename from silkworm/infra/concurrency/sync_wait_test.cpp rename to silkworm/infra/concurrency/spawn_test.cpp index 63914e4679..02cd18feea 100644 --- a/silkworm/infra/concurrency/sync_wait_test.cpp +++ b/silkworm/infra/concurrency/spawn_test.cpp @@ -14,7 +14,7 @@ limitations under the License. */ -#include "sync_wait.hpp" +#include "spawn.hpp" #include @@ -27,7 +27,7 @@ #include #include -namespace silkworm { +namespace silkworm::concurrency { namespace asio = boost::asio; @@ -55,31 +55,37 @@ class DummyEngine { } }; -TEST_CASE("sync wait") { - asio::io_context io; - asio::executor_work_guard work_guard{io.get_executor()}; +struct SpawnTest { + SpawnTest() { + ioc_thread = std::thread{[this]() { ioc.run(); }}; + } + ~SpawnTest() { + ioc.stop(); + if (ioc_thread.joinable()) { + ioc_thread.join(); + } + } - SECTION("wait for function") { - std::thread io_execution([&io]() { io.run(); }); + asio::io_context ioc; + asio::executor_work_guard work_guard{ioc.get_executor()}; + std::thread ioc_thread; +}; - sync_wait(io, dummy_task()); +TEST_CASE_METHOD(SpawnTest, "spawn_and_wait") { + SECTION("wait for function") { + CHECK_NOTHROW(spawn_and_wait(ioc, dummy_task())); - io.stop(); - io_execution.join(); + std::future result = spawn(ioc, dummy_task()); + CHECK_NOTHROW(result.get()); } SECTION("wait for method") { - std::thread io_execution([&io]() { io.run(); }); - - DummyEngine engine{io}; - - auto value = sync_wait(in(engine), DummyEngine::do_work()); - - CHECK(value == 42); + DummyEngine engine{ioc}; + CHECK(spawn_and_wait(engine.get_executor(), DummyEngine::do_work()) == 42); - io.stop(); - io_execution.join(); + std::future result = spawn(engine.get_executor(), DummyEngine::do_work()); + CHECK(result.get() == 42); } } -} // namespace silkworm \ No newline at end of file +} // namespace silkworm::concurrency \ No newline at end of file diff --git a/silkworm/infra/concurrency/sync_wait.hpp b/silkworm/infra/concurrency/sync_wait.hpp deleted file mode 100644 index 714b923959..0000000000 --- a/silkworm/infra/concurrency/sync_wait.hpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - Copyright 2023 The Silkworm Authors - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. -*/ - -#pragma once - -#include - -#include -#include -#include - -namespace silkworm { - -/** - * Do a synchronous wait of a coroutine on the specified io_context - * - * sync_wait: - * - schedules a coroutine for execution in the specified io_context - * - blocks the calling thread until the coroutine completes - * - returns the result of the coroutine - * - * Rationale: doing an asynchronous wait of a coroutine is easy: - * auto result = co_await task(); - * Doing a synchronous wait of a coroutine is more verbose: - * auto result = co_spawn(io_context, task(), use_future).get(); - * also this exposes implementation details. - * Using sync_wait the call becomes: - * auto result = sync_wait(io_context, task()); - * Or, if the current object has an io_context: - * auto result = sync_wait(in(this), task()); - * - */ -template -T sync_wait(boost::asio::io_context& io_context, const Task& task) { - auto future_result = boost::asio::co_spawn(io_context, task, boost::asio::use_future); - return future_result.get(); -} - -template -T sync_wait(boost::asio::io_context& io_context, Task&& task) { - auto future_result = boost::asio::co_spawn(io_context, std::move(task), boost::asio::use_future); - return future_result.get(); -} - -/** - * Simplify the call to sync_wait - * - * When the desired io_context is not immediately available in the scope of the caller, - * but is owned by some object, this function can be used to retrieve it and simplify - * the call to sync_wait - * - * For example, provided that some class Engine has an io_context: - * sync_wait(in(engine), engine.do_work()); - * or provided that the current object has an io_context: - * sync_wait(in(this), engine.do_work()); - */ - -template -boost::asio::io_context& in(C& context) { - return context.get_executor(); -} - -} // namespace silkworm \ No newline at end of file diff --git a/silkworm/sync/sync_pos.cpp b/silkworm/sync/sync_pos.cpp index b4a722e7e7..89ba4e3909 100644 --- a/silkworm/sync/sync_pos.cpp +++ b/silkworm/sync/sync_pos.cpp @@ -220,7 +220,6 @@ Task PoSSync::new_payload(const rpc::NewPayloadRequest& requ } else if (std::holds_alternative(verification)) { // INVALID const auto invalid_chain = std::get(verification); - // auto latest_valid_height = sync_wait(in(exec_engine_), exec_engine_.get_block_num(invalid_chain.latest_valid_head)); auto unwind_point_td = chain_fork_view_.get_total_difficulty(invalid_chain.unwind_point.hash); Hash latest_valid_hash = unwind_point_td < terminal_total_difficulty ? kZeroHash diff --git a/silkworm/sync/sync_pow.cpp b/silkworm/sync/sync_pow.cpp index aa6edb2e0b..3e79b07d9e 100644 --- a/silkworm/sync/sync_pow.cpp +++ b/silkworm/sync/sync_pow.cpp @@ -21,12 +21,14 @@ #include #include #include -#include +#include #include #include namespace silkworm::chainsync { +using concurrency::spawn_and_wait; + PoWSync::PoWSync(BlockExchange& block_exchange, execution::api::Client& exec_engine) : ChainSync(block_exchange, exec_engine) {} @@ -38,21 +40,21 @@ PoWSync::NewHeight PoWSync::resume() { // find the point (head) where we left o BlockId head{}; // BlockExchange need a bunch of previous headers to attach the new ones - auto last_headers = sync_wait(io_context_, exec_engine_->get_last_headers(1000)); + auto last_headers = spawn_and_wait(io_context_, exec_engine_->get_last_headers(1000)); block_exchange_.initial_state(last_headers); // We calculate a provisional head based on the previous headers std::ranges::for_each(last_headers, [&, this](const auto& header) { auto hash = header.hash(); - auto td = sync_wait(io_context_, exec_engine_->get_td(hash)); + auto td = spawn_and_wait(io_context_, exec_engine_->get_td(hash)); chain_fork_view_.add(header, *td); // add to cache & compute a new canonical head }); // Now we can resume the sync from the canonical head - const auto last_fcu = sync_wait(io_context_, exec_engine_->get_fork_choice()); // previously was get_canonical_head() - const auto block_progress = sync_wait(io_context_, exec_engine_->block_progress()); + const auto last_fcu = spawn_and_wait(io_context_, exec_engine_->get_fork_choice()); // previously was get_canonical_head() + const auto block_progress = spawn_and_wait(io_context_, exec_engine_->block_progress()); - const auto last_fcu_number = sync_wait(io_context_, exec_engine_->get_header_hash_number(last_fcu.head_block_hash)); + const auto last_fcu_number = spawn_and_wait(io_context_, exec_engine_->get_header_hash_number(last_fcu.head_block_hash)); if (!last_fcu_number) return head; ensure_invariant(*last_fcu_number <= block_progress, "canonical head beyond block progress"); @@ -78,7 +80,7 @@ PoWSync::NewHeight PoWSync::forward_and_insert_blocks() { ResultQueue& downloading_queue = block_exchange_.result_queue(); - auto initial_block_progress = sync_wait(io_context_, exec_engine_->block_progress()); + auto initial_block_progress = spawn_and_wait(io_context_, exec_engine_->block_progress()); auto block_progress = initial_block_progress; block_exchange_.download_blocks(initial_block_progress, BlockExchange::Target_Tracking::kByAnnouncements); @@ -105,7 +107,7 @@ PoWSync::NewHeight PoWSync::forward_and_insert_blocks() { }); // Insert blocks into database - const auto insert_result{sync_wait(io_context_, exec_engine_->insert_blocks(to_plain_blocks(blocks)))}; + const auto insert_result{spawn_and_wait(io_context_, exec_engine_->insert_blocks(to_plain_blocks(blocks)))}; if (!insert_result) { log::Error("Sync") << "Cannot insert " << blocks.size() << " blocks, error=" << insert_result.status; continue; @@ -155,7 +157,7 @@ void PoWSync::execution_loop() { // Verify the new section of the chain log::Info("Sync") << "Verifying chain, head=(" << new_height.number << ", " << to_hex(new_height.hash) << ")"; - const auto verification = sync_wait(io_context_, exec_engine_->validate_chain(new_height)); // BLOCKING + const auto verification = spawn_and_wait(io_context_, exec_engine_->validate_chain(new_height)); // BLOCKING if (std::holds_alternative(verification)) { auto valid_chain = std::get(verification); @@ -167,14 +169,14 @@ void PoWSync::execution_loop() { // Notify the fork choice log::Info("Sync") << "Notifying fork choice updated, new head=" << new_height.number; - sync_wait(io_context_, exec_engine_->update_fork_choice({new_height.hash})); + spawn_and_wait(io_context_, exec_engine_->update_fork_choice({new_height.hash})); send_new_block_hash_announcements(); // according to eth/67 they must be done after a full block verification } else if (std::holds_alternative(verification)) { auto invalid_chain = std::get(verification); - const auto latest_valid_height = sync_wait(io_context_, exec_engine_->get_header_hash_number(invalid_chain.unwind_point.hash)); + const auto latest_valid_height = spawn_and_wait(io_context_, exec_engine_->get_header_hash_number(invalid_chain.unwind_point.hash)); ensure_invariant(latest_valid_height.has_value(), "wrong latest_valid_head"); log::Info("Sync") << "Invalid chain, unwinding down to=" << *latest_valid_height; @@ -188,7 +190,7 @@ void PoWSync::execution_loop() { // Notify the fork choice log::Info("Sync") << "Notifying fork choice updated, head=" << to_hex(invalid_chain.unwind_point.hash); - sync_wait(io_context_, exec_engine_->update_fork_choice({invalid_chain.unwind_point.hash})); + spawn_and_wait(io_context_, exec_engine_->update_fork_choice({invalid_chain.unwind_point.hash})); } else if (std::holds_alternative(verification)) { // If it returned a validation error, raise an exception From 5c423bb8de1d4310dc3ef6d87692f54298c888e2 Mon Sep 17 00:00:00 2001 From: canepat <16927169+canepat@users.noreply.github.com> Date: Tue, 20 Aug 2024 12:18:00 +0200 Subject: [PATCH 6/6] renaming after review --- silkworm/db/kv/api/service_router.cpp | 2 +- silkworm/db/kv/state_changes_stream.cpp | 2 +- .../infra/concurrency/parallel_group_test.cpp | 7 +-- silkworm/infra/concurrency/spawn.hpp | 16 +++--- silkworm/infra/concurrency/spawn_test.cpp | 10 +--- .../infra/test_util/context_test_base.hpp | 4 +- .../execution/api/active_direct_service.cpp | 38 ++++++------- silkworm/node/stagedsync/execution_engine.cpp | 2 +- .../node/stagedsync/stages/stage_triggers.cpp | 2 +- .../discovery/node_db/serial_node_db.cpp | 56 +++++++++---------- silkworm/sentry/message_receiver.cpp | 2 +- silkworm/sentry/peer_manager.cpp | 10 ++-- silkworm/sentry/peer_manager_api.cpp | 2 +- silkworm/sentry/rlpx/peer.cpp | 4 +- silkworm/sync/sync_pow.cpp | 26 ++++----- 15 files changed, 87 insertions(+), 96 deletions(-) diff --git a/silkworm/db/kv/api/service_router.cpp b/silkworm/db/kv/api/service_router.cpp index dda54c0166..ed3f343e26 100644 --- a/silkworm/db/kv/api/service_router.cpp +++ b/silkworm/db/kv/api/service_router.cpp @@ -24,7 +24,7 @@ using namespace boost::asio; Task StateChangeRunner::run(std::shared_ptr self) { auto run = self->handle_calls(); - co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); + co_await concurrency::spawn_task(self->strand_, std::move(run)); } StateChangeRunner::StateChangeRunner(const boost::asio::any_io_executor& executor) diff --git a/silkworm/db/kv/state_changes_stream.cpp b/silkworm/db/kv/state_changes_stream.cpp index 8405d49c1d..cd583fbcee 100644 --- a/silkworm/db/kv/state_changes_stream.cpp +++ b/silkworm/db/kv/state_changes_stream.cpp @@ -31,7 +31,7 @@ StateChangesStream::StateChangesStream(rpc::ClientContext& context, api::Client& cache_(must_use_shared_service(scheduler_)) {} std::future StateChangesStream::open() { - return concurrency::spawn(scheduler_, run()); + return concurrency::spawn_future(scheduler_, run()); } void StateChangesStream::close() { diff --git a/silkworm/infra/concurrency/parallel_group_test.cpp b/silkworm/infra/concurrency/parallel_group_test.cpp index d2725f3c07..624d3d195e 100644 --- a/silkworm/infra/concurrency/parallel_group_test.cpp +++ b/silkworm/infra/concurrency/parallel_group_test.cpp @@ -24,7 +24,6 @@ #include #include #include -#include #include #include @@ -52,12 +51,12 @@ awaitable throw_op() { } awaitable spawn_throw_op(strand& strand) { - co_await spawn_and_async_wait(strand, throw_op()); + co_await spawn_task(strand, throw_op()); } awaitable spawn_noop_loop(strand& strand) { while (true) { - co_await spawn_and_async_wait(strand, noop()); + co_await spawn_task(strand, noop()); } } @@ -74,6 +73,6 @@ awaitable co_spawn_cancellation_handler_bug() { TEST_CASE("parallel_group.co_spawn_cancellation_handler_bug") { io_context context; - spawn(context, co_spawn_cancellation_handler_bug()); + spawn_future(context, co_spawn_cancellation_handler_bug()); context.run(); } diff --git a/silkworm/infra/concurrency/spawn.hpp b/silkworm/infra/concurrency/spawn.hpp index 680fc8c42a..26bc8e3685 100644 --- a/silkworm/infra/concurrency/spawn.hpp +++ b/silkworm/infra/concurrency/spawn.hpp @@ -37,33 +37,33 @@ template concept AsioExecutionContext = std::is_convertible_v; template -auto spawn_and_async_wait(const Executor& ex, F&& f) { +auto spawn_task(const Executor& ex, F&& f) { return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_awaitable); } template -auto spawn_and_async_wait(ExecutionContext& ctx, F&& f) { +auto spawn_task(ExecutionContext& ctx, F&& f) { return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_awaitable); } template -auto spawn(const Executor& ex, F&& f) { +auto spawn_future(const Executor& ex, F&& f) { return boost::asio::co_spawn(ex, std::forward(f), boost::asio::use_future); } template -auto spawn(ExecutionContext& ctx, F&& f) { +auto spawn_future(ExecutionContext& ctx, F&& f) { return boost::asio::co_spawn(ctx, std::forward(f), boost::asio::use_future); } template -auto spawn_and_wait(const Executor& ex, F&& f) { - return spawn(ex, std::forward(f)).get(); +auto spawn_future_and_wait(const Executor& ex, F&& f) { + return spawn_future(ex, std::forward(f)).get(); } template -auto spawn_and_wait(ExecutionContext& ctx, F&& f) { - return spawn(ctx, std::forward(f)).get(); +auto spawn_future_and_wait(ExecutionContext& ctx, F&& f) { + return spawn_future(ctx, std::forward(f)).get(); } } // namespace silkworm::concurrency diff --git a/silkworm/infra/concurrency/spawn_test.cpp b/silkworm/infra/concurrency/spawn_test.cpp index 02cd18feea..d012c6fef8 100644 --- a/silkworm/infra/concurrency/spawn_test.cpp +++ b/silkworm/infra/concurrency/spawn_test.cpp @@ -73,18 +73,12 @@ struct SpawnTest { TEST_CASE_METHOD(SpawnTest, "spawn_and_wait") { SECTION("wait for function") { - CHECK_NOTHROW(spawn_and_wait(ioc, dummy_task())); - - std::future result = spawn(ioc, dummy_task()); - CHECK_NOTHROW(result.get()); + CHECK_NOTHROW(spawn_future_and_wait(ioc, dummy_task())); } SECTION("wait for method") { DummyEngine engine{ioc}; - CHECK(spawn_and_wait(engine.get_executor(), DummyEngine::do_work()) == 42); - - std::future result = spawn(engine.get_executor(), DummyEngine::do_work()); - CHECK(result.get() == 42); + CHECK(spawn_future_and_wait(engine.get_executor(), DummyEngine::do_work()) == 42); } } diff --git a/silkworm/infra/test_util/context_test_base.hpp b/silkworm/infra/test_util/context_test_base.hpp index 7b02d39490..4de8ace778 100644 --- a/silkworm/infra/test_util/context_test_base.hpp +++ b/silkworm/infra/test_util/context_test_base.hpp @@ -36,12 +36,12 @@ class ContextTestBase { template auto spawn(AwaitableOrFunction&& awaitable) { - return concurrency::spawn(io_context_, std::forward(awaitable)); + return concurrency::spawn_future(io_context_, std::forward(awaitable)); } template auto spawn_and_wait(AwaitableOrFunction&& awaitable) { - return concurrency::spawn_and_wait(io_context_, std::forward(awaitable)); + return spawn(std::forward(awaitable)).get(); } static void sleep_for(std::chrono::milliseconds sleep_time_ms) { diff --git a/silkworm/node/execution/api/active_direct_service.cpp b/silkworm/node/execution/api/active_direct_service.cpp index a0058c1db1..57e18ae05b 100644 --- a/silkworm/node/execution/api/active_direct_service.cpp +++ b/silkworm/node/execution/api/active_direct_service.cpp @@ -41,7 +41,7 @@ bool ActiveDirectService::stop() { // rpc InsertBlocks(InsertBlocksRequest) returns(InsertionResult); Task ActiveDirectService::insert_blocks(const Blocks& blocks) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& bb) { + return concurrency::spawn_task(executor_, [](auto* self, const auto& bb) { return self->DirectService::insert_blocks(bb); }(this, blocks)); } @@ -50,14 +50,14 @@ Task ActiveDirectService::insert_blocks(const Blocks& blocks) { // rpc ValidateChain(ValidationRequest) returns(ValidationReceipt); Task ActiveDirectService::validate_chain(BlockNumAndHash number_and_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_and_hash) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_and_hash) { return self->DirectService::validate_chain(num_and_hash); }(this, number_and_hash)); } // rpc UpdateForkChoice(ForkChoice) returns(ForkChoiceReceipt); Task ActiveDirectService::update_fork_choice(const ForkChoice& fork_choice) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& choice) { + return concurrency::spawn_task(executor_, [](auto* self, const auto& choice) { return self->DirectService::update_fork_choice(choice); }(this, fork_choice)); } @@ -66,14 +66,14 @@ Task ActiveDirectService::update_fork_choice(const ForkChoice& // rpc AssembleBlock(AssembleBlockRequest) returns(AssembleBlockResponse); Task ActiveDirectService::assemble_block(const api::BlockUnderConstruction& block) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& b) { + return concurrency::spawn_task(executor_, [](auto* self, const auto& b) { return self->DirectService::assemble_block(b); }(this, block)); } // rpc GetAssembledBlock(GetAssembledBlockRequest) returns(GetAssembledBlockResponse); Task ActiveDirectService::get_assembled_block(PayloadId payload_id) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto id) { + return concurrency::spawn_task(executor_, [](auto* self, auto id) { return self->DirectService::get_assembled_block(id); }(this, payload_id)); } @@ -82,35 +82,35 @@ Task ActiveDirectService::get_assembled_block(PayloadId pa // rpc CurrentHeader(google.protobuf.Empty) returns(GetHeaderResponse); Task> ActiveDirectService::current_header() { - return concurrency::spawn_and_async_wait(executor_, [](auto* self) { + return concurrency::spawn_task(executor_, [](auto* self) { return self->DirectService::current_header(); }(this)); } // rpc GetTD(GetSegmentRequest) returns(GetTDResponse); Task> ActiveDirectService::get_td(BlockNumberOrHash number_or_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_td(num_or_hash); }(this, number_or_hash)); } // rpc GetHeader(GetSegmentRequest) returns(GetHeaderResponse); Task> ActiveDirectService::get_header(BlockNumberOrHash number_or_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_header(num_or_hash); }(this, number_or_hash)); } // rpc GetBody(GetSegmentRequest) returns(GetBodyResponse); Task> ActiveDirectService::get_body(BlockNumberOrHash number_or_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::get_body(num_or_hash); }(this, number_or_hash)); } // rpc HasBlock(GetSegmentRequest) returns(HasBlockResponse); Task ActiveDirectService::has_block(BlockNumberOrHash number_or_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_or_hash) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_or_hash) { return self->DirectService::has_block(num_or_hash); }(this, number_or_hash)); } @@ -119,14 +119,14 @@ Task ActiveDirectService::has_block(BlockNumberOrHash number_or_hash) { // rpc GetBodiesByRange(GetBodiesByRangeRequest) returns(GetBodiesBatchResponse); Task ActiveDirectService::get_bodies_by_range(BlockNumRange number_range) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto num_range) { + return concurrency::spawn_task(executor_, [](auto* self, auto num_range) { return self->DirectService::get_bodies_by_range(num_range); }(this, number_range)); } // rpc GetBodiesByHashes(GetBodiesByHashesRequest) returns(GetBodiesBatchResponse); Task ActiveDirectService::get_bodies_by_hashes(const BlockHashes& hashes) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, const auto& hh) { + return concurrency::spawn_task(executor_, [](auto* self, const auto& hh) { return self->DirectService::get_bodies_by_hashes(hh); }(this, hashes)); } @@ -135,21 +135,21 @@ Task ActiveDirectService::get_bodies_by_hashes(const BlockHashes& h // rpc IsCanonicalHash(types.H256) returns(IsCanonicalResponse); Task ActiveDirectService::is_canonical_hash(Hash block_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto h) { + return concurrency::spawn_task(executor_, [](auto* self, auto h) { return self->DirectService::is_canonical_hash(h); }(this, block_hash)); } // rpc GetHeaderHashNumber(types.H256) returns(GetHeaderHashNumberResponse); Task> ActiveDirectService::get_header_hash_number(Hash block_hash) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto h) { + return concurrency::spawn_task(executor_, [](auto* self, auto h) { return self->DirectService::get_header_hash_number(h); }(this, block_hash)); } // rpc GetForkChoice(google.protobuf.Empty) returns(ForkChoice); Task ActiveDirectService::get_fork_choice() { - return concurrency::spawn_and_async_wait(executor_, [](auto* self) { + return concurrency::spawn_task(executor_, [](auto* self) { return self->DirectService::get_fork_choice(); }(this)); } @@ -158,14 +158,14 @@ Task ActiveDirectService::get_fork_choice() { // rpc Ready(google.protobuf.Empty) returns(ReadyResponse); Task ActiveDirectService::ready() { - return concurrency::spawn_and_async_wait(executor_, [](auto* self) { + return concurrency::spawn_task(executor_, [](auto* self) { return self->DirectService::ready(); }(this)); } // rpc FrozenBlocks(google.protobuf.Empty) returns(FrozenBlocksResponse); Task ActiveDirectService::frozen_blocks() { - return concurrency::spawn_and_async_wait(executor_, [](auto* self) { + return concurrency::spawn_task(executor_, [](auto* self) { return self->DirectService::frozen_blocks(); }(this)); } @@ -173,13 +173,13 @@ Task ActiveDirectService::frozen_blocks() { /** Additional non-RPC methods **/ Task ActiveDirectService::get_last_headers(uint64_t n) { - return concurrency::spawn_and_async_wait(executor_, [](auto* self, auto how_many) { + return concurrency::spawn_task(executor_, [](auto* self, auto how_many) { return self->DirectService::get_last_headers(how_many); }(this, n)); } Task ActiveDirectService::block_progress() { - return concurrency::spawn_and_async_wait(executor_, [](auto* self) { + return concurrency::spawn_task(executor_, [](auto* self) { return self->DirectService::block_progress(); }(this)); } diff --git a/silkworm/node/stagedsync/execution_engine.cpp b/silkworm/node/stagedsync/execution_engine.cpp index 30c31274dd..7d42aa4609 100644 --- a/silkworm/node/stagedsync/execution_engine.cpp +++ b/silkworm/node/stagedsync/execution_engine.cpp @@ -206,7 +206,7 @@ bool ExecutionEngine::notify_fork_choice_update(Hash head_block_hash, // notify the fork of the update - we need to block here to restore the invariant auto fork_choice_aw_future = (*f)->fork_choice(head_block_hash, finalized_block_hash, safe_block_hash); - std::future fork_choice_future = concurrency::spawn(io_context_, fork_choice_aw_future.get()); + std::future fork_choice_future = concurrency::spawn_future(io_context_, fork_choice_aw_future.get()); bool updated = fork_choice_future.get(); // BLOCKING if (!updated) return false; diff --git a/silkworm/node/stagedsync/stages/stage_triggers.cpp b/silkworm/node/stagedsync/stages/stage_triggers.cpp index 17c06d80bd..66753fe491 100644 --- a/silkworm/node/stagedsync/stages/stage_triggers.cpp +++ b/silkworm/node/stagedsync/stages/stage_triggers.cpp @@ -46,7 +46,7 @@ Task TriggersStage::schedule(std::function(db::RWTxn&)> task) { assert(tx); co_await t(*tx); }; - return concurrency::spawn_and_async_wait(io_context_, task_caller()); + return concurrency::spawn_task(io_context_, task_caller()); } bool TriggersStage::stop() { diff --git a/silkworm/sentry/discovery/node_db/serial_node_db.cpp b/silkworm/sentry/discovery/node_db/serial_node_db.cpp index 0e0bf71a13..8bd5cbcf7e 100644 --- a/silkworm/sentry/discovery/node_db/serial_node_db.cpp +++ b/silkworm/sentry/discovery/node_db/serial_node_db.cpp @@ -25,115 +25,115 @@ namespace silkworm::sentry::discovery::node_db { using namespace boost::asio; Task SerialNodeDb::upsert_node_address(NodeId id, NodeAddress address) { - return concurrency::spawn_and_async_wait(strand_, db_.upsert_node_address(std::move(id), std::move(address))); + return concurrency::spawn_task(strand_, db_.upsert_node_address(std::move(id), std::move(address))); } Task> SerialNodeDb::find_node_address_v4(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_node_address_v4(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_node_address_v4(std::move(id))); } Task> SerialNodeDb::find_node_address_v6(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_node_address_v6(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_node_address_v6(std::move(id))); } Task SerialNodeDb::update_next_ping_time(NodeId id, Time value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_next_ping_time(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_next_ping_time(std::move(id), value)); } Task> SerialNodeDb::find_next_ping_time(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_next_ping_time(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_next_ping_time(std::move(id))); } Task SerialNodeDb::update_last_pong_time(NodeId id, Time value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_last_pong_time(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_last_pong_time(std::move(id), value)); } Task> SerialNodeDb::find_last_pong_time(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_last_pong_time(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_last_pong_time(std::move(id))); } Task SerialNodeDb::update_ping_fails(NodeId id, size_t value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_ping_fails(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_ping_fails(std::move(id), value)); } Task> SerialNodeDb::find_ping_fails(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_ping_fails(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_ping_fails(std::move(id))); } Task SerialNodeDb::update_peer_disconnected_time(NodeId id, Time value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_peer_disconnected_time(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_peer_disconnected_time(std::move(id), value)); } Task> SerialNodeDb::find_peer_disconnected_time(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_peer_disconnected_time(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_peer_disconnected_time(std::move(id))); } Task SerialNodeDb::update_peer_is_useless(NodeId id, bool value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_peer_is_useless(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_peer_is_useless(std::move(id), value)); } Task> SerialNodeDb::find_peer_is_useless(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_peer_is_useless(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_peer_is_useless(std::move(id))); } Task SerialNodeDb::update_distance(NodeId id, size_t value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_distance(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_distance(std::move(id), value)); } Task> SerialNodeDb::find_distance(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_distance(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_distance(std::move(id))); } Task SerialNodeDb::update_enr_seq_num(NodeId id, uint64_t value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_enr_seq_num(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_enr_seq_num(std::move(id), value)); } Task> SerialNodeDb::find_enr_seq_num(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_enr_seq_num(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_enr_seq_num(std::move(id))); } Task SerialNodeDb::update_eth1_fork_id(NodeId id, std::optional value) { - return concurrency::spawn_and_async_wait(strand_, db_.update_eth1_fork_id(std::move(id), value)); + return concurrency::spawn_task(strand_, db_.update_eth1_fork_id(std::move(id), value)); } Task> SerialNodeDb::find_eth1_fork_id(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.find_eth1_fork_id(std::move(id))); + return concurrency::spawn_task(strand_, db_.find_eth1_fork_id(std::move(id))); } Task> SerialNodeDb::find_ping_candidates(Time time, size_t limit) { - return concurrency::spawn_and_async_wait(strand_, db_.find_ping_candidates(time, limit)); + return concurrency::spawn_task(strand_, db_.find_ping_candidates(time, limit)); } Task> SerialNodeDb::find_useful_nodes(Time min_pong_time, size_t limit) { - return concurrency::spawn_and_async_wait(strand_, db_.find_useful_nodes(min_pong_time, limit)); + return concurrency::spawn_task(strand_, db_.find_useful_nodes(min_pong_time, limit)); } Task> SerialNodeDb::find_lookup_candidates(FindLookupCandidatesQuery query) { - return concurrency::spawn_and_async_wait(strand_, db_.find_lookup_candidates(query)); + return concurrency::spawn_task(strand_, db_.find_lookup_candidates(query)); } Task SerialNodeDb::mark_taken_lookup_candidates(const std::vector& ids, Time time) { - return concurrency::spawn_and_async_wait(strand_, db_.mark_taken_lookup_candidates(ids, time)); + return concurrency::spawn_task(strand_, db_.mark_taken_lookup_candidates(ids, time)); } Task> SerialNodeDb::take_lookup_candidates(FindLookupCandidatesQuery query, Time time) { - return concurrency::spawn_and_async_wait(strand_, db_.take_lookup_candidates(query, time)); + return concurrency::spawn_task(strand_, db_.take_lookup_candidates(query, time)); } Task> SerialNodeDb::find_peer_candidates(FindPeerCandidatesQuery query) { - return concurrency::spawn_and_async_wait(strand_, db_.find_peer_candidates(std::move(query))); + return concurrency::spawn_task(strand_, db_.find_peer_candidates(std::move(query))); } Task SerialNodeDb::mark_taken_peer_candidates(const std::vector& ids, Time time) { - return concurrency::spawn_and_async_wait(strand_, db_.mark_taken_peer_candidates(ids, time)); + return concurrency::spawn_task(strand_, db_.mark_taken_peer_candidates(ids, time)); } Task> SerialNodeDb::take_peer_candidates(FindPeerCandidatesQuery query, Time time) { - return concurrency::spawn_and_async_wait(strand_, db_.take_peer_candidates(std::move(query), time)); + return concurrency::spawn_task(strand_, db_.take_peer_candidates(std::move(query), time)); } Task SerialNodeDb::delete_node(NodeId id) { - return concurrency::spawn_and_async_wait(strand_, db_.delete_node(std::move(id))); + return concurrency::spawn_task(strand_, db_.delete_node(std::move(id))); } } // namespace silkworm::sentry::discovery::node_db diff --git a/silkworm/sentry/message_receiver.cpp b/silkworm/sentry/message_receiver.cpp index 3c765f9d7d..c69cb059cb 100644 --- a/silkworm/sentry/message_receiver.cpp +++ b/silkworm/sentry/message_receiver.cpp @@ -41,7 +41,7 @@ Task MessageReceiver::run(std::shared_ptr self, PeerManag self->peer_tasks_.wait() && self->unsubscription_tasks_.wait() && self->handle_calls(); - co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); + co_await concurrency::spawn_task(self->strand_, std::move(run)); } Task MessageReceiver::handle_calls() { diff --git a/silkworm/sentry/peer_manager.cpp b/silkworm/sentry/peer_manager.cpp index c9cdf4a4e5..2ef2b6d3fc 100644 --- a/silkworm/sentry/peer_manager.cpp +++ b/silkworm/sentry/peer_manager.cpp @@ -49,7 +49,7 @@ Task PeerManager::run( connect_peer_tasks_.wait() && drop_peer_tasks_.wait() && peer_tasks_.wait(); - co_await concurrency::spawn_and_async_wait(strand_, std::move(run)); + co_await concurrency::spawn_task(strand_, std::move(run)); } Task PeerManager::run_in_strand(concurrency::Channel>& peer_channel) { @@ -122,15 +122,15 @@ Task PeerManager::drop_peer( } Task PeerManager::count_peers() { - co_return (co_await concurrency::spawn_and_async_wait(strand_, count_peers_in_strand())); + co_return (co_await concurrency::spawn_task(strand_, count_peers_in_strand())); } Task PeerManager::enumerate_peers(EnumeratePeersCallback callback) { - co_await concurrency::spawn_and_async_wait(strand_, enumerate_peers_in_strand(callback)); + co_await concurrency::spawn_task(strand_, enumerate_peers_in_strand(callback)); } Task PeerManager::enumerate_random_peers(size_t max_count, EnumeratePeersCallback callback) { - co_await concurrency::spawn_and_async_wait(strand_, enumerate_random_peers_in_strand(max_count, callback)); + co_await concurrency::spawn_task(strand_, enumerate_random_peers_in_strand(max_count, callback)); } Task PeerManager::count_peers_in_strand() { @@ -251,7 +251,7 @@ Task PeerManager::connect_peer(EnodeUrl peer_url, bool is_static_peer, std [[maybe_unused]] auto _ = gsl::finally([this, peer_url] { this->connecting_peer_urls_.erase(peer_url); }); try { - auto peer1 = co_await concurrency::spawn_and_async_wait(executor_pool_.any_executor(), client->connect(peer_url, is_static_peer)); + auto peer1 = co_await concurrency::spawn_task(executor_pool_.any_executor(), client->connect(peer_url, is_static_peer)); auto peer = std::shared_ptr(std::move(peer1)); co_await client_peer_channel_.send(peer); } catch (const boost::system::system_error& ex) { diff --git a/silkworm/sentry/peer_manager_api.cpp b/silkworm/sentry/peer_manager_api.cpp index dcaa2cba34..ce0f8018bb 100644 --- a/silkworm/sentry/peer_manager_api.cpp +++ b/silkworm/sentry/peer_manager_api.cpp @@ -47,7 +47,7 @@ Task PeerManagerApi::run(std::shared_ptr self) { self->handle_peer_events_calls() && self->events_unsubscription_tasks_.wait() && self->forward_peer_events(); - co_await concurrency::spawn_and_async_wait(self->strand_, std::move(run)); + co_await concurrency::spawn_task(self->strand_, std::move(run)); } Task PeerManagerApi::handle_peer_count_calls() { diff --git a/silkworm/sentry/rlpx/peer.cpp b/silkworm/sentry/rlpx/peer.cpp index c5a195e94d..0fc939d197 100644 --- a/silkworm/sentry/rlpx/peer.cpp +++ b/silkworm/sentry/rlpx/peer.cpp @@ -77,7 +77,7 @@ Task Peer::run(std::shared_ptr peer) { using namespace concurrency::awaitable_wait_for_one; auto run = peer->handle() || peer->send_message_tasks_.wait(); - co_await concurrency::spawn_and_async_wait(peer->strand_, std::move(run)); + co_await concurrency::spawn_task(peer->strand_, std::move(run)); } static bool is_fatal_network_error(const boost::system::system_error& ex) { @@ -244,7 +244,7 @@ Task Peer::handle() { } Task Peer::drop(const std::shared_ptr& peer, DisconnectReason reason) { - return concurrency::spawn_and_async_wait(peer->strand_, Peer::drop_in_strand(peer, reason)); + return concurrency::spawn_task(peer->strand_, Peer::drop_in_strand(peer, reason)); } Task Peer::drop_in_strand(std::shared_ptr peer, DisconnectReason reason) { diff --git a/silkworm/sync/sync_pow.cpp b/silkworm/sync/sync_pow.cpp index 3e79b07d9e..9932f741d5 100644 --- a/silkworm/sync/sync_pow.cpp +++ b/silkworm/sync/sync_pow.cpp @@ -27,7 +27,7 @@ namespace silkworm::chainsync { -using concurrency::spawn_and_wait; +using concurrency::spawn_future_and_wait; PoWSync::PoWSync(BlockExchange& block_exchange, execution::api::Client& exec_engine) : ChainSync(block_exchange, exec_engine) {} @@ -40,21 +40,21 @@ PoWSync::NewHeight PoWSync::resume() { // find the point (head) where we left o BlockId head{}; // BlockExchange need a bunch of previous headers to attach the new ones - auto last_headers = spawn_and_wait(io_context_, exec_engine_->get_last_headers(1000)); + auto last_headers = spawn_future_and_wait(io_context_, exec_engine_->get_last_headers(1000)); block_exchange_.initial_state(last_headers); // We calculate a provisional head based on the previous headers std::ranges::for_each(last_headers, [&, this](const auto& header) { auto hash = header.hash(); - auto td = spawn_and_wait(io_context_, exec_engine_->get_td(hash)); + auto td = spawn_future_and_wait(io_context_, exec_engine_->get_td(hash)); chain_fork_view_.add(header, *td); // add to cache & compute a new canonical head }); // Now we can resume the sync from the canonical head - const auto last_fcu = spawn_and_wait(io_context_, exec_engine_->get_fork_choice()); // previously was get_canonical_head() - const auto block_progress = spawn_and_wait(io_context_, exec_engine_->block_progress()); + const auto last_fcu = spawn_future_and_wait(io_context_, exec_engine_->get_fork_choice()); // previously was get_canonical_head() + const auto block_progress = spawn_future_and_wait(io_context_, exec_engine_->block_progress()); - const auto last_fcu_number = spawn_and_wait(io_context_, exec_engine_->get_header_hash_number(last_fcu.head_block_hash)); + const auto last_fcu_number = spawn_future_and_wait(io_context_, exec_engine_->get_header_hash_number(last_fcu.head_block_hash)); if (!last_fcu_number) return head; ensure_invariant(*last_fcu_number <= block_progress, "canonical head beyond block progress"); @@ -80,7 +80,7 @@ PoWSync::NewHeight PoWSync::forward_and_insert_blocks() { ResultQueue& downloading_queue = block_exchange_.result_queue(); - auto initial_block_progress = spawn_and_wait(io_context_, exec_engine_->block_progress()); + auto initial_block_progress = spawn_future_and_wait(io_context_, exec_engine_->block_progress()); auto block_progress = initial_block_progress; block_exchange_.download_blocks(initial_block_progress, BlockExchange::Target_Tracking::kByAnnouncements); @@ -107,7 +107,7 @@ PoWSync::NewHeight PoWSync::forward_and_insert_blocks() { }); // Insert blocks into database - const auto insert_result{spawn_and_wait(io_context_, exec_engine_->insert_blocks(to_plain_blocks(blocks)))}; + const auto insert_result{spawn_future_and_wait(io_context_, exec_engine_->insert_blocks(to_plain_blocks(blocks)))}; if (!insert_result) { log::Error("Sync") << "Cannot insert " << blocks.size() << " blocks, error=" << insert_result.status; continue; @@ -157,7 +157,7 @@ void PoWSync::execution_loop() { // Verify the new section of the chain log::Info("Sync") << "Verifying chain, head=(" << new_height.number << ", " << to_hex(new_height.hash) << ")"; - const auto verification = spawn_and_wait(io_context_, exec_engine_->validate_chain(new_height)); // BLOCKING + const auto verification = spawn_future_and_wait(io_context_, exec_engine_->validate_chain(new_height)); // BLOCKING if (std::holds_alternative(verification)) { auto valid_chain = std::get(verification); @@ -169,14 +169,14 @@ void PoWSync::execution_loop() { // Notify the fork choice log::Info("Sync") << "Notifying fork choice updated, new head=" << new_height.number; - spawn_and_wait(io_context_, exec_engine_->update_fork_choice({new_height.hash})); + spawn_future_and_wait(io_context_, exec_engine_->update_fork_choice({new_height.hash})); send_new_block_hash_announcements(); // according to eth/67 they must be done after a full block verification } else if (std::holds_alternative(verification)) { auto invalid_chain = std::get(verification); - const auto latest_valid_height = spawn_and_wait(io_context_, exec_engine_->get_header_hash_number(invalid_chain.unwind_point.hash)); + const auto latest_valid_height = spawn_future_and_wait(io_context_, exec_engine_->get_header_hash_number(invalid_chain.unwind_point.hash)); ensure_invariant(latest_valid_height.has_value(), "wrong latest_valid_head"); log::Info("Sync") << "Invalid chain, unwinding down to=" << *latest_valid_height; @@ -190,14 +190,12 @@ void PoWSync::execution_loop() { // Notify the fork choice log::Info("Sync") << "Notifying fork choice updated, head=" << to_hex(invalid_chain.unwind_point.hash); - spawn_and_wait(io_context_, exec_engine_->update_fork_choice({invalid_chain.unwind_point.hash})); - + spawn_future_and_wait(io_context_, exec_engine_->update_fork_choice({invalid_chain.unwind_point.hash})); } else if (std::holds_alternative(verification)) { // If it returned a validation error, raise an exception const auto validation_error = std::get(verification); throw std::logic_error("Consensus validation error: last point=" + validation_error.latest_valid_head.hash.to_hex() + ", error=" + validation_error.error); - } else { throw std::logic_error("Consensus, unknown error"); }