Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair queuing for request aggregator #4598

Merged
19 changes: 7 additions & 12 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1881,10 +1881,6 @@ TEST (node, local_votes_cache)
node.network.inbound (message1, channel);
node.network.inbound (message2, channel);
}
for (int i = 0; i < 4; ++i)
{
ASSERT_NO_ERROR (system.poll (node.aggregator.max_delay));
}
// Make sure a new vote was not generated
ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 2);
// Max cache
Expand All @@ -1893,18 +1889,17 @@ TEST (node, local_votes_cache)
ASSERT_EQ (nano::block_status::progress, node.ledger.process (transaction, send3));
}
nano::confirm_req message3{ nano::dev::network_params.network, send3->hash (), send3->root () };
node.network.inbound (message3, channel);
ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 3);
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
ASSERT_TIMELY (3s, !node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_TIMELY (3s, !node.history.votes (send3->root (), send3->hash ()).empty ());
// All requests should be served from the cache
for (auto i (0); i < 100; ++i)
{
node.network.inbound (message3, channel);
}
for (int i = 0; i < 4; ++i)
{
ASSERT_NO_ERROR (system.poll (node.aggregator.max_delay));
}
ASSERT_TIMELY_EQ (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 3);
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
ASSERT_TIMELY (3s, !node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_TIMELY (3s, !node.history.votes (send3->root (), send3->hash ()).empty ());
}

// Test disabled because it's failing intermittently.
Expand Down Expand Up @@ -3065,7 +3060,7 @@ TEST (node, rollback_vote_self)
ASSERT_TRUE (node.history.votes (send2->root (), send2->hash ()).empty ());
ASSERT_TRUE (node.history.votes (fork->root (), fork->hash ()).empty ());
auto channel = std::make_shared<nano::transport::fake::channel> (node);
node.aggregator.add (channel, { { send2->hash (), send2->root () } });
node.aggregator.request ({ { send2->hash (), send2->root () } }, channel);
ASSERT_TIMELY (5s, !node.history.votes (fork->root (), fork->hash ()).empty ());
ASSERT_TRUE (node.history.votes (send2->root (), send2->hash ()).empty ());

Expand Down
123 changes: 22 additions & 101 deletions nano/core_test/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,16 @@ TEST (request_aggregator, one)
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY (3s, node.aggregator.empty ());
// Not yet in the ledger
ASSERT_TIMELY_EQ (3s, 1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_unknown));
ASSERT_EQ (nano::block_status::progress, node.ledger.process (node.ledger.tx_begin_write (), send1));
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
// In the ledger but no vote generated yet
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TIMELY (3s, node.aggregator.empty ());
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
// Already cached
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_TIMELY_EQ (3s, 3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
Expand Down Expand Up @@ -107,12 +104,11 @@ TEST (request_aggregator, one_update)
request.emplace_back (send2->hash (), send2->root ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
node.aggregator.request (request, dummy_channel);
request.clear ();
request.emplace_back (receive1->hash (), receive1->root ());
// Update the pool of requests with another hash
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
// In the ledger but no vote generated yet
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes))
ASSERT_TRUE (node.aggregator.empty ());
Expand Down Expand Up @@ -175,14 +171,12 @@ TEST (request_aggregator, two)
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
// Process both blocks
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
// One vote should be generated for both blocks
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TRUE (node.aggregator.empty ());
// The same request should now send the cached vote
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
Expand Down Expand Up @@ -231,7 +225,7 @@ TEST (request_aggregator, two_endpoints)
ASSERT_NE (nano::transport::map_endpoint_to_v6 (dummy_channel1->get_endpoint ()), nano::transport::map_endpoint_to_v6 (dummy_channel2->get_endpoint ()));

// For the first request, aggregator should generate a new vote
node1.aggregator.add (dummy_channel1, request);
node1.aggregator.request (request, dummy_channel1);
ASSERT_TIMELY (5s, node1.aggregator.empty ());

