Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair queuing for request aggregator #4598

Merged
5 changes: 5 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class type
drop,
aggregator,
requests,
request_aggregator,
filter,
telemetry,
vote_generator,
Expand Down Expand Up @@ -295,6 +296,10 @@ enum class detail
requests_cannot_vote,
requests_unknown,

// request_aggregator
request_hashes,
overfill_hashes,

// duplicate
duplicate_publish_message,

Expand Down
7 changes: 4 additions & 3 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,15 @@ class network_message_visitor : public nano::message_visitor
}
}

void confirm_req (nano::confirm_req const & message_a) override
void confirm_req (nano::confirm_req const & message) override
{
// Don't load nodes with disabled voting
// TODO: This check should be cached somewhere
if (node.config.enable_voting && node.wallets.reps ().voting > 0)
{
if (!message_a.roots_hashes.empty ())
if (!message.roots_hashes.empty ())
{
node.aggregator.add (channel, message_a.roots_hashes);
node.aggregator.request (message.roots_hashes, channel);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
final_generator{ *final_generator_impl },
scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) },
scheduler{ *scheduler_impl },
aggregator (config, stats, generator, final_generator, history, ledger, wallets, active),
aggregator (config.request_aggregator, *this, stats, generator, final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nano::backlog_population_config (config), ledger, stats },
ascendboot{ config, block_processor, ledger, network, stats },
Expand Down Expand Up @@ -585,7 +585,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (node.vote_uniquer.collect_container_info ("vote_uniquer"));
composite->add_component (node.confirming_set.collect_container_info ("confirming_set"));
composite->add_component (collect_container_info (node.distributed_work, "distributed_work"));
composite->add_component (collect_container_info (node.aggregator, "request_aggregator"));
composite->add_component (node.aggregator.collect_container_info ("request_aggregator"));
composite->add_component (node.scheduler.collect_container_info ("election_scheduler"));
composite->add_component (node.vote_cache.collect_container_info ("vote_cache"));
composite->add_component (node.generator.collect_container_info ("vote_generator"));
Expand Down
2 changes: 2 additions & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <nano/node/ipc/ipc_config.hpp>
#include <nano/node/peer_history.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/optimistic.hpp>
#include <nano/node/vote_cache.hpp>
Expand Down Expand Up @@ -145,6 +146,7 @@ class node_config
nano::block_processor_config block_processor;
nano::vote_processor_config vote_processor;
nano::peer_history_config peer_history;
nano::request_aggregator_config request_aggregator;

public:
std::string serialize_frontiers_confirmation (nano::frontiers_confirmation_mode) const;
Expand Down
179 changes: 97 additions & 82 deletions nano/node/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/node/common.hpp>
#include <nano/node/local_vote_history.hpp>
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/vote_generator.hpp>
Expand All @@ -12,12 +13,9 @@
#include <nano/secure/ledger_set_any.hpp>
#include <nano/store/component.hpp>

nano::request_aggregator::request_aggregator (nano::node_config const & config_a, nano::stats & stats_a, nano::vote_generator & generator_a, nano::vote_generator & final_generator_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
max_delay (config_a.network_params.network.is_dev_network () ? 50 : 300),
small_delay (config_a.network_params.network.is_dev_network () ? 10 : 50),
max_channel_requests (config_a.max_queued_requests),
request_aggregator_threads (config_a.request_aggregator_threads),
nano::request_aggregator::request_aggregator (request_aggregator_config const & config_a, nano::node & node_a, nano::stats & stats_a, nano::vote_generator & generator_a, nano::vote_generator & final_generator_a, nano::local_vote_history & history_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::active_transactions & active_a) :
config{ config_a },
network_constants{ node_a.network_params.network },
stats (stats_a),
local_votes (history_a),
ledger (ledger_a),
Expand All @@ -32,6 +30,13 @@ nano::request_aggregator::request_aggregator (nano::node_config const & config_a
final_generator.set_reply_action ([this] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a) {
this->reply_action (vote_a, channel_a);
});

queue.max_size_query = [this] (auto const & origin) {
return config.max_queue;
};
queue.priority_query = [this] (auto const & origin) {
return 1;
};
}

nano::request_aggregator::~request_aggregator ()
Expand All @@ -43,7 +48,7 @@ void nano::request_aggregator::start ()
{
debug_assert (threads.empty ());

for (auto i = 0; i < request_aggregator_threads; ++i)
for (auto i = 0; i < config.threads; ++i)
{
threads.emplace_back ([this] () {
nano::thread_role::set (nano::thread_role::name::request_aggregator);
Expand Down Expand Up @@ -71,104 +76,115 @@ void nano::request_aggregator::stop ()

std::size_t nano::request_aggregator::size () const
{
nano::unique_lock<nano::mutex> lock{ mutex };
return requests.size ();
nano::lock_guard<nano::mutex> lock{ mutex };
return queue.size ();
}

bool nano::request_aggregator::empty () const
{
return size () == 0;
nano::lock_guard<nano::mutex> lock{ mutex };
return queue.empty ();
}

// TODO: This is badly implemented, will prematurely drop large vote requests
void nano::request_aggregator::add (std::shared_ptr<nano::transport::channel> const & channel_a, std::vector<std::pair<nano::block_hash, nano::root>> const & hashes_roots_a)
bool nano::request_aggregator::request (request_type const & request, std::shared_ptr<nano::transport::channel> const & channel)
{
release_assert (channel != nullptr);

// This should be checked before calling request
debug_assert (wallets.reps ().voting > 0);
bool error = true;
auto const endpoint (nano::transport::map_endpoint_to_v6 (channel_a->get_endpoint ()));
debug_assert (!request.empty ());

bool added = false;
{
nano::lock_guard<nano::mutex> guard{ mutex };
added = queue.push ({ request, channel }, { nano::no_value{}, channel });
}
if (added)
{
stats.inc (nano::stat::type::request_aggregator, nano::stat::detail::request);
stats.add (nano::stat::type::request_aggregator, nano::stat::detail::request_hashes, request.size ());

condition.notify_one ();
}
else
{
stats.inc (nano::stat::type::request_aggregator, nano::stat::detail::overfill);
stats.add (nano::stat::type::request_aggregator, nano::stat::detail::overfill_hashes, request.size ());
}
return added;
}

void nano::request_aggregator::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
// Protecting from ever-increasing memory usage when request are consumed slower than generated
// Reject request if the oldest request has not yet been processed after its deadline + a modest margin
if (requests.empty () || (requests.get<tag_deadline> ().begin ()->deadline + 2 * this->max_delay > std::chrono::steady_clock::now ()))
while (!stopped)
{
auto & requests_by_endpoint (requests.get<tag_endpoint> ());
auto existing (requests_by_endpoint.find (endpoint));
if (existing == requests_by_endpoint.end ())
stats.inc (nano::stat::type::request_aggregator, nano::stat::detail::loop);

if (!queue.empty ())
{
existing = requests_by_endpoint.emplace (channel_a).first;
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
clemahieu marked this conversation as resolved.
Show resolved Hide resolved
}
requests_by_endpoint.modify (existing, [&hashes_roots_a, &channel_a, &error, this] (channel_pool & pool_a) {
// This extends the lifetime of the channel, which is acceptable up to max_delay
pool_a.channel = channel_a;
if (pool_a.hashes_roots.size () + hashes_roots_a.size () <= this->max_channel_requests)
{
error = false;
auto new_deadline (std::min (pool_a.start + this->max_delay, std::chrono::steady_clock::now () + this->small_delay));
pool_a.deadline = new_deadline;
pool_a.hashes_roots.insert (pool_a.hashes_roots.begin (), hashes_roots_a.begin (), hashes_roots_a.end ());
}
});
if (requests.size () == 1)
else
{
lock.unlock ();
condition.notify_all ();
condition.wait (lock, [&] { return stopped || !queue.empty (); });
}
}
stats.inc (nano::stat::type::aggregator, !error ? nano::stat::detail::aggregator_accepted : nano::stat::detail::aggregator_dropped);
}

void nano::request_aggregator::run ()
void nano::request_aggregator::run_batch (nano::unique_lock<nano::mutex> & lock)
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

debug_assert (config.batch_size > 0);
auto batch = queue.next_batch (config.batch_size);

lock.unlock ();

auto transaction = ledger.tx_begin_read ();

for (auto const & [value, origin] : batch)
{
if (!requests.empty ())
auto const & [request, channel] = value;

transaction.refresh_if_needed ();

if (!channel->max ())
{
auto & requests_by_deadline (requests.get<tag_deadline> ());
auto front (requests_by_deadline.begin ());
if (front->deadline < std::chrono::steady_clock::now ())
{
// Store the channel and requests for processing after erasing this pool
decltype (front->channel) channel{};
decltype (front->hashes_roots) hashes_roots{};
requests_by_deadline.modify (front, [&channel, &hashes_roots] (channel_pool & pool) {
channel.swap (pool.channel);
hashes_roots.swap (pool.hashes_roots);
});
requests_by_deadline.erase (front);
lock.unlock ();
erase_duplicates (hashes_roots);
auto const remaining = aggregate (hashes_roots, channel);
if (!remaining.remaining_normal.empty ())
{
// Generate votes for the remaining hashes
auto const generated = generator.generate (remaining.remaining_normal, channel);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.remaining_normal.size () - generated);
}
if (!remaining.remaining_final.empty ())
{
// Generate final votes for the remaining hashes
auto const generated = final_generator.generate (remaining.remaining_final, channel);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.remaining_final.size () - generated);
}
lock.lock ();
}
else
{
auto deadline = front->deadline;
condition.wait_until (lock, deadline, [this, &deadline] () { return this->stopped || deadline < std::chrono::steady_clock::now (); });
}
process (transaction, request, channel);
}
else
{
condition.wait_for (lock, small_delay, [this] () { return this->stopped || !this->requests.empty (); });
stats.inc (nano::stat::type::request_aggregator, nano::stat::detail::channel_full, stat::dir::out);
}
}
}

void nano::request_aggregator::process (nano::secure::transaction const & transaction, request_type const & request, std::shared_ptr<nano::transport::channel> const & channel)
{
auto const remaining = aggregate (transaction, request, channel);

if (!remaining.remaining_normal.empty ())
{
// Generate votes for the remaining hashes
auto const generated = generator.generate (remaining.remaining_normal, channel);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.remaining_normal.size () - generated);
}
if (!remaining.remaining_final.empty ())
{
// Generate final votes for the remaining hashes
auto const generated = final_generator.generate (remaining.remaining_final, channel);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cannot_vote, stat::dir::in, remaining.remaining_final.size () - generated);
}
}

