Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serialize telemetry as big endian #2751

Merged
merged 1 commit into from
May 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 0 additions & 102 deletions nano/core_test/node_telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,108 +277,6 @@ TEST (node_telemetry, basic)
}
}

TEST (node_telemetry, many_nodes)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
node_flags.disable_initial_telemetry_requests = true;
node_flags.disable_request_loop = true;
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
for (auto i = 0; i < num_nodes; ++i)
{
nano::node_config node_config (nano::get_available_port (), system.logging);
// Make a metric completely different for each node so we can check afterwards that there are no duplicates
node_config.bandwidth_limit = 100000 + i;

auto node = std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags);
node->start ();
system.nodes.push_back (node);
}

// Merge peers after creating nodes as some backends (RocksDB) can take a while to initialize nodes (Windows/Debug for instance)
// and timeouts can occur between nodes while starting up many nodes synchronously.
for (auto const & node : system.nodes)
{
for (auto const & other_node : system.nodes)
{
if (node != other_node)
{
node->network.merge_peer (other_node->network.endpoint ());
}
}
}

wait_peer_connections (system);

// Give all nodes a non-default number of blocks
nano::keypair key;
nano::genesis genesis;
nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()));
for (auto node : system.nodes)
{
auto transaction (node->store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
}

// This is the node which will request metrics from all other nodes
auto node_client = system.nodes.front ();

std::mutex mutex;
std::vector<nano::telemetry_data> telemetry_datas;
auto peers = node_client->network.list (num_nodes - 1);
ASSERT_EQ (peers.size (), num_nodes - 1);
for (auto const & peer : peers)
{
node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
nano::lock_guard<std::mutex> guard (mutex);
telemetry_datas.push_back (response_a.telemetry_data);
});
}

system.deadline_set (20s);
nano::unique_lock<std::mutex> lk (mutex);
while (telemetry_datas.size () != num_nodes - 1)
{
lk.unlock ();
ASSERT_NO_ERROR (system.poll ());
lk.lock ();
}

// Check the metrics
nano::network_params params;
for (auto & data : telemetry_datas)
{
ASSERT_EQ (data.unchecked_count, 0);
ASSERT_EQ (data.cemented_count, 1);
ASSERT_LE (data.peer_count, 9);
ASSERT_EQ (data.account_count, 1);
ASSERT_TRUE (data.block_count == 2);
ASSERT_EQ (data.protocol_version, params.protocol.telemetry_protocol_version_min);
ASSERT_GE (data.bandwidth_cap, 100000);
ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ());
ASSERT_EQ (data.major_version, nano::get_major_node_version ());
ASSERT_EQ (data.minor_version, nano::get_minor_node_version ());
ASSERT_EQ (data.patch_version, nano::get_patch_node_version ());
ASSERT_EQ (data.pre_release_version, nano::get_pre_release_node_version ());
ASSERT_EQ (data.maker, 0);
ASSERT_LT (data.uptime, 100);
ASSERT_EQ (data.genesis_block, genesis.hash ());
ASSERT_LE (data.timestamp, std::chrono::system_clock::now ());
ASSERT_EQ (data.active_difficulty, system.nodes.front ()->active.active_difficulty ());
}

// We gave some nodes different bandwidth caps, confirm they are not all the same
auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap;
telemetry_datas.erase (telemetry_datas.begin ());
auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) {
return telemetry_data.bandwidth_cap == bandwidth_cap;
});
ASSERT_FALSE (all_bandwidth_limits_same);
}

