Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Websocket bootstrap subscription #2471

Merged
merged 2 commits into from
Jan 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 47 additions & 1 deletion nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,48 @@ TEST (bootstrap_processor, lazy_hash)
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (system.nodes[0]->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash ());
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true);
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (receive2->hash ().to_string (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}

TEST (bootstrap_processor, lazy_hash_bootstrap_id)
{
nano::system system (1);
auto node0 (system.nodes[0]);
nano::genesis genesis;
nano::keypair key1;
nano::keypair key2;
// Generating test chain
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, key1.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node0->work_generate_blocking (genesis.hash ())));
auto receive1 (std::make_shared<nano::state_block> (key1.pub, 0, key1.pub, nano::Gxrb_ratio, send1->hash (), key1.prv, key1.pub, *node0->work_generate_blocking (key1.pub)));
auto send2 (std::make_shared<nano::state_block> (key1.pub, receive1->hash (), key1.pub, 0, key2.pub, key1.prv, key1.pub, *node0->work_generate_blocking (receive1->hash ())));
auto receive2 (std::make_shared<nano::state_block> (key2.pub, 0, key2.pub, nano::Gxrb_ratio, send2->hash (), key2.prv, key2.pub, *node0->work_generate_blocking (key2.pub)));
// Processing test chain
node0->block_processor.add (send1);
node0->block_processor.add (receive1);
node0->block_processor.add (send2);
node0->block_processor.add (receive2);
node0->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work));
node1->network.udp_channels.insert (node0->network.endpoint (), node1->network_params.protocol.protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (receive2->hash (), true, true, "123456");
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ ("123456", attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (node1->balance (key2.pub) == 0)
Expand Down Expand Up @@ -695,6 +736,11 @@ TEST (bootstrap_processor, wallet_lazy_frontier)
ASSERT_NE (nullptr, wallet);
wallet->insert_adhoc (key2.prv);
node1->bootstrap_wallet ();
{
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
ASSERT_EQ (key2.pub.to_account (), attempt->id);
}
// Check processed blocks
system.deadline_set (10s);
while (!node1->ledger.block_exists (receive2->hash ()))
Expand Down
135 changes: 135 additions & 0 deletions nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,141 @@ TEST (websocket, work)
ASSERT_EQ (contents.get<std::string> ("reason"), "");
}

// Test client subscribing to notifications for bootstrap
TEST (websocket, bootstrap)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Start bootsrap attempt
node1->bootstrap_initiator.bootstrap (true, "123abc");
ASSERT_NE (nullptr, node1->bootstrap_initiator.current_attempt ());

// Wait for the bootstrap notification
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");

auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "started");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");

// Wait for bootstrap finish
system.deadline_set (5s);
while (node1->bootstrap_initiator.in_progress ())
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (websocket, bootstrap_excited)
{
nano::system system;
nano::node_config config (nano::get_available_port (), system.logging);
config.websocket_config.enabled = true;
config.websocket_config.port = nano::get_available_port ();
auto node1 (system.add_node (config));

ASSERT_EQ (0, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));

// Start bootstrap, exit after subscription
std::atomic<bool> bootstrap_started{ false };
std::atomic<bool> subscribed{ false };
std::thread bootstrap_thread ([&system, node1, &bootstrap_started, &subscribed](){
node1->bootstrap_initiator.bootstrap (true, "123abc");
auto attempt (node1->bootstrap_initiator.current_attempt ());
ASSERT_NE (nullptr, attempt);
bootstrap_started = true;
system.deadline_set (5s);
while (!subscribed)
{
ASSERT_NO_ERROR (system.poll ());
}
});

// Wait for bootstrap start
system.deadline_set (5s);
while (!bootstrap_started)
{
ASSERT_NO_ERROR (system.poll ());
}

// Subscribe to bootstrap and wait for response asynchronously
ack_ready = false;
auto client_task = ([config]() -> boost::optional<std::string> {
auto response = websocket_test_call ("::1", std::to_string (config.websocket_config.port), R"json({"action": "subscribe", "topic": "bootstrap", "ack": true})json", true, true);
return response;
});
auto client_future = std::async (std::launch::async, client_task);

// Wait for acknowledge
system.deadline_set (5s);
while (!ack_ready)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (1, node1->websocket_server->subscriber_count (nano::websocket::topic::bootstrap));


// Wait for the bootstrap notification
subscribed = true;
bootstrap_thread.join ();
system.deadline_set (5s);
while (client_future.wait_for (std::chrono::seconds (0)) != std::future_status::ready)
{
ASSERT_NO_ERROR (system.poll ());
}

// Check the bootstrap notification message
auto response = client_future.get ();
ASSERT_TRUE (response);
std::stringstream stream;
stream << response;
boost::property_tree::ptree event;
boost::property_tree::read_json (stream, event);
ASSERT_EQ (event.get<std::string> ("topic"), "bootstrap");

auto & contents = event.get_child ("message");
ASSERT_EQ (contents.get<std::string> ("reason"), "exited");
ASSERT_EQ (contents.get<std::string> ("id"), "123abc");
ASSERT_EQ (contents.get<std::string> ("mode"), "legacy");
ASSERT_EQ (contents.get<unsigned> ("total_blocks"), 0);
ASSERT_LT (contents.get<unsigned> ("duration"), 15000);
}

/** Tests clients subscribing multiple times or unsubscribing without a subscription */
TEST (websocket, ws_keepalive)
{
Expand Down
58 changes: 47 additions & 11 deletions nano/node/bootstrap/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/websocket.hpp>

#include <boost/format.hpp>

Expand Down Expand Up @@ -75,19 +76,35 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
return shared_from_this ();
}

nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a) :
nano::bootstrap_attempt::bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a, std::string id_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
mode (mode_a)
mode (mode_a),
id (id_a)
{
node->logger.always_log ("Starting bootstrap attempt");
if (id.empty ())
{
nano::random_constants constants;
id = constants.random_128.to_string ();
}
node->logger.always_log (boost::str (boost::format ("Starting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (true);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_started (id, mode_text ()));
}
}

nano::bootstrap_attempt::~bootstrap_attempt ()
{
node->logger.always_log ("Exiting bootstrap attempt");
node->logger.always_log (boost::str (boost::format ("Exiting bootstrap attempt id %1%") % id));
node->bootstrap_initiator.notify_listeners (false);
if (node->websocket_server)
{
nano::websocket::message_builder builder;
node->websocket_server->broadcast (builder.bootstrap_exited (id, mode_text (), attempt_start, total_blocks));
}
}

bool nano::bootstrap_attempt::should_log ()
Expand Down Expand Up @@ -794,6 +811,24 @@ bool nano::bootstrap_attempt::confirm_frontiers (nano::unique_lock<std::mutex> &
return confirmed;
}

std::string nano::bootstrap_attempt::mode_text ()
{
std::string mode_text;
if (mode == nano::bootstrap_mode::legacy)
{
mode_text = "legacy";
}
else if (mode == nano::bootstrap_mode::lazy)
{
mode_text = "lazy";
}
else if (mode == nano::bootstrap_mode::wallet_lazy)
{
mode_text = "wallet_lazy";
}
return mode_text;
}

void nano::bootstrap_attempt::lazy_start (nano::hash_or_account const & hash_or_account_a, bool confirmed)
{
nano::lock_guard<std::mutex> lazy_lock (lazy_mutex);
Expand Down Expand Up @@ -1339,7 +1374,7 @@ nano::bootstrap_initiator::~bootstrap_initiator ()
stop ();
}

void nano::bootstrap_initiator::bootstrap (bool force)
void nano::bootstrap_initiator::bootstrap (bool force, std::string id_a)
{
nano::unique_lock<std::mutex> lock (mutex);
if (force && attempt != nullptr)
Expand All @@ -1352,12 +1387,12 @@ void nano::bootstrap_initiator::bootstrap (bool force)
if (!stopped && attempt == nullptr)
{
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
condition.notify_all ();
}
}

void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed)
void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bool add_to_peers, bool frontiers_confirmed, std::string id_a)
{
if (add_to_peers)
{
Expand All @@ -1374,7 +1409,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
// clang-format on
}
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate, nano::stat::dir::out);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared ());
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::legacy, id_a);
if (frontiers_confirmed)
{
excluded_peers.remove (nano::transport::map_endpoint_to_tcp (endpoint_a));
Expand All @@ -1388,7 +1423,7 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
}
}

void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed)
void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & hash_or_account_a, bool force, bool confirmed, std::string id_a)
{
{
nano::unique_lock<std::mutex> lock (mutex);
Expand All @@ -1402,7 +1437,7 @@ void nano::bootstrap_initiator::bootstrap_lazy (nano::hash_or_account const & ha
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy);
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::lazy, id_a.empty () ? hash_or_account_a.to_string () : id_a);
}
attempt->lazy_start (hash_or_account_a, confirmed);
}
Expand All @@ -1416,7 +1451,8 @@ void nano::bootstrap_initiator::bootstrap_wallet (std::deque<nano::account> & ac
node.stats.inc (nano::stat::type::bootstrap, nano::stat::detail::initiate_wallet_lazy, nano::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy);
std::string id (!accounts_a.empty () ? accounts_a[0].to_account () : "");
attempt = std::make_shared<nano::bootstrap_attempt> (node.shared (), nano::bootstrap_mode::wallet_lazy, id);
}
attempt->wallet_start (accounts_a);
}
Expand Down
10 changes: 6 additions & 4 deletions nano/node/bootstrap/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class bulk_push_client;
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
{
public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy, std::string id_a = "");
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &, bool = false);
Expand All @@ -82,6 +82,7 @@ class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_at
void attempt_restart_check (nano::unique_lock<std::mutex> &);
bool confirm_frontiers (nano::unique_lock<std::mutex> &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, nano::bulk_pull::count_t, bool, unsigned);
std::string mode_text ();
/** Lazy bootstrap */
void lazy_run ();
void lazy_start (nano::hash_or_account const &, bool confirmed = true);
Expand Down Expand Up @@ -130,6 +131,7 @@ class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_at
std::atomic<bool> stopped{ false };
std::chrono::steady_clock::time_point attempt_start{ std::chrono::steady_clock::now () };
nano::bootstrap_mode mode;
std::string id;
std::mutex mutex;
nano::condition_variable condition;
// Lazy bootstrap
Expand Down Expand Up @@ -248,9 +250,9 @@ class bootstrap_initiator final
public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false);
void bootstrap (bool force = false);
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true);
void bootstrap (nano::endpoint const &, bool add_to_peers = true, bool frontiers_confirmed = false, std::string id_a = "");
void bootstrap (bool force = false, std::string id_a = "");
void bootstrap_lazy (nano::hash_or_account const &, bool force = false, bool confirmed = true, std::string id_a = "");
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();
void notify_listeners (bool);
Expand Down
Loading