Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce active mutex locking with election winner details #2600

Merged
merged 3 commits into from
Feb 26, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ TEST (confirmation_height, single)
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (1, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));

ASSERT_EQ (0, node->active.election_winner_details_size ());
}
};

Expand Down Expand Up @@ -213,6 +215,8 @@ TEST (confirmation_height, multiple_accounts)
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));

ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -288,6 +292,8 @@ TEST (confirmation_height, gap_bootstrap)
ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));

ASSERT_EQ (0, node1.active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -374,6 +380,8 @@ TEST (confirmation_height, gap_live)
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));

ASSERT_EQ (0, node->active.election_winner_details_size ());
}
};

Expand Down Expand Up @@ -460,6 +468,8 @@ TEST (confirmation_height, send_receive_between_2_accounts)
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (10, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (11, node->ledger.cache.cemented_count);

ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -524,6 +534,7 @@ TEST (confirmation_height, send_receive_self)
ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (6, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (confirmation_height_info.height, node->ledger.cache.cemented_count);
ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -635,6 +646,8 @@ TEST (confirmation_height, all_block_types)
ASSERT_EQ (15, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (15, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (16, node->ledger.cache.cemented_count);

ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -743,6 +756,7 @@ TEST (confirmation_height, observers)
ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (0, node1->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -792,6 +806,7 @@ TEST (confirmation_height, modified_chain)
}

ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in));
ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -836,6 +851,7 @@ TEST (confirmation_height, pending_observer_callbacks)

ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (0, node->active.election_winner_details_size ());
};

test_mode (nano::confirmation_height_mode::bounded);
Expand Down Expand Up @@ -1181,6 +1197,7 @@ TEST (confirmation_height, dependent_election)

{
// The write guard prevents the confirmation height processor doing any writes.
// Note: This test could still fail intermittently due to thread scheduling between active and confirmation height.
system.deadline_set (10s);
auto write_guard = node->write_database_queue.wait (nano::writer::testing);
while (!node->write_database_queue.contains (nano::writer::confirmation_height))
Expand Down Expand Up @@ -1283,6 +1300,7 @@ TEST (confirmation_height, cemented_gap_below_receive)
ASSERT_EQ (9, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
ASSERT_EQ (0, node->active.election_winner_details_size ());

// Check that the order of callbacks is correct
std::vector<nano::block_hash> expected_order = { send.hash (), open.hash (), send1.hash (), receive1.hash (), send2.hash (), dummy_send.hash (), receive2.hash (), dummy_send1.hash (), send3.hash (), open1->hash () };
Expand Down Expand Up @@ -1368,6 +1386,7 @@ TEST (confirmation_height, cemented_gap_below_no_cache)

auto transaction = node->store.tx_begin_read ();
ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ()));
ASSERT_EQ (node->active.election_winner_details_size (), 0);
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out));
ASSERT_EQ (5, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
Expand Down Expand Up @@ -1395,13 +1414,13 @@ TEST (confirmation_height, election_winner_details_clearing)
nano::keypair key1;
auto send = std::make_shared<nano::send_block> (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest));
auto send1 = std::make_shared<nano::send_block> (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send->hash ()));
nano::send_block send2 (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()));
auto send2 = std::make_shared<nano::send_block> (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ()));

{
auto transaction = node->store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send1).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code);
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send2).code);
}

add_callback_stats (*node);
Expand Down Expand Up @@ -1442,8 +1461,16 @@ TEST (confirmation_height, election_winner_details_clearing)

ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));

// election_winner_details should get cleared during another batch of cementing, so add another block
node->confirmation_height_processor.add (send2.hash ());
node->block_confirm (send2);
system.deadline_set (10s);
while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 3)
{
ASSERT_NO_ERROR (system.poll ());
}

// Add an already cemented block with fake election details. It should get removed
node->active.add_election_winner_details (send2->hash (), nullptr);
node->confirmation_height_processor.add (send2->hash ());

system.deadline_set (10s);
while (node->active.election_winner_details_size () > 0)
Expand All @@ -1452,8 +1479,8 @@ TEST (confirmation_height, election_winner_details_clearing)
}

ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out));
ASSERT_EQ (2, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (3, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out));
ASSERT_EQ (2, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out));
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in));
ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, get_stats_detail (mode_a), nano::stat::dir::in));
};
Expand Down
46 changes: 22 additions & 24 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ thread ([this]() {
})
{
// Register a callback which will get called after a block is cemented
confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data) {
this->block_cemented_callback (callback_data.block, callback_data.sideband);
confirmation_height_processor.add_cemented_observer ([this](nano::block_w_sideband const & callback_data_a) {
this->block_cemented_callback (callback_data_a.block, callback_data_a.sideband);
});

// Register a callback which will get called after a batch of blocks is written and observer calls finished
confirmation_height_processor.add_cemented_process_finished_observer ([this]() {
this->cemented_batch_finished_callback ();
confirmation_height_processor.add_block_already_cemented_observer ([this](nano::block_hash const & hash_a) {
this->block_already_cemented_callback (hash_a);
});

debug_assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
Expand Down Expand Up @@ -153,15 +153,20 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
nano::unique_lock<std::mutex> election_winners_lk (election_winner_details_mutex);
auto existing (election_winner_details.find (hash));
if (existing != election_winner_details.end ())
{
auto election = existing->second;
election_winner_details.erase (hash);
election_winners_lk.unlock ();
// Make sure mutex is held before election usage so we know that confirm_once has
// finished removing the root from active to avoid any data race.
nano::unique_lock<std::mutex> lk (mutex);
if (election->confirmed () && !election->stopped && election->status.winner->hash () == hash)
{
add_confirmed (existing->second->status, block_a->qualified_root ());

add_confirmed (election->status, block_a->qualified_root ());
lk.unlock ();
node.receive_confirmed (transaction, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
Expand All @@ -180,32 +185,25 @@ void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::b
}
}
}

election_winner_details.erase (hash);
}
}
}
}

