Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fair queuing for block processor #4476

Merged
merged 10 commits into from
Apr 4, 2024
Merged
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_executable(
election_scheduler.cpp
enums.cpp
epochs.cpp
fair_queue.cpp
frontiers_confirmation.cpp
ipc.cpp
ledger.cpp
Expand Down
276 changes: 276 additions & 0 deletions nano/core_test/fair_queue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
#include <nano/node/fair_queue.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>

#include <gtest/gtest.h>

#include <ranges>

using namespace std::chrono_literals;

namespace
{
enum class source_enum
{
unknown = 0,
live,
bootstrap,
bootstrap_legacy,
unchecked,
local,
forced,
};
}

TEST (fair_queue, construction)
{
nano::fair_queue<source_enum, int> queue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types are reversed compared to other tests.

ASSERT_EQ (queue.total_size (), 0);
ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, process_one)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 1; };

queue.push (7, { source_enum::live });
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 0);

auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, nullptr);

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, fifo)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };

queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 3);

{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::live);
}

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, process_many)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 1; };

queue.push (7, { source_enum::live });
queue.push (8, { source_enum::bootstrap });
queue.push (9, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 3);
ASSERT_EQ (queue.size ({ source_enum::live }), 1);
ASSERT_EQ (queue.size ({ source_enum::bootstrap }), 1);
ASSERT_EQ (queue.size ({ source_enum::unchecked }), 1);

{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::bootstrap);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::unchecked);
}

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, max_queue_size)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 2; };

queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
ASSERT_EQ (queue.total_size (), 2);
ASSERT_EQ (queue.queues_size (), 1);
ASSERT_EQ (queue.size ({ source_enum::live }), 2);

{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 7);
ASSERT_EQ (origin.source, source_enum::live);
}
{
auto [result, origin] = queue.next ();
ASSERT_EQ (result, 8);
ASSERT_EQ (origin.source, source_enum::live);
}

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, round_robin_with_priority)
{
nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const & origin) {
switch (origin.source)
{
case source_enum::live:
return 1;
case source_enum::bootstrap:
return 2;
case source_enum::unchecked:
return 3;
default:
return 0;
}
};
queue.max_size_query = [] (auto const &) { return 999; };

queue.push (7, { source_enum::live });
queue.push (8, { source_enum::live });
queue.push (9, { source_enum::live });
queue.push (10, { source_enum::bootstrap });
queue.push (11, { source_enum::bootstrap });
queue.push (12, { source_enum::bootstrap });
queue.push (13, { source_enum::unchecked });
queue.push (14, { source_enum::unchecked });
queue.push (15, { source_enum::unchecked });
ASSERT_EQ (queue.total_size (), 9);

// Processing 1x live, 2x bootstrap, 3x unchecked before moving to the next source
ASSERT_EQ (queue.next ().second.source, source_enum::live);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::unchecked);
ASSERT_EQ (queue.next ().second.source, source_enum::live);
ASSERT_EQ (queue.next ().second.source, source_enum::bootstrap);
ASSERT_EQ (queue.next ().second.source, source_enum::live);

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, source_channel)
{
nano::test::system system{ 1 };

nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };

auto channel1 = nano::test::fake_channel (system.node (0));
auto channel2 = nano::test::fake_channel (system.node (0));
auto channel3 = nano::test::fake_channel (system.node (0));

queue.push (6, { source_enum::live, channel1 });
queue.push (7, { source_enum::live, channel2 });
queue.push (8, { source_enum::live, channel3 });
queue.push (9, { source_enum::live, channel1 }); // Channel 1 has multiple entries
ASSERT_EQ (queue.total_size (), 4);
ASSERT_EQ (queue.queues_size (), 3); // Each <source, channel> pair is a separate queue

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 2);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);

auto all = queue.next_batch (999);
ASSERT_EQ (all.size (), 4);

auto filtered = [&] (auto const & channel) {
auto r = all | std::views::filter ([&] (auto const & entry) {
return entry.second.channel == channel;
});
std::vector vec (r.begin (), r.end ());
return vec;
};

auto channel1_results = filtered (channel1);
ASSERT_EQ (channel1_results.size (), 2);

{
auto [result, origin] = channel1_results[0];
ASSERT_EQ (result, 6);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, channel1);
}
{
auto [result, origin] = channel1_results[1];
ASSERT_EQ (result, 9);
ASSERT_EQ (origin.source, source_enum::live);
ASSERT_EQ (origin.channel, channel1);
}

ASSERT_TRUE (queue.empty ());
}

TEST (fair_queue, cleanup)
{
nano::test::system system{ 1 };

nano::fair_queue<int, source_enum> queue;
queue.priority_query = [] (auto const &) { return 1; };
queue.max_size_query = [] (auto const &) { return 999; };

auto channel1 = nano::test::fake_channel (system.node (0));
auto channel2 = nano::test::fake_channel (system.node (0));
auto channel3 = nano::test::fake_channel (system.node (0));

queue.push (7, { source_enum::live, channel1 });
queue.push (8, { source_enum::live, channel2 });
queue.push (9, { source_enum::live, channel3 });
ASSERT_EQ (queue.total_size (), 3);
ASSERT_EQ (queue.queues_size (), 3);

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 1);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);

