Skip to content

Commit

Permalink
Move excluded_peers to network (#2693)
Browse files Browse the repository at this point in the history
* Moves peer exclusion code into its own headers, moves the object from bootstrap initiator to network, some general cleanup and adds a validation test

* Get limited size from method

* Compositing peer_exclusion for container info collection (Wes review)

* Fix container info collection and include in test
  • Loading branch information
guilhermelawless authored Mar 31, 2020
1 parent 8d4a447 commit 33874b6
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 115 deletions.
6 changes: 3 additions & 3 deletions nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,8 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
ASSERT_NO_ERROR (system.poll ());
}
//Add single excluded peers record (2 records are required to drop peer)
node3->bootstrap_initiator.excluded_peers.add (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ()), 0);
ASSERT_FALSE (node3->bootstrap_initiator.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
node3->network.excluded_peers.add (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ()), 0);
ASSERT_FALSE (node3->network.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
node3->bootstrap_initiator.bootstrap (node1->network.endpoint ());
system.deadline_set (15s);
while (node3->bootstrap_initiator.in_progress ())
Expand All @@ -434,7 +434,7 @@ TEST (bootstrap_processor, frontiers_unconfirmed)
ASSERT_FALSE (node3->ledger.block_exists (send1->hash ()));
ASSERT_FALSE (node3->ledger.block_exists (open1->hash ()));
ASSERT_EQ (1, node3->stats.count (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in)); // failed request from node1
ASSERT_TRUE (node3->bootstrap_initiator.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
ASSERT_TRUE (node3->network.excluded_peers.check (nano::transport::map_endpoint_to_tcp (node1->network.endpoint ())));
}

TEST (bootstrap_processor, frontiers_confirmed)
Expand Down
53 changes: 53 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1052,3 +1052,56 @@ TEST (bandwidth_limiter, validate)
ASSERT_EQ (limiter_3.get_rate (), limiter_3.get_limit () + message_size);
ASSERT_LT (std::chrono::steady_clock::now () - 1s, start);
}

namespace nano
{
TEST (peer_exclusion, validate)
{
nano::peer_exclusion excluded_peers;
size_t fake_peers_count = 10;
auto max_size = excluded_peers.limited_size (fake_peers_count);
auto address (boost::asio::ip::address_v6::loopback ());
for (auto i = 0; i < max_size + 2; ++i)
{
nano::tcp_endpoint endpoint (address, i);
ASSERT_FALSE (excluded_peers.check (endpoint));
ASSERT_EQ (1, excluded_peers.add (endpoint, fake_peers_count));
ASSERT_FALSE (excluded_peers.check (endpoint));
}
// The oldest one must have been removed
ASSERT_EQ (max_size + 1, excluded_peers.size ());
auto & peers_by_endpoint (excluded_peers.peers.get<nano::peer_exclusion::tag_endpoint> ());
ASSERT_EQ (peers_by_endpoint.end (), peers_by_endpoint.find (nano::tcp_endpoint (address, 0)));

auto to_seconds = [](std::chrono::steady_clock::time_point const & timepoint) {
return std::chrono::duration_cast<std::chrono::seconds> (timepoint.time_since_epoch ()).count ();
};
nano::tcp_endpoint first (address, 1);
ASSERT_NE (peers_by_endpoint.end (), peers_by_endpoint.find (first));
nano::tcp_endpoint second (address, 2);
ASSERT_EQ (false, excluded_peers.check (second));
ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (peers_by_endpoint.find (second)->exclude_until), 2);
ASSERT_EQ (2, excluded_peers.add (second, fake_peers_count));
ASSERT_EQ (peers_by_endpoint.end (), peers_by_endpoint.find (first));
ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours), to_seconds (peers_by_endpoint.find (second)->exclude_until), 2);
ASSERT_EQ (3, excluded_peers.add (second, fake_peers_count));
ASSERT_NEAR (to_seconds (std::chrono::steady_clock::now () + excluded_peers.exclude_time_hours * 3 * 2), to_seconds (peers_by_endpoint.find (second)->exclude_until), 2);
ASSERT_EQ (max_size, excluded_peers.size ());

// Clear many entries if there are a low number of peers
ASSERT_EQ (4, excluded_peers.add (second, 0));
ASSERT_EQ (1, excluded_peers.size ());

