From 83e23b74c59d8a39f720a0aabe5e9d69c4b55a07 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Mon, 4 May 2020 16:07:26 +0100 Subject: [PATCH] Serialize telemetry as big endian (#2751) --- nano/core_test/node_telemetry.cpp | 102 ------------- nano/node/common.cpp | 35 +++-- nano/node/common.hpp | 1 + nano/slow_test/node.cpp | 244 +++++++++++++++++++++--------- 4 files changed, 198 insertions(+), 184 deletions(-) diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index 5b72c8c881..50dd80e3c6 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -276,108 +276,6 @@ TEST (node_telemetry, basic) } } -TEST (node_telemetry, many_nodes) -{ - nano::system system; - nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; - node_flags.disable_request_loop = true; - // The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number. - const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; - for (auto i = 0; i < num_nodes; ++i) - { - nano::node_config node_config (nano::get_available_port (), system.logging); - // Make a metric completely different for each node so we can check afterwards that there are no duplicates - node_config.bandwidth_limit = 100000 + i; - - auto node = std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags); - node->start (); - system.nodes.push_back (node); - } - - // Merge peers after creating nodes as some backends (RocksDB) can take a while to initialize nodes (Windows/Debug for instance) - // and timeouts can occur between nodes while starting up many nodes synchronously. - for (auto const & node : system.nodes) - { - for (auto const & other_node : system.nodes) - { - if (node != other_node) - { - node->network.merge_peer (other_node->network.endpoint ()); - } - } - } - - wait_peer_connections (system); - - // Give all nodes a non-default number of blocks - nano::keypair key; - nano::genesis genesis; - nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())); - for (auto node : system.nodes) - { - auto transaction (node->store.tx_begin_write ()); - ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); - } - - // This is the node which will request metrics from all other nodes - auto node_client = system.nodes.front (); - - std::mutex mutex; - std::vector telemetry_datas; - auto peers = node_client->network.list (num_nodes - 1); - ASSERT_EQ (peers.size (), num_nodes - 1); - for (auto const & peer : peers) - { - node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - nano::lock_guard guard (mutex); - telemetry_datas.push_back (response_a.telemetry_data); - }); - } - - system.deadline_set (20s); - nano::unique_lock lk (mutex); - while (telemetry_datas.size () != num_nodes - 1) - { - lk.unlock (); - ASSERT_NO_ERROR (system.poll ()); - lk.lock (); - } - - // Check the metrics - nano::network_params params; - for (auto & data : telemetry_datas) - { - ASSERT_EQ (data.unchecked_count, 0); - ASSERT_EQ (data.cemented_count, 1); - ASSERT_LE (data.peer_count, 9); - ASSERT_EQ (data.account_count, 1); - ASSERT_TRUE (data.block_count == 2); - ASSERT_EQ (data.protocol_version, params.protocol.telemetry_protocol_version_min); - ASSERT_GE (data.bandwidth_cap, 100000); - ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ()); - ASSERT_EQ (data.major_version, nano::get_major_node_version ()); - ASSERT_EQ (data.minor_version, nano::get_minor_node_version ()); - ASSERT_EQ (data.patch_version, nano::get_patch_node_version ()); - ASSERT_EQ (data.pre_release_version, nano::get_pre_release_node_version ()); - ASSERT_EQ (data.maker, 0); - ASSERT_LT (data.uptime, 100); - ASSERT_EQ (data.genesis_block, genesis.hash ()); - ASSERT_LE (data.timestamp, std::chrono::system_clock::now ()); - ASSERT_EQ (data.active_difficulty, system.nodes.front ()->active.active_difficulty ()); - } - - // We gave some nodes different bandwidth caps, confirm they are not all the same - auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap; - telemetry_datas.erase (telemetry_datas.begin ()); - auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) { - return telemetry_data.bandwidth_cap == bandwidth_cap; - }); - ASSERT_FALSE (all_bandwidth_limits_same); -} - TEST (node_telemetry, receive_from_non_listening_channel) { nano::system system; diff --git a/nano/node/common.cpp b/nano/node/common.cpp index 5eb03ca833..3a4a7a6bce 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -14,6 +14,7 @@ std::bitset<16> constexpr nano::message_header::block_type_mask; std::bitset<16> constexpr nano::message_header::count_mask; +std::bitset<16> constexpr nano::message_header::telemetry_size_mask; std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::test; std::chrono::seconds constexpr nano::telemetry_cache_cutoffs::beta; @@ -1115,7 +1116,9 @@ nano::telemetry_ack::telemetry_ack (nano::telemetry_data const & telemetry_data_ message (nano::message_type::telemetry_ack), data (telemetry_data_a) { - header.extensions = telemetry_data::size; + debug_assert (telemetry_data::size < 2048); // Maximum size the mask allows + header.extensions &= ~message_header::telemetry_size_mask; + header.extensions |= std::bitset<16> (static_cast (telemetry_data::size)); } void nano::telemetry_ack::serialize (nano::stream & stream_a, bool use_epoch_2_min_version_a) const @@ -1158,7 +1161,7 @@ uint16_t nano::telemetry_ack::size () const uint16_t nano::telemetry_ack::size (nano::message_header const & message_header_a) { - return static_cast (message_header_a.extensions.to_ulong ()); + return static_cast ((message_header_a.extensions & message_header::telemetry_size_mask).to_ullong ()); } bool nano::telemetry_ack::is_empty_payload () const @@ -1171,13 +1174,20 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa read (stream_a, signature); read (stream_a, node_id); read (stream_a, block_count); + boost::endian::big_to_native_inplace (block_count); read (stream_a, cemented_count); + boost::endian::big_to_native_inplace (cemented_count); read (stream_a, unchecked_count); + boost::endian::big_to_native_inplace (unchecked_count); read (stream_a, account_count); + boost::endian::big_to_native_inplace (account_count); read (stream_a, bandwidth_cap); + boost::endian::big_to_native_inplace (bandwidth_cap); read (stream_a, peer_count); + boost::endian::big_to_native_inplace (peer_count); read (stream_a, protocol_version); read (stream_a, uptime); + boost::endian::big_to_native_inplace (uptime); read (stream_a, genesis_block.bytes); read (stream_a, major_version); read (stream_a, minor_version); @@ -1187,29 +1197,32 @@ void nano::telemetry_data::deserialize (nano::stream & stream_a, uint16_t payloa uint64_t timestamp_l; read (stream_a, timestamp_l); + boost::endian::big_to_native_inplace (timestamp_l); timestamp = std::chrono::system_clock::time_point (std::chrono::milliseconds (timestamp_l)); read (stream_a, active_difficulty); + boost::endian::big_to_native_inplace (active_difficulty); } void nano::telemetry_data::serialize_without_signature (nano::stream & stream_a, uint16_t /* size_a */) const { + // All values should be serialized in big endian write (stream_a, node_id); - write (stream_a, block_count); - write (stream_a, cemented_count); - write (stream_a, unchecked_count); - write (stream_a, account_count); - write (stream_a, bandwidth_cap); - write (stream_a, peer_count); + write (stream_a, boost::endian::native_to_big (block_count)); + write (stream_a, boost::endian::native_to_big (cemented_count)); + write (stream_a, boost::endian::native_to_big (unchecked_count)); + write (stream_a, boost::endian::native_to_big (account_count)); + write (stream_a, boost::endian::native_to_big (bandwidth_cap)); + write (stream_a, boost::endian::native_to_big (peer_count)); write (stream_a, protocol_version); - write (stream_a, uptime); + write (stream_a, boost::endian::native_to_big (uptime)); write (stream_a, genesis_block.bytes); write (stream_a, major_version); write (stream_a, minor_version); write (stream_a, patch_version); write (stream_a, pre_release_version); write (stream_a, maker); - write (stream_a, std::chrono::duration_cast (timestamp.time_since_epoch ()).count ()); - write (stream_a, active_difficulty); + write (stream_a, boost::endian::native_to_big (std::chrono::duration_cast (timestamp.time_since_epoch ()).count ())); + write (stream_a, boost::endian::native_to_big (active_difficulty)); } void nano::telemetry_data::serialize (nano::stream & stream_a) const diff --git a/nano/node/common.hpp b/nano/node/common.hpp index a68e481213..4672b7783a 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -220,6 +220,7 @@ class message_header final static std::bitset<16> constexpr block_type_mask{ 0x0f00 }; static std::bitset<16> constexpr count_mask{ 0xf000 }; + static std::bitset<16> constexpr telemetry_size_mask{ 0x07ff }; }; class message { diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 071dec6601..f6945b87f1 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1148,6 +1148,179 @@ TEST (node_telemetry, under_load) } } +TEST (node_telemetry, all_peers_use_single_request_cache) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + node_flags.disable_initial_telemetry_requests = true; + auto node_client = system.add_node (node_flags); + auto node_server = system.add_node (node_flags); + + wait_peer_connections (system); + + // Request telemetry metrics + nano::telemetry_data telemetry_data; + { + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + telemetry_data = response_a.telemetry_data; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + auto responses = node_client->telemetry->get_metrics (); + ASSERT_EQ (telemetry_data, responses.begin ()->second); + + // Confirm only 1 request was made + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + + std::this_thread::sleep_for (node_server->telemetry->cache_plus_buffer_cutoff_time ()); + + // Should be empty + responses = node_client->telemetry->get_metrics (); + ASSERT_TRUE (responses.empty ()); + + { + std::atomic done{ false }; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + telemetry_data = response_a.telemetry_data; + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + } + + responses = node_client->telemetry->get_metrics (); + ASSERT_EQ (telemetry_data, responses.begin ()->second); + + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); +} + +TEST (node_telemetry, many_nodes) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; + node_flags.disable_initial_telemetry_requests = true; + node_flags.disable_request_loop = true; + // The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number. + const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; + for (auto i = 0; i < num_nodes; ++i) + { + nano::node_config node_config (nano::get_available_port (), system.logging); + // Make a metric completely different for each node so we can check afterwards that there are no duplicates + node_config.bandwidth_limit = 100000 + i; + + auto node = std::make_shared (system.io_ctx, nano::unique_path (), system.alarm, node_config, system.work, node_flags); + node->start (); + system.nodes.push_back (node); + } + + // Merge peers after creating nodes as some backends (RocksDB) can take a while to initialize nodes (Windows/Debug for instance) + // and timeouts can occur between nodes while starting up many nodes synchronously. + for (auto const & node : system.nodes) + { + for (auto const & other_node : system.nodes) + { + if (node != other_node) + { + node->network.merge_peer (other_node->network.endpoint ()); + } + } + } + + wait_peer_connections (system); + + // Give all nodes a non-default number of blocks + nano::keypair key; + nano::genesis genesis; + nano::state_block send (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())); + for (auto node : system.nodes) + { + auto transaction (node->store.tx_begin_write ()); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + } + + // This is the node which will request metrics from all other nodes + auto node_client = system.nodes.front (); + + std::mutex mutex; + std::vector telemetry_datas; + auto peers = node_client->network.list (num_nodes - 1); + ASSERT_EQ (peers.size (), num_nodes - 1); + for (auto const & peer : peers) + { + node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + nano::lock_guard guard (mutex); + telemetry_datas.push_back (response_a.telemetry_data); + }); + } + + system.deadline_set (20s); + nano::unique_lock lk (mutex); + while (telemetry_datas.size () != num_nodes - 1) + { + lk.unlock (); + ASSERT_NO_ERROR (system.poll ()); + lk.lock (); + } + + // Check the metrics + nano::network_params params; + for (auto & data : telemetry_datas) + { + ASSERT_EQ (data.unchecked_count, 0); + ASSERT_EQ (data.cemented_count, 1); + ASSERT_LE (data.peer_count, 9); + ASSERT_EQ (data.account_count, 1); + ASSERT_TRUE (data.block_count == 2); + ASSERT_EQ (data.protocol_version, params.protocol.telemetry_protocol_version_min); + ASSERT_GE (data.bandwidth_cap, 100000); + ASSERT_LT (data.bandwidth_cap, 100000 + system.nodes.size ()); + ASSERT_EQ (data.major_version, nano::get_major_node_version ()); + ASSERT_EQ (data.minor_version, nano::get_minor_node_version ()); + ASSERT_EQ (data.patch_version, nano::get_patch_node_version ()); + ASSERT_EQ (data.pre_release_version, nano::get_pre_release_node_version ()); + ASSERT_EQ (data.maker, 0); + ASSERT_LT (data.uptime, 100); + ASSERT_EQ (data.genesis_block, genesis.hash ()); + ASSERT_LE (data.timestamp, std::chrono::system_clock::now ()); + ASSERT_EQ (data.active_difficulty, system.nodes.front ()->active.active_difficulty ()); + } + + // We gave some nodes different bandwidth caps, confirm they are not all the same + auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap; + telemetry_datas.erase (telemetry_datas.begin ()); + auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) { + return telemetry_data.bandwidth_cap == bandwidth_cap; + }); + ASSERT_FALSE (all_bandwidth_limits_same); +} + // Similar to signature_checker.boundary_checks but more exhaustive. Can take up to 1 minute TEST (signature_checker, mass_boundary_checks) { @@ -1322,74 +1495,3 @@ TEST (node, mass_epoch_upgrader) perform_test (42); perform_test (std::numeric_limits::max ()); } - -TEST (node_telemetry, all_peers_use_single_request_cache) -{ - nano::system system; - nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - node_flags.disable_initial_telemetry_requests = true; - auto node_client = system.add_node (node_flags); - auto node_server = system.add_node (node_flags); - - wait_peer_connections (system); - - // Request telemetry metrics - nano::telemetry_data telemetry_data; - { - std::atomic done{ false }; - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - telemetry_data = response_a.telemetry_data; - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - } - - auto responses = node_client->telemetry->get_metrics (); - ASSERT_EQ (telemetry_data, responses.begin ()->second); - - // Confirm only 1 request was made - ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - - std::this_thread::sleep_for (node_server->telemetry->cache_plus_buffer_cutoff_time ()); - - // Should be empty - responses = node_client->telemetry->get_metrics (); - ASSERT_TRUE (responses.empty ()); - - { - std::atomic done{ false }; - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - telemetry_data = response_a.telemetry_data; - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - } - - responses = node_client->telemetry->get_metrics (); - ASSERT_EQ (telemetry_data, responses.begin ()->second); - - ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); -}