ASSERT_TIMELY_EQ (5s, 1, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
Expand All @@ -245,7 +239,7 @@ TEST (request_aggregator, two_endpoints)
ASSERT_TIMELY_EQ (3s, 0, node1.stats.count (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote));

// For the second request, aggregator should use the cache
node1.aggregator.add (dummy_channel1, request);
node1.aggregator.request (request, dummy_channel1);
ASSERT_TIMELY (5s, node1.aggregator.empty ());

ASSERT_TIMELY_EQ (5s, 2, node1.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
Expand Down Expand Up @@ -295,8 +289,7 @@ TEST (request_aggregator, split)
ASSERT_EQ (max_vbh + 1, request.size ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
// In the ledger but no vote generated yet
ASSERT_TIMELY_EQ (3s, 2, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
ASSERT_TRUE (node.aggregator.empty ());
Expand All @@ -311,82 +304,12 @@ TEST (request_aggregator, split)
ASSERT_TIMELY_EQ (3s, 2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
}

TEST (request_aggregator, channel_lifetime)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
nano::block_builder builder;
auto send1 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build ();
ASSERT_EQ (nano::block_status::progress, node.ledger.process (node.ledger.tx_begin_write (), send1));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
{
// The aggregator should extend the lifetime of the channel
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
}
ASSERT_EQ (1, node.aggregator.size ());
ASSERT_TIMELY (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
}

TEST (request_aggregator, channel_update)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
nano::block_builder builder;
auto send1 = builder
.state ()
.account (nano::dev::genesis_key.pub)
.previous (nano::dev::genesis->hash ())
.representative (nano::dev::genesis_key.pub)
.balance (nano::dev::constants.genesis_amount - nano::Gxrb_ratio)
.link (nano::dev::genesis_key.pub)
.sign (nano::dev::genesis_key.prv, nano::dev::genesis_key.pub)
.work (*node.work_generate_blocking (nano::dev::genesis->hash ()))
.build ();
ASSERT_EQ (nano::block_status::progress, node.ledger.process (node.ledger.tx_begin_write (), send1));
std::vector<std::pair<nano::block_hash, nano::root>> request;
request.emplace_back (send1->hash (), send1->root ());
std::weak_ptr<nano::transport::channel> channel1_w;
{
auto client1 = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel1 = std::make_shared<nano::transport::channel_tcp> (node, client1);
channel1_w = dummy_channel1;
node.aggregator.add (dummy_channel1, request);
auto client2 = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel2 = std::make_shared<nano::transport::channel_tcp> (node, client2);
// The aggregator then hold channel2 and drop channel1
node.aggregator.add (dummy_channel2, request);
}
// Both requests were for the same endpoint, so only one pool should exist
ASSERT_EQ (1, node.aggregator.size ());
// channel1 is not being held anymore
ASSERT_EQ (nullptr, channel1_w.lock ());
ASSERT_TIMELY_EQ (3s, 0 < node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes), 0);
}

TEST (request_aggregator, channel_max_queue)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
node_config.max_queued_requests = 1;
node_config.request_aggregator.max_queue = 1;
auto & node (*system.add_node (node_config));
system.wallet (0)->insert_adhoc (nano::dev::genesis_key.prv);
nano::block_builder builder;
Expand All @@ -405,12 +328,13 @@ TEST (request_aggregator, channel_max_queue)
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
node.aggregator.request (request, dummy_channel);
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY_EQ (3s, 1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
}

TEST (request_aggregator, unique)
// TODO: Deduplication is a concern for the requesting node, not the aggregator which should be stateless and fairly service all peers
TEST (request_aggregator, DISABLED_unique)
{
nano::test::system system;
nano::node_config node_config = system.default_config ();
Expand All @@ -433,10 +357,10 @@ TEST (request_aggregator, unique)
request.emplace_back (send1->hash (), send1->root ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
node.aggregator.add (dummy_channel, request);
node.aggregator.request (request, dummy_channel);
node.aggregator.request (request, dummy_channel);
node.aggregator.request (request, dummy_channel);
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY_EQ (3s, 1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_hashes));
ASSERT_TIMELY_EQ (3s, 1, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes));
}
Expand Down Expand Up @@ -478,8 +402,7 @@ TEST (request_aggregator, cannot_vote)
request.emplace_back (1, send2->root ());
auto client = std::make_shared<nano::transport::socket> (node);
std::shared_ptr<nano::transport::channel> dummy_channel = std::make_shared<nano::transport::channel_tcp> (node, client);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (1, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
Expand All @@ -491,8 +414,7 @@ TEST (request_aggregator, cannot_vote)

// With an ongoing election
node.start_election (send2);
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (2, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
Expand All @@ -508,8 +430,7 @@ TEST (request_aggregator, cannot_vote)
ASSERT_TIMELY (5s, election = node.active.election (send1->qualified_root ()));
election->force_confirm ();
ASSERT_TIMELY (3s, node.ledger.dependents_confirmed (node.ledger.tx_begin_read (), *send2));
node.aggregator.add (dummy_channel, request);
ASSERT_EQ (1, node.aggregator.size ());
node.aggregator.request (request, dummy_channel);
ASSERT_TIMELY (3s, node.aggregator.empty ());
ASSERT_EQ (3, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_accepted));
ASSERT_EQ (0, node.stats.count (nano::stat::type::aggregator, nano::stat::detail::aggregator_dropped));
Expand Down
13 changes: 13 additions & 0 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,10 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);

ASSERT_EQ (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_EQ (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_EQ (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);
}

TEST (toml, optional_child)
Expand Down Expand Up @@ -569,6 +573,11 @@ TEST (toml, daemon_config_deserialize_no_defaults)
threads = 999
batch_size = 999

[node.request_aggregator]
max_queue = 999
threads = 999
batch_size = 999

[opencl]
device = 999
enable = true
Expand Down Expand Up @@ -722,6 +731,10 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_NE (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);

ASSERT_NE (conf.node.request_aggregator.max_queue, defaults.node.request_aggregator.max_queue);
ASSERT_NE (conf.node.request_aggregator.threads, defaults.node.request_aggregator.threads);
ASSERT_NE (conf.node.request_aggregator.batch_size, defaults.node.request_aggregator.batch_size);
}

/** There should be no required values **/
Expand Down
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class type
drop,
aggregator,
requests,
request_aggregator,
filter,
telemetry,
vote_generator,
Expand Down Expand Up @@ -295,6 +296,10 @@ enum class detail
requests_cannot_vote,
requests_unknown,

// request_aggregator
request_hashes,
overfill_hashes,

// duplicate
duplicate_publish_message,

Expand Down
7 changes: 6 additions & 1 deletion nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@

namespace nano
{
class active_transactions;
class ledger;
class local_vote_history;
class logger;
class node;
class network;
class node;
class node_config;
class stats;
class vote_generator;
class wallets;
}
7 changes: 4 additions & 3 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,15 @@ class network_message_visitor : public nano::message_visitor
}
}

