Skip to content

Commit

Permalink
Epoch upgrader as an async task (#2718)
Browse files Browse the repository at this point in the history
* Epoch upgrade on background instead of worker

Also fixes the RPC test not being multithreaded

* Epoch upgrade as an async task; fix limited count in multithreaded mode; improve tests

* Use node.epoch_upgrader directly in the slow test

* std min fix

* No need for std::bind (Wes review)
  • Loading branch information
guilhermelawless authored Apr 14, 2020
1 parent 1cd5078 commit a4d4675
Show file tree
Hide file tree
Showing 8 changed files with 151 additions and 92 deletions.
5 changes: 5 additions & 0 deletions nano/lib/locks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class locked
return owner->obj;
}

T & operator* () const
{
return get ();
}

locked * owner{ nullptr };
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::state_block_signature_verification:
thread_role_name_string = "State block sig";
break;
case nano::thread_role::name::epoch_upgrader:
thread_role_name_string = "Epoch upgrader";
break;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace thread_role
confirmation_height_processing,
worker,
request_aggregator,
state_block_signature_verification
state_block_signature_verification,
epoch_upgrader
};
/*
* Get/Set the identifier for the current thread
Expand Down
13 changes: 8 additions & 5 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2141,11 +2141,14 @@ void nano::json_handler::epoch_upgrade ()
{
if (nano::pub_key (prv) == node.ledger.epoch_signer (node.ledger.epoch_link (epoch)))
{
auto node_l (node.shared ());
node.worker.push_task ([node_l, prv, epoch, count_limit, threads]() {
node_l->epoch_upgrader (prv, epoch, count_limit, threads);
});
response_l.put ("started", "1");
if (!node.epoch_upgrader (prv, epoch, count_limit, threads))
{
response_l.put ("started", "1");
}
else
{
response_l.put ("started", "0");
}
}
else
{
Expand Down
33 changes: 29 additions & 4 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,11 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
worker.stop ();
auto epoch_upgrade = epoch_upgrading.lock ();
if (epoch_upgrade->valid ())
{
epoch_upgrade->wait ();
}
// work pool is not stopped on purpose due to testing setup
}
}
Expand Down Expand Up @@ -1346,8 +1351,24 @@ bool nano::node::init_error () const
return store.init_error () || wallets_store.init_error ();
}

void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
bool nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
bool error = stopped.load ();
if (!error)
{
auto epoch_upgrade = epoch_upgrading.lock ();
error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout;
if (!error)
{
*epoch_upgrade = std::async (std::launch::async, &nano::node::epoch_upgrader_impl, this, prv_a, epoch_a, count_limit, threads);
}
}
return error;
}

void nano::node::epoch_upgrader_impl (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
nano::thread_role::set (nano::thread_role::name::epoch_upgrader);
auto upgrader_process = [](nano::node & node_a, std::atomic<uint64_t> & counter, std::shared_ptr<nano::block> epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) {
epoch->block_work_set (node_a.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0));
bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ()));
Expand Down Expand Up @@ -1413,7 +1434,7 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
{
auto transaction (store.tx_begin_read ());
// Collect accounts to upgrade
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n; ++i)
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n && accounts_list.size () < count_limit; ++i)
{
nano::account const & account (i->first);
nano::account_info const & info (i->second);
Expand All @@ -1429,13 +1450,15 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
Repeat until accounts with previous epoch exist in latest table */
std::atomic<uint64_t> upgraded_accounts (0);
uint64_t workers (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && upgraded_accounts < upgrade_batch_size && upgraded_accounts < count_limit && !stopped; ++i)
uint64_t attempts (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i)
{
auto transaction (store.tx_begin_read ());
nano::account_info info;
nano::account const & account (i->account);
if (!store.account_get (transaction, account, info) && info.epoch () < epoch_a)
{
++attempts;
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (info.head);
std::shared_ptr<nano::block> epoch = builder.state ()
Expand Down Expand Up @@ -1501,8 +1524,9 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
{
std::atomic<uint64_t> upgraded_pending (0);
uint64_t workers (0);
uint64_t attempts (0);
auto transaction (store.tx_begin_read ());
for (auto i (store.pending_begin (transaction, nano::pending_key (1, 0))), n (store.pending_end ()); i != n && upgraded_pending < upgrade_batch_size && upgraded_pending < count_limit && !stopped;)
for (auto i (store.pending_begin (transaction, nano::pending_key (1, 0))), n (store.pending_end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;)
{
bool to_next_account (false);
nano::pending_key const & key (i->first);
Expand All @@ -1511,6 +1535,7 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
nano::pending_info const & info (i->second);
if (info.epoch < epoch_a)
{
++attempts;
release_assert (nano::epochs::is_sequential (info.epoch, epoch_a));
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (key.account);
Expand Down
4 changes: 3 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class node final : public std::enable_shared_from_this<nano::node>
void ongoing_online_weight_calculation_queue ();
bool online () const;
bool init_error () const;
void epoch_upgrader (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
bool epoch_upgrader (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
nano::worker worker;
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
Expand Down Expand Up @@ -206,6 +206,8 @@ class node final : public std::enable_shared_from_this<nano::node>

private:
void long_inactivity_cleanup ();
void epoch_upgrader_impl (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
nano::locked<std::future<void>> epoch_upgrading;
};

std::unique_ptr<container_info_component> collect_container_info (node & node, const std::string & name);
Expand Down
9 changes: 9 additions & 0 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7977,6 +7977,14 @@ TEST (rpc, epoch_upgrade)
}
ASSERT_EQ (200, response.status);
ASSERT_EQ ("1", response.json.get<std::string> ("started"));
test_response response_fail (request, rpc.config.port, system.io_ctx);
system.deadline_set (5s);
while (response_fail.status == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (200, response_fail.status);
ASSERT_EQ ("0", response_fail.json.get<std::string> ("started"));
system.deadline_set (5s);
bool done (false);
while (!done)
Expand Down Expand Up @@ -8089,6 +8097,7 @@ TEST (rpc, epoch_upgrade_multithreaded)
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "epoch_upgrade");
request.put ("threads", 2);
request.put ("epoch", 1);
request.put ("key", epoch_signer.prv.data.to_string ());
test_response response (request, rpc.config.port, system.io_ctx);
Expand Down
173 changes: 92 additions & 81 deletions nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,106 +1182,117 @@ TEST (signature_checker, mass_boundary_checks)
// Possible to manually add work peers
TEST (node, mass_epoch_upgrader)
{
unsigned threads = 20;
size_t total_accounts = 2500;
auto perform_test = [](size_t const batch_size) {
unsigned threads = 5;
size_t total_accounts = 2500;

#ifndef NDEBUG
total_accounts /= 5;
total_accounts /= 5;
#endif

struct info
{
nano::keypair key;
nano::block_hash pending_hash;
};
struct info
{
nano::keypair key;
nano::block_hash pending_hash;
};

std::vector<info> opened (total_accounts / 2);
std::vector<info> unopened (total_accounts / 2);
std::vector<info> opened (total_accounts / 2);
std::vector<info> unopened (total_accounts / 2);

nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.work_threads = 4;
//node_config.work_peers = { { "192.168.1.101", 7000 } };
auto & node = *system.add_node (node_config);
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.work_threads = 4;
//node_config.work_peers = { { "192.168.1.101", 7000 } };
auto & node = *system.add_node (node_config);

auto balance = node.balance (nano::test_genesis_key.pub);
auto latest = node.latest (nano::test_genesis_key.pub);
nano::uint128_t amount = 1;
auto balance = node.balance (nano::test_genesis_key.pub);
auto latest = node.latest (nano::test_genesis_key.pub);
nano::uint128_t amount = 1;

// Send to all accounts
std::array<std::vector<info> *, 2> all{ &opened, &unopened };
for (auto & accounts : all)
{
for (auto & info : *accounts)
// Send to all accounts
std::array<std::vector<info> *, 2> all{ &opened, &unopened };
for (auto & accounts : all)
{
for (auto & info : *accounts)
{
balance -= amount;
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (nano::test_genesis_key.pub)
.previous (latest)
.balance (balance)
.link (info.key.pub)
.representative (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node.work_generate_blocking (latest, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
latest = block->hash ();
info.pending_hash = block->hash ();
}
}
ASSERT_EQ (1 + total_accounts, node.ledger.cache.block_count);
ASSERT_EQ (1, node.ledger.cache.account_count);

// Receive for half of accounts
for (auto const & info : opened)
{
balance -= amount;
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (nano::test_genesis_key.pub)
.previous (latest)
.balance (balance)
.link (info.key.pub)
.representative (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node.work_generate_blocking (latest, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.account (info.key.pub)
.previous (0)
.balance (amount)
.link (info.pending_hash)
.representative (info.key.pub)
.sign (info.key.prv, info.key.pub)
.work (*node.work_generate_blocking (info.key.pub, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
latest = block->hash ();
info.pending_hash = block->hash ();
}
}
ASSERT_EQ (1 + total_accounts, node.ledger.cache.block_count);
ASSERT_EQ (1, node.ledger.cache.account_count);
ASSERT_EQ (1 + total_accounts + opened.size (), node.ledger.cache.block_count);
ASSERT_EQ (1 + opened.size (), node.ledger.cache.account_count);

// Receive for half of accounts
for (auto const & info : opened)
{
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (info.key.pub)
.previous (0)
.balance (amount)
.link (info.pending_hash)
.representative (info.key.pub)
.sign (info.key.prv, info.key.pub)
.work (*node.work_generate_blocking (info.key.pub, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
}
ASSERT_EQ (1 + total_accounts + opened.size (), node.ledger.cache.block_count);
ASSERT_EQ (1 + opened.size (), node.ledger.cache.account_count);

nano::keypair epoch_signer (nano::test_genesis_key);
nano::keypair epoch_signer (nano::test_genesis_key);

auto block_count_before = node.ledger.cache.block_count.load ();
std::cout << "Mass upgrading " << 1 + total_accounts << " accounts" << std::endl;
auto future = std::async (
std::launch::async, [node_l = node.shared (), signer = epoch_signer.prv.as_private_key (), epoch = nano::epoch::epoch_1, total = 1 + total_accounts, threads] {
node_l->epoch_upgrader (signer, epoch, total, threads);
});
auto expected_blocks = block_count_before + total_accounts + 1;
system.deadline_set (300s);
while (node.ledger.cache.block_count != expected_blocks)
{
ASSERT_NO_ERROR (system.poll ());
std::this_thread::sleep_for (1s);
std::cout << node.ledger.cache.block_count - block_count_before << " / " << expected_blocks - block_count_before << std::endl;
}
ASSERT_EQ (expected_blocks, node.ledger.cache.block_count);
// Check upgrade
{
auto transaction (node.store.tx_begin_read ());
ASSERT_EQ (expected_blocks, node.store.block_count (transaction).sum ());
for (auto i (node.store.latest_begin (transaction)); i != node.store.latest_end (); ++i)
auto const block_count_before = node.ledger.cache.block_count.load ();
auto const total_to_upgrade = 1 + total_accounts;
std::cout << "Mass upgrading " << total_to_upgrade << " accounts" << std::endl;
while (node.ledger.cache.block_count != block_count_before + total_to_upgrade)
{
nano::account_info info (i->second);
ASSERT_EQ (info.epoch (), nano::epoch::epoch_1);
auto const pre_upgrade = node.ledger.cache.block_count.load ();
auto upgrade_count = std::min<size_t> (batch_size, block_count_before + total_to_upgrade - pre_upgrade);
ASSERT_FALSE (node.epoch_upgrader (epoch_signer.prv.as_private_key (), nano::epoch::epoch_1, upgrade_count, threads));
// Already ongoing - should fail
ASSERT_TRUE (node.epoch_upgrader (epoch_signer.prv.as_private_key (), nano::epoch::epoch_1, upgrade_count, threads));
system.deadline_set (60s);
while (node.ledger.cache.block_count != pre_upgrade + upgrade_count)
{
ASSERT_NO_ERROR (system.poll ());
std::this_thread::sleep_for (200ms);
std::cout << node.ledger.cache.block_count - block_count_before << " / " << total_to_upgrade << std::endl;
}
std::this_thread::sleep_for (50ms);
}
}
auto expected_blocks = block_count_before + total_accounts + 1;
ASSERT_EQ (expected_blocks, node.ledger.cache.block_count);
// Check upgrade
{
auto transaction (node.store.tx_begin_read ());
ASSERT_EQ (expected_blocks, node.store.block_count (transaction).sum ());
for (auto i (node.store.latest_begin (transaction)); i != node.store.latest_end (); ++i)
{
nano::account_info info (i->second);
ASSERT_EQ (info.epoch (), nano::epoch::epoch_1);
}
}
};
// Test with a limited number of upgrades and an unlimited
perform_test (42);
perform_test (std::numeric_limits<size_t>::max ());
}

0 comments on commit a4d4675

Please sign in to comment.