Skip to content

Commit

Permalink
Websocket bootstrap subscription (nanocurrency#2471)
Browse files Browse the repository at this point in the history
* Bootstrap attempt ID
* Websocket bootstrap subscription
  • Loading branch information
SergiySW authored and wezrule committed Jan 22, 2020
1 parent 6802674 commit 90bfcce
Show file tree
Hide file tree
Showing 7 changed files with 288 additions and 34 deletions.
48 changes: 47 additions & 1 deletion nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,48 @@ TEST (bootstrap_processor, lazy_hash)
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (system.nodes[0]->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash ());
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (receive2->hash ().to_string (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}

TEST (bootstrap_processor, lazy_hash_bootstrap_id)
{
nano::system system (1);
auto node0 (system.nodes[0]);
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
// Generating test chain
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node0->work_generate_blocking (genesis.hash ())));
auto receive1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, nano::Gxrb_ratio, send1->hash (), key1.prv, key1.pub, *node0->work_generate_blocking (key1.pub)));
auto send2 (std::make_shared<nano::state_block> (key1.pub, receive1->hash (), key1.pub, 0, key2.pub, key1.prv, key1.pub, *node0->work_generate_blocking (receive1->hash ())));
auto receive2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, nano::Gxrb_ratio, send2->hash (), key2.prv, key2.pub, *node0->work_generate_blocking (key2.pub)));
// Processing test chain
node0->block_processor.add (send1);
node0->block_processor.add (receive1);
node0->block_processor.add (send2);
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true, true, "123456");
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ ("123456", attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
Expand Down Expand Up @@ -695,6 +736,11 @@ TEST (bootstrap_processor, wallet_lazy_frontier)
ASSERT_NE (nullptr, wallet);
wallet->insert_adhoc (key2.prv);
node1->bootstrap_wallet ();
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (key2.pub.to_account (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (!node1->ledger.block_exists (receive2->hash ()))
Expand Down
135 changes: 135 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,141 @@ TEST (websocket, work)
ASSERT_EQ (contents.get<std::string> ("reason"), "");
}

// Test client subscribing to notifications for bootstrap
TEST (websocket, bootstrap)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Start bootsrap attempt
node1->bootstrap_initiator.bootstrap (true, "123abc");
ASSERT_NE (nullptr, node1->bootstrap_initiator.current_attempt ());

// Wait for the bootstrap notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");

auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "started");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");

// Wait for bootstrap finish
system.deadline_set (5s);
while (node1->bootstrap_initiator.in_progress ())
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (websocket, bootstrap_excited)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Start bootstrap, exit after subscription
std::atomic<bool> bootstrap_started{ false };
std::atomic<bool> subscribed{ false };
std::thread bootstrap_thread ([&system, node1, &bootstrap_started, &subscribed](){
node1->bootstrap_initiator.bootstrap (true, "123abc");
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
bootstrap_started = true;
system.deadline_set (5s);
while (!subscribed)
{
ASSERT_NO_ERROR (system.poll ());
}
});

// Wait for bootstrap start
system.deadline_set (5s);
while (!bootstrap_started)
{
ASSERT_NO_ERROR (system.poll ());
}

// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));


// Wait for the bootstrap notification
subscribed = true;
bootstrap_thread.join ();
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");

auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "exited");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");
ASSERT_EQ (contents.get<unsigned> ("total_blocks"), 0);
ASSERT_LT (contents.get<unsigned> ("duration"), 15000);
}

/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, ws_keepalive)
{
Expand Down
58 changes: 47 additions & 11 deletions nano/node/bootstrap/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/websocket.hpp>

#include <boost/format.hpp>

Expand Down Expand Up @@ -75,19 +76,35 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
return shared_from_this ();
}

nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a) :
nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a, std::string id_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
mode (mode_a)
mode (mode_a),
id (id_a)
{
node->logger.always_log ("Starting bootstrap attempt");
if (id.empty ())
{
nano::random_constants constants;
id = constants.random_128.to_string ();
}
node->logger.always_log (boost::str (boost::format ("Starting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (true);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ()));
}
}

nano::bootstrap_attempt::~bootstrap_attempt ()
{
node->logger.always_log ("Exiting bootstrap attempt");
node->logger.always_log (boost::str (boost::format ("Exiting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (false);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
}
}

bool nano::bootstrap_attempt::should_log ()
Expand Down Expand Up @@ -794,6 +811,24 @@ bool nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
return confirmed;
}

std::string nano::bootstrap_attempt::mode_text ()
{
std::string mode_text;
if (mode == nano::bootstrap_mode::legacy)
{
mode_text = "legacy";
}
else if (mode == nano::bootstrap_mode::lazy)
{
mode_text = "lazy";
}
else if (mode == nano::bootstrap_mode::wallet_lazy)
{
mode_text = "wallet_lazy";
}
return mode_text;
}

void nano::bootstrap_attempt::lazy_start (nano::hash_or_account const & hash_or_account_a, bool confirmed)
{
nano::lock_guard<std::mutex> lazy_lock (lazy_mutex);
Expand Down Expand Up @@ -1339,7 +1374,7 @@ nano::bootstrap_initiator::~bootstrap_initiator ()
stop ();
}

void nano::bootstrap_initiator::bootstrap (bool force)
void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a)
{
nano::unique_lock<std::mutex> lock (mutex);
if (force && attempt != nullptr)
Expand All @@ -1352,12 +1387,12 @@ void nano::bootstrap_initiator::bootstrap (bool force)
if (!stopped && attempt == nullptr)
{
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
condition.notify_all ();
}
}

void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed)
void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed, std::string id_a)
{
if (add_to_peers)
{
Expand All @@ -1374,7 +1409,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
// clang-format on
}
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
if (frontiers_confirmed)
{
excluded_peers.remove (nano::transport::map_endpoint_to_tcp (endpoint_a));
Expand All @@ -1388,7 +1423,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
}
}

void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed)
void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed, std::string id_a)
{
{
nano::unique_lock<std::mutex> lock (mutex);
Expand All @@ -1402,7 +1437,7 @@ void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & ha
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy, id_a.empty () ? hash_or_account_a.to_string () : id_a);
}
attempt->lazy_start (hash_or_account_a, confirmed);
}
Expand All @@ -1416,7 +1451,8 @@ void nano::bootstrap_initiator::bootstrap_wallet (std::deque<nano::account> & ac
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_wallet_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy);
std::string id (!accounts_a.empty () ? accounts_a[0].to_account () : "");
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy, id);
}
attempt->wallet_start (accounts_a);
}
Expand Down
10 changes: 6 additions & 4 deletions nano/node/bootstrap/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class bulk_push_client;
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
{
public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy, std::string id_a = "");
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &, bool = false);
Expand All @@ -82,6 +82,7 @@ class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_at
void attempt_restart_check (nano::unique_lock<std::mutex> &);
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
std::string mode_text ();
/** Lazy bootstrap */
void lazy_run ();
void lazy_start (nano::hash_or_account const &, bool confirmed = true);
Expand Down Expand Up @@ -130,6 +131,7 @@ class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_at
std::atomic<bool> stopped{ false };
std::chrono::steady_clock::time_point attempt_start{ std::chrono::steady_clock::now () };
nano::bootstrap_mode mode;
std::string id;
std::mutex mutex;
nano::condition_variable condition;
// Lazy bootstrap
Expand Down Expand Up @@ -248,9 +250,9 @@ class bootstrap_initiator final
public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false);
void bootstrap (bool force = false);
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true);
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false, std::string id_a = "");
void bootstrap (bool force = false, std::string id_a = "");
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true, std::string id_a = "");
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();
void notify_listeners (bool);
Expand Down
Loading

0 comments on commit 90bfcce

Please sign in to comment.