// Either closing or resetting the channel should remove it from the queue
channel1->close ();
channel2.reset ();

ASSERT_TRUE (queue.periodic_update ());

// Only channel 3 should remain
ASSERT_EQ (queue.total_size (), 1);
ASSERT_EQ (queue.queues_size (), 1);

ASSERT_EQ (queue.size ({ source_enum::live, channel1 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel2 }), 0);
ASSERT_EQ (queue.size ({ source_enum::live, channel3 }), 1);
}
7 changes: 3 additions & 4 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,10 +768,9 @@ TEST (network, duplicate_detection)
TEST (network, duplicate_revert_publish)
{
nano::test::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::node_config node_config = system.default_config ();
node_config.block_processor.max_peer_queue = 0;
auto & node (*system.add_node (node_config));
nano::publish publish{ nano::dev::network_params.network, nano::dev::genesis };
std::vector<uint8_t> bytes;
{
Expand Down
5 changes: 3 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ TEST (node, expire)
ASSERT_TRUE (node0.expired ());
}

// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_publish)
{
nano::test::system system (1);
Expand Down Expand Up @@ -671,6 +672,7 @@ TEST (node, fork_keep)
ASSERT_TRUE (node2.ledger.block_exists (transaction1, send1->hash ()));
}

// This test is racy, there is no guarantee that the election won't be confirmed until all forks are fully processed
TEST (node, fork_flip)
{
nano::test::system system (2);
Expand All @@ -696,8 +698,7 @@ TEST (node, fork_flip)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build ();
nano::publish publish2{ nano::dev::network_params.network, send2 };
auto ignored_channel{ std::make_shared<nano::transport::channel_tcp> (node1, std::weak_ptr<nano::transport::socket> ()) };

auto ignored_channel = nano::test::fake_channel (node1);
node1.network.inbound (publish1, ignored_channel);
node2.network.inbound (publish2, ignored_channel);
ASSERT_TIMELY_EQ (5s, 1, node1.active.size ());
Expand Down
20 changes: 20 additions & 0 deletions nano/core_test/toml.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ TEST (toml, daemon_config_deserialize_defaults)
std::stringstream ss;
ss << R"toml(
[node]
[node.block_processor]
[node.diagnostics.txn_tracking]
[node.httpcallback]
[node.ipc.local]
Expand Down Expand Up @@ -254,6 +255,12 @@ TEST (toml, daemon_config_deserialize_defaults)

ASSERT_EQ (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_EQ (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);

ASSERT_EQ (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_EQ (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_EQ (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_EQ (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_EQ (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
}

TEST (toml, optional_child)
Expand Down Expand Up @@ -432,6 +439,13 @@ TEST (toml, daemon_config_deserialize_no_defaults)
backlog_scan_batch_size = 999
backlog_scan_frequency = 999

[node.block_processor]
max_peer_queue = 999
max_system_queue = 999
priority_live = 999
priority_bootstrap = 999
priority_local = 999

[node.diagnostics.txn_tracking]
enable = true
ignore_writes_below_block_processor_max_time = false
Expand Down Expand Up @@ -680,6 +694,12 @@ TEST (toml, daemon_config_deserialize_no_defaults)

ASSERT_NE (conf.node.vote_cache.max_size, defaults.node.vote_cache.max_size);
ASSERT_NE (conf.node.vote_cache.max_voters, defaults.node.vote_cache.max_voters);

ASSERT_NE (conf.node.block_processor.max_peer_queue, defaults.node.block_processor.max_peer_queue);
ASSERT_NE (conf.node.block_processor.max_system_queue, defaults.node.block_processor.max_system_queue);
ASSERT_NE (conf.node.block_processor.priority_live, defaults.node.block_processor.priority_live);
ASSERT_NE (conf.node.block_processor.priority_bootstrap, defaults.node.block_processor.priority_bootstrap);
ASSERT_NE (conf.node.block_processor.priority_local, defaults.node.block_processor.priority_local);
}

/** There should be no required values **/
Expand Down
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ enum class type : uint8_t
blockprocessor,
blockprocessor_source,
blockprocessor_result,
blockprocessor_overfill,
bootstrap_server,
active,
active_started,
Expand Down Expand Up @@ -85,6 +86,7 @@ enum class detail : uint8_t
success,
unknown,
cache,
queue_overflow,

// processing queue
queue,
Expand Down
1 change: 1 addition & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ add_library(
election_status.hpp
epoch_upgrader.hpp
epoch_upgrader.cpp
fair_queue.hpp
ipc/action_handler.hpp
ipc/action_handler.cpp
ipc/flatbuffers_handler.hpp
Expand Down
Loading
Loading