Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggressive flooding for local blocks #2549

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3613,6 +3613,84 @@ TEST (node, bandwidth_limiter)
node.stop ();
}

// Tests that local blocks are flooded to all principal representatives
TEST (node, aggressive_flooding)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
node_flags.disable_block_processor_republishing = true;
auto & node1 (*system.add_node (node_flags));
auto & wallet1 (*system.wallet (0));
wallet1.insert_adhoc (nano::test_genesis_key.prv);
std::array<std::pair<std::shared_ptr<nano::node>, std::shared_ptr<nano::wallet>>, 5> nodes_wallets{};
std::generate (nodes_wallets.begin (), nodes_wallets.end (), [&system, node_flags]() {
nano::node_config node_config;
node_config.peering_port = nano::get_available_port ();
auto node (system.add_node (node_config, node_flags));
return std::make_pair (node, system.wallet (system.nodes.size () - 1));
});
auto large_amount = (nano::genesis_amount / 2) / nodes_wallets.size ();
for (auto & node_wallet : nodes_wallets)
{
nano::keypair keypair;
node_wallet.second->store.representative_set (node_wallet.first->wallets.tx_begin_write (), keypair.pub);
node_wallet.second->insert_adhoc (keypair.prv);
wallet1.send_action (nano::test_genesis_key.pub, keypair.pub, large_amount);
}
// Wait until all nodes have a representative
system.deadline_set (!is_sanitizer_build ? 5s : 15s);
while (node1.rep_crawler.principal_representatives ().size () != nodes_wallets.size ())
{
ASSERT_NO_ERROR (system.poll ());
}
// Generate blocks and ensure they are sent to all representatives
nano::block_builder builder;
std::shared_ptr<nano::state_block> block{};
{
auto transaction (node1.store.tx_begin_read ());
block = builder.state ()
.account (nano::test_genesis_key.pub)
.representative (nano::test_genesis_key.pub)
.previous (node1.ledger.latest (transaction, nano::test_genesis_key.pub))
.balance (node1.ledger.account_balance (transaction, nano::test_genesis_key.pub) - 1)
.link (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node1.work_generate_blocking (node1.ledger.latest (transaction, nano::test_genesis_key.pub)))
.build ();
}
// Processing locally goes through the aggressive block flooding path
node1.process_local (block, false);

auto all_have_block = [&nodes_wallets](nano::block_hash const & hash_a) {
return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a](auto const & node_wallet) {
return node_wallet.first->block (hash) != nullptr;
});
};

system.deadline_set (!is_sanitizer_build ? 3s : 10s);
while (!all_have_block (block->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}

// Do the same for a wallet block
auto wallet_block = wallet1.send_sync (nano::test_genesis_key.pub, nano::test_genesis_key.pub, 10);
system.deadline_set (!is_sanitizer_build ? 3s : 10s);
while (!all_have_block (wallet_block))
{
ASSERT_NO_ERROR (system.poll ());
}

// Wait until the main node has all blocks: genesis + (send+open) for each representative + 2 local blocks
// The main node only sees all blocks if other nodes are flooding their PR's open block to all other PRs
system.deadline_set (5s);
while (node1.ledger.cache.block_count < 1 + 2 * nodes_wallets.size () + 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (active_difficulty, recalculate_work)
{
nano::system system;
Expand Down
15 changes: 11 additions & 4 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
}
}

void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a, const bool initial_publish_a)
{
// Add to work watcher to prevent dropping the election
if (watch_work_a)
Expand All @@ -373,15 +373,22 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
node.active.insert (block_a, false);

// Announce block contents to the network
node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop);
if (initial_publish_a)
{
node.network.flood_block_initial (block_a);
}
else if (!node.flags.disable_block_processor_republishing)
{
node.network.flood_block (block_a, nano::buffer_drop_policy::no_limiter_drop);
}
if (node.config.enable_voting && node.wallets.rep_counts ().voting > 0)
{
// Announce our weighted vote to the network
generator.add (hash_a);
}
}

nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, nano::unchecked_info info_a, const bool watch_work_a, const bool first_publish_a)
{
nano::process_return result;
auto hash (info_a.block->hash ());
Expand All @@ -399,7 +406,7 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
process_live (hash, info_a.block, watch_work_a);
process_live (hash, info_a.block, watch_work_a, first_publish_a);
}
queue_unchecked (transaction_a, hash);
break;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class block_processor final
bool should_log (bool);
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false);
nano::process_return process_one (nano::write_transaction const &, nano::unchecked_info, const bool = false, const bool = false);
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block>, const bool = false);
nano::vote_generator generator;
// Delay required for average network propagartion before requesting confirmation
Expand All @@ -52,7 +52,7 @@ class block_processor final
void queue_unchecked (nano::write_transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (nano::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false, const bool = false);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
bool stopped;
bool active;
Expand Down
19 changes: 19 additions & 0 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,25 @@ void nano::network::flood_message (nano::message const & message_a, nano::buffer
}
}

void nano::network::flood_block (std::shared_ptr<nano::block> const & block_a, nano::buffer_drop_policy const drop_policy_a)
{
nano::publish message (block_a);
flood_message (message, drop_policy_a);
}

void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & block_a)
{
nano::publish message (block_a);
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop);
}
for (auto & i : list_non_pr (fanout (1.0)))
{
i->send (message, nullptr, nano::buffer_drop_policy::no_limiter_drop);
}
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote_a, float scale)
{
nano::confirm_ack message (vote_a);
Expand Down
10 changes: 4 additions & 6 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,10 @@ class network final
}
void flood_vote (std::shared_ptr<nano::vote> const &, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote> const &);
void flood_block (std::shared_ptr<nano::block> block_a, nano::buffer_drop_policy drop_policy_a = nano::buffer_drop_policy::limiter)
{
nano::publish publish (block_a);
flood_message (publish, drop_policy_a);
}

// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &);
// Flood block to a random selection of peers
void flood_block (std::shared_ptr<nano::block> const &, nano::buffer_drop_policy const = nano::buffer_drop_policy::limiter);
void flood_block_many (std::deque<std::shared_ptr<nano::block>>, std::function<void()> = nullptr, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ nano::process_return nano::node::process_local (std::shared_ptr<nano::block> blo
block_processor.wait_write ();
// Process block
auto transaction (store.tx_begin_write ({ tables::accounts, tables::cached_counts, tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks }, { tables::confirmation_height }));
return block_processor.process_one (transaction, info, work_watcher_a);
return block_processor.process_one (transaction, info, work_watcher_a, true);
}

void nano::node::start ()
Expand Down
1 change: 1 addition & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ class node_flags final
bool disable_unchecked_drop{ true };
bool disable_providing_telemetry_metrics{ false };
bool disable_block_processor_unchecked_deletion{ false };
bool disable_block_processor_republishing{ false };
bool disable_ongoing_telemetry_requests{ false };
bool fast_bootstrap{ false };
bool read_only{ false };
Expand Down
1 change: 1 addition & 0 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,7 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha

if (!ec)
{
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
updated_l = true;
Expand Down