From 7a24d2932c526cdf309a59bc09f900153112174d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 16:47:36 +0200 Subject: [PATCH 01/12] Formatting --- nano/node/fair_queue.hpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index e13ed847c0..1e6f2e32fa 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -81,18 +81,22 @@ class fair_queue final { // First compare source if (auto cmp = source <=> other.source; cmp != 0) + { return cmp; + } if (maybe_channel && other.maybe_channel) { // Then compare channels by ownership, not by the channel's value or state std::owner_less> less; if (less (*maybe_channel, *other.maybe_channel)) + { return std::strong_ordering::less; + } if (less (*other.maybe_channel, *maybe_channel)) + { return std::strong_ordering::greater; - - return std::strong_ordering::equivalent; + } } else { @@ -104,8 +108,9 @@ class fair_queue final { return std::strong_ordering::less; } - return std::strong_ordering::equivalent; } + + return std::strong_ordering::equivalent; } operator origin () const From 6de75b50850ee0cdccf90a28f459dfa477757d3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:00:27 +0200 Subject: [PATCH 02/12] Cleanups --- nano/core_test/vote_processor.cpp | 4 ++-- nano/node/vote_processor.cpp | 38 ++++++++++++++++++------------- nano/node/vote_processor.hpp | 8 +++++-- 3 files changed, 30 insertions(+), 20 deletions(-) diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 115dd347ef..6236388c24 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -102,7 +102,7 @@ TEST (vote_processor, no_capacity) nano::keypair key; auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0); auto channel (std::make_shared (node, node)); - ASSERT_TRUE (node.vote_processor.vote (vote, channel)); + ASSERT_FALSE (node.vote_processor.vote (vote, channel)); } TEST (vote_processor, overflow) @@ -121,7 +121,7 @@ TEST (vote_processor, overflow) size_t const total{ 1000 }; for (unsigned i = 0; i < total; ++i) { - if (node.vote_processor.vote (vote, channel)) + if (!node.vote_processor.vote (vote, channel)) { ++not_processed; } diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index cb2610d0dc..c84a81584a 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -84,7 +84,8 @@ void nano::vote_processor::run () log_this_iteration = true; elapsed.restart (); } - verify_votes (votes_l); + + verify_and_process_votes (votes_l); total_processed += votes_l.size (); if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100)) @@ -107,48 +108,53 @@ void nano::vote_processor::run () bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) { debug_assert (channel_a != nullptr); - bool process (false); + nano::unique_lock lock{ mutex }; - if (!stopped) - { - auto tier = rep_tiers.tier (vote_a->account); - // Level 0 (< 0.1%) + auto should_process = [this] (auto tier) { if (votes.size () < 6.0 / 9.0 * max_votes) { - process = true; + return true; } // Level 1 (0.1-1%) - else if (votes.size () < 7.0 / 9.0 * max_votes) + if (votes.size () < 7.0 / 9.0 * max_votes) { - process = (tier == nano::rep_tier::tier_1); + return (tier == nano::rep_tier::tier_1); } // Level 2 (1-5%) - else if (votes.size () < 8.0 / 9.0 * max_votes) + if (votes.size () < 8.0 / 9.0 * max_votes) { - process = (tier == nano::rep_tier::tier_2); + return (tier == nano::rep_tier::tier_2); } // Level 3 (> 5%) - else if (votes.size () < max_votes) + if (votes.size () < max_votes) { - process = (tier == nano::rep_tier::tier_3); + return (tier == nano::rep_tier::tier_3); } - if (process) + return false; + }; + + if (!stopped) + { + auto tier = rep_tiers.tier (vote_a->account); + if (should_process (tier)) { votes.emplace_back (vote_a, channel_a); lock.unlock (); condition.notify_all (); // Lock no longer required + + return true; // Processed } else { stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow); } } - return !process; + return false; // Not processed } -void nano::vote_processor::verify_votes (decltype (votes) const & votes_a) +void nano::vote_processor::verify_and_process_votes (decltype (votes) const & votes_a) { for (auto const & vote : votes_a) { diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index b7498180fc..2bf06d12ed 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -32,7 +33,10 @@ namespace transport { class channel; } +} +namespace nano +{ class vote_processor final { public: @@ -44,7 +48,7 @@ class vote_processor final /** Returns false if the vote was processed */ bool vote (std::shared_ptr const &, std::shared_ptr const &); - /** Note: node.active.mutex lock is required */ + nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &, bool = false); /** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase) @@ -69,7 +73,7 @@ class vote_processor final private: void run (); - void verify_votes (std::deque, std::shared_ptr>> const &); + void verify_and_process_votes (std::deque, std::shared_ptr>> const &); private: std::size_t const max_votes; From e6eed53b52f69208bd1c63f8715fe258ab3ee31f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:06:10 +0200 Subject: [PATCH 03/12] Renaming --- nano/core_test/fair_queue.cpp | 18 +++++++++--------- nano/node/blockprocessor.cpp | 6 +++--- nano/node/fair_queue.hpp | 6 +++--- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/nano/core_test/fair_queue.cpp b/nano/core_test/fair_queue.cpp index 178365b171..d60dda7855 100644 --- a/nano/core_test/fair_queue.cpp +++ b/nano/core_test/fair_queue.cpp @@ -25,7 +25,7 @@ enum class source_enum TEST (fair_queue, construction) { nano::fair_queue queue; - ASSERT_EQ (queue.total_size (), 0); + ASSERT_EQ (queue.size (), 0); ASSERT_TRUE (queue.empty ()); } @@ -36,7 +36,7 @@ TEST (fair_queue, process_one) queue.max_size_query = [] (auto const &) { return 1; }; queue.push (7, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.size (), 1); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 1); ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0); @@ -58,7 +58,7 @@ TEST (fair_queue, fifo) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::live }); queue.push (9, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 3); @@ -90,7 +90,7 @@ TEST (fair_queue, process_many) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::bootstrap }); queue.push (9, { source_enum::unchecked }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 3); ASSERT_EQ (queue.size ({ source_enum::live }), 1); ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1); @@ -124,7 +124,7 @@ TEST (fair_queue, max_queue_size) queue.push (7, { source_enum::live }); queue.push (8, { source_enum::live }); queue.push (9, { source_enum::live }); - ASSERT_EQ (queue.total_size (), 2); + ASSERT_EQ (queue.size (), 2); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live }), 2); @@ -169,7 +169,7 @@ TEST (fair_queue, round_robin_with_priority) queue.push (13, { source_enum::unchecked }); queue.push (14, { source_enum::unchecked }); queue.push (15, { source_enum::unchecked }); - ASSERT_EQ (queue.total_size (), 9); + ASSERT_EQ (queue.size (), 9); // Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source ASSERT_EQ (queue.next ().second.source, source_enum::live); @@ -201,7 +201,7 @@ TEST (fair_queue, source_channel) queue.push (7, { source_enum::live, channel2 }); queue.push (8, { source_enum::live, channel3 }); queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries - ASSERT_EQ (queue.total_size (), 4); + ASSERT_EQ (queue.size (), 4); ASSERT_EQ (queue.queues_size (), 3); // Each pair is a separate queue ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2); @@ -253,7 +253,7 @@ TEST (fair_queue, cleanup) queue.push (7, { source_enum::live, channel1 }); queue.push (8, { source_enum::live, channel2 }); queue.push (9, { source_enum::live, channel3 }); - ASSERT_EQ (queue.total_size (), 3); + ASSERT_EQ (queue.size (), 3); ASSERT_EQ (queue.queues_size (), 3); ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1); @@ -267,7 +267,7 @@ TEST (fair_queue, cleanup) ASSERT_TRUE (queue.periodic_update ()); // Only channel 3 should remain - ASSERT_EQ (queue.total_size (), 1); + ASSERT_EQ (queue.size (), 1); ASSERT_EQ (queue.queues_size (), 1); ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 8443493557..e7703609f8 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -111,7 +111,7 @@ void nano::block_processor::stop () std::size_t nano::block_processor::size () const { nano::unique_lock lock{ mutex }; - return queue.total_size (); + return queue.size (); } std::size_t nano::block_processor::size (nano::block_source source) const @@ -322,7 +322,7 @@ auto nano::block_processor::process_batch (nano::unique_lock & lock if (should_log ()) { node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue", - queue.total_size (), + queue.size (), queue.size ({ nano::block_source::forced })); } @@ -463,7 +463,7 @@ std::unique_ptr nano::block_processor::collect_c nano::lock_guard guard{ mutex }; auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "blocks", queue.total_size (), 0 })); + composite->add_component (std::make_unique (container_info{ "blocks", queue.size (), 0 })); composite->add_component (std::make_unique (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 })); composite->add_component (queue.collect_container_info ("queue")); return composite; diff --git a/nano/node/fair_queue.hpp b/nano/node/fair_queue.hpp index 1e6f2e32fa..6620f3fb8d 100644 --- a/nano/node/fair_queue.hpp +++ b/nano/node/fair_queue.hpp @@ -186,7 +186,7 @@ class fair_queue final return it == queues.end () ? 0 : it->second.priority; } - size_t total_size () const + size_t size () const { return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) { return total + queue.second.size (); @@ -354,8 +354,8 @@ class fair_queue final std::unique_ptr collect_container_info (std::string const & name) { auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) })); - composite->add_component (std::make_unique (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "queues", queues_size (), sizeof (typename decltype (queues)::value_type) })); + composite->add_component (std::make_unique (container_info{ "total_size", size (), sizeof (typename decltype (queues)::value_type) })); return composite; } }; From ea38b8e1b62aba27ce71a9e895bc2a848ba212f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:13:33 +0200 Subject: [PATCH 04/12] Remove flush --- nano/core_test/active_transactions.cpp | 4 +--- nano/core_test/vote_processor.cpp | 14 -------------- nano/node/vote_processor.cpp | 14 -------------- nano/node/vote_processor.hpp | 3 --- nano/slow_test/vote_processor.cpp | 12 ------------ 5 files changed, 1 insertion(+), 46 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 086195ef74..ef66d532ed 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -758,7 +758,6 @@ TEST (active_transactions, republish_winner) ASSERT_NE (nullptr, election); auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork }); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); ASSERT_TIMELY (5s, election->confirmed ()); ASSERT_EQ (fork->hash (), election->status.winner->hash ()); ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ())); @@ -937,7 +936,7 @@ TEST (active_transactions, fork_replacement_tally) .build (); auto vote = nano::test::make_vote (keys[i], { fork }, 0, 0); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); + ASSERT_TIMELY (5s, node1.vote_cache.find (fork->hash ()).size () > 0); node1.process_active (fork); } @@ -980,7 +979,6 @@ TEST (active_transactions, fork_replacement_tally) // Process vote for correct block & replace existing lowest tally block auto vote = nano::test::make_vote (nano::dev::genesis_key, { send_last }, 0, 0); node1.vote_processor.vote (vote, std::make_shared (node1, node1)); - node1.vote_processor.flush (); // ensure vote arrives before the block ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ()); node1.network.publish_filter.clear (); diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 6236388c24..85a94c75c5 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -58,20 +58,6 @@ TEST (vote_processor, codes) ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel)); } -TEST (vote_processor, flush) -{ - nano::test::system system (1); - auto & node (*system.nodes[0]); - auto channel (std::make_shared (node, node)); - for (unsigned i = 0; i < 2000; ++i) - { - auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * (1 + i), 0, std::vector{ nano::dev::genesis->hash () }); - node.vote_processor.vote (vote, channel); - } - node.vote_processor.flush (); - ASSERT_TRUE (node.vote_processor.empty ()); -} - TEST (vote_processor, invalid_signature) { nano::test::system system{ 1 }; diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index c84a81584a..dfff6faef6 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -194,20 +194,6 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr return result; } -void nano::vote_processor::flush () -{ - nano::unique_lock lock{ mutex }; - auto const cutoff = total_processed.load (std::memory_order_relaxed) + votes.size (); - bool success = condition.wait_for (lock, 60s, [this, &cutoff] () { - return stopped || votes.empty () || total_processed.load (std::memory_order_relaxed) >= cutoff; - }); - if (!success) - { - logger.error (nano::log::type::vote_processor, "Flush timeout"); - debug_assert (false && "vote_processor::flush timeout while waiting for flush"); - } -} - std::size_t nano::vote_processor::size () const { nano::lock_guard guard{ mutex }; diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 2bf06d12ed..5efdbe6e2f 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -51,9 +51,6 @@ class vote_processor final nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &, bool = false); - /** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase) - * is processed or the queue is empty (end condition or cutoff's guard, as it is positioned ahead) */ - void flush (); std::size_t size () const; bool empty () const; diff --git a/nano/slow_test/vote_processor.cpp b/nano/slow_test/vote_processor.cpp index 4dc3c09992..938aa2566a 100644 --- a/nano/slow_test/vote_processor.cpp +++ b/nano/slow_test/vote_processor.cpp @@ -32,16 +32,6 @@ TEST (vote_processor, producer_consumer) } }; - auto consumer = [&node, &number_of_votes] () -> void { - while (node.vote_processor.total_processed.load () < number_of_votes) - { - if (node.vote_processor.size () >= number_of_votes / 100) - { - node.vote_processor.flush (); - } - } - }; - auto monitor = [&node, &number_of_votes, &producer_wins, &consumer_wins] () -> void { while (node.vote_processor.total_processed.load () < number_of_votes) { @@ -64,7 +54,6 @@ TEST (vote_processor, producer_consumer) producers.emplace_back (producer); } - std::thread consumer_thread{ consumer }; std::thread monitor_thread{ monitor }; ASSERT_TIMELY (30s, node.vote_processor.total_processed.load () >= number_of_votes); @@ -73,7 +62,6 @@ TEST (vote_processor, producer_consumer) { producer.join (); } - consumer_thread.join (); monitor_thread.join (); ASSERT_GT (producer_wins, consumer_wins); From 6f5bd2345c9d4f79413ad810536080813660a6b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:19:27 +0200 Subject: [PATCH 05/12] Collect container info member --- nano/node/node.cpp | 2 +- nano/node/vote_processor.cpp | 8 ++++---- nano/node/vote_processor.hpp | 6 ++---- 3 files changed, 7 insertions(+), 9 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 10a710f608..9c7b2be9fa 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -563,7 +563,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.workers, "workers")); composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); - composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); + composite->add_component (node.vote_processor.collect_container_info ("vote_processor")); composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler")); composite->add_component (node.block_processor.collect_container_info ("block_processor")); composite->add_component (collect_container_info (node.online_reps, "online_reps")); diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index dfff6faef6..cbb2e75446 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -206,14 +206,14 @@ bool nano::vote_processor::empty () const return votes.empty (); } -std::unique_ptr nano::collect_container_info (vote_processor & vote_processor, std::string const & name) +std::unique_ptr nano::vote_processor::collect_container_info (std::string const & name) const { std::size_t votes_count; { - nano::lock_guard guard{ vote_processor.mutex }; - votes_count = vote_processor.votes.size (); + nano::lock_guard guard{ mutex }; + votes_count = votes.size (); } auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (vote_processor.votes)::value_type) })); + composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (votes)::value_type) })); return composite; } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 5efdbe6e2f..2dd7b0c9df 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -54,6 +54,8 @@ class vote_processor final std::size_t size () const; bool empty () const; + std::unique_ptr collect_container_info (std::string const & name) const; + std::atomic total_processed{ 0 }; private: // Dependencies @@ -81,9 +83,5 @@ class vote_processor final nano::condition_variable condition; mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; std::thread thread; - - friend std::unique_ptr collect_container_info (vote_processor & vote_processor, std::string const & name); }; - -std::unique_ptr collect_container_info (vote_processor & vote_processor, std::string const & name); } From 504effcc638d3eb29f406bb0cee418f5ea215367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 15:38:17 +0200 Subject: [PATCH 06/12] Always verify votes --- nano/core_test/vote_processor.cpp | 5 +---- nano/node/active_transactions.cpp | 2 ++ nano/node/vote_processor.cpp | 21 +++++++-------------- nano/node/vote_processor.hpp | 4 +--- 4 files changed, 11 insertions(+), 21 deletions(-) diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 85a94c75c5..c909d2e214 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -30,10 +30,7 @@ TEST (vote_processor, codes) auto channel (std::make_shared (node, node)); // Invalid signature - ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, false)); - - // Hint of pre-validation - ASSERT_NE (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, true)); + ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel)); // No ongoing election (vote goes to vote cache) ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel)); diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 5a95e0b8d6..3d0e11a3c3 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -452,6 +452,8 @@ bool nano::active_transactions::trigger_vote_cache (nano::block_hash hash) // Validate a vote and apply it to the current election if one exists std::unordered_map nano::active_transactions::vote (std::shared_ptr const & vote, nano::vote_source source) { + debug_assert (!vote->validate ()); // false => valid vote + std::unordered_map results; std::unordered_map> process; std::vector inactive; // Hashes that should be added to inactive vote cache diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index cbb2e75446..d8c306d54a 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -85,7 +85,11 @@ void nano::vote_processor::run () elapsed.restart (); } - verify_and_process_votes (votes_l); + for (auto const & [vote, channel] : votes_l) + { + vote_blocking (vote, channel); + } + total_processed += votes_l.size (); if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100)) @@ -154,21 +158,10 @@ bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std return false; // Not processed } -void nano::vote_processor::verify_and_process_votes (decltype (votes) const & votes_a) -{ - for (auto const & vote : votes_a) - { - if (!nano::validate_message (vote.first->account, vote.first->hash (), vote.first->signature)) - { - vote_blocking (vote.first, vote.second, true); - } - } -} - -nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel, bool validated) +nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel) { auto result = nano::vote_code::invalid; - if (validated || !vote->validate ()) + if (!vote->validate ()) // false => valid vote { auto vote_results = active.vote (vote); diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 2dd7b0c9df..368e16e1fb 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -48,8 +48,7 @@ class vote_processor final /** Returns false if the vote was processed */ bool vote (std::shared_ptr const &, std::shared_ptr const &); - - nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &, bool = false); + nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &); std::size_t size () const; bool empty () const; @@ -72,7 +71,6 @@ class vote_processor final private: void run (); - void verify_and_process_votes (std::deque, std::shared_ptr>> const &); private: std::size_t const max_votes; From 91e3c8e310d69459d65d6208926bbd4c289ab1b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 4 Apr 2024 16:21:45 +0200 Subject: [PATCH 07/12] Fair queue for vote processor --- nano/core_test/vote_processor.cpp | 14 +-- nano/lib/stats_enums.hpp | 8 ++ nano/node/node.cpp | 5 + nano/node/rep_tiers.cpp | 9 ++ nano/node/rep_tiers.hpp | 2 + nano/node/vote_processor.cpp | 174 ++++++++++++++++-------------- nano/node/vote_processor.hpp | 6 +- nano/node/websocket.cpp | 1 + 8 files changed, 122 insertions(+), 97 deletions(-) diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index c909d2e214..9e0214c094 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -76,18 +76,6 @@ TEST (vote_processor, invalid_signature) ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ()); } -TEST (vote_processor, no_capacity) -{ - nano::test::system system; - nano::node_flags node_flags; - node_flags.vote_processor_capacity = 0; - auto & node (*system.add_node (node_flags)); - nano::keypair key; - auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0); - auto channel (std::make_shared (node, node)); - ASSERT_FALSE (node.vote_processor.vote (vote, channel)); -} - TEST (vote_processor, overflow) { nano::test::system system; @@ -111,7 +99,7 @@ TEST (vote_processor, overflow) } ASSERT_GT (not_processed, 0); ASSERT_LT (not_processed, total); - ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow)); + ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote_processor, nano::stat::detail::overfill)); // check that it did not timeout ASSERT_LT (std::chrono::system_clock::now () - start_time, 10s); diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 709902d59e..049b4d299b 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -20,6 +20,9 @@ enum class type : uint8_t network, tcp_server, vote, + vote_processor, + vote_processor_tier, + vote_processor_overfill, election, http_callback, ipc, @@ -377,6 +380,11 @@ enum class detail : uint8_t erase_old, erase_confirmed, + // rep tiers + tier_1, + tier_2, + tier_3, + _last // Must be the last enum }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 9c7b2be9fa..a321549ac5 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -341,7 +341,12 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy }); observers.vote.add ([this] (std::shared_ptr vote, std::shared_ptr const & channel, nano::vote_code code) { + debug_assert (vote != nullptr); debug_assert (code != nano::vote_code::invalid); + if (channel == nullptr) + { + return; // Channel expired when waiting for vote to be processed + } bool active_in_rep_crawler = rep_crawler.process (vote, channel); if (active_in_rep_crawler) { diff --git a/nano/node/rep_tiers.cpp b/nano/node/rep_tiers.cpp index 7b0cffb47d..df65aea884 100644 --- a/nano/node/rep_tiers.cpp +++ b/nano/node/rep_tiers.cpp @@ -5,6 +5,8 @@ #include #include +#include + using namespace std::chrono_literals; nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) : @@ -141,4 +143,11 @@ std::unique_ptr nano::rep_tiers::collect_contain composite->add_component (std::make_unique (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) })); composite->add_component (std::make_unique (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) })); return composite; +} + +nano::stat::detail nano::to_stat_detail (nano::rep_tier tier) +{ + auto value = magic_enum::enum_cast (magic_enum::enum_name (tier)); + debug_assert (value); + return value.value_or (nano::stat::detail{}); } \ No newline at end of file diff --git a/nano/node/rep_tiers.hpp b/nano/node/rep_tiers.hpp index ce989d8e0a..e9134a46e9 100644 --- a/nano/node/rep_tiers.hpp +++ b/nano/node/rep_tiers.hpp @@ -26,6 +26,8 @@ enum class rep_tier tier_3, // (> 5%) of online stake }; +nano::stat::detail to_stat_detail (rep_tier); + class rep_tiers final { public: diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index d8c306d54a..01874811c9 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -27,6 +27,37 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano rep_tiers{ rep_tiers_a }, max_votes{ flags_a.vote_processor_capacity } { + queue.max_size_query = [this] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return 256; + case nano::rep_tier::tier_2: + return 128; + case nano::rep_tier::tier_1: + return 64; + case nano::rep_tier::none: + return 32; + } + debug_assert (false); + return 0; + }; + + queue.priority_query = [] (auto const & origin) { + switch (origin.source) + { + case nano::rep_tier::tier_3: + return 9; + case nano::rep_tier::tier_2: + return 6; + case nano::rep_tier::tier_1: + return 3; + case nano::rep_tier::none: + return 1; + } + debug_assert (false); + return 0; + }; } nano::vote_processor::~vote_processor () @@ -58,104 +89,81 @@ void nano::vote_processor::stop () } } -void nano::vote_processor::run () +bool nano::vote_processor::vote (std::shared_ptr const & vote, std::shared_ptr const & channel) { - nano::timer elapsed; - bool log_this_iteration; + debug_assert (channel != nullptr); + + auto const tier = rep_tiers.tier (vote->account); + + bool added = false; + { + nano::lock_guard guard{ mutex }; + added = queue.push ({ vote, channel }, tier); + } + if (added) + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::process); + stats.inc (nano::stat::type::vote_processor_tier, to_stat_detail (tier)); + condition.notify_all (); + } + else + { + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::overfill); + stats.inc (nano::stat::type::vote_processor_overfill, to_stat_detail (tier)); + } + return added; +} + +void nano::vote_processor::run () +{ nano::unique_lock lock{ mutex }; while (!stopped) { - if (!votes.empty ()) + stats.inc (nano::stat::type::vote_processor, nano::stat::detail::loop); + + if (!queue.empty ()) { - decltype (votes) votes_l; - votes_l.swap (votes); - lock.unlock (); - condition.notify_all (); - - log_this_iteration = false; - // TODO: This is a temporary measure to prevent spamming the logs until we can implement a better solution - if (votes_l.size () > 1024 * 4) - { - /* - * Only log the timing information for this iteration if - * there are a sufficient number of items for it to be relevant - */ - log_this_iteration = true; - elapsed.restart (); - } - - for (auto const & [vote, channel] : votes_l) - { - vote_blocking (vote, channel); - } - - total_processed += votes_l.size (); - - if (log_this_iteration && elapsed.stop () > std::chrono::milliseconds (100)) - { - logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", - votes_l.size (), - elapsed.value ().count (), - ((votes_l.size () * 1000ULL) / elapsed.value ().count ())); - } + run_batch (lock); + debug_assert (!lock.owns_lock ()); lock.lock (); } - else - { - condition.wait (lock); - } + + condition.wait (lock, [&] { + return stopped || !queue.empty (); + }); } } -bool nano::vote_processor::vote (std::shared_ptr const & vote_a, std::shared_ptr const & channel_a) +void nano::vote_processor::run_batch (nano::unique_lock & lock) { - debug_assert (channel_a != nullptr); + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!queue.empty ()); - nano::unique_lock lock{ mutex }; + nano::timer timer; - auto should_process = [this] (auto tier) { - if (votes.size () < 6.0 / 9.0 * max_votes) - { - return true; - } - // Level 1 (0.1-1%) - if (votes.size () < 7.0 / 9.0 * max_votes) - { - return (tier == nano::rep_tier::tier_1); - } - // Level 2 (1-5%) - if (votes.size () < 8.0 / 9.0 * max_votes) - { - return (tier == nano::rep_tier::tier_2); - } - // Level 3 (> 5%) - if (votes.size () < max_votes) - { - return (tier == nano::rep_tier::tier_3); - } - return false; - }; + size_t const max_batch_size = 1024 * 4; + auto batch = queue.next_batch (max_batch_size); + + lock.unlock (); - if (!stopped) + for (auto const & [entry, origin] : batch) { - auto tier = rep_tiers.tier (vote_a->account); - if (should_process (tier)) - { - votes.emplace_back (vote_a, channel_a); - lock.unlock (); - condition.notify_all (); - // Lock no longer required + auto const & [vote, channel] = entry; + vote_blocking (vote, channel); + } - return true; // Processed - } - else - { - stats.inc (nano::stat::type::vote, nano::stat::detail::vote_overflow); - } + total_processed += batch.size (); + + if (batch.size () == max_batch_size && timer.stop () > 100ms) + { + logger.debug (nano::log::type::vote_processor, "Processed {} votes in {} milliseconds (rate of {} votes per second)", + batch.size (), + timer.value ().count (), + ((batch.size () * 1000ULL) / timer.value ().count ())); } - return false; // Not processed } nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr const & vote, std::shared_ptr const & channel) @@ -190,13 +198,13 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr std::size_t nano::vote_processor::size () const { nano::lock_guard guard{ mutex }; - return votes.size (); + return queue.size (); } bool nano::vote_processor::empty () const { nano::lock_guard guard{ mutex }; - return votes.empty (); + return queue.empty (); } std::unique_ptr nano::vote_processor::collect_container_info (std::string const & name) const @@ -204,9 +212,9 @@ std::unique_ptr nano::vote_processor::collect_co std::size_t votes_count; { nano::lock_guard guard{ mutex }; - votes_count = votes.size (); + votes_count = queue.size (); } auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (votes)::value_type) })); + composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) })); return composite; } diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 368e16e1fb..61518ef738 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include @@ -71,10 +72,13 @@ class vote_processor final private: void run (); + void run_batch (nano::unique_lock &); private: std::size_t const max_votes; - std::deque, std::shared_ptr>> votes; + + using entry_t = std::pair, std::shared_ptr>; + nano::fair_queue queue; private: bool stopped{ false }; diff --git a/nano/node/websocket.cpp b/nano/node/websocket.cpp index 464c170cda..7b299ab6ce 100644 --- a/nano/node/websocket.cpp +++ b/nano/node/websocket.cpp @@ -1061,6 +1061,7 @@ nano::websocket_server::websocket_server (nano::websocket::config & config_a, na }); observers.vote.add ([this] (std::shared_ptr vote_a, std::shared_ptr const & channel_a, nano::vote_code code_a) { + debug_assert (vote_a != nullptr); if (server->any_subscriber (nano::websocket::topic::vote)) { nano::websocket::message_builder builder; From bd0a281c4f8f3238bd817143c6a535b1a5ab27da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 5 Apr 2024 13:11:27 +0200 Subject: [PATCH 08/12] Docs --- nano/node/vote_processor.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 61518ef738..785e510c8c 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -47,7 +47,7 @@ class vote_processor final void start (); void stop (); - /** Returns false if the vote was processed */ + /** @returns true if the vote was queued for processing */ bool vote (std::shared_ptr const &, std::shared_ptr const &); nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &); From 32d71600ac6463f1ce0c8a77445001cdbd3460ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 5 Apr 2024 22:35:20 +0200 Subject: [PATCH 09/12] Bump queue sizes --- nano/node/vote_processor.cpp | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 01874811c9..b211974977 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -31,11 +31,9 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano switch (origin.source) { case nano::rep_tier::tier_3: - return 256; case nano::rep_tier::tier_2: - return 128; case nano::rep_tier::tier_1: - return 64; + return 512; case nano::rep_tier::none: return 32; } From dcf214c9580ee9212f6dd72678056c24b08f4298 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 5 Apr 2024 22:44:27 +0200 Subject: [PATCH 10/12] Fix missing channel source --- nano/node/vote_processor.cpp | 12 +++++------- nano/node/vote_processor.hpp | 5 +---- 2 files changed, 6 insertions(+), 11 deletions(-) diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index b211974977..4405bf5838 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -24,8 +24,7 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano rep_crawler{ rep_crawler_a }, ledger{ ledger_a }, network_params{ network_params_a }, - rep_tiers{ rep_tiers_a }, - max_votes{ flags_a.vote_processor_capacity } + rep_tiers{ rep_tiers_a } { queue.max_size_query = [this] (auto const & origin) { switch (origin.source) @@ -33,7 +32,7 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano case nano::rep_tier::tier_3: case nano::rep_tier::tier_2: case nano::rep_tier::tier_1: - return 512; + return 256; case nano::rep_tier::none: return 32; } @@ -96,7 +95,7 @@ bool nano::vote_processor::vote (std::shared_ptr const & vote, std:: bool added = false; { nano::lock_guard guard{ mutex }; - added = queue.push ({ vote, channel }, tier); + added = queue.push (vote, { tier, channel }); } if (added) { @@ -147,10 +146,9 @@ void nano::vote_processor::run_batch (nano::unique_lock & lock) lock.unlock (); - for (auto const & [entry, origin] : batch) + for (auto const & [vote, origin] : batch) { - auto const & [vote, channel] = entry; - vote_blocking (vote, channel); + vote_blocking (vote, origin.channel); } total_processed += batch.size (); diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 785e510c8c..6a8c454a87 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -75,10 +75,7 @@ class vote_processor final void run_batch (nano::unique_lock &); private: - std::size_t const max_votes; - - using entry_t = std::pair, std::shared_ptr>; - nano::fair_queue queue; + nano::fair_queue, nano::rep_tier> queue; private: bool stopped{ false }; From 5fa9e491eec2c0b094424382220404b3e70bbad8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 7 Apr 2024 19:33:41 +0200 Subject: [PATCH 11/12] Vote processor config --- nano/core_test/toml.cpp | 14 ++++++++++++ nano/node/node.cpp | 2 +- nano/node/nodeconfig.cpp | 10 ++++++++ nano/node/nodeconfig.hpp | 3 +++ nano/node/vote_processor.cpp | 44 +++++++++++++++++++++++++++--------- nano/node/vote_processor.hpp | 16 +++++++++++-- 6 files changed, 75 insertions(+), 14 deletions(-) diff --git a/nano/core_test/toml.cpp b/nano/core_test/toml.cpp index 7de673fbd8..d72eefd369 100644 --- a/nano/core_test/toml.cpp +++ b/nano/core_test/toml.cpp @@ -125,6 +125,7 @@ TEST (toml, daemon_config_deserialize_defaults) [node.logging] [node.statistics.log] [node.statistics.sampling] + [node.vote_processor] [node.websocket] [node.lmdb] [node.rocksdb] @@ -261,6 +262,10 @@ TEST (toml, daemon_config_deserialize_defaults) ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); + + ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); + ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); + ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); } TEST (toml, optional_child) @@ -553,6 +558,11 @@ TEST (toml, daemon_config_deserialize_no_defaults) max_size = 999 max_voters = 999 + [node.vote_processor] + max_pr_queue = 999 + max_non_pr_queue = 999 + pr_priority = 999 + [opencl] device = 999 enable = true @@ -700,6 +710,10 @@ TEST (toml, daemon_config_deserialize_no_defaults) ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live); ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap); ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local); + + ASSERT_NE (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue); + ASSERT_NE (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue); + ASSERT_NE (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority); } /** There should be no required values **/ diff --git a/nano/node/node.cpp b/nano/node/node.cpp index a321549ac5..2df9363db9 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -178,7 +178,7 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy active{ *active_impl }, rep_crawler (config.rep_crawler, *this), rep_tiers{ ledger, network_params, online_reps, stats, logger }, - vote_processor{ active, observers, stats, config, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers }, + vote_processor{ config.vote_processor, active, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers }, warmed_up (0), online_reps (ledger, config), history_impl{ std::make_unique (config.network_params.voting) }, diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index cfa5fcbf66..63a18c22b6 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -217,6 +217,10 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const block_processor.serialize (block_processor_l); toml.put_child ("block_processor", block_processor_l); + nano::tomlconfig vote_processor_l; + vote_processor.serialize (vote_processor_l); + toml.put_child ("vote_processor", vote_processor_l); + return toml.get_error (); } @@ -298,6 +302,12 @@ nano::error nano::node_config::deserialize_toml (nano::tomlconfig & toml) block_processor.deserialize (config_l); } + if (toml.has_key ("vote_processor")) + { + auto config_l = toml.get_required_child ("vote_processor"); + vote_processor.deserialize (config_l); + } + if (toml.has_key ("work_peers")) { work_peers.clear (); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index b8c953b270..2ec04ceca6 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,7 @@ enum class frontiers_confirmation_mode : uint8_t class node_config { public: + // TODO: Users of this class rely on the default copy consturctor. This prevents using unique_ptrs with forward declared types. node_config (nano::network_params & network_params = nano::dev::network_params); node_config (const std::optional &, nano::network_params & network_params = nano::dev::network_params); @@ -138,6 +140,7 @@ class node_config nano::vote_cache_config vote_cache; nano::rep_crawler_config rep_crawler; nano::block_processor_config block_processor; + nano::vote_processor_config vote_processor; public: std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const; diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index 4405bf5838..ac161dc5e5 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -14,11 +14,11 @@ using namespace std::chrono_literals; -nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : +nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : + config{ config_a }, active{ active_a }, observers{ observers_a }, stats{ stats_a }, - config{ config_a }, logger{ logger_a }, online_reps{ online_reps_a }, rep_crawler{ rep_crawler_a }, @@ -32,28 +32,28 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano case nano::rep_tier::tier_3: case nano::rep_tier::tier_2: case nano::rep_tier::tier_1: - return 256; + return config.max_pr_queue; case nano::rep_tier::none: - return 32; + return config.max_non_pr_queue; } debug_assert (false); - return 0; + return size_t{ 0 }; }; - queue.priority_query = [] (auto const & origin) { + queue.priority_query = [this] (auto const & origin) { switch (origin.source) { case nano::rep_tier::tier_3: - return 9; + return config.pr_priority * config.pr_priority * config.pr_priority; case nano::rep_tier::tier_2: - return 6; + return config.pr_priority * config.pr_priority; case nano::rep_tier::tier_1: - return 3; + return config.pr_priority; case nano::rep_tier::none: - return 1; + return size_t{ 1 }; } debug_assert (false); - return 0; + return size_t{ 0 }; }; } @@ -214,3 +214,25 @@ std::unique_ptr nano::vote_processor::collect_co composite->add_component (std::make_unique (container_info{ "votes", votes_count, sizeof (decltype (queue)::value_type) })); return composite; } + +/* + * vote_processor_config + */ + +nano::error nano::vote_processor_config::serialize (nano::tomlconfig & toml) const +{ + toml.put ("max_pr_queue", max_pr_queue, "Maximum number of votes to queue from principal representatives. \ntype:uint64"); + toml.put ("max_non_pr_queue", max_non_pr_queue, "Maximum number of votes to queue from non-principal representatives. \ntype:uint64"); + toml.put ("pr_priority", pr_priority, "Priority for votes from principal representatives. Higher priority gets processed more frequently. Non-principal representatives have a baseline priority of 1. \ntype:uint64"); + + return toml.get_error (); +} + +nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml) +{ + toml.get ("max_pr_queue", max_pr_queue); + toml.get ("max_non_pr_queue", max_non_pr_queue); + toml.get ("pr_priority", pr_priority); + + return toml.get_error (); +} diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 6a8c454a87..2f2d6b4f05 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -38,10 +38,22 @@ namespace transport namespace nano { +class vote_processor_config final +{ +public: + nano::error serialize (nano::tomlconfig & toml) const; + nano::error deserialize (nano::tomlconfig & toml); + +public: + size_t max_pr_queue{ 256 }; + size_t max_non_pr_queue{ 32 }; + size_t pr_priority{ 3 }; +}; + class vote_processor final { public: - vote_processor (nano::active_transactions &, nano::node_observers &, nano::stats &, nano::node_config &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); + vote_processor (vote_processor_config const &, nano::active_transactions &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); ~vote_processor (); void start (); @@ -59,10 +71,10 @@ class vote_processor final std::atomic total_processed{ 0 }; private: // Dependencies + vote_processor_config const & config; nano::active_transactions & active; nano::node_observers & observers; nano::stats & stats; - nano::node_config & config; nano::logger & logger; nano::online_reps & online_reps; nano::rep_crawler & rep_crawler; From 378e69bb168fe4f5ee549291f839a3295d2b6acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sun, 7 Apr 2024 19:38:07 +0200 Subject: [PATCH 12/12] Store as unique ptr --- nano/node/node.cpp | 4 +++- nano/node/node.hpp | 5 +++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 2df9363db9..35b4874ae4 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -178,7 +179,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy active{ *active_impl }, rep_crawler (config.rep_crawler, *this), rep_tiers{ ledger, network_params, online_reps, stats, logger }, - vote_processor{ config.vote_processor, active, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers }, + vote_processor_impl{ std::make_unique (config.vote_processor, active, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) }, + vote_processor{ *vote_processor_impl }, warmed_up (0), online_reps (ledger, config), history_impl{ std::make_unique (config.network_params.voting) }, diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 53ba1f69a9..354ee8d114 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -29,7 +29,6 @@ #include #include #include -#include #include #include #include @@ -48,6 +47,7 @@ namespace nano class active_transactions; class confirming_set; class node; +class vote_processor; class work_pool; namespace scheduler @@ -175,7 +175,8 @@ class node final : public std::enable_shared_from_this nano::online_reps online_reps; nano::rep_crawler rep_crawler; nano::rep_tiers rep_tiers; - nano::vote_processor vote_processor; + std::unique_ptr vote_processor_impl; + nano::vote_processor & vote_processor; unsigned warmed_up; std::unique_ptr history_impl; nano::local_vote_history & history;