Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair queuing for vote processor #4536

Merged
4 changes: 1 addition & 3 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,6 @@ TEST (active_transactions, republish_winner)
ASSERT_NE (nullptr, election);
auto vote = nano::test::make_final_vote (nano::dev::genesis_key, { fork });
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
ASSERT_TIMELY (5s, election->confirmed ());
ASSERT_EQ (fork->hash (), election->status.winner->hash ());
ASSERT_TIMELY (5s, node2.block_confirmed (fork->hash ()));
Expand Down Expand Up @@ -937,7 +936,7 @@ TEST (active_transactions, fork_replacement_tally)
.build ();
auto vote = nano::test::make_vote (keys[i], { fork }, 0, 0);
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
ASSERT_TIMELY (5s, node1.vote_cache.find (fork->hash ()).size () > 0);
node1.process_active (fork);
}

Expand Down Expand Up @@ -980,7 +979,6 @@ TEST (active_transactions, fork_replacement_tally)
// Process vote for correct block & replace existing lowest tally block
auto vote = nano::test::make_vote (nano::dev::genesis_key, { send_last }, 0, 0);
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
// ensure vote arrives before the block
ASSERT_TIMELY_EQ (5s, 1, node1.vote_cache.find (send_last->hash ()).size ());
node1.network.publish_filter.clear ();
Expand Down
18 changes: 9 additions & 9 deletions nano/core_test/fair_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ enum class source_enum
TEST (fair_queue, construction)
{
nano::fair_queue<source_enum, int> queue;
ASSERT_EQ (queue.total_size (), 0);
ASSERT_EQ (queue.size (), 0);
ASSERT_TRUE (queue.empty ());
}

Expand All @@ -36,7 +36,7 @@ TEST (fair_queue, process_one)
queue.max_size_query = [] (auto const &) { return 1; };

queue.push (7, { source_enum::live });
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.size (), 1);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0);
Expand All @@ -58,7 +58,7 @@ TEST (fair_queue, fifo)
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.size (), 3);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 3);

Expand Down Expand Up @@ -90,7 +90,7 @@ TEST (fair_queue, process_many)
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::bootstrap });
queue.push (9, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.size (), 3);
ASSERT_EQ (queue.queues_size (), 3);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1);
Expand Down Expand Up @@ -124,7 +124,7 @@ TEST (fair_queue, max_queue_size)
queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 2);
ASSERT_EQ (queue.size (), 2);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 2);

Expand Down Expand Up @@ -169,7 +169,7 @@ TEST (fair_queue, round_robin_with_priority)
queue.push (13, { source_enum::unchecked });
queue.push (14, { source_enum::unchecked });
queue.push (15, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 9);
ASSERT_EQ (queue.size (), 9);

// Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source
ASSERT_EQ (queue.next ().second.source, source_enum::live);
Expand Down Expand Up @@ -201,7 +201,7 @@ TEST (fair_queue, source_channel)
queue.push (7, { source_enum::live, channel2 });
queue.push (8, { source_enum::live, channel3 });
queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries
ASSERT_EQ (queue.total_size (), 4);
ASSERT_EQ (queue.size (), 4);
ASSERT_EQ (queue.queues_size (), 3); // Each <source, channel> pair is a separate queue

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2);
Expand Down Expand Up @@ -253,7 +253,7 @@ TEST (fair_queue, cleanup)
queue.push (7, { source_enum::live, channel1 });
queue.push (8, { source_enum::live, channel2 });
queue.push (9, { source_enum::live, channel3 });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.size (), 3);
ASSERT_EQ (queue.queues_size (), 3);

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1);
Expand All @@ -267,7 +267,7 @@ TEST (fair_queue, cleanup)
ASSERT_TRUE (queue.periodic_update ());

// Only channel 3 should remain
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.size (), 1);
ASSERT_EQ (queue.queues_size (), 1);

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0);
Expand Down
35 changes: 3 additions & 32 deletions nano/core_test/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ TEST (vote_processor, codes)
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));

// Invalid signature
ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, false));

// Hint of pre-validation
ASSERT_NE (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, true));
ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel));

// No ongoing election (vote goes to vote cache)
ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel));
Expand All @@ -58,20 +55,6 @@ TEST (vote_processor, codes)
ASSERT_EQ (nano::vote_code::indeterminate, node.vote_processor.vote_blocking (vote, channel));
}

TEST (vote_processor, flush)
{
nano::test::system system (1);
auto & node (*system.nodes[0]);
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
for (unsigned i = 0; i < 2000; ++i)
{
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * (1 + i), 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
node.vote_processor.vote (vote, channel);
}
node.vote_processor.flush ();
ASSERT_TRUE (node.vote_processor.empty ());
}

TEST (vote_processor, invalid_signature)
{
nano::test::system system{ 1 };
Expand All @@ -93,18 +76,6 @@ TEST (vote_processor, invalid_signature)
ASSERT_TIMELY_EQ (5s, 2, election->votes ().size ());
}

TEST (vote_processor, no_capacity)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.vote_processor_capacity = 0;
auto & node (*system.add_node (node_flags));
nano::keypair key;
auto vote = nano::test::make_vote (key, { nano::dev::genesis }, nano::vote::timestamp_min * 1, 0);
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TRUE (node.vote_processor.vote (vote, channel));
}

TEST (vote_processor, overflow)
{
nano::test::system system;
Expand All @@ -121,14 +92,14 @@ TEST (vote_processor, overflow)
size_t const total{ 1000 };
for (unsigned i = 0; i < total; ++i)
{
if (node.vote_processor.vote (vote, channel))
if (!node.vote_processor.vote (vote, channel))
{
++not_processed;
}
}
ASSERT_GT (not_processed, 0);
ASSERT_LT (not_processed, total);
ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote, nano::stat::detail::vote_overflow));
ASSERT_EQ (not_processed, node.stats.count (nano::stat::type::vote_processor, nano::stat::detail::overfill));

