Skip to content

Commit

Permalink
Merge pull request #4584 from pwojcikdev/fair-queuing/bootstrap-server
Browse files Browse the repository at this point in the history
Fair queuing for bootstrap server
  • Loading branch information
pwojcikdev authored Apr 26, 2024
2 parents d15849e + 79e22e9 commit ef9e02a
Show file tree
Hide file tree
Showing 15 changed files with 220 additions and 57 deletions.
2 changes: 1 addition & 1 deletion nano/core_test/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace
class responses_helper final
{
public:
void add (nano::asc_pull_ack & ack)
void add (nano::asc_pull_ack const & ack)
{
nano::lock_guard<nano::mutex> lock{ mutex };
responses.push_back (ack);
Expand Down
14 changes: 14 additions & 0 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ TEST (toml, daemon_config_deserialize_defaults)
[node.websocket]
[node.lmdb]
[node.rocksdb]
[node.bootstrap_server]
[opencl]
[rpc]
[rpc.child_process]
Expand Down Expand Up @@ -266,6 +267,10 @@ TEST (toml, daemon_config_deserialize_defaults)
ASSERT_EQ (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_EQ (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_EQ (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);

ASSERT_EQ (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_EQ (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_EQ (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);
}

TEST (toml, optional_child)
Expand Down Expand Up @@ -563,6 +568,11 @@ TEST (toml, daemon_config_deserialize_no_defaults)
max_non_pr_queue = 999
pr_priority = 999
[node.bootstrap_server]
max_queue = 999
threads = 999
batch_size = 999
[opencl]
device = 999
enable = true
Expand Down Expand Up @@ -714,6 +724,10 @@ TEST (toml, daemon_config_deserialize_no_defaults)
ASSERT_NE (conf.node.vote_processor.max_pr_queue, defaults.node.vote_processor.max_pr_queue);
ASSERT_NE (conf.node.vote_processor.max_non_pr_queue, defaults.node.vote_processor.max_non_pr_queue);
ASSERT_NE (conf.node.vote_processor.pr_priority, defaults.node.vote_processor.pr_priority);

ASSERT_NE (conf.node.bootstrap_server.max_queue, defaults.node.bootstrap_server.max_queue);
ASSERT_NE (conf.node.bootstrap_server.threads, defaults.node.bootstrap_server.threads);
ASSERT_NE (conf.node.bootstrap_server.batch_size, defaults.node.bootstrap_server.batch_size);
}

/** There should be no required values **/
Expand Down
7 changes: 4 additions & 3 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ enum class type : uint8_t
blockprocessor_result,
blockprocessor_overfill,
bootstrap_server,
bootstrap_server_request,
bootstrap_server_overfill,
bootstrap_server_response,
active,
active_started,
active_confirmed,
Expand Down Expand Up @@ -318,11 +321,9 @@ enum class detail : uint8_t
response,
write_error,
blocks,
response_blocks,
response_account_info,
channel_full,
response_frontiers,
frontiers,
account_info,

// backlog
activated,
Expand Down
1 change: 1 addition & 0 deletions nano/node/backlog_population.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/nodeconfig.hpp>
Expand Down
154 changes: 130 additions & 24 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <nano/lib/blocks.hpp>
#include <nano/lib/thread_roles.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/transport/channel.hpp>
Expand All @@ -9,31 +10,53 @@
#include <nano/store/component.hpp>
#include <nano/store/confirmation_height.hpp>

// TODO: Make threads configurable
nano::bootstrap_server::bootstrap_server (nano::store::component & store_a, nano::ledger & ledger_a, nano::network_constants const & network_constants_a, nano::stats & stats_a) :
nano::bootstrap_server::bootstrap_server (bootstrap_server_config const & config_a, nano::store::component & store_a, nano::ledger & ledger_a, nano::network_constants const & network_constants_a, nano::stats & stats_a) :
config{ config_a },
store{ store_a },
ledger{ ledger_a },
network_constants{ network_constants_a },
stats{ stats_a },
request_queue{ stats, nano::stat::type::bootstrap_server, nano::thread_role::name::bootstrap_server, /* threads */ 1, /* max size */ 1024 * 16, /* max batch */ 128 }
stats{ stats_a }
{
request_queue.process_batch = [this] (auto & batch) {
process_batch (batch);
queue.max_size_query = [this] (auto const & origin) {
return config.max_queue;
};

queue.priority_query = [this] (auto const & origin) {
return size_t{ 1 };
};
}

nano::bootstrap_server::~bootstrap_server ()
{
debug_assert (threads.empty ());
}

void nano::bootstrap_server::start ()
{
request_queue.start ();
debug_assert (threads.empty ());

for (auto i = 0u; i < config.threads; ++i)
{
threads.push_back (std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::bootstrap_server);
run ();
}));
}
}

void nano::bootstrap_server::stop ()
{
request_queue.stop ();
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();

for (auto & thread : threads)
{
thread.join ();
}
threads.clear ();
}

bool nano::bootstrap_server::verify_request_type (nano::asc_pull_type type) const
Expand Down Expand Up @@ -96,13 +119,30 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s
return false;
}

request_queue.add (std::make_pair (message, channel));
return true;
bool added = false;
{
std::lock_guard guard{ mutex };
added = queue.push ({ message, channel }, { nano::no_value{}, channel });
}
if (added)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::request);
stats.inc (nano::stat::type::bootstrap_server_request, to_stat_detail (message.type));

