diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index bf930f8875..80b4ba6353 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -758,7 +758,6 @@ TEST (active_transactions, republish_winner) ASSERT_NE (nullptr, election); auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork }); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); ASSERT_TIMELY (5s, election->confirmed ()); ASSERT_EQ (fork->hash (), election->status.winner->hash ()); ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ())); @@ -937,7 +936,7 @@ TEST (active_transactions, fork_replacement_tally) .build (); auto vote = nano::test::make_vote (keys[i], { fork }, 0, 0); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); + ASSERT_TIMELY (5s, node1.vote_cache.find (fork->hash ()).size () > 0); node1.process_active (fork); } @@ -980,7 +979,6 @@ TEST (active_transactions, fork_replacement_tally) // Process vote for correct block & replace existing lowest tally block auto vote = nano::test::make_vote (nano::dev::genesis_key, { send_last }, 0, 0); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); // ensure vote arrives before the block ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ()); node1.network.publish_filter.clear (); diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp index 178365b171..d60dda7855 100644 --- a/nano/core_test/fair_queue.cpp +++ b/nano/core_test/fair_queue.cpp @@ -25,7 +25,7 @@ enum class source_enum TEST (fair_queue, construction) { nano::fair_queue queue; - ASSERT_EQ (queue.total_size (), 0); + ASSERT_EQ (queue.size (), 0); ASSERT_TRUE (queue.empty ()); } @@ -36,7 +36,7 @@ TEST (fair_queue, process_one) queue.max_size_query = [] (auto const &) { return 1; }; queue.push (7, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.size (), 1); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 1); ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0); @@ -58,7 +58,7 @@ TEST (fair_queue, fifo) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::live }); queue.push (9, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 3); @@ -90,7 +90,7 @@ TEST (fair_queue, process_many) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::bootstrap }); queue.push (9, { source_enum::unchecked }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 3); ASSERT_EQ (queue.size ({ source_enum::live }), 1); ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1); @@ -124,7 +124,7 @@ TEST (fair_queue, max_queue_size) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::live }); queue.push (9, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 2); + ASSERT_EQ (queue.size (), 2); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 2); @@ -169,7 +169,7 @@ TEST (fair_queue, round_robin_with_priority) queue.push (13, { source_enum::unchecked }); queue.push (14, { source_enum::unchecked }); queue.push (15, { source_enum::unchecked }); - ASSERT_EQ (queue.total_size (), 9); + ASSERT_EQ (queue.size (), 9); // Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source ASSERT_EQ (queue.next ().second.source, source_enum::live); @@ -201,7 +201,7 @@ TEST (fair_queue, source_channel) queue.push (7, { source_enum::live, channel2 }); queue.push (8, { source_enum::live, channel3 }); queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries - ASSERT_EQ (queue.total_size (), 4); + ASSERT_EQ (queue.size (), 4); ASSERT_EQ (queue.queues_size (), 3); // Each pair is a separate queue ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2); @@ -253,7 +253,7 @@ TEST (fair_queue, cleanup) queue.push (7, { source_enum::live, channel1 }); queue.push (8, { source_enum::live, channel2 }); queue.push (9, { source_enum::live, channel3 }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 3); ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1); @@ -267,7 +267,7 @@ TEST (fair_queue, cleanup) ASSERT_TRUE (queue.periodic_update ()); // Only channel 3 should remain - ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.size (), 1); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0); diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 7de673fbd8..d72eefd369 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -125,6 +125,7 @@ TEST (toml, daemon_config_deserialize_defaults) [node.logging] [node.statistics.log] [node.statistics.sampling] + [node.vote_processor] [node.websocket] [node.lmdb] [node.rocksdb] @@ -261,6 +262,10 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); + + ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); + ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); + ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); } TEST (toml, optional_child) @@ -553,6 +558,11 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_size = 999 max_voters = 999 + [node.vote_processor] + max_pr_queue = 999 + max_non_pr_queue = 999 + pr_priority = 999 + [opencl] device = 999 enable = true @@ -700,6 +710,10 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); + + ASSERT_NE (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); + ASSERT_NE (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); + ASSERT_NE (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); } /** There should be no required values **/ diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 115dd347ef..9e0214c094 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -30,10 +30,7 @@ TEST (vote_processor, codes) auto channel (std::make_shared (node, node)); // Invalid signature - ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, false)); - - // Hint of pre-validation - ASSERT_NE (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, true)); + ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel)); // No ongoing election (vote goes to vote cache) ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel)); @@ -58,20 +55,6 @@ TEST (vote_processor, codes) ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel)); } -TEST (vote_processor, flush) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto channel (std::make_shared (node, node)); - for (unsigned i = 0; i < 2000; ++i) - { - auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * (1 + i), 0, std::vector{ nano::dev::genesis->hash () }); - node.vote_processor.vote (vote, channel); - } - node.vote_processor.flush (); - ASSERT_TRUE (node.vote_processor.empty ()); -} - TEST (vote_processor, invalid_signature) { nano::test::system system{ 1 }; @@ -93,18 +76,6 @@ TEST (vote_processor, invalid_signature) ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ()); } -TEST (vote_processor, no_capacity) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.vote_processor_capacity = 0; - auto & node (*system.add_node (node_flags)); - nano::keypair key; - auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0); - auto channel (std::make_shared (node, node)); - ASSERT_TRUE (node.vote_processor.vote (vote, channel)); -} - TEST (vote_processor, overflow) { nano::test::system system; @@ -121,14 +92,14 @@ TEST (vote_processor, overflow) size_t const total{ 1000 }; for (unsigned i = 0; i < total; ++i) { - if (node.vote_processor.vote (vote, channel)) + if (!node.vote_processor.vote (vote, channel)) { ++not_processed; } } ASSERT_GT (not_processed, 0); ASSERT_LT (not_processed, total); - ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow)); + ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote_processor, nano::stat::detail::overfill)); // check that it did not timeout ASSERT_LT (std::chrono::system_clock::now () - start_time, 10s); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 608d430a4c..b4f2991a61 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -20,6 +20,9 @@ enum class type : uint8_t network, tcp_server, vote, + vote_processor, + vote_processor_tier, + vote_processor_overfill, election, http_callback, ipc, @@ -390,6 +393,11 @@ enum class detail : uint8_t erase_old, erase_confirmed, + // rep tiers + tier_1, + tier_2, + tier_3, + _last // Must be the last enum }; diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 8621dca958..f16e6e8c97 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -462,6 +462,8 @@ bool nano::active_transactions::trigger_vote_cache (nano::block_hash hash) // Validate a vote and apply it to the current election if one exists std::unordered_map nano::active_transactions::vote (std::shared_ptr const & vote, nano::vote_source source) { + debug_assert (!vote->validate ()); // false => valid vote + std::unordered_map results; std::unordered_map> process; std::vector inactive; // Hashes that should be added to inactive vote cache diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 4899da88cd..9338b98d1a 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -110,7 +110,7 @@ void nano::block_processor::stop () std::size_t nano::block_processor::size () const { nano::unique_lock lock{ mutex }; - return queue.total_size (); + return queue.size (); } std::size_t nano::block_processor::size (nano::block_source source) const @@ -321,7 +321,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (should_log ()) { node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", - queue.total_size (), + queue.size (), queue.size ({ nano::block_source::forced })); } @@ -462,7 +462,7 @@ std::unique_ptr nano::block_processor::collect_c nano::lock_guard guard{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", queue.total_size (), 0 })); + composite->add_component (std::make_unique (container_info{ "blocks", queue.size (), 0 })); composite->add_component (std::make_unique (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 })); composite->add_component (queue.collect_container_info ("queue")); return composite; diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index bf7e7f4b4f..738bd43ec7 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -81,18 +81,22 @@ class fair_queue final { // First compare source if (auto cmp = source <=> other.source; cmp != 0) + { return cmp; + } if (maybe_channel && other.maybe_channel) { // Then compare channels by ownership, not by the channel's value or state std::owner_less> less; if (less (*maybe_channel, *other.maybe_channel)) + { return std::strong_ordering::less; + } if (less (*other.maybe_channel, *maybe_channel)) + { return std::strong_ordering::greater; - - return std::strong_ordering::equivalent; + } } else { @@ -104,8 +108,9 @@ class fair_queue final { return std::strong_ordering::less; } - return std::strong_ordering::equivalent; } + + return std::strong_ordering::equivalent; } operator origin () const @@ -181,7 +186,7 @@ class fair_queue final return it == queues.end () ? 0 : it->second.priority; } - size_t total_size () const + size_t size () const { return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { return total + queue.second.size (); @@ -349,8 +354,8 @@ class fair_queue final std::unique_ptr collect_container_info (std::string const & name) { auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); - composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "queues", queues_size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", size (), sizeof (typename decltype (queues)::value_type) })); return composite; } }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 2650b19c27..bb521c160c 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -179,7 +180,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy active{ *active_impl }, rep_crawler (config.rep_crawler, *this), rep_tiers{ ledger, network_params, online_reps, stats, logger }, - vote_processor{ active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers }, + vote_processor_impl{ std::make_unique (config.vote_processor, active, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) }, + vote_processor{ *vote_processor_impl }, warmed_up (0), online_reps (ledger, config), history_impl{ std::make_unique (config.network_params.voting) }, @@ -344,7 +346,12 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }); observers.vote.add ([this] (std::shared_ptr vote, std::shared_ptr const & channel, nano::vote_code code) { + debug_assert (vote != nullptr); debug_assert (code != nano::vote_code::invalid); + if (channel == nullptr) + { + return; // Channel expired when waiting for vote to be processed + } bool active_in_rep_crawler = rep_crawler.process (vote, channel); if (active_in_rep_crawler) { @@ -567,7 +574,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.workers, "workers")); composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); - composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); + composite->add_component (node.vote_processor.collect_container_info ("vote_processor")); composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler")); composite->add_component (node.block_processor.collect_container_info ("block_processor")); composite->add_component (collect_container_info (node.online_reps, "online_reps")); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index e08f2aabe4..ea8d801956 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -47,6 +46,7 @@ namespace nano class active_transactions; class confirming_set; class node; +class vote_processor; class work_pool; class peer_history; @@ -174,7 +174,8 @@ class node final : public std::enable_shared_from_this nano::online_reps online_reps; nano::rep_crawler rep_crawler; nano::rep_tiers rep_tiers; - nano::vote_processor vote_processor; + std::unique_ptr vote_processor_impl; + nano::vote_processor & vote_processor; unsigned warmed_up; std::unique_ptr history_impl; nano::local_vote_history & history; diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index fb01717bfa..0f9c0b3ae0 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -218,6 +218,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const block_processor.serialize (block_processor_l); toml.put_child ("block_processor", block_processor_l); + nano::tomlconfig vote_processor_l; + vote_processor.serialize (vote_processor_l); + toml.put_child ("vote_processor", vote_processor_l); + nano::tomlconfig peer_history_l; peer_history.serialize (peer_history_l); toml.put_child ("peer_history", peer_history_l); @@ -303,6 +307,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) block_processor.deserialize (config_l); } + if (toml.has_key ("vote_processor")) + { + auto config_l = toml.get_required_child ("vote_processor"); + vote_processor.deserialize (config_l); + } + if (toml.has_key ("peer_history")) { auto config_l = toml.get_required_child ("peer_history"); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 456c652df3..82cf6d6522 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include #include @@ -42,6 +43,7 @@ enum class frontiers_confirmation_mode : uint8_t class node_config { public: + // TODO: Users of this class rely on the default copy consturctor. This prevents using unique_ptrs with forward declared types. node_config (nano::network_params & network_params = nano::dev::network_params); node_config (const std::optional &, nano::network_params & network_params = nano::dev::network_params); @@ -139,6 +141,7 @@ class node_config nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; nano::block_processor_config block_processor; + nano::vote_processor_config vote_processor; nano::peer_history_config peer_history; public: diff --git a/nano/node/rep_tiers.cpp b/nano/node/rep_tiers.cpp index 7b0cffb47d..df65aea884 100644 --- a/nano/node/rep_tiers.cpp +++ b/nano/node/rep_tiers.cpp @@ -5,6 +5,8 @@ #include #include +#include + using namespace std::chrono_literals; nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) : @@ -141,4 +143,11 @@ std::unique_ptr nano::rep_tiers::collect_contain composite->add_component (std::make_unique (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) })); composite->add_component (std::make_unique (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) })); return composite; +} + +nano::stat::detail nano::to_stat_detail (nano::rep_tier tier) +{ + auto value = magic_enum::enum_cast (magic_enum::enum_name (tier)); + debug_assert (value); + return value.value_or (nano::stat::detail{}); } \ No newline at end of file diff --git a/nano/node/rep_tiers.hpp b/nano/node/rep_tiers.hpp index ce989d8e0a..e9134a46e9 100644 --- a/nano/node/rep_tiers.hpp +++ b/nano/node/rep_tiers.hpp @@ -26,6 +26,8 @@ enum class rep_tier tier_3, // (> 5%) of online stake }; +nano::stat::detail to_stat_detail (rep_tier); + class rep_tiers final { public: diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index cb2610d0dc..ac161dc5e5 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -14,19 +14,47 @@ using namespace std::chrono_literals; -nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : +nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : + config{ config_a }, active{ active_a }, observers{ observers_a }, stats{ stats_a }, - config{ config_a }, logger{ logger_a }, online_reps{ online_reps_a }, rep_crawler{ rep_crawler_a }, ledger{ ledger_a }, network_params{ network_params_a }, - rep_tiers{ rep_tiers_a }, - max_votes{ flags_a.vote_processor_capacity } + rep_tiers{ rep_tiers_a } { + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + case nano::rep_tier::tier_2: + case nano::rep_tier::tier_1: + return config.max_pr_queue; + case nano::rep_tier::none: + return config.max_non_pr_queue; + } + debug_assert (false); + return size_t{ 0 }; + }; + + queue.priority_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return config.pr_priority * config.pr_priority * config.pr_priority; + case nano::rep_tier::tier_2: + return config.pr_priority * config.pr_priority; + case nano::rep_tier::tier_1: + return config.pr_priority; + case nano::rep_tier::none: + return size_t{ 1 }; + } + debug_assert (false); + return size_t{ 0 }; + }; } nano::vote_processor::~vote_processor () @@ -58,111 +86,86 @@ void nano::vote_processor::stop () } } -void nano::vote_processor::run () +bool nano::vote_processor::vote (std::shared_ptr const & vote, std::shared_ptr const & channel) { - nano::timer elapsed; - bool log_this_iteration; + debug_assert (channel != nullptr); - nano::unique_lock lock{ mutex }; - while (!stopped) + auto const tier = rep_tiers.tier (vote->account); + + bool added = false; { - if (!votes.empty ()) - { - decltype (votes) votes_l; - votes_l.swap (votes); - lock.unlock (); - condition.notify_all (); - - log_this_iteration = false; - // TODO: This is a temporary measure to prevent spamming the logs until we can implement a better solution - if (votes_l.size () > 1024 * 4) - { - /* - * Only log the timing information for this iteration if - * there are a sufficient number of items for it to be relevant - */ - log_this_iteration = true; - elapsed.restart (); - } - verify_votes (votes_l); - total_processed += votes_l.size (); - - if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100)) - { - logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", - votes_l.size (), - elapsed.value ().count (), - ((votes_l.size () * 1000ULL) / elapsed.value ().count ())); - } + nano::lock_guard guard{ mutex }; + added = queue.push (vote, { tier, channel }); + } + if (added) + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process); + stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier)); - lock.lock (); - } - else - { - condition.wait (lock); - } + condition.notify_all (); + } + else + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::overfill); + stats.inc (nano::stat::type::vote_processor_overfill, to_stat_detail (tier)); } + return added; } -bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) +void nano::vote_processor::run () { - debug_assert (channel_a != nullptr); - bool process (false); nano::unique_lock lock{ mutex }; - if (!stopped) + while (!stopped) { - auto tier = rep_tiers.tier (vote_a->account); + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::loop); - // Level 0 (< 0.1%) - if (votes.size () < 6.0 / 9.0 * max_votes) - { - process = true; - } - // Level 1 (0.1-1%) - else if (votes.size () < 7.0 / 9.0 * max_votes) - { - process = (tier == nano::rep_tier::tier_1); - } - // Level 2 (1-5%) - else if (votes.size () < 8.0 / 9.0 * max_votes) - { - process = (tier == nano::rep_tier::tier_2); - } - // Level 3 (> 5%) - else if (votes.size () < max_votes) - { - process = (tier == nano::rep_tier::tier_3); - } - if (process) + if (!queue.empty ()) { - votes.emplace_back (vote_a, channel_a); - lock.unlock (); - condition.notify_all (); - // Lock no longer required - } - else - { - stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow); + run_batch (lock); + debug_assert (!lock.owns_lock ()); + + lock.lock (); } + + condition.wait (lock, [&] { + return stopped || !queue.empty (); + }); } - return !process; } -void nano::vote_processor::verify_votes (decltype (votes) const & votes_a) +void nano::vote_processor::run_batch (nano::unique_lock & lock) { - for (auto const & vote : votes_a) + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); + + nano::timer timer; + + size_t const max_batch_size = 1024 * 4; + auto batch = queue.next_batch (max_batch_size); + + lock.unlock (); + + for (auto const & [vote, origin] : batch) { - if (!nano::validate_message (vote.first->account, vote.first->hash (), vote.first->signature)) - { - vote_blocking (vote.first, vote.second, true); - } + vote_blocking (vote, origin.channel); + } + + total_processed += batch.size (); + + if (batch.size () == max_batch_size && timer.stop () > 100ms) + { + logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", + batch.size (), + timer.value ().count (), + ((batch.size () * 1000ULL) / timer.value ().count ())); } } -nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel, bool validated) +nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel) { auto result = nano::vote_code::invalid; - if (validated || !vote->validate ()) + if (!vote->validate ()) // false => valid vote { auto vote_results = active.vote (vote); @@ -188,40 +191,48 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr return result; } -void nano::vote_processor::flush () -{ - nano::unique_lock lock{ mutex }; - auto const cutoff = total_processed.load (std::memory_order_relaxed) + votes.size (); - bool success = condition.wait_for (lock, 60s, [this, &cutoff] () { - return stopped || votes.empty () || total_processed.load (std::memory_order_relaxed) >= cutoff; - }); - if (!success) - { - logger.error (nano::log::type::vote_processor, "Flush timeout"); - debug_assert (false && "vote_processor::flush timeout while waiting for flush"); - } -} - std::size_t nano::vote_processor::size () const { nano::lock_guard guard{ mutex }; - return votes.size (); + return queue.size (); } bool nano::vote_processor::empty () const { nano::lock_guard guard{ mutex }; - return votes.empty (); + return queue.empty (); } -std::unique_ptr nano::collect_container_info (vote_processor & vote_processor, std::string const & name) +std::unique_ptr nano::vote_processor::collect_container_info (std::string const & name) const { std::size_t votes_count; { - nano::lock_guard guard{ vote_processor.mutex }; - votes_count = vote_processor.votes.size (); + nano::lock_guard guard{ mutex }; + votes_count = queue.size (); } auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (vote_processor.votes)::value_type) })); + composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) })); return composite; } + +/* + * vote_processor_config + */ + +nano::error nano::vote_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("max_pr_queue", max_pr_queue, "Maximum number of votes to queue from principal representatives. \ntype:uint64"); + toml.put ("max_non_pr_queue", max_non_pr_queue, "Maximum number of votes to queue from non-principal representatives. \ntype:uint64"); + toml.put ("pr_priority", pr_priority, "Priority for votes from principal representatives. Higher priority gets processed more frequently. Non-principal representatives have a baseline priority of 1. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("max_pr_queue", max_pr_queue); + toml.get ("max_non_pr_queue", max_non_pr_queue); + toml.get ("pr_priority", pr_priority); + + return toml.get_error (); +} diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index b7498180fc..2f2d6b4f05 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -32,34 +34,47 @@ namespace transport { class channel; } +} + +namespace nano +{ +class vote_processor_config final +{ +public: + nano::error serialize (nano::tomlconfig & toml) const; + nano::error deserialize (nano::tomlconfig & toml); + +public: + size_t max_pr_queue{ 256 }; + size_t max_non_pr_queue{ 32 }; + size_t pr_priority{ 3 }; +}; class vote_processor final { public: - vote_processor (nano::active_transactions &, nano::node_observers &, nano::stats &, nano::node_config &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); + vote_processor (vote_processor_config const &, nano::active_transactions &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); ~vote_processor (); void start (); void stop (); - /** Returns false if the vote was processed */ + /** @returns true if the vote was queued for processing */ bool vote (std::shared_ptr const &, std::shared_ptr const &); - /** Note: node.active.mutex lock is required */ - nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &, bool = false); + nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &); - /** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase) - * is processed or the queue is empty (end condition or cutoff's guard, as it is positioned ahead) */ - void flush (); std::size_t size () const; bool empty () const; + std::unique_ptr collect_container_info (std::string const & name) const; + std::atomic total_processed{ 0 }; private: // Dependencies + vote_processor_config const & config; nano::active_transactions & active; nano::node_observers & observers; nano::stats & stats; - nano::node_config & config; nano::logger & logger; nano::online_reps & online_reps; nano::rep_crawler & rep_crawler; @@ -69,20 +84,15 @@ class vote_processor final private: void run (); - void verify_votes (std::deque, std::shared_ptr>> const &); + void run_batch (nano::unique_lock &); private: - std::size_t const max_votes; - std::deque, std::shared_ptr>> votes; + nano::fair_queue, nano::rep_tier> queue; private: bool stopped{ false }; nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; std::thread thread; - - friend std::unique_ptr collect_container_info (vote_processor & vote_processor, std::string const & name); }; - -std::unique_ptr collect_container_info (vote_processor & vote_processor, std::string const & name); } diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 464c170cda..7b299ab6ce 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -1061,6 +1061,7 @@ nano::websocket_server::websocket_server (nano::websocket::config & config_a, na }); observers.vote.add ([this] (std::shared_ptr vote_a, std::shared_ptr const & channel_a, nano::vote_code code_a) { + debug_assert (vote_a != nullptr); if (server->any_subscriber (nano::websocket::topic::vote)) { nano::websocket::message_builder builder; diff --git a/nano/slow_test/vote_processor.cpp b/nano/slow_test/vote_processor.cpp index 4dc3c09992..938aa2566a 100644 --- a/nano/slow_test/vote_processor.cpp +++ b/nano/slow_test/vote_processor.cpp @@ -32,16 +32,6 @@ TEST (vote_processor, producer_consumer) } }; - auto consumer = [&node, &number_of_votes] () -> void { - while (node.vote_processor.total_processed.load () < number_of_votes) - { - if (node.vote_processor.size () >= number_of_votes / 100) - { - node.vote_processor.flush (); - } - } - }; - auto monitor = [&node, &number_of_votes, &producer_wins, &consumer_wins] () -> void { while (node.vote_processor.total_processed.load () < number_of_votes) { @@ -64,7 +54,6 @@ TEST (vote_processor, producer_consumer) producers.emplace_back (producer); } - std::thread consumer_thread{ consumer }; std::thread monitor_thread{ monitor }; ASSERT_TIMELY (30s, node.vote_processor.total_processed.load () >= number_of_votes); @@ -73,7 +62,6 @@ TEST (vote_processor, producer_consumer) { producer.join (); } - consumer_thread.join (); monitor_thread.join (); ASSERT_GT (producer_wins, consumer_wins);