From 3dc15fd10c5b0420b752518fa746684b6782ad0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 10 Mar 2024 20:00:20 +0100 Subject: [PATCH] Use per peer fair queue in block processor --- nano/core_test/network.cpp | 7 +- nano/core_test/node.cpp | 5 +- nano/lib/stats_enums.hpp | 2 + nano/node/blockprocessor.cpp | 188 ++++++++++++++-------- nano/node/blockprocessor.hpp | 41 +++-- nano/node/bootstrap/bootstrap_legacy.cpp | 4 +- nano/node/bootstrap_ascending/service.cpp | 2 +- nano/node/fair_queue.hpp | 2 +- nano/node/network.cpp | 11 +- nano/node/nodeconfig.cpp | 13 +- nano/node/nodeconfig.hpp | 2 + 11 files changed, 180 insertions(+), 97 deletions(-) diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index bf9ed2e651..6ada05260b 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -813,10 +813,9 @@ TEST (network, duplicate_detection) TEST (network, duplicate_revert_publish) { nano::test::system system; - nano::node_flags node_flags; - node_flags.block_processor_full_size = 0; - auto & node (*system.add_node (node_flags)); - ASSERT_TRUE (node.block_processor.full ()); + nano::node_config node_config = system.default_config (); + node_config.block_processor.max_peer_queue = 0; + auto & node (*system.add_node (node_config)); nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis }; std::vector bytes; { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index cf81a51bdb..2886c4a2b7 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -528,6 +528,7 @@ TEST (node, expire) ASSERT_TRUE (node0.expired ()); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_publish) { nano::test::system system (1); @@ -674,6 +675,7 @@ TEST (node, fork_keep) ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ())); } +// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed TEST (node, fork_flip) { nano::test::system system (2); @@ -699,8 +701,7 @@ TEST (node, fork_flip) .work (*system.work.generate (nano::dev::genesis->hash ())) .build (); nano::publish publish2{ nano::dev::network_params.network, send2 }; - auto ignored_channel{ std::make_shared (node1, std::weak_ptr ()) }; - + auto ignored_channel = nano::test::fake_channel (node1); node1.network.inbound (publish1, ignored_channel); node2.network.inbound (publish2, ignored_channel); ASSERT_TIMELY_EQ (5s, 1, node1.active.size ()); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 6d1ac20b29..648d883132 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -38,6 +38,7 @@ enum class type : uint8_t blockprocessor, blockprocessor_source, blockprocessor_result, + blockprocessor_overfill, bootstrap_server, active, active_started, @@ -80,6 +81,7 @@ enum class detail : uint8_t none, success, unknown, + queue_overflow, // processing queue queue, diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 3bb23c8ee2..b1b4019ddd 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include @@ -15,7 +15,7 @@ */ nano::block_processor::context::context (std::shared_ptr block, nano::block_source source_a) : - block{ block }, + block{ std::move (block) }, source{ source_a } { debug_assert (source != nano::block_source::unknown); @@ -36,6 +36,7 @@ void nano::block_processor::context::set_result (result_t const & result) */ nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) : + config{ node_a.config.block_processor }, node (node_a), write_database_queue (write_database_queue_a), next_log (std::chrono::steady_clock::now ()) @@ -47,6 +48,32 @@ nano::block_processor::block_processor (nano::node & node_a, nano::write_databas block_processed.notify (result, context); } }); + + queue.max_size_query = [this] (auto const & origin) { + switch (std::get (origin.sources)) + { + case nano::block_source::live: + return config.max_peer_queue; + default: + return config.max_system_queue; + } + }; + + queue.priority_query = [this] (auto const & origin) -> size_t { + switch (std::get (origin.sources)) + { + case nano::block_source::live: + return config.priority_live; + case nano::block_source::bootstrap: + case nano::block_source::bootstrap_legacy: + case nano::block_source::unchecked: + return config.priority_bootstrap; + case nano::block_source::local: + return config.priority_local; + default: + return 1; + } + }; } nano::block_processor::~block_processor () @@ -78,39 +105,44 @@ void nano::block_processor::stop () } } -std::size_t nano::block_processor::size () +// TODO: Remove and replace all checks with calls to size (block_source) +std::size_t nano::block_processor::size () const +{ + nano::unique_lock lock{ mutex }; + return queue.total_size (); +} + +std::size_t nano::block_processor::size (nano::block_source source) const { nano::unique_lock lock{ mutex }; - return blocks.size () + forced.size (); + return queue.size ({ source }); } -bool nano::block_processor::full () +bool nano::block_processor::full () const { return size () >= node.flags.block_processor_full_size; } -bool nano::block_processor::half_full () +bool nano::block_processor::half_full () const { return size () >= node.flags.block_processor_full_size / 2; } -void nano::block_processor::add (std::shared_ptr const & block, block_source const source) +bool nano::block_processor::add (std::shared_ptr const & block, block_source const source, std::shared_ptr const & channel) { - if (full ()) - { - node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); - return; - } if (node.network_params.work.validate_entry (*block)) // true => error { node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::insufficient_work); - return; + return false; // Not added } node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::process); - node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {})", block->hash ().to_string (), to_string (source)); + node.logger.debug (nano::log::type::blockprocessor, "Processing block (async): {} (source: {} {})", + block->hash ().to_string (), + to_string (source), + channel ? channel->to_string () : ""); // TODO: Lazy eval - add_impl (context{ block, source }); + return add_impl (context{ block, source }, channel); } std::optional nano::block_processor::add_blocking (std::shared_ptr const & block, block_source const source) @@ -145,11 +177,27 @@ void nano::block_processor::force (std::shared_ptr const & block_a) node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::force); node.logger.debug (nano::log::type::blockprocessor, "Forcing block: {}", block_a->hash ().to_string ()); + add_impl (context{ block_a, block_source::forced }); +} + +bool nano::block_processor::add_impl (context ctx, std::shared_ptr const & channel) +{ + auto const source = ctx.source; + bool added = false; { - nano::lock_guard lock{ mutex }; - forced.emplace_back (context{ block_a, block_source::forced }); + nano::lock_guard guard{ mutex }; + added = queue.push (std::move (ctx), { source, channel }); } - condition.notify_all (); + if (added) + { + condition.notify_all (); + } + else + { + node.stats.inc (nano::stat::type::blockprocessor, nano::stat::detail::overfill); + node.stats.inc (nano::stat::type::blockprocessor_overfill, to_stat_detail (source)); + } + return added; } void nano::block_processor::rollback_competitor (store::write_transaction const & transaction, nano::block const & block) @@ -193,7 +241,7 @@ void nano::block_processor::run () nano::unique_lock lock{ mutex }; while (!stopped) { - if (have_blocks_ready ()) + if (!queue.empty ()) { lock.unlock (); @@ -230,47 +278,16 @@ bool nano::block_processor::should_log () return result; } -bool nano::block_processor::have_blocks_ready () -{ - debug_assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty (); -} - -bool nano::block_processor::have_blocks () -{ - debug_assert (!mutex.try_lock ()); - return have_blocks_ready (); -} - -void nano::block_processor::add_impl (context ctx) -{ - release_assert (ctx.source != nano::block_source::forced); - { - nano::lock_guard guard{ mutex }; - blocks.emplace_back (std::move (ctx)); - } - condition.notify_all (); -} - auto nano::block_processor::next () -> context { debug_assert (!mutex.try_lock ()); - debug_assert (!blocks.empty () || !forced.empty ()); // This should be checked before calling next + debug_assert (!queue.empty ()); // This should be checked before calling next - if (!forced.empty ()) + if (!queue.empty ()) { - auto entry = std::move (forced.front ()); - release_assert (entry.source == nano::block_source::forced); - forced.pop_front (); - return entry; - } - - if (!blocks.empty ()) - { - auto entry = std::move (blocks.front ()); - release_assert (entry.source != nano::block_source::forced); - blocks.pop_front (); - return entry; + auto [request, origin] = queue.next (); + release_assert (std::get (origin.sources) != nano::block_source::forced || request.source == nano::block_source::forced); + return std::move (request); } release_assert (false, "next() called when no blocks are ready"); @@ -286,19 +303,24 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock lock_a.lock (); + queue.periodic_update (); + timer_l.start (); + // Processing blocks unsigned number_of_blocks_processed (0), number_of_forced_processed (0); auto deadline_reached = [&timer_l, deadline = node.config.block_processor_batch_max_time] { return timer_l.after_deadline (deadline); }; auto processor_batch_reached = [&number_of_blocks_processed, max = node.flags.block_processor_batch_size] { return number_of_blocks_processed >= max; }; auto store_batch_reached = [&number_of_blocks_processed, max = node.store.max_block_write_batch_num ()] { return number_of_blocks_processed >= max; }; - while (have_blocks_ready () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) + while (!queue.empty () && (!deadline_reached () || !processor_batch_reached ()) && !store_batch_reached ()) { // TODO: Cleaner periodical logging - if ((blocks.size () + forced.size () > 64) && should_log ()) + if (should_log ()) { - node.logger.debug (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", blocks.size (), forced.size ()); + node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", + queue.total_size (), + queue.size ({ nano::block_source::forced })); } auto ctx = next (); @@ -339,6 +361,7 @@ nano::block_status nano::block_processor::process_one (store::write_transaction node.stats.inc (nano::stat::type::blockprocessor_result, to_stat_detail (result)); node.stats.inc (nano::stat::type::blockprocessor_source, to_stat_detail (context.source)); + node.logger.trace (nano::log::type::blockprocessor, nano::log::detail::block_processed, nano::log::arg{ "result", result }, nano::log::arg{ "source", context.source }, @@ -434,18 +457,12 @@ void nano::block_processor::queue_unchecked (store::write_transaction const & tr std::unique_ptr nano::block_processor::collect_container_info (std::string const & name) { - std::size_t blocks_count; - std::size_t forced_count; - - { - nano::lock_guard guard{ mutex }; - blocks_count = blocks.size (); - forced_count = forced.size (); - } + nano::lock_guard guard{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (forced)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocks", queue.total_size (), 0 })); + composite->add_component (std::make_unique (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 })); + composite->add_component (queue.collect_container_info ("queue")); return composite; } @@ -460,3 +477,38 @@ nano::stat::detail nano::to_stat_detail (nano::block_source type) debug_assert (value); return value.value_or (nano::stat::detail{}); } + +/* + * block_processor_config + */ + +nano::block_processor_config::block_processor_config (const nano::network_constants & network_constants) +{ + if (network_constants.is_beta_network ()) + { + // Bump max queue sizes for beta network to allow for more aggressive block propagation for saturation testing + max_peer_queue = 1024; + } +} + +nano::error nano::block_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("max_peer_queue", max_peer_queue, "Maximum number of blocks to queue from network peers. \ntype:uint64"); + toml.put ("max_system_queue", max_system_queue, "Maximum number of blocks to queue from system components (local RPC, bootstrap). \ntype:uint64"); + toml.put ("priority_live", priority_live, "Priority for live network blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_bootstrap", priority_bootstrap, "Priority for bootstrap blocks. Higher priority gets processed more frequently. \ntype:uint64"); + toml.put ("priority_local", priority_local, "Priority for local RPC blocks. Higher priority gets processed more frequently. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::block_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("max_peer_queue", max_peer_queue); + toml.get ("max_system_queue", max_system_queue); + toml.get ("priority_live", priority_live); + toml.get ("priority_bootstrap", priority_bootstrap); + toml.get ("priority_local", priority_local); + + return toml.get_error (); +} \ No newline at end of file diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 0d020e8b16..a1537092f0 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -23,7 +24,6 @@ class write_transaction; namespace nano { - enum class block_source { unknown = 0, @@ -38,6 +38,26 @@ enum class block_source std::string_view to_string (block_source); nano::stat::detail to_stat_detail (block_source); +class block_processor_config final +{ +public: + explicit block_processor_config (nano::network_constants const &); + + nano::error deserialize (nano::tomlconfig & toml); + nano::error serialize (nano::tomlconfig & toml) const; + +public: + // Maximum number of blocks to queue from network peers + size_t max_peer_queue{ 128 }; + // Maximum number of blocks to queue from system components (local RPC, bootstrap) + size_t max_system_queue{ 16 * 1024 }; + + // Higher priority gets processed more frequently + size_t priority_live{ 1 }; + size_t priority_bootstrap{ 8 }; + size_t priority_local{ 16 }; +}; + /** * Processing blocks is a potentially long IO operation. * This class isolates block insertion from other operations like servicing network operations @@ -72,15 +92,14 @@ class block_processor final void start (); void stop (); - std::size_t size (); - bool full (); - bool half_full (); - void add (std::shared_ptr const &, block_source = block_source::live); + std::size_t size () const; + std::size_t size (block_source) const; + bool full () const; + bool half_full () const; + bool add (std::shared_ptr const &, block_source = block_source::live, std::shared_ptr const & channel = nullptr); std::optional add_blocking (std::shared_ptr const & block, block_source); void force (std::shared_ptr const &); bool should_log (); - bool have_blocks_ready (); - bool have_blocks (); std::unique_ptr collect_container_info (std::string const & name); @@ -103,21 +122,21 @@ class block_processor final void queue_unchecked (store::write_transaction const &, nano::hash_or_account const &); processed_batch_t process_batch (nano::unique_lock &); context next (); - void add_impl (context); + bool add_impl (context, std::shared_ptr const & channel = nullptr); private: // Dependencies + block_processor_config const & config; nano::node & node; nano::write_database_queue & write_database_queue; private: - std::deque blocks; - std::deque forced; + nano::fair_queue queue; std::chrono::steady_clock::time_point next_log; bool stopped{ false }; nano::condition_variable condition; - nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; + mutable nano::mutex mutex{ mutex_identifier (mutexes::block_processor) }; std::thread thread; }; } diff --git a/nano/node/bootstrap/bootstrap_legacy.cpp b/nano/node/bootstrap/bootstrap_legacy.cpp index 9badee97cc..98d48e9ead 100644 --- a/nano/node/bootstrap/bootstrap_legacy.cpp +++ b/nano/node/bootstrap/bootstrap_legacy.cpp @@ -225,9 +225,9 @@ void nano::bootstrap_attempt_legacy::run () // TODO: This check / wait is a heuristic and should be improved. auto wait_start = std::chrono::steady_clock::now (); - while (!stopped && node->block_processor.size () != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) + while (!stopped && node->block_processor.size (nano::block_source::bootstrap_legacy) != 0 && ((std::chrono::steady_clock::now () - wait_start) < std::chrono::seconds{ 10 })) { - condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size () == 0; }); + condition.wait_for (lock, std::chrono::milliseconds{ 100 }, [this, node] { return stopped || node->block_processor.size (nano::block_source::bootstrap_legacy) == 0; }); } if (start_account.number () != std::numeric_limits::max ()) diff --git a/nano/node/bootstrap_ascending/service.cpp b/nano/node/bootstrap_ascending/service.cpp index e75a26c247..83645c7456 100644 --- a/nano/node/bootstrap_ascending/service.cpp +++ b/nano/node/bootstrap_ascending/service.cpp @@ -177,7 +177,7 @@ void nano::bootstrap_ascending::service::inspect (store::transaction const & tx, void nano::bootstrap_ascending::service::wait_blockprocessor () { nano::unique_lock lock{ mutex }; - while (!stopped && block_processor.size () > config.bootstrap_ascending.block_wait_count) + while (!stopped && block_processor.size (nano::block_source::bootstrap) > config.bootstrap_ascending.block_wait_count) { condition.wait_for (lock, std::chrono::milliseconds{ config.bootstrap_ascending.throttle_wait }, [this] () { return stopped; }); // Blockprocessor is relatively slow, sleeping here instead of using conditions } diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 935fa8dae8..a23626f0dc 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -34,7 +34,7 @@ class fair_queue final source (std::tuple sources, std::shared_ptr channel = nullptr) : sources{ sources }, - channel{ channel } + channel{ std::move (channel) } { } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 80660cad00..6c4f649c00 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -253,15 +253,12 @@ class network_message_visitor : public nano::message_visitor } } - void publish (nano::publish const & message_a) override + void publish (nano::publish const & message) override { - if (!node.block_processor.full ()) + bool added = node.block_processor.add (message.block, nano::block_source::live, channel); + if (!added) { - node.process_active (message_a.block); - } - else - { - node.network.publish_filter.clear (message_a.digest); + node.network.publish_filter.clear (message.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 2d7e23ab44..0b6faa3fb3 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -33,7 +33,8 @@ nano::node_config::node_config (const std::optional & peering_port_a, websocket_config{ network_params.network }, ipc_config{ network_params.network }, external_address{ boost::asio::ip::address_v6{}.to_string () }, - rep_crawler{ network_params.network } + rep_crawler{ network_params.network }, + block_processor{ network_params.network } { if (peering_port == 0) { @@ -210,6 +211,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const rep_crawler.serialize (rep_crawler_l); toml.put_child ("rep_crawler", rep_crawler_l); + nano::tomlconfig block_processor_l; + block_processor.serialize (block_processor_l); + toml.put_child ("block_processor", block_processor_l); + return toml.get_error (); } @@ -285,6 +290,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) rep_crawler.deserialize (config_l); } + if (toml.has_key ("block_processor")) + { + auto config_l = toml.get_required_child ("block_processor"); + block_processor.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 252f6e07fb..6b9b757452 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -130,6 +131,7 @@ class node_config unsigned backlog_scan_frequency{ 10 }; nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; + nano::block_processor_config block_processor; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;