void confirm_req (nano::confirm_req const & message_a) override
void confirm_req (nano::confirm_req const & message) override
{
// Don't load nodes with disabled voting
// TODO: This check should be cached somewhere
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
if (!message_a.roots_hashes.empty ())
if (!message.roots_hashes.empty ())
{
node.aggregator.add (channel, message_a.roots_hashes);
node.aggregator.request (message.roots_hashes, channel);
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
final_generator{ *final_generator_impl },
scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) },
scheduler{ *scheduler_impl },
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
aggregator (config.request_aggregator, *this, stats, generator, final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::backlog_population_config (config), ledger, stats },
ascendboot{ config, block_processor, ledger, network, stats },
Expand Down Expand Up @@ -585,7 +585,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (node.vote_uniquer.collect_container_info ("vote_uniquer"));
composite->add_component (node.confirming_set.collect_container_info ("confirming_set"));
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
composite->add_component (node.aggregator.collect_container_info ("request_aggregator"));
composite->add_component (node.scheduler.collect_container_info ("election_scheduler"));
composite->add_component (node.vote_cache.collect_container_info ("vote_cache"));
composite->add_component (node.generator.collect_container_info ("vote_generator"));
Expand Down Expand Up @@ -698,6 +698,7 @@ void nano::node::start ()
final_generator.start ();
confirming_set.start ();
scheduler.start ();
aggregator.start ();
backlog.start ();
bootstrap_server.start ();
if (!flags.disable_ascending_bootstrap)
Expand Down
Loading
Loading