// check that it did not timeout
ASSERT_LT (std::chrono::system_clock::now () - start_time, 10s);
Expand Down
8 changes: 8 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ enum class type : uint8_t
network,
tcp_server,
vote,
vote_processor,
vote_processor_tier,
vote_processor_overfill,
election,
http_callback,
ipc,
Expand Down Expand Up @@ -377,6 +380,11 @@ enum class detail : uint8_t
erase_old,
erase_confirmed,

// rep tiers
tier_1,
tier_2,
tier_3,

_last // Must be the last enum
};

Expand Down
2 changes: 2 additions & 0 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ bool nano::active_transactions::trigger_vote_cache (nano::block_hash hash)
// Validate a vote and apply it to the current election if one exists
std::unordered_map<nano::block_hash, nano::vote_code> nano::active_transactions::vote (std::shared_ptr<nano::vote> const & vote, nano::vote_source source)
{
debug_assert (!vote->validate ()); // false => valid vote

std::unordered_map<nano::block_hash, nano::vote_code> results;
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> process;
std::vector<nano::block_hash> inactive; // Hashes that should be added to inactive vote cache
Expand Down
6 changes: 3 additions & 3 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void nano::block_processor::stop ()
std::size_t nano::block_processor::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return queue.total_size ();
return queue.size ();
}

std::size_t nano::block_processor::size (nano::block_source source) const
Expand Down Expand Up @@ -322,7 +322,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
if (should_log ())
{
node.logger.info (nano::log::type::blockprocessor, "{} blocks (+ {} forced) in processing queue",
queue.total_size (),
queue.size (),
queue.size ({ nano::block_source::forced }));
}

Expand Down Expand Up @@ -463,7 +463,7 @@ std::unique_ptr<nano::container_info_component> nano::block_processor::collect_c
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", queue.total_size (), 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", queue.size (), 0 }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", queue.size ({ nano::block_source::forced }), 0 }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
Expand Down
17 changes: 11 additions & 6 deletions nano/node/fair_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,22 @@ class fair_queue final
{
// First compare source
if (auto cmp = source <=> other.source; cmp != 0)
{
return cmp;
}

if (maybe_channel && other.maybe_channel)
{
// Then compare channels by ownership, not by the channel's value or state
std::owner_less<std::weak_ptr<nano::transport::channel>> less;
if (less (*maybe_channel, *other.maybe_channel))
{
return std::strong_ordering::less;
}
if (less (*other.maybe_channel, *maybe_channel))
{
return std::strong_ordering::greater;

return std::strong_ordering::equivalent;
}
}
else
{
Expand All @@ -104,8 +108,9 @@ class fair_queue final
{
return std::strong_ordering::less;
}
return std::strong_ordering::equivalent;
}

return std::strong_ordering::equivalent;
}

operator origin () const
Expand Down Expand Up @@ -181,7 +186,7 @@ class fair_queue final
return it == queues.end () ? 0 : it->second.priority;
}

size_t total_size () const
size_t size () const
{
return std::accumulate (queues.begin (), queues.end (), 0, [] (size_t total, auto const & queue) {
return total + queue.second.size ();
Expand Down Expand Up @@ -349,8 +354,8 @@ class fair_queue final
std::unique_ptr<container_info_component> collect_container_info (std::string const & name)
{
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queues", queues.size (), sizeof (typename decltype (queues)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_size", total_size (), sizeof (typename decltype (queues)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "queues", queues_size (), sizeof (typename decltype (queues)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "total_size", size (), sizeof (typename decltype (queues)::value_type) }));
return composite;
}
};
Expand Down
7 changes: 6 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,12 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_code code) {
debug_assert (vote != nullptr);
debug_assert (code != nano::vote_code::invalid);
if (channel == nullptr)
{
return; // Channel expired when waiting for vote to be processed
}
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
{
Expand Down Expand Up @@ -563,7 +568,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.workers, "workers"));
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (collect_container_info (node.vote_processor, "vote_processor"));
composite->add_component (node.vote_processor.collect_container_info ("vote_processor"));
composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler"));
composite->add_component (node.block_processor.collect_container_info ("block_processor"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
Expand Down
9 changes: 9 additions & 0 deletions nano/node/rep_tiers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>

#include <magic_enum.hpp>

using namespace std::chrono_literals;

nano::rep_tiers::rep_tiers (nano::ledger & ledger_a, nano::network_params & network_params_a, nano::online_reps & online_reps_a, nano::stats & stats_a, nano::logger & logger_a) :
Expand Down Expand Up @@ -141,4 +143,11 @@ std::unique_ptr<nano::container_info_component> nano::rep_tiers::collect_contain
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "representatives_2", representatives_2.size (), sizeof (decltype (representatives_2)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "representatives_3", representatives_3.size (), sizeof (decltype (representatives_3)::value_type) }));
return composite;
}

nano::stat::detail nano::to_stat_detail (nano::rep_tier tier)
{
auto value = magic_enum::enum_cast<nano::stat::detail> (magic_enum::enum_name (tier));
debug_assert (value);
return value.value_or (nano::stat::detail{});
}
2 changes: 2 additions & 0 deletions nano/node/rep_tiers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ enum class rep_tier
tier_3, // (> 5%) of online stake
};

nano::stat::detail to_stat_detail (rep_tier);

class rep_tiers final
{
public:
Expand Down
Loading
Loading