Skip to content

Commit

Permalink
Serialize telemetry as big endian (#2751)
Browse files Browse the repository at this point in the history
  • Loading branch information
wezrule authored May 4, 2020
1 parent 70c26c9 commit 83e23b7
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 184 deletions.
102 changes: 0 additions & 102 deletions nano/core_test/node_telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,108 +276,6 @@ TEST (node_telemetry, basic)
}
}

TEST (node_telemetry, many_nodes)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_ongoing_telemetry_requests = true;
node_flags.disable_initial_telemetry_requests = true;
node_flags.disable_request_loop = true;
// The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number.
const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10;
for (auto i = 0; i < num_nodes; ++i)
{
nano::node_config node_config (nano::get_available_port (), system.logging);
// Make a metric completely different for each node so we can check afterwards that there are no duplicates
node_config.bandwidth_limit = 100000 + i;

auto node = std::make_shared<nano::node> (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags);
node->start ();
system.nodes.push_back (node);
}

// Merge peers after creating nodes as some backends (RocksDB) can take a while to initialize nodes (Windows/Debug for instance)
// and timeouts can occur between nodes while starting up many nodes synchronously.
for (auto const & node : system.nodes)
{
for (auto const & other_node : system.nodes)
{
if (node != other_node)
{
node->network.merge_peer (other_node->network.endpoint ());
}
}
}

wait_peer_connections (system);

// Give all nodes a non-default number of blocks
nano::keypair key;
nano::genesis genesis;
nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ()));
for (auto node : system.nodes)
{
auto transaction (node->store.tx_begin_write ());
ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code);
}

// This is the node which will request metrics from all other nodes
auto node_client = system.nodes.front ();

std::mutex mutex;
std::vector<nano::telemetry_data> telemetry_datas;
auto peers = node_client->network.list (num_nodes - 1);
ASSERT_EQ (peers.size (), num_nodes - 1);
for (auto const & peer : peers)
{
node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) {
ASSERT_FALSE (response_a.error);
nano::lock_guard<std::mutex> guard (mutex);
telemetry_datas.push_back (response_a.telemetry_data);
});
}

system.deadline_set (20s);
nano::unique_lock<std::mutex> lk (mutex);
while (telemetry_datas.size () != num_nodes - 1)
{
lk.unlock ();
ASSERT_NO_ERROR (system.poll ());
lk.lock ();
}

// Check the metrics
nano::network_params params;
for (auto & data : telemetry_datas)
{
ASSERT_EQ (data.unchecked_count, 0);
ASSERT_EQ (data.cemented_count, 1);
ASSERT_LE (data.peer_count, 9);
ASSERT_EQ (data.account_count, 1);
ASSERT_TRUE (data.block_count == 2);
ASSERT_EQ (data.protocol_version, params.protocol.telemetry_protocol_version_min);
ASSERT_GE (data.bandwidth_cap, 100000);
ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ());
ASSERT_EQ (data.major_version, nano::get_major_node_version ());
ASSERT_EQ (data.minor_version, nano::get_minor_node_version ());
ASSERT_EQ (data.patch_version, nano::get_patch_node_version ());
ASSERT_EQ (data.pre_release_version, nano::get_pre_release_node_version ());
ASSERT_EQ (data.maker, 0);
ASSERT_LT (data.uptime, 100);
ASSERT_EQ (data.genesis_block, genesis.hash ());
ASSERT_LE (data.timestamp, std::chrono::system_clock::now ());
ASSERT_EQ (data.active_difficulty, system.nodes.front ()->active.active_difficulty ());
}

// We gave some nodes different bandwidth caps, confirm they are not all the same
auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap;
telemetry_datas.erase (telemetry_datas.begin ());
auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) {
return telemetry_data.bandwidth_cap == bandwidth_cap;
});
ASSERT_FALSE (all_bandwidth_limits_same);
}