void nano::active_transactions::cemented_batch_finished_callback ()
void nano::active_transactions::add_election_winner_details (nano::block_hash const & hash_a, std::shared_ptr<nano::election> const & election_a)
{
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
election_winner_details.emplace (hash_a, election_a);
}

void nano::active_transactions::block_already_cemented_callback (nano::block_hash const & hash_a)
{
// Depending on timing there is a situation where the election_winner_details is not reset.
// This can happen when a block wins an election, and the block is confirmed + observer
// called before the block hash gets added to election_winner_details. If the block is confirmed
// callbacks have already been done, so we can safely just remove it.
auto transaction = node.store.tx_begin_read ();
nano::lock_guard<std::mutex> guard (mutex);
for (auto it = election_winner_details.begin (); it != election_winner_details.end ();)
{
if (node.ledger.block_confirmed (transaction, it->first))
{
it = election_winner_details.erase (it);
}
else
{
++it;
}
}
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
election_winner_details.erase (hash_a);
}

void nano::active_transactions::election_escalate (std::shared_ptr<nano::election> & election_l, nano::transaction const & transaction_l, size_t const & roots_size_l)
Expand Down Expand Up @@ -1074,7 +1072,7 @@ bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector<nano

size_t nano::active_transactions::election_winner_details_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
nano::lock_guard<std::mutex> guard (election_winner_details_mutex);
return election_winner_details.size ();
}

Expand Down
8 changes: 6 additions & 2 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class active_transactions final
bool publish (std::shared_ptr<nano::block> block_a);
boost::optional<nano::election_status_type> confirm_block (nano::transaction const &, std::shared_ptr<nano::block>);
void block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
void cemented_batch_finished_callback ();
void block_already_cemented_callback (nano::block_hash const &);
// clang-format off
boost::multi_index_container<nano::conflict_info,
mi::indexed_by<
Expand All @@ -137,11 +137,14 @@ class active_transactions final
size_t priority_wallet_cementable_frontiers_size ();
boost::circular_buffer<double> difficulty_trend ();
size_t inactive_votes_cache_size ();
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
size_t election_winner_details_size ();
void add_election_winner_details (nano::block_hash const &, std::shared_ptr<nano::election> const &);
nano::confirmation_solicitor solicitor;

private:
std::mutex election_winner_details_mutex;
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;

// Call action with confirmed block, may be different than what we started with
// clang-format off
std::pair<std::shared_ptr<nano::election>, bool> insert_impl (std::shared_ptr<nano::block>, bool const = false, std::function<void(std::shared_ptr<nano::block>)> const & = [](std::shared_ptr<nano::block>) {});
Expand Down Expand Up @@ -208,6 +211,7 @@ class active_transactions final

friend class confirmation_height_prioritize_frontiers_Test;
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);
};

std::unique_ptr<container_info_component> collect_container_info (active_transactions & active_transactions, const std::string & name);
Expand Down
11 changes: 10 additions & 1 deletion nano/node/confirmation_height_bounded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

#include <numeric>

nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<nano::block_w_sideband> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
ledger (ledger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
logger (logger_a),
stopped (stopped_a),
original_hash (original_hash_a),
notify_observers_callback (notify_observers_callback_a),
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
{
}
Expand Down Expand Up @@ -57,6 +58,7 @@ void nano::confirmation_height_bounded::process ()
boost::circular_buffer_space_optimized<nano::block_hash> checkpoints{ max_items };
boost::circular_buffer_space_optimized<receive_source_pair> receive_source_pairs{ max_items };
nano::block_hash current;
bool first_iter = true;
auto transaction (ledger.store.tx_begin_read ());
do
{
Expand Down Expand Up @@ -85,7 +87,13 @@ void nano::confirmation_height_bounded::process ()
else
{
auto error = ledger.store.confirmation_height_get (transaction, account, confirmation_height_info);
(void)error;
debug_assert (!error);
// This block was added to the confirmation height processor but is already confirmed
if (first_iter && confirmation_height_info.height >= sideband.height && current == original_hash)
{
notify_block_already_cemented_observers_callback (original_hash);
}
}

auto block_height = sideband.height;
Expand Down Expand Up @@ -181,6 +189,7 @@ void nano::confirmation_height_bounded::process ()
}
}

first_iter = false;
transaction.refresh ();
} while ((!receive_source_pairs.empty () || current != original_hash) && !stopped);

Expand Down
Loading