Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Asynchronous epoch upgrade RPC #2704

Merged
merged 5 commits into from
Apr 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nano/lib/errors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,8 @@ std::string nano::error_rpc_messages::message (int ev) const
return "Invalid previous block for given subtype";
case nano::error_rpc::invalid_timestamp:
return "Invalid timestamp";
case nano::error_rpc::invalid_threads_count:
return "Invalid threads count";
case nano::error_rpc::payment_account_balance:
return "Account has non-zero balance";
case nano::error_rpc::payment_unable_create_account:
Expand Down
1 change: 1 addition & 0 deletions nano/lib/errors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ enum class error_rpc
invalid_subtype_epoch_link,
invalid_subtype_previous,
invalid_timestamp,
invalid_threads_count,
payment_account_balance,
payment_unable_create_account,
peer_not_found,
Expand Down
210 changes: 11 additions & 199 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2070,203 +2070,6 @@ void nano::json_handler::deterministic_key ()
response_errors ();
}

void epoch_upgrader (std::shared_ptr<nano::node> node_a, nano::private_key const & prv_a, nano::epoch epoch_a, uint64_t count_limit)
{
uint64_t const upgrade_batch_size = 1000;
nano::block_builder builder;
auto link (node_a->ledger.epoch_link (epoch_a));
nano::raw_key raw_key;
raw_key.data = prv_a;
auto signer (nano::pub_key (prv_a));
debug_assert (signer == node_a->ledger.epoch_signer (link));

class account_upgrade_item final
{
public:
nano::account account{ 0 };
uint64_t modified{ 0 };
};
class account_tag
{
};
class modified_tag
{
};
// clang-format off
boost::multi_index_container<account_upgrade_item,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::tag<modified_tag>,
boost::multi_index::member<account_upgrade_item, uint64_t, &account_upgrade_item::modified>,
std::greater<uint64_t>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<account_tag>,
boost::multi_index::member<account_upgrade_item, nano::account, &account_upgrade_item::account>>>>
accounts_list;
// clang-format on

bool finished_upgrade (false);

while (!finished_upgrade && !node_a->stopped)
{
bool finished_accounts (false);
uint64_t total_upgraded_accounts (0);
while (!finished_accounts && count_limit != 0 && !node_a->stopped)
{
{
auto transaction (node_a->store.tx_begin_read ());
// Collect accounts to upgrade
for (auto i (node_a->store.latest_begin (transaction)), n (node_a->store.latest_end ()); i != n; ++i)
{
nano::account const & account (i->first);
nano::account_info const & info (i->second);
if (info.epoch () < epoch_a)
{
release_assert (nano::epochs::is_sequential (info.epoch (), epoch_a));
accounts_list.emplace (account_upgrade_item{ account, info.modified });
}
}
}

/* Upgrade accounts
Repeat until accounts with previous epoch exist in latest table */
uint64_t upgraded_accounts (0);
for (auto i (accounts_list.get<modified_tag> ().begin ()), n (accounts_list.get<modified_tag> ().end ()); i != n && upgraded_accounts < upgrade_batch_size && upgraded_accounts < count_limit && !node_a->stopped; ++i)
{
auto transaction (node_a->store.tx_begin_read ());
nano::account_info info;
if (!node_a->store.account_get (transaction, i->account, info) && info.epoch () < epoch_a)
{
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
auto epoch = builder.state ()
.account (i->account)
.previous (info.head)
.representative (info.representative)
.balance (info.balance)
.link (link)
.sign (raw_key, signer)
.work (node_a->work_generate_blocking (nano::work_version::work_1, info.head, difficulty).value_or (0))
.build ();
bool valid_signature (!nano::validate_message (signer, epoch->hash (), epoch->block_signature ()));
bool valid_work (epoch->difficulty () >= difficulty);
nano::process_result result (nano::process_result::old);
if (valid_signature && valid_work)
{
result = node_a->process_local (std::move (epoch)).code;
}
if (result == nano::process_result::progress)
{
++upgraded_accounts;
}
else
{
bool fork (result == nano::process_result::fork);
node_a->logger.always_log (boost::str (boost::format ("Failed to upgrade account %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % i->account.to_account () % valid_signature % valid_work % fork));
}
}
}
total_upgraded_accounts += upgraded_accounts;
count_limit -= upgraded_accounts;

if (!accounts_list.empty ())
{
node_a->logger.always_log (boost::str (boost::format ("%1% accounts were upgraded to new epoch, %2% remain...") % total_upgraded_accounts % (accounts_list.size () - upgraded_accounts)));
accounts_list.clear ();
}
else
{
node_a->logger.always_log (boost::str (boost::format ("%1% total accounts were upgraded to new epoch") % total_upgraded_accounts));
finished_accounts = true;
}
}

// Pending blocks upgrade
bool finished_pending (false);
uint64_t total_upgraded_pending (0);
while (!finished_pending && count_limit != 0 && !node_a->stopped)
{
uint64_t upgraded_pending (0);
auto transaction (node_a->store.tx_begin_read ());
for (auto i (node_a->store.pending_begin (transaction, nano::pending_key (1, 0))), n (node_a->store.pending_end ()); i != n && upgraded_pending < upgrade_batch_size && upgraded_pending < count_limit && !node_a->stopped;)
{
bool to_next_account (false);
nano::pending_key const & key (i->first);
if (!node_a->store.account_exists (transaction, key.account))
{
nano::pending_info const & info (i->second);
if (info.epoch < epoch_a)
{
release_assert (nano::epochs::is_sequential (info.epoch, epoch_a));
auto difficulty (nano::work_threshold (nano::work_version::work_1, nano::block_details (epoch_a, false, false, true)));
auto epoch = builder.state ()
.account (key.account)
.previous (0)
.representative (0)
.balance (0)
.link (link)
.sign (raw_key, signer)
.work (node_a->work_generate_blocking (nano::work_version::work_1, key.account, difficulty).value_or (0))
.build ();
bool valid_signature (!nano::validate_message (signer, epoch->hash (), epoch->block_signature ()));
bool valid_work (epoch->difficulty () >= difficulty);
nano::process_result result (nano::process_result::old);
if (valid_signature && valid_work)
{
result = node_a->process_local (std::move (epoch)).code;
}
if (result == nano::process_result::progress)
{
++upgraded_pending;
to_next_account = true;
}
else
{
bool fork (result == nano::process_result::fork);
node_a->logger.always_log (boost::str (boost::format ("Failed to upgrade account with pending blocks %1%. Valid signature: %2%. Valid work: %3%. Block processor fork: %4%") % key.account.to_account () % valid_signature % valid_work % fork));
}
}
}
else
{
to_next_account = true;
}
if (to_next_account)
{
// Move to next account if pending account exists or was upgraded
if (key.account.number () == std::numeric_limits<nano::uint256_t>::max ())
{
break;
}
else
{
i = node_a->store.pending_begin (transaction, nano::pending_key (key.account.number () + 1, 0));
}
}
else
{
// Move to next pending item
++i;
}
}
total_upgraded_pending += upgraded_pending;
count_limit -= upgraded_pending;

// Repeat if some pending accounts were upgraded
if (upgraded_pending != 0)
{
node_a->logger.always_log (boost::str (boost::format ("%1% unopened accounts with pending blocks were upgraded to new epoch...") % total_upgraded_pending));
}
else
{
node_a->logger.always_log (boost::str (boost::format ("%1% total unopened accounts with pending blocks were upgraded to new epoch") % total_upgraded_pending));
finished_pending = true;
}
}

finished_upgrade = (total_upgraded_accounts == 0) && (total_upgraded_pending == 0);
}

node_a->logger.always_log ("Epoch upgrade is completed");
}

/*
* @warning This is an internal/diagnostic RPC, do not rely on its interface being stable
*/
Expand All @@ -2288,15 +2091,24 @@ void nano::json_handler::epoch_upgrade ()
if (epoch != nano::epoch::invalid)
{
uint64_t count_limit (count_optional_impl ());
uint64_t threads (0);
boost::optional<std::string> threads_text (request.get_optional<std::string> ("threads"));
if (!ec && threads_text.is_initialized ())
{
if (decode_unsigned (threads_text.get (), threads))
{
ec = nano::error_rpc::invalid_threads_count;
}
}
std::string key_text (request.get<std::string> ("key"));
nano::private_key prv;
if (!prv.decode_hex (key_text))
{
if (nano::pub_key (prv) == node.ledger.epoch_signer (node.ledger.epoch_link (epoch)))
{
auto node_l (node.shared ());
node.worker.push_task ([node_l, prv, epoch, count_limit]() {
epoch_upgrader (node_l, prv, epoch, count_limit);
node.worker.push_task ([node_l, prv, epoch, count_limit, threads]() {
node_l->epoch_upgrader (prv, epoch, count_limit, threads);
});
response_l.put ("started", "1");
}
Expand Down
Loading