Skip to content

Commit

Permalink
Prioritize node wallet frontiers during background confirmations (#2154)
Browse files Browse the repository at this point in the history
* Prioritize node wallet frontiers during background confirmation height updating

* Formatting

* Review comments

* Backwards iterating not standard with unordered collections
  • Loading branch information
wezrule authored Aug 5, 2019
1 parent 83e5c50 commit 7818456
Show file tree
Hide file tree
Showing 5 changed files with 203 additions and 87 deletions.
54 changes: 39 additions & 15 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3042,12 +3042,7 @@ TEST (confirmation_height, prioritize_frontiers)
nano::keypair key2;
nano::keypair key3;
nano::keypair key4;
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
nano::block_hash latest1 (system.nodes[0]->latest (nano::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key1.prv);
system.wallet (0)->insert_adhoc (key2.prv);
system.wallet (0)->insert_adhoc (key3.prv);
system.wallet (0)->insert_adhoc (key4.prv);

// Send different numbers of blocks all accounts
nano::send_block send1 (latest1, key1.pub, node->config.online_weight_minimum.number () + 10000, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest1));
Expand Down Expand Up @@ -3095,20 +3090,49 @@ TEST (confirmation_height, prioritize_frontiers)
}

auto transaction = node->store.tx_begin_read ();
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1));
constexpr auto num_accounts = 5;
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts);
// Check the order of accounts is as expected (greatest number of uncemented blocks at the front). key3 and key4 have the same value, the order is unspecified so check both
std::array<nano::account, num_accounts> desired_order_1 { nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2 { nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
// clang-format off
auto priority_orders_match = [&priority_cementable_frontiers = node->active.priority_cementable_frontiers](std::array<nano::account, num_accounts> const & desired_order) {
return std::equal (desired_order.begin (), desired_order.end (), priority_cementable_frontiers.get<1> ().begin (), priority_cementable_frontiers.get<1> ().end (), [](nano::account const & account, nano::cementable_account const & cementable_account) {
auto priority_orders_match = [](auto const & cementable_frontiers, auto const & desired_order) {
return std::equal (desired_order.begin (), desired_order.end (), cementable_frontiers.template get<1> ().begin (), cementable_frontiers.template get<1> ().end (), [](nano::account const & account, nano::cementable_account const & cementable_account) {
return (account == cementable_account.account);
});
};
// clang-format on
ASSERT_TRUE (priority_orders_match (desired_order_1) || priority_orders_match (desired_order_2));
{
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts);
// Check the order of accounts is as expected (greatest number of uncemented blocks at the front). key3 and key4 have the same value, the order is unspecified so check both
std::array<nano::account, num_accounts> desired_order_1{ nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2{ nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_cementable_frontiers, desired_order_2));
}

{
// Add some to the local node wallets and check ordering of both containers
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (key1.prv);
system.wallet (0)->insert_adhoc (key2.prv);
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), num_accounts - 3);
ASSERT_EQ (node->active.priority_wallet_cementable_frontiers_size (), num_accounts - 2);
std::array<nano::account, 3> local_desired_order{ nano::genesis_account, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, local_desired_order));
std::array<nano::account, 2> desired_order_1{ key3.pub, key4.pub };
std::array<nano::account, 2> desired_order_2{ key4.pub, key3.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_cementable_frontiers, desired_order_2));
}

{
// Add the remainder of accounts to node wallets and check size/ordering is correct
system.wallet (0)->insert_adhoc (key3.prv);
system.wallet (0)->insert_adhoc (key4.prv);
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_EQ (node->active.priority_cementable_frontiers_size (), 0);
ASSERT_EQ (node->active.priority_wallet_cementable_frontiers_size (), num_accounts);
std::array<nano::account, num_accounts> desired_order_1{ nano::genesis_account, key3.pub, key4.pub, key1.pub, key2.pub };
std::array<nano::account, num_accounts> desired_order_2{ nano::genesis_account, key4.pub, key3.pub, key1.pub, key2.pub };
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, desired_order_1) || priority_orders_match (node->active.priority_wallet_cementable_frontiers, desired_order_2));
}