TEST (node_telemetry, receive_from_non_listening_channel)
{
nano::system system;
Expand Down
35 changes: 24 additions & 11 deletions nano/node/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

std::bitset<16> constexpr nano::message_header::block_type_mask;
std::bitset<16> constexpr nano::message_header::count_mask;
std::bitset<16> constexpr nano::message_header::telemetry_size_mask;

std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta;
Expand Down Expand Up @@ -1115,7 +1116,9 @@ nano::telemetry_ack::telemetry_ack (nano::telemetry_data const & telemetry_data_
message (nano::message_type::telemetry_ack),
data (telemetry_data_a)
{
header.extensions = telemetry_data::size;
debug_assert (telemetry_data::size < 2048); // Maximum size the mask allows
header.extensions &= ~message_header::telemetry_size_mask;
header.extensions |= std::bitset<16> (static_cast<unsigned long long> (telemetry_data::size));
}

void nano::telemetry_ack::serialize (nano::stream & stream_a, bool use_epoch_2_min_version_a) const
Expand Down Expand Up @@ -1158,7 +1161,7 @@ uint16_t nano::telemetry_ack::size () const

uint16_t nano::telemetry_ack::size (nano::message_header const & message_header_a)
{
return static_cast<uint16_t> (message_header_a.extensions.to_ulong ());
return static_cast<uint16_t> ((message_header_a.extensions & message_header::telemetry_size_mask).to_ullong ());
}

bool nano::telemetry_ack::is_empty_payload () const
Expand All @@ -1171,13 +1174,20 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa
read (stream_a, signature);
read (stream_a, node_id);
read (stream_a, block_count);
boost::endian::big_to_native_inplace (block_count);
read (stream_a, cemented_count);
boost::endian::big_to_native_inplace (cemented_count);
read (stream_a, unchecked_count);
boost::endian::big_to_native_inplace (unchecked_count);
read (stream_a, account_count);
boost::endian::big_to_native_inplace (account_count);
read (stream_a, bandwidth_cap);
boost::endian::big_to_native_inplace (bandwidth_cap);
read (stream_a, peer_count);
boost::endian::big_to_native_inplace (peer_count);
read (stream_a, protocol_version);
read (stream_a, uptime);
boost::endian::big_to_native_inplace (uptime);
read (stream_a, genesis_block.bytes);
read (stream_a, major_version);
read (stream_a, minor_version);
Expand All @@ -1187,29 +1197,32 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa

uint64_t timestamp_l;
read (stream_a, timestamp_l);
boost::endian::big_to_native_inplace (timestamp_l);
timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (timestamp_l));
read (stream_a, active_difficulty);
boost::endian::big_to_native_inplace (active_difficulty);
}

void nano::telemetry_data::serialize_without_signature (nano::stream & stream_a, uint16_t /* size_a */) const
{
// All values should be serialized in big endian
write (stream_a, node_id);
write (stream_a, block_count);
write (stream_a, cemented_count);
write (stream_a, unchecked_count);
write (stream_a, account_count);
write (stream_a, bandwidth_cap);
write (stream_a, peer_count);
write (stream_a, boost::endian::native_to_big (block_count));
write (stream_a, boost::endian::native_to_big (cemented_count));
write (stream_a, boost::endian::native_to_big (unchecked_count));
write (stream_a, boost::endian::native_to_big (account_count));
write (stream_a, boost::endian::native_to_big (bandwidth_cap));
write (stream_a, boost::endian::native_to_big (peer_count));
write (stream_a, protocol_version);
write (stream_a, uptime);
write (stream_a, boost::endian::native_to_big (uptime));
write (stream_a, genesis_block.bytes);
write (stream_a, major_version);
write (stream_a, minor_version);
write (stream_a, patch_version);
write (stream_a, pre_release_version);
write (stream_a, maker);
write (stream_a, std::chrono::duration_cast<std::chrono::milliseconds> (timestamp.time_since_epoch ()).count ());
write (stream_a, active_difficulty);
write (stream_a, boost::endian::native_to_big (std::chrono::duration_cast<std::chrono::milliseconds> (timestamp.time_since_epoch ()).count ()));
write (stream_a, boost::endian::native_to_big (active_difficulty));
}

void nano::telemetry_data::serialize (nano::stream & stream_a) const
Expand Down
1 change: 1 addition & 0 deletions nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class message_header final

static std::bitset<16> constexpr block_type_mask{ 0x0f00 };
static std::bitset<16> constexpr count_mask{ 0xf000 };
static std::bitset<16> constexpr telemetry_size_mask{ 0x07ff };
};
class message
{
Expand Down
Loading