Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Epoch upgrader as an async task #2718

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions nano/lib/locks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ class locked
return owner->obj;
}

T & operator* () const
{
return get ();
}

locked * owner{ nullptr };
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::state_block_signature_verification:
thread_role_name_string = "State block sig";
break;
case nano::thread_role::name::epoch_upgrader:
thread_role_name_string = "Epoch upgrader";
break;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ namespace thread_role
confirmation_height_processing,
worker,
request_aggregator,
state_block_signature_verification
state_block_signature_verification,
epoch_upgrader
};
/*
* Get/Set the identifier for the current thread
Expand Down
13 changes: 8 additions & 5 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2141,11 +2141,14 @@ void nano::json_handler::epoch_upgrade ()
{
if (nano::pub_key (prv) == node.ledger.epoch_signer (node.ledger.epoch_link (epoch)))
{
auto node_l (node.shared ());
node.worker.push_task ([node_l, prv, epoch, count_limit, threads]() {
node_l->epoch_upgrader (prv, epoch, count_limit, threads);
});
response_l.put ("started", "1");
if (!node.epoch_upgrader (prv, epoch, count_limit, threads))
{
response_l.put ("started", "1");
}
else
{
response_l.put ("started", "0");
}
}
else
{
Expand Down
33 changes: 29 additions & 4 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,11 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
worker.stop ();
auto epoch_upgrade = epoch_upgrading.lock ();
if (epoch_upgrade->valid ())
{
epoch_upgrade->wait ();
}
// work pool is not stopped on purpose due to testing setup
}
}
Expand Down Expand Up @@ -1346,8 +1351,24 @@ bool nano::node::init_error () const
return store.init_error () || wallets_store.init_error ();
}

void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
bool nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
bool error = stopped.load ();
if (!error)
{
auto epoch_upgrade = epoch_upgrading.lock ();
error = epoch_upgrade->valid () && epoch_upgrade->wait_for (std::chrono::seconds (0)) == std::future_status::timeout;
if (!error)
{
*epoch_upgrade = std::async (std::launch::async, &nano::node::epoch_upgrader_impl, this, prv_a, epoch_a, count_limit, threads);
}
}
return error;
}

void nano::node::epoch_upgrader_impl (nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit, uint64_t threads)
{
nano::thread_role::set (nano::thread_role::name::epoch_upgrader);
auto upgrader_process = [](nano::node & node_a, std::atomic<uint64_t> & counter, std::shared_ptr<nano::block> epoch, uint64_t difficulty, nano::public_key const & signer_a, nano::root const & root_a, nano::account const & account_a) {
epoch->block_work_set (node_a.work_generate_blocking (nano::work_version::work_1, root_a, difficulty).value_or (0));
bool valid_signature (!nano::validate_message (signer_a, epoch->hash (), epoch->block_signature ()));
Expand Down Expand Up @@ -1413,7 +1434,7 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
{
auto transaction (store.tx_begin_read ());
// Collect accounts to upgrade
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n; ++i)
for (auto i (store.latest_begin (transaction)), n (store.latest_end ()); i != n && accounts_list.size () < count_limit; ++i)
{
nano::account const & account (i->first);
nano::account_info const & info (i->second);
Expand All @@ -1429,13 +1450,15 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
Repeat until accounts with previous epoch exist in latest table */
std::atomic<uint64_t> upgraded_accounts (0);
uint64_t workers (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && upgraded_accounts < upgrade_batch_size && upgraded_accounts < count_limit && !stopped; ++i)
uint64_t attempts (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped; ++i)
{
auto transaction (store.tx_begin_read ());
nano::account_info info;
nano::account const & account (i->account);
if (!store.account_get (transaction, account, info) && info.epoch () < epoch_a)
{
++attempts;
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (info.head);
std::shared_ptr<nano::block> epoch = builder.state ()
Expand Down Expand Up @@ -1501,8 +1524,9 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
{
std::atomic<uint64_t> upgraded_pending (0);
uint64_t workers (0);
uint64_t attempts (0);
auto transaction (store.tx_begin_read ());
for (auto i (store.pending_begin (transaction, nano::pending_key (1, 0))), n (store.pending_end ()); i != n && upgraded_pending < upgrade_batch_size && upgraded_pending < count_limit && !stopped;)
for (auto i (store.pending_begin (transaction, nano::pending_key (1, 0))), n (store.pending_end ()); i != n && attempts < upgrade_batch_size && attempts < count_limit && !stopped;)
{
bool to_next_account (false);
nano::pending_key const & key (i->first);
Expand All @@ -1511,6 +1535,7 @@ void nano::node::epoch_upgrader (nano::private_key const & prv_a, nano::epoch ep
nano::pending_info const & info (i->second);
if (info.epoch < epoch_a)
{
++attempts;
release_assert (nano::epochs::is_sequential (info.epoch, epoch_a));
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
nano::root const & root (key.account);
Expand Down
4 changes: 3 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class node final : public std::enable_shared_from_this<nano::node>
void ongoing_online_weight_calculation_queue ();
bool online () const;
bool init_error () const;
void epoch_upgrader (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
bool epoch_upgrader (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
nano::worker worker;
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
Expand Down Expand Up @@ -206,6 +206,8 @@ class node final : public std::enable_shared_from_this<nano::node>

private:
void long_inactivity_cleanup ();
void epoch_upgrader_impl (nano::private_key const &, nano::epoch, uint64_t, uint64_t);
nano::locked<std::future<void>> epoch_upgrading;
};

std::unique_ptr<container_info_component> collect_container_info (node & node, const std::string & name);
Expand Down
9 changes: 9 additions & 0 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7977,6 +7977,14 @@ TEST (rpc, epoch_upgrade)
}
ASSERT_EQ (200, response.status);
ASSERT_EQ ("1", response.json.get<std::string> ("started"));
test_response response_fail (request, rpc.config.port, system.io_ctx);
system.deadline_set (5s);
while (response_fail.status == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (200, response_fail.status);
ASSERT_EQ ("0", response_fail.json.get<std::string> ("started"));
system.deadline_set (5s);
bool done (false);
while (!done)
Expand Down Expand Up @@ -8089,6 +8097,7 @@ TEST (rpc, epoch_upgrade_multithreaded)
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "epoch_upgrade");
request.put ("threads", 2);
request.put ("epoch", 1);
request.put ("key", epoch_signer.prv.data.to_string ());
test_response response (request, rpc.config.port, system.io_ctx);
Expand Down
173 changes: 92 additions & 81 deletions nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1182,106 +1182,117 @@ TEST (signature_checker, mass_boundary_checks)
// Possible to manually add work peers
TEST (node, mass_epoch_upgrader)
{
unsigned threads = 20;
size_t total_accounts = 2500;
auto perform_test = [](size_t const batch_size) {
unsigned threads = 5;
size_t total_accounts = 2500;

#ifndef NDEBUG
total_accounts /= 5;
total_accounts /= 5;
#endif

struct info
{
nano::keypair key;
nano::block_hash pending_hash;
};
struct info
{
nano::keypair key;
nano::block_hash pending_hash;
};

std::vector<info> opened (total_accounts / 2);
std::vector<info> unopened (total_accounts / 2);
std::vector<info> opened (total_accounts / 2);
std::vector<info> unopened (total_accounts / 2);

nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.work_threads = 4;
//node_config.work_peers = { { "192.168.1.101", 7000 } };
auto & node = *system.add_node (node_config);
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.work_threads = 4;
//node_config.work_peers = { { "192.168.1.101", 7000 } };
auto & node = *system.add_node (node_config);

auto balance = node.balance (nano::test_genesis_key.pub);
auto latest = node.latest (nano::test_genesis_key.pub);
nano::uint128_t amount = 1;
auto balance = node.balance (nano::test_genesis_key.pub);
auto latest = node.latest (nano::test_genesis_key.pub);
nano::uint128_t amount = 1;

// Send to all accounts
std::array<std::vector<info> *, 2> all{ &opened, &unopened };
for (auto & accounts : all)
{
for (auto & info : *accounts)
// Send to all accounts
std::array<std::vector<info> *, 2> all{ &opened, &unopened };
for (auto & accounts : all)
{
for (auto & info : *accounts)
{
balance -= amount;
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (nano::test_genesis_key.pub)
.previous (latest)
.balance (balance)
.link (info.key.pub)
.representative (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node.work_generate_blocking (latest, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
latest = block->hash ();
info.pending_hash = block->hash ();
}
}
ASSERT_EQ (1 + total_accounts, node.ledger.cache.block_count);
ASSERT_EQ (1, node.ledger.cache.account_count);

// Receive for half of accounts
for (auto const & info : opened)
{
balance -= amount;
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (nano::test_genesis_key.pub)
.previous (latest)
.balance (balance)
.link (info.key.pub)
.representative (nano::test_genesis_key.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*node.work_generate_blocking (latest, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.account (info.key.pub)
.previous (0)
.balance (amount)
.link (info.pending_hash)
.representative (info.key.pub)
.sign (info.key.prv, info.key.pub)
.work (*node.work_generate_blocking (info.key.pub, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
latest = block->hash ();
info.pending_hash = block->hash ();
}
}
ASSERT_EQ (1 + total_accounts, node.ledger.cache.block_count);
ASSERT_EQ (1, node.ledger.cache.account_count);
ASSERT_EQ (1 + total_accounts + opened.size (), node.ledger.cache.block_count);
ASSERT_EQ (1 + opened.size (), node.ledger.cache.account_count);

// Receive for half of accounts
for (auto const & info : opened)
{
nano::state_block_builder builder;
std::error_code ec;
auto block = builder
.account (info.key.pub)
.previous (0)
.balance (amount)
.link (info.pending_hash)
.representative (info.key.pub)
.sign (info.key.prv, info.key.pub)
.work (*node.work_generate_blocking (info.key.pub, nano::work_threshold (nano::work_version::work_1, nano::block_details (nano::epoch::epoch_0, false, false, false))))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_NE (nullptr, block);
ASSERT_EQ (nano::process_result::progress, node.process (*block).code);
}
ASSERT_EQ (1 + total_accounts + opened.size (), node.ledger.cache.block_count);
ASSERT_EQ (1 + opened.size (), node.ledger.cache.account_count);

nano::keypair epoch_signer (nano::test_genesis_key);
nano::keypair epoch_signer (nano::test_genesis_key);

auto block_count_before = node.ledger.cache.block_count.load ();
std::cout << "Mass upgrading " << 1 + total_accounts << " accounts" << std::endl;
auto future = std::async (
std::launch::async, [node_l = node.shared (), signer = epoch_signer.prv.as_private_key (), epoch = nano::epoch::epoch_1, total = 1 + total_accounts, threads] {
node_l->epoch_upgrader (signer, epoch, total, threads);
});
auto expected_blocks = block_count_before + total_accounts + 1;
system.deadline_set (300s);
while (node.ledger.cache.block_count != expected_blocks)
{
ASSERT_NO_ERROR (system.poll ());
std::this_thread::sleep_for (1s);
std::cout << node.ledger.cache.block_count - block_count_before << " / " << expected_blocks - block_count_before << std::endl;
}
ASSERT_EQ (expected_blocks, node.ledger.cache.block_count);
// Check upgrade
{
auto transaction (node.store.tx_begin_read ());
ASSERT_EQ (expected_blocks, node.store.block_count (transaction).sum ());
for (auto i (node.store.latest_begin (transaction)); i != node.store.latest_end (); ++i)
auto const block_count_before = node.ledger.cache.block_count.load ();
auto const total_to_upgrade = 1 + total_accounts;
std::cout << "Mass upgrading " << total_to_upgrade << " accounts" << std::endl;
while (node.ledger.cache.block_count != block_count_before + total_to_upgrade)
{
nano::account_info info (i->second);
ASSERT_EQ (info.epoch (), nano::epoch::epoch_1);
auto const pre_upgrade = node.ledger.cache.block_count.load ();
auto upgrade_count = std::min<size_t> (batch_size, block_count_before + total_to_upgrade - pre_upgrade);
ASSERT_FALSE (node.epoch_upgrader (epoch_signer.prv.as_private_key (), nano::epoch::epoch_1, upgrade_count, threads));
// Already ongoing - should fail
ASSERT_TRUE (node.epoch_upgrader (epoch_signer.prv.as_private_key (), nano::epoch::epoch_1, upgrade_count, threads));
system.deadline_set (60s);
while (node.ledger.cache.block_count != pre_upgrade + upgrade_count)
{
ASSERT_NO_ERROR (system.poll ());
std::this_thread::sleep_for (200ms);
std::cout << node.ledger.cache.block_count - block_count_before << " / " << total_to_upgrade << std::endl;
}
std::this_thread::sleep_for (50ms);
}
}
auto expected_blocks = block_count_before + total_accounts + 1;
ASSERT_EQ (expected_blocks, node.ledger.cache.block_count);
// Check upgrade
{
auto transaction (node.store.tx_begin_read ());
ASSERT_EQ (expected_blocks, node.store.block_count (transaction).sum ());
for (auto i (node.store.latest_begin (transaction)); i != node.store.latest_end (); ++i)
{
nano::account_info info (i->second);
ASSERT_EQ (info.epoch (), nano::epoch::epoch_1);
}
}
};
// Test with a limited number of upgrades and an unlimited
perform_test (42);
perform_test (std::numeric_limits<size_t>::max ());
}