auto component (nano::collect_container_info (excluded_peers, ""));
auto composite (dynamic_cast<nano::container_info_composite *> (component.get ()));
ASSERT_NE (nullptr, component);
auto & children (composite->get_children ());
ASSERT_EQ (1, children.size ());
auto child_leaf (dynamic_cast<nano::container_info_leaf *> (children.front ().get ()));
ASSERT_NE (nullptr, child_leaf);
auto child_info (child_leaf->get_info ());
ASSERT_EQ ("peers", child_info.name);
ASSERT_EQ (1, child_info.count);
ASSERT_EQ (sizeof (decltype (excluded_peers.peers)::value_type), child_info.sizeof_element);
}
}
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ add_library (node
openclwork.cpp
payment_observer_processor.hpp
payment_observer_processor.cpp
peer_exclusion.hpp
peer_exclusion.cpp
portmapping.hpp
portmapping.cpp
node_pow_server_config.hpp
Expand Down
77 changes: 2 additions & 75 deletions nano/node/bootstrap/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,6 @@

#include <algorithm>

constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_time_hours;
constexpr std::chrono::hours nano::bootstrap_excluded_peers::exclude_remove_hours;

nano::bootstrap_initiator::bootstrap_initiator (nano::node & node_a) :
node (node_a)
{
Expand Down Expand Up @@ -75,9 +72,9 @@ void nano::bootstrap_initiator::bootstrap (nano::endpoint const & endpoint_a, bo
attempts.add (legacy_attempt);
if (frontiers_confirmed)
{
excluded_peers.remove (nano::transport::map_endpoint_to_tcp (endpoint_a));
node.network.excluded_peers.remove (nano::transport::map_endpoint_to_tcp (endpoint_a));
}
if (!excluded_peers.check (nano::transport::map_endpoint_to_tcp (endpoint_a)))
if (!node.network.excluded_peers.check (nano::transport::map_endpoint_to_tcp (endpoint_a)))
{
connections->add_connection (endpoint_a);
}
Expand Down Expand Up @@ -289,7 +286,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bo
{
size_t count;
size_t cache_count;
size_t excluded_peers_count;
{
nano::lock_guard<std::mutex> guard (bootstrap_initiator.observers_mutex);
count = bootstrap_initiator.observers.size ();
Expand All @@ -298,18 +294,12 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (bo
nano::lock_guard<std::mutex> guard (bootstrap_initiator.cache.pulls_cache_mutex);
cache_count = bootstrap_initiator.cache.cache.size ();
}
{
nano::lock_guard<std::mutex> guard (bootstrap_initiator.excluded_peers.excluded_peers_mutex);
excluded_peers_count = bootstrap_initiator.excluded_peers.peers.size ();
}

auto sizeof_element = sizeof (decltype (bootstrap_initiator.observers)::value_type);
auto sizeof_cache_element = sizeof (decltype (bootstrap_initiator.cache.cache)::value_type);
auto sizeof_excluded_peers_element = sizeof (decltype (bootstrap_initiator.excluded_peers.peers)::value_type);
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "observers", count, sizeof_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pulls_cache", cache_count, sizeof_cache_element }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "excluded_peers", excluded_peers_count, sizeof_excluded_peers_element }));
return composite;
}

Expand Down Expand Up @@ -362,69 +352,6 @@ void nano::pulls_cache::remove (nano::pull_info const & pull_a)
cache.get<account_head_tag> ().erase (head_512);
}

uint64_t nano::bootstrap_excluded_peers::add (nano::tcp_endpoint const & endpoint_a, size_t network_peers_count)
{
uint64_t result (0);
nano::lock_guard<std::mutex> guard (excluded_peers_mutex);
// Clean old excluded peers
while (peers.size () > 1 && peers.size () > std::min (static_cast<double> (excluded_peers_size_max), network_peers_count * excluded_peers_percentage_limit))
{
peers.erase (peers.begin ());
}
debug_assert (peers.size () <= excluded_peers_size_max);
auto existing (peers.get<endpoint_tag> ().find (endpoint_a));
if (existing == peers.get<endpoint_tag> ().end ())
{
// Insert new endpoint
auto inserted (peers.emplace (nano::excluded_peers_item{ std::chrono::steady_clock::steady_clock::now () + exclude_time_hours, endpoint_a, 1 }));
(void)inserted;
debug_assert (inserted.second);
result = 1;
}
else
{
// Update existing endpoint
peers.get<endpoint_tag> ().modify (existing, [&result](nano::excluded_peers_item & item_a) {
++item_a.score;
result = item_a.score;
if (item_a.score == nano::bootstrap_excluded_peers::score_limit)
{
item_a.exclude_until = std::chrono::steady_clock::now () + nano::bootstrap_excluded_peers::exclude_time_hours;
}
else if (item_a.score > nano::bootstrap_excluded_peers::score_limit)
{
item_a.exclude_until = std::chrono::steady_clock::now () + nano::bootstrap_excluded_peers::exclude_time_hours * item_a.score * 2;
}
});
}
return result;
}

bool nano::bootstrap_excluded_peers::check (nano::tcp_endpoint const & endpoint_a)
{
bool excluded (false);
nano::lock_guard<std::mutex> guard (excluded_peers_mutex);
auto existing (peers.get<endpoint_tag> ().find (endpoint_a));
if (existing != peers.get<endpoint_tag> ().end () && existing->score >= score_limit)
{
if (existing->exclude_until > std::chrono::steady_clock::now ())
{
excluded = true;
}
else if (existing->exclude_until + exclude_remove_hours * existing->score < std::chrono::steady_clock::now ())
{
peers.get<endpoint_tag> ().erase (existing);
}
}
return excluded;
}

void nano::bootstrap_excluded_peers::remove (nano::tcp_endpoint const & endpoint_a)
{
nano::lock_guard<std::mutex> guard (excluded_peers_mutex);
peers.get<endpoint_tag> ().erase (endpoint_a);
}

void nano::bootstrap_attempts::add (std::shared_ptr<nano::bootstrap_attempt> attempt_a)
{
nano::lock_guard<std::mutex> lock (bootstrap_attempts_mutex);
Expand Down
33 changes: 0 additions & 33 deletions nano/node/bootstrap/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,38 +64,6 @@ class pulls_cache final
// clang-format on
constexpr static size_t cache_size_max = 10000;
};
class excluded_peers_item final
{
public:
std::chrono::steady_clock::time_point exclude_until;
nano::tcp_endpoint endpoint;
uint64_t score;
};
class bootstrap_excluded_peers final
{
public:
uint64_t add (nano::tcp_endpoint const &, size_t);
bool check (nano::tcp_endpoint const &);
void remove (nano::tcp_endpoint const &);
std::mutex excluded_peers_mutex;
class endpoint_tag
{
};
// clang-format off
boost::multi_index_container<nano::excluded_peers_item,
mi::indexed_by<
mi::ordered_non_unique<
mi::member<nano::excluded_peers_item, std::chrono::steady_clock::time_point, &nano::excluded_peers_item::exclude_until>>,
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<nano::excluded_peers_item, nano::tcp_endpoint, &nano::excluded_peers_item::endpoint>>>>
peers;
// clang-format on
constexpr static size_t excluded_peers_size_max = 5000;
constexpr static double excluded_peers_percentage_limit = 0.5;
constexpr static uint64_t score_limit = 2;
constexpr static std::chrono::hours exclude_time_hours = std::chrono::hours (1);
constexpr static std::chrono::hours exclude_remove_hours = std::chrono::hours (24);
};
class bootstrap_attempts final
{
public:
Expand Down Expand Up @@ -130,7 +98,6 @@ class bootstrap_initiator final
std::shared_ptr<nano::bootstrap_attempt> current_lazy_attempt ();
std::shared_ptr<nano::bootstrap_attempt> current_wallet_attempt ();
nano::pulls_cache cache;
nano::bootstrap_excluded_peers excluded_peers;
nano::bootstrap_attempts attempts;
void stop ();

Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_attempt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,8 @@ void nano::bootstrap_attempt_legacy::attempt_restart_check (nano::unique_lock<st
if (!confirmed)
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_confirmation_failed, nano::stat::dir::in);
auto score (node->bootstrap_initiator.excluded_peers.add (endpoint_frontier_request, node->network.size ()));
if (score >= nano::bootstrap_excluded_peers::score_limit)
auto score (node->network.excluded_peers.add (endpoint_frontier_request, node->network.size ()));
if (score >= nano::peer_exclusion::score_limit)
{
node->logger.always_log (boost::str (boost::format ("Adding peer %1% to excluded peers list with score %2% after %3% seconds bootstrap attempt") % endpoint_frontier_request % score % std::chrono::duration_cast<std::chrono::seconds> (std::chrono::steady_clock::now () - attempt_start).count ()));
}
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_connections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ std::shared_ptr<nano::bootstrap_client> nano::bootstrap_connections::connection
void nano::bootstrap_connections::pool_connection (std::shared_ptr<nano::bootstrap_client> client_a, bool new_client, bool push_front)
{
nano::unique_lock<std::mutex> lock (mutex);
if (!stopped && !client_a->pending_stop && !node.bootstrap_initiator.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
if (!stopped && !client_a->pending_stop && !node.network.excluded_peers.check (client_a->channel->get_tcp_endpoint ()))
{
// Idle bootstrap client socket
if (auto socket_l = client_a->channel->socket.lock ())
Expand Down Expand Up @@ -284,7 +284,7 @@ void nano::bootstrap_connections::populate_connections (bool repeat)
for (auto i = 0u; i < delta; i++)
{
auto endpoint (node.network.bootstrap_peer (true));
if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && (node.flags.allow_bootstrap_peers_duplicates || endpoints.find (endpoint) == endpoints.end ()) && !node.bootstrap_initiator.excluded_peers.check (endpoint))
if (endpoint != nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0) && (node.flags.allow_bootstrap_peers_duplicates || endpoints.find (endpoint) == endpoints.end ()) && !node.network.excluded_peers.check (endpoint))
{
connect_client (endpoint);
endpoints.insert (endpoint);
Expand Down
1 change: 1 addition & 0 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ne
composite->add_component (network.tcp_channels.collect_container_info ("tcp_channels"));
composite->add_component (network.udp_channels.collect_container_info ("udp_channels"));
composite->add_component (network.syn_cookies.collect_container_info ("syn_cookies"));
composite->add_component (collect_container_info (network.excluded_peers, "excluded_peers"));
return composite;
}

Expand Down
2 changes: 2 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/node/common.hpp>
#include <nano/node/peer_exclusion.hpp>
#include <nano/node/transport/tcp.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/secure/network_filter.hpp>
Expand Down Expand Up @@ -154,6 +155,7 @@ class network final
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::peer_exclusion excluded_peers;
nano::node & node;
nano::network_filter publish_filter;
nano::transport::udp_channels udp_channels;
Expand Down
94 changes: 94 additions & 0 deletions nano/node/peer_exclusion.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
#include <nano/node/peer_exclusion.hpp>

constexpr std::chrono::hours nano::peer_exclusion::exclude_time_hours;
constexpr std::chrono::hours nano::peer_exclusion::exclude_remove_hours;
constexpr size_t nano::peer_exclusion::size_max;
constexpr double nano::peer_exclusion::peers_percentage_limit;

uint64_t nano::peer_exclusion::add (nano::tcp_endpoint const & endpoint_a, size_t const network_peers_count_a)
{
uint64_t result (0);
nano::lock_guard<std::mutex> guard (mutex);
// Clean old excluded peers
auto limited = limited_size (network_peers_count_a);
while (peers.size () > 1 && peers.size () > limited)
{
peers.get<tag_exclusion> ().erase (peers.get<tag_exclusion> ().begin ());
}
debug_assert (peers.size () <= size_max);
auto & peers_by_endpoint (peers.get<tag_endpoint> ());
auto existing (peers_by_endpoint.find (endpoint_a));
if (existing == peers_by_endpoint.end ())
{
// Insert new endpoint
auto inserted (peers.emplace (peer_exclusion::item{ std::chrono::steady_clock::steady_clock::now () + exclude_time_hours, endpoint_a, 1 }));
(void)inserted;
debug_assert (inserted.second);
result = 1;
}
else
{
// Update existing endpoint
peers_by_endpoint.modify (existing, [&result](peer_exclusion::item & item_a) {
++item_a.score;
result = item_a.score;
if (item_a.score == peer_exclusion::score_limit)
{
item_a.exclude_until = std::chrono::steady_clock::now () + peer_exclusion::exclude_time_hours;
}
else if (item_a.score > peer_exclusion::score_limit)
{
item_a.exclude_until = std::chrono::steady_clock::now () + peer_exclusion::exclude_time_hours * item_a.score * 2;
}
});
}
return result;
}

bool nano::peer_exclusion::check (nano::tcp_endpoint const & endpoint_a)
{
bool excluded (false);
nano::lock_guard<std::mutex> guard (mutex);
auto & peers_by_endpoint (peers.get<tag_endpoint> ());
auto existing (peers_by_endpoint.find (endpoint_a));
if (existing != peers_by_endpoint.end () && existing->score >= score_limit)
{
if (existing->exclude_until > std::chrono::steady_clock::now ())
{
excluded = true;
}
else if (existing->exclude_until + exclude_remove_hours * existing->score < std::chrono::steady_clock::now ())
{
peers_by_endpoint.erase (existing);
}
}
return excluded;
}

void nano::peer_exclusion::remove (nano::tcp_endpoint const & endpoint_a)
{
nano::lock_guard<std::mutex> guard (mutex);
peers.get<tag_endpoint> ().erase (endpoint_a);
}

size_t nano::peer_exclusion::limited_size (size_t const network_peers_count_a) const
{
return std::min<size_t> (size_max, network_peers_count_a * peers_percentage_limit);
}

size_t nano::peer_exclusion::size () const
{
nano::lock_guard<std::mutex> guard (mutex);
return peers.size ();
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (nano::peer_exclusion const & excluded_peers, const std::string & name)
{
auto composite = std::make_unique<container_info_composite> (name);

size_t excluded_peers_count = excluded_peers.size ();
auto sizeof_excluded_peers_element = sizeof (nano::peer_exclusion::ordered_endpoints::value_type);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "peers", excluded_peers_count, sizeof_excluded_peers_element }));

return composite;
}
Loading

0 comments on commit 33874b6

Please sign in to comment.