// Check that accounts which already exist have their order modified when the uncemented count changes.
nano::send_block send12 (send9.hash (), nano::test_genesis_key.pub, 100, key3.prv, key3.pub, system.work.generate (send9.hash ()));
Expand All @@ -3127,8 +3151,8 @@ TEST (confirmation_height, prioritize_frontiers)
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send17).code);
}
transaction.refresh ();
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1));
ASSERT_TRUE (priority_orders_match ({ key3.pub, nano::genesis_account, key4.pub, key1.pub, key2.pub }));
node->active.prioritize_frontiers_for_confirmation (transaction, std::chrono::seconds (1), std::chrono::seconds (1));
ASSERT_TRUE (priority_orders_match (node->active.priority_wallet_cementable_frontiers, std::array<nano::account, num_accounts>{ key3.pub, nano::genesis_account, key4.pub, key1.pub, key2.pub }));

// Check that the active transactions roots contains the frontiers
{
Expand Down
214 changes: 150 additions & 64 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,39 +58,44 @@ void nano::active_transactions::confirm_frontiers (nano::transaction const & tra
}

// Spend time prioritizing accounts to reduce voting traffic
auto time_to_spend_prioritizing = (frontiers_fully_confirmed ? std::chrono::milliseconds (200) : std::chrono::seconds (2));
prioritize_frontiers_for_confirmation (transaction_a, is_test_network ? std::chrono::milliseconds (50) : time_to_spend_prioritizing);
auto time_spent_prioritizing_ledger_accounts = (frontiers_fully_confirmed ? std::chrono::milliseconds (200) : std::chrono::seconds (2));
auto time_spent_prioritizing_wallet_accounts = std::chrono::milliseconds (50);
prioritize_frontiers_for_confirmation (transaction_a, is_test_network ? std::chrono::milliseconds (50) : time_spent_prioritizing_ledger_accounts, time_spent_prioritizing_wallet_accounts);

size_t elections_count (0);
lk.lock ();
while (!priority_cementable_frontiers.empty () && !stopped && elections_count < max_elections)
{
auto cementable_account_front_it = priority_cementable_frontiers.get<1> ().begin ();
auto cementable_account = *cementable_account_front_it;
priority_cementable_frontiers.get<1> ().erase (cementable_account_front_it);
lk.unlock ();
nano::account_info info;
auto error = node.store.account_get (transaction_a, cementable_account.account, info);
release_assert (!error);
uint64_t confirmation_height;
error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height);
release_assert (!error);

if (info.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info.head))
auto start_elections_for_prioritized_frontiers = [&transaction_a, &elections_count, max_elections, &lk, &representative, this](prioritize_num_uncemented & cementable_frontiers) {
while (!cementable_frontiers.empty () && !this->stopped && elections_count < max_elections)
{
auto block (node.store.block_get (transaction_a, info.head));
if (!start (block))
auto cementable_account_front_it = cementable_frontiers.get<1> ().begin ();
auto cementable_account = *cementable_account_front_it;
cementable_frontiers.get<1> ().erase (cementable_account_front_it);
lk.unlock ();
nano::account_info info;
auto error = node.store.account_get (transaction_a, cementable_account.account, info);
release_assert (!error);
uint64_t confirmation_height;
error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height);
release_assert (!error);

if (info.block_count > confirmation_height && !this->node.pending_confirmation_height.is_processing_block (info.head))
{
++elections_count;
// Calculate votes for local representatives
if (representative)
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block))
{
node.block_processor.generator.add (block->hash ());
++elections_count;
// Calculate votes for local representatives
if (representative)
{
this->node.block_processor.generator.add (block->hash ());
}
}
}
lk.lock ();
}
lk.lock ();
}
};
start_elections_for_prioritized_frontiers (priority_cementable_frontiers);
start_elections_for_prioritized_frontiers (priority_wallet_cementable_frontiers);
frontiers_fully_confirmed = (elections_count < max_elections);
// 4 times slower check if all frontiers were confirmed
auto fully_confirmed_factor = frontiers_fully_confirmed ? 4 : 1;
Expand Down Expand Up @@ -333,77 +338,151 @@ void nano::active_transactions::request_loop ()
}
}