void nano::request_aggregator::reply_action (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const & channel_a) const
{
nano::confirm_ack confirm{ config.network_params.network, vote_a };
nano::confirm_ack confirm{ network_constants, vote_a };
channel_a->send (confirm);
}

Expand All @@ -183,9 +199,8 @@ void nano::request_aggregator::erase_duplicates (std::vector<std::pair<nano::blo
requests_a.end ());
}

auto nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash, nano::root>> const & requests_a, std::shared_ptr<nano::transport::channel> const & channel_a) const -> aggregate_result
auto nano::request_aggregator::aggregate (nano::secure::transaction const & transaction, request_type const & requests_a, std::shared_ptr<nano::transport::channel> const & channel_a) const -> aggregate_result
{
auto transaction = ledger.tx_begin_read ();
std::vector<std::shared_ptr<nano::block>> to_generate;
std::vector<std::shared_ptr<nano::block>> to_generate_final;
std::vector<std::shared_ptr<nano::vote>> cached_votes;
Expand Down Expand Up @@ -296,7 +311,7 @@ auto nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash
// Let the node know about the alternative block
if (block->hash () != hash)
{
nano::publish publish (config.network_params.network, block);
nano::publish publish (network_constants, block);
channel_a->send (publish);
}
}
Expand Down Expand Up @@ -324,11 +339,11 @@ auto nano::request_aggregator::aggregate (std::vector<std::pair<nano::block_hash
};
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::request_aggregator & aggregator, std::string const & name)
std::unique_ptr<nano::container_info_component> nano::request_aggregator::collect_container_info (std::string const & name)
{
auto pools_count = aggregator.size ();
auto sizeof_element = sizeof (decltype (aggregator.requests)::value_type);
nano::lock_guard<nano::mutex> guard{ mutex };

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pools", pools_count, sizeof_element }));
composite->add_component (queue.collect_container_info ("queue"));
return composite;
}
Loading