TEST (node_telemetry, receive_from_non_listening_channel)
{
nano::system system;
Expand Down
35 changes: 24 additions & 11 deletions nano/node/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

std::bitset<16> constexpr nano::message_header::block_type_mask;
std::bitset<16> constexpr nano::message_header::count_mask;
std::bitset<16> constexpr nano::message_header::telemetry_size_mask;

std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test;
std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta;
Expand Down Expand Up @@ -1115,7 +1116,9 @@ nano::telemetry_ack::telemetry_ack (nano::telemetry_data const & telemetry_data_
message (nano::message_type::telemetry_ack),
data (telemetry_data_a)
{
header.extensions = telemetry_data::size;
debug_assert (telemetry_data::size < 2048); // Maximum size the mask allows
header.extensions &= ~message_header::telemetry_size_mask;
header.extensions |= std::bitset<16> (static_cast<unsigned long long> (telemetry_data::size));
}

void nano::telemetry_ack::serialize (nano::stream & stream_a, bool use_epoch_2_min_version_a) const
Expand Down Expand Up @@ -1158,7 +1161,7 @@ uint16_t nano::telemetry_ack::size () const

uint16_t nano::telemetry_ack::size (nano::message_header const & message_header_a)
{
return static_cast<uint16_t> (message_header_a.extensions.to_ulong ());
return static_cast<uint16_t> ((message_header_a.extensions & message_header::telemetry_size_mask).to_ullong ());
}

bool nano::telemetry_ack::is_empty_payload () const
Expand All @@ -1171,13 +1174,20 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa
read (stream_a, signature);
read (stream_a, node_id);
read (stream_a, block_count);
boost::endian::big_to_native_inplace (block_count);
read (stream_a, cemented_count);
boost::endian::big_to_native_inplace (cemented_count);
read (stream_a, unchecked_count);
boost::endian::big_to_native_inplace (unchecked_count);
read (stream_a, account_count);
boost::endian::big_to_native_inplace (account_count);
read (stream_a, bandwidth_cap);
boost::endian::big_to_native_inplace (bandwidth_cap);
read (stream_a, peer_count);
boost::endian::big_to_native_inplace (peer_count);
read (stream_a, protocol_version);
read (stream_a, uptime);
boost::endian::big_to_native_inplace (uptime);
read (stream_a, genesis_block.bytes);
read (stream_a, major_version);
read (stream_a, minor_version);
Expand All @@ -1187,29 +1197,32 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa

uint64_t timestamp_l;
read (stream_a, timestamp_l);
boost::endian::big_to_native_inplace (timestamp_l);
timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (timestamp_l));
read (stream_a, active_difficulty);
boost::endian::big_to_native_inplace (active_difficulty);
}

void nano::telemetry_data::serialize_without_signature (nano::stream & stream_a, uint16_t /* size_a */) const
{
// All values should be serialized in big endian
write (stream_a, node_id);
write (stream_a, block_count);
write (stream_a, cemented_count);
write (stream_a, unchecked_count);
write (stream_a, account_count);
write (stream_a, bandwidth_cap);
write (stream_a, peer_count);
write (stream_a, boost::endian::native_to_big (block_count));
write (stream_a, boost::endian::native_to_big (cemented_count));
write (stream_a, boost::endian::native_to_big (unchecked_count));
write (stream_a, boost::endian::native_to_big (account_count));
write (stream_a, boost::endian::native_to_big (bandwidth_cap));
write (stream_a, boost::endian::native_to_big (peer_count));
write (stream_a, protocol_version);
write (stream_a, uptime);
write (stream_a, boost::endian::native_to_big (uptime));
write (stream_a, genesis_block.bytes);
write (stream_a, major_version);
write (stream_a, minor_version);
write (stream_a, patch_version);
write (stream_a, pre_release_version);
write (stream_a, maker);
write (stream_a, std::chrono::duration_cast<std::chrono::milliseconds> (timestamp.time_since_epoch ()).count ());
write (stream_a, active_difficulty);
write (stream_a, boost::endian::native_to_big (std::chrono::duration_cast<std::chrono::milliseconds> (timestamp.time_since_epoch ()).count ()));
write (stream_a, boost::endian::native_to_big (active_difficulty));
}

void nano::telemetry_data::serialize (nano::stream & stream_a) const
Expand Down
1 change: 1 addition & 0 deletions nano/node/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ class message_header final

static std::bitset<16> constexpr block_type_mask{ 0x0f00 };
static std::bitset<16> constexpr count_mask{ 0xf000 };
static std::bitset<16> constexpr telemetry_size_mask{ 0x07ff };
};
class message
{
Expand Down
Loading

0 comments on commit 83e23b7

Please sign in to comment.