void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds time_a)
void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height)
{
if (info_a.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head))
{
auto num_uncemented = info_a.block_count - confirmation_height;
std::lock_guard<std::mutex> guard (mutex);
auto it = cementable_frontiers_a.find (account_a);
if (it != cementable_frontiers_a.end ())
{
if (it->blocks_uncemented != num_uncemented)
{
// Account already exists and there is now a different uncemented block count so update it in the container
cementable_frontiers_a.modify (it, [num_uncemented](nano::cementable_account & info) {
info.blocks_uncemented = num_uncemented;
});
}
}
else
{
assert (cementable_frontiers_size_a <= max_priority_cementable_frontiers);
if (cementable_frontiers_size_a == max_priority_cementable_frontiers)
{
// The maximum amount of frontiers stored has been reached. Check if the current frontier
// has more uncemented blocks than the lowest uncemented frontier in the collection if so replace it.
auto least_uncemented_frontier_it = cementable_frontiers_a.get<1> ().end ();
--least_uncemented_frontier_it;
if (num_uncemented > least_uncemented_frontier_it->blocks_uncemented)
{
cementable_frontiers_a.get<1> ().erase (least_uncemented_frontier_it);
cementable_frontiers_a.emplace (account_a, num_uncemented);
}
}
else
{
cementable_frontiers_a.emplace (account_a, num_uncemented);
}
}
cementable_frontiers_size_a = cementable_frontiers_a.size ();
}
}