condition.notify_one ();
}
else
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::overfill);
stats.inc (nano::stat::type::bootstrap_server_overfill, to_stat_detail (message.type));
}
return added;
}

void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared_ptr<nano::transport::channel> & channel)
void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared_ptr<nano::transport::channel> const & channel)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response, nano::stat::dir::out);
stats.inc (nano::stat::type::bootstrap_server_response, to_stat_detail (response.type));

// Increase relevant stats depending on payload type
struct stat_visitor
Expand All @@ -115,16 +155,13 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared
}
void operator() (nano::asc_pull_ack::blocks_payload const & pld)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_blocks, nano::stat::dir::out);
stats.add (nano::stat::type::bootstrap_server, nano::stat::detail::blocks, nano::stat::dir::out, pld.blocks.size ());
}
void operator() (nano::asc_pull_ack::account_info_payload const & pld)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_account_info, nano::stat::dir::out);
}
void operator() (nano::asc_pull_ack::frontiers_payload const & pld)
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::response_frontiers, nano::stat::dir::out);
stats.add (nano::stat::type::bootstrap_server, nano::stat::detail::frontiers, nano::stat::dir::out, pld.frontiers.size ());
}
};
Expand All @@ -142,16 +179,44 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
}

/*
* Requests
*/
void nano::bootstrap_server::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (!queue.empty ())
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::loop);

void nano::bootstrap_server::process_batch (std::deque<request_t> & batch)
run_batch (lock);
debug_assert (!lock.owns_lock ());

lock.lock ();
}
else
{
condition.wait (lock, [this] () { return stopped || !queue.empty (); });
}
}
}

void nano::bootstrap_server::run_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!queue.empty ());

debug_assert (config.batch_size > 0);
auto batch = queue.next_batch (config.batch_size);

lock.unlock ();

auto transaction = ledger.tx_begin_read ();

for (auto & [request, channel] : batch)
for (auto const & [value, origin] : batch)
{
auto const & [request, channel] = value;

transaction.refresh_if_needed ();

if (!channel->max (nano::transport::traffic_type::bootstrap))
Expand Down Expand Up @@ -185,7 +250,7 @@ nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const &,
* Blocks request
*/

nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request)
nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::blocks_payload const & request) const
{
const std::size_t count = std::min (static_cast<std::size_t> (request.count), max_blocks);

Expand Down Expand Up @@ -215,7 +280,7 @@ nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const &
return prepare_empty_blocks_response (id);
}

nano::asc_pull_ack nano::bootstrap_server::prepare_response (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count)
nano::asc_pull_ack nano::bootstrap_server::prepare_response (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::block_hash start_block, std::size_t count) const
{
debug_assert (count <= max_blocks); // Should be filtered out earlier

Expand All @@ -234,7 +299,7 @@ nano::asc_pull_ack nano::bootstrap_server::prepare_response (secure::transaction
return response;
}

nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano::asc_pull_req::id_t id)
nano::asc_pull_ack nano::bootstrap_server::prepare_empty_blocks_response (nano::asc_pull_req::id_t id) const
{
nano::asc_pull_ack response{ network_constants };
response.id = id;
Expand Down Expand Up @@ -270,7 +335,7 @@ std::vector<std::shared_ptr<nano::block>> nano::bootstrap_server::prepare_blocks
* Account info request
*/

nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::account_info_payload const & request)
nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::account_info_payload const & request) const
{
nano::asc_pull_ack response{ network_constants };
response.id = id;
Expand Down Expand Up @@ -320,7 +385,7 @@ nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const &
* Frontiers request
*/

nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::frontiers_payload const & request)
nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const & transaction, nano::asc_pull_req::id_t id, nano::asc_pull_req::frontiers_payload const & request) const
{
debug_assert (request.count <= max_frontiers); // Should be filtered out earlier

Expand All @@ -339,3 +404,44 @@ nano::asc_pull_ack nano::bootstrap_server::process (secure::transaction const &
response.update_header ();
return response;
}

/*
*
*/

nano::stat::detail nano::to_stat_detail (nano::asc_pull_type type)
{
switch (type)
{
case asc_pull_type::blocks:
return nano::stat::detail::blocks;
case asc_pull_type::account_info:
return nano::stat::detail::account_info;
case asc_pull_type::frontiers:
return nano::stat::detail::frontiers;
default:
return nano::stat::detail::invalid;
}
}

/*
* bootstrap_server_config
*/

nano::error nano::bootstrap_server_config::serialize (nano::tomlconfig & toml) const
{
toml.put ("max_queue", max_queue, "Maximum number of queued requests per peer. \ntype:uint64");
toml.put ("threads", threads, "Number of threads to process requests. \ntype:uint64");
toml.put ("batch_size", batch_size, "Maximum number of requests to process in a single batch. \ntype:uint64");

return toml.get_error ();
}

nano::error nano::bootstrap_server_config::deserialize (nano::tomlconfig & toml)
{
toml.get ("max_queue", max_queue);
toml.get ("threads", threads);
toml.get ("batch_size", batch_size);

return toml.get_error ();
}
Loading

0 comments on commit ef9e02a

Please sign in to comment.