void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
nano::timer<std::chrono::milliseconds> timer;
timer.start ();
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
{
auto i (node.store.latest_begin (transaction_a, next_frontier_account));
auto n (node.store.latest_end ());
std::unique_lock<std::mutex> lk (mutex);
auto priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
auto priority_wallet_cementable_frontiers_size = priority_wallet_cementable_frontiers.size ();
lk.unlock ();
for (; i != n && !stopped; ++i)
{
auto const & account (i->first);
auto const & info (i->second);

uint64_t confirmation_height;
release_assert (!node.store.confirmation_height_get (transaction_a, account, confirmation_height));
nano::timer<std::chrono::milliseconds> wallet_account_timer;
wallet_account_timer.start ();

if (info.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info.head))
if (!skip_wallets)
{
// Prioritize wallet accounts first
{
auto num_uncemented = info.block_count - confirmation_height;
lk.lock ();
auto it = priority_cementable_frontiers.find (account);
if (it != priority_cementable_frontiers.end ())
std::lock_guard<std::mutex> lock (node.wallets.mutex);
auto wallet_transaction (node.wallets.tx_begin_read ());
auto const & items = node.wallets.items;
for (auto item_it = items.cbegin (); item_it != items.cend (); ++item_it)
{
if (it->blocks_uncemented != num_uncemented)
// Skip this wallet if it has been traversed already while there are others still awaiting
if (wallet_accounts_already_iterated.find (item_it->first) != wallet_accounts_already_iterated.end ())
{
// Account already exists and there is now a different uncemented block count so update it in the container
priority_cementable_frontiers.modify (it, [num_uncemented](nano::cementable_account & info) {
info.blocks_uncemented = num_uncemented;
});
continue;
}
}
else
{
assert (priority_cementable_frontiers_size <= max_priority_cementable_frontiers);
if (priority_cementable_frontiers_size == max_priority_cementable_frontiers)

nano::account_info info;
auto & wallet (item_it->second);
std::lock_guard<std::recursive_mutex> wallet_lock (wallet->store.mutex);

auto & next_wallet_frontier_account = next_wallet_frontier_accounts.emplace (item_it->first, wallet_store::special_count).first->second;

auto i (wallet->store.begin (wallet_transaction, next_wallet_frontier_account));
auto n (wallet->store.end ());
uint64_t confirmation_height = 0;
for (; i != n; ++i)
{
// The maximum amount of frontiers stored has been reached. Check if the current frontier
// has more uncemented blocks than the lowest uncemented frontier in the collection if so replace it.
auto least_uncemented_frontier_it = priority_cementable_frontiers.get<1> ().end ();
--least_uncemented_frontier_it;
if (num_uncemented > least_uncemented_frontier_it->blocks_uncemented)
auto & account (i->first);
if (!node.store.account_get (transaction_a, account, info) && !node.store.confirmation_height_get (transaction_a, account, confirmation_height))
{
priority_cementable_frontiers.get<1> ().erase (least_uncemented_frontier_it);
priority_cementable_frontiers.emplace (account, num_uncemented);
// If it exists in normal priority collection delete from there.
auto it = priority_cementable_frontiers.find (account);
if (it != priority_cementable_frontiers.end ())
{
priority_cementable_frontiers.erase (it);
}

prioritize_account_for_confirmation (priority_wallet_cementable_frontiers, priority_wallet_cementable_frontiers_size, account, info, confirmation_height);

if (wallet_account_timer.since_start () >= wallet_account_time_a)
{
break;
}
}
next_wallet_frontier_account = account.number () + 1;
}
else
// Go back to the beginning when we have reached the end of the wallet accounts for this wallet
if (i == n)
{
priority_cementable_frontiers.emplace (account, num_uncemented);
wallet_accounts_already_iterated.emplace (item_it->first);
next_wallet_frontier_accounts.at (item_it->first) = wallet_store::special_count;

// Skip wallet accounts when they have all been traversed
if (std::next (item_it) == items.cend ())
{
wallet_accounts_already_iterated.clear ();
skip_wallets = true;
}
}
}
priority_cementable_frontiers_size = priority_cementable_frontiers.size ();
lk.unlock ();
}
}

next_frontier_account = account.number () + 1;
nano::timer<std::chrono::milliseconds> timer;
timer.start ();

if (timer.since_start () >= time_a)
auto i (node.store.latest_begin (transaction_a, next_frontier_account));
auto n (node.store.latest_end ());
uint64_t confirmation_height = 0;
for (; i != n && !stopped; ++i)
{
auto const & account (i->first);
auto const & info (i->second);
if (priority_wallet_cementable_frontiers.find (account) == priority_wallet_cementable_frontiers.end ())
{
if (!node.store.confirmation_height_get (transaction_a, account, confirmation_height))
{
prioritize_account_for_confirmation (priority_cementable_frontiers, priority_cementable_frontiers_size, account, info, confirmation_height);
}
}
next_frontier_account = account.number () + 1;
if (timer.since_start () >= ledger_accounts_time_a)
{
break;
}
}

// Go back to the beginning when we have reached the end of the accounts
// Go back to the beginning when we have reached the end of the accounts and start with wallet accounts next time
if (i == n)
{
next_frontier_account = 0;
skip_wallets = false;
}
}
}
Expand Down Expand Up @@ -783,6 +862,12 @@ size_t nano::active_transactions::priority_cementable_frontiers_size ()
return priority_cementable_frontiers.size ();
}

size_t nano::active_transactions::priority_wallet_cementable_frontiers_size ()
{
std::lock_guard<std::mutex> guard (mutex);
return priority_wallet_cementable_frontiers.size ();
}

boost::circular_buffer<double> nano::active_transactions::difficulty_trend ()
{
std::lock_guard<std::mutex> guard (mutex);
Expand Down Expand Up @@ -813,6 +898,7 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (active_transaction
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
return composite;
}
Expand Down
Loading

0 comments on commit 7818456

Please sign in to comment.