Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 'rebroadcasted' flag to confirm_ack message #4621

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions nano/core_test/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ TEST (inactive_votes_cache, basic)
ASSERT_TIMELY_EQ (5s, node.vote_cache.size (), 1);
node.process_active (send);
ASSERT_TIMELY (5s, node.ledger.confirmed.block_exists_or_pruned (node.ledger.tx_begin_read (), send->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

/**
Expand All @@ -276,7 +276,7 @@ TEST (inactive_votes_cache, non_final)
node.process_active (send);
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = node.active.election (send->qualified_root ()));
ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached), 1);
ASSERT_TIMELY_EQ (5s, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache), 1);
ASSERT_TIMELY_EQ (5s, nano::dev::constants.genesis_amount - 100, election->tally ().begin ()->first);
ASSERT_FALSE (election->confirmed ());
}
Expand Down Expand Up @@ -318,7 +318,7 @@ TEST (inactive_votes_cache, fork)
node.process_active (send1);
ASSERT_TIMELY_EQ (5s, election->blocks ().size (), 2);
ASSERT_TIMELY (5s, node.block_confirmed (send1->hash ()));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, existing_vote)
Expand Down Expand Up @@ -354,7 +354,7 @@ TEST (inactive_votes_cache, existing_vote)
auto vote1 = nano::test::make_vote (key, { send }, nano::vote::timestamp_min * 1, 0);
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY_EQ (5s, election->votes ().size (), 2);
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_new));
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote));
auto last_vote1 (election->votes ()[key.pub]);
ASSERT_EQ (send->hash (), last_vote1.hash);
ASSERT_EQ (nano::vote::timestamp_min * 1, last_vote1.timestamp);
Expand All @@ -372,7 +372,7 @@ TEST (inactive_votes_cache, existing_vote)
ASSERT_EQ (last_vote1.hash, last_vote2.hash);
ASSERT_EQ (last_vote1.timestamp, last_vote2.timestamp);
ASSERT_EQ (last_vote1.time, last_vote2.time);
ASSERT_EQ (0, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (0, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, multiple_votes)
Expand Down Expand Up @@ -425,7 +425,7 @@ TEST (inactive_votes_cache, multiple_votes)
ASSERT_EQ (1, node.vote_cache.size ());
auto election = nano::test::start_election (system, node, send1->hash ());
ASSERT_TIMELY_EQ (5s, 3, election->votes ().size ()); // 2 votes and 1 default not_an_acount
ASSERT_EQ (2, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_cached));
ASSERT_EQ (2, node.stats.count (nano::stat::type::election_vote, nano::stat::detail::cache));
}

TEST (inactive_votes_cache, election_start)
Expand Down
23 changes: 23 additions & 0 deletions nano/core_test/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ TEST (message, confirm_ack_hash_serialization)
ASSERT_EQ (hashes, con2.vote->hashes);
ASSERT_FALSE (header.confirm_is_v2 ());
ASSERT_EQ (header.count_get (), hashes.size ());
ASSERT_FALSE (con2.is_rebroadcasted ());
}

TEST (message, confirm_ack_hash_serialization_v2)
Expand Down Expand Up @@ -223,6 +224,28 @@ TEST (message, confirm_ack_hash_serialization_v2)
ASSERT_EQ (hashes, con2.vote->hashes);
ASSERT_TRUE (header.confirm_is_v2 ());
ASSERT_EQ (header.count_v2_get (), hashes.size ());
ASSERT_FALSE (con2.is_rebroadcasted ());
}

TEST (message, confirm_ack_rebroadcasted_flag)
{
nano::keypair representative1;
auto vote = nano::test::make_vote (representative1, std::vector<nano::block_hash> (), 0, 0);
nano::confirm_ack con1{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
ASSERT_TRUE (con1.is_rebroadcasted ());
std::vector<uint8_t> bytes;
{
nano::vectorstream stream1 (bytes);
con1.serialize (stream1);
}
nano::bufferstream stream2 (bytes.data (), bytes.size ());
bool error (false);
nano::message_header header (error, stream2);
nano::confirm_ack con2 (error, stream2, header);
ASSERT_FALSE (error);
ASSERT_EQ (con1, con2);
ASSERT_TRUE (con2.vote->hashes.empty ());
ASSERT_TRUE (con2.is_rebroadcasted ());
}

TEST (message, confirm_req_hash_serialization)
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2166,7 +2166,7 @@ TEST (node, vote_by_hash_bundle)
nano::keypair key1;
system.wallet (0)->insert_adhoc (key1.prv);

system.nodes[0]->observers.vote.add ([&max_hashes] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const &, nano::vote_code) {
system.nodes[0]->observers.vote.add ([&max_hashes] (std::shared_ptr<nano::vote> const & vote_a, std::shared_ptr<nano::transport::channel> const &, nano::vote_source, nano::vote_code) {
if (vote_a->hashes.size () > max_hashes)
{
max_hashes = vote_a->hashes.size ();
Expand Down
29 changes: 29 additions & 0 deletions nano/core_test/rep_crawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,32 @@ TEST (rep_crawler, two_reps_one_node)
ASSERT_TRUE (nano::dev::genesis_key.pub == reps[0].account || nano::dev::genesis_key.pub == reps[1].account);
ASSERT_TRUE (second_rep.pub == reps[0].account || second_rep.pub == reps[1].account);
}

TEST (rep_crawler, ignore_rebroadcasted)
{
nano::test::system system;
auto & node1 = *system.add_node ();
auto & node2 = *system.add_node ();

auto channel1to2 = node1.network.find_node_id (node2.node_id.pub);
ASSERT_NE (nullptr, channel1to2);

node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel1to2);
ASSERT_ALWAYS_EQ (100ms, node1.rep_crawler.representative_count (), 0);

// Now we spam the vote for genesis, so it appears as a rebroadcasted vote
auto vote = nano::test::make_vote (nano::dev::genesis_key, { nano::dev::genesis->hash () }, 0);

auto channel2to1 = node2.network.find_node_id (node1.node_id.pub);
ASSERT_NE (nullptr, channel2to1);

node1.rep_crawler.force_query (nano::dev::genesis->hash (), channel1to2);

auto tick = [&] () {
nano::confirm_ack msg{ nano::dev::network_params.network, vote, /* rebroadcasted */ true };
channel2to1->send (msg, nullptr, nano::transport::buffer_drop_policy::no_socket_drop);
return false;
};

ASSERT_NEVER (1s, tick () || node1.rep_crawler.representative_count () > 0);
}
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ enum class type
vote_processor_tier,
vote_processor_overfill,
election,
election_vote,
http_callback,
ipc,
tcp,
Expand Down Expand Up @@ -109,6 +110,7 @@ enum class detail
success,
unknown,
cache,
rebroadcast,
queue_overflow,

// processing queue
Expand Down
8 changes: 5 additions & 3 deletions nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
auto max_vote = timestamp_a == std::numeric_limits<uint64_t>::max () && last_vote_l.timestamp < timestamp_a;

bool past_cooldown = true;
if (vote_source_a == vote_source::live) // Only cooldown live votes
if (vote_source_a != vote_source::cache) // Only cooldown live votes
{
const auto cooldown = cooldown_time (weight);
past_cooldown = last_vote_l.time <= std::chrono::steady_clock::now () - cooldown;
Expand All @@ -473,12 +473,14 @@ nano::vote_code nano::election::vote (nano::account const & rep, uint64_t timest
}

last_votes[rep] = { std::chrono::steady_clock::now (), timestamp_a, block_hash_a };
if (vote_source_a == vote_source::live)
if (vote_source_a != vote_source::cache)
{
live_vote_action (rep);
}

node.stats.inc (nano::stat::type::election, vote_source_a == vote_source::live ? nano::stat::detail::vote_new : nano::stat::detail::vote_cached);
node.stats.inc (nano::stat::type::election, nano::stat::detail::vote);
node.stats.inc (nano::stat::type::election_vote, to_stat_detail (vote_source_a));

node.logger.trace (nano::log::type::election, nano::log::detail::vote_processed,
nano::log::arg{ "id", id },
nano::log::arg{ "qualified_root", qualified_root },
Expand Down
2 changes: 1 addition & 1 deletion nano/node/message_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class process_visitor : public nano::message_visitor
{
if (!message.vote->account.is_zero ())
{
node.vote_processor.vote (message.vote, channel);
node.vote_processor.vote (message.vote, channel, message.is_rebroadcasted () ? nano::vote_source::rebroadcast : nano::vote_source::live);
}
}

Expand Down
8 changes: 7 additions & 1 deletion nano/node/messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,14 @@ nano::confirm_ack::confirm_ack (bool & error_a, nano::stream & stream_a, nano::m
}
}

nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a) :
nano::confirm_ack::confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const & vote_a, bool rebroadcasted_a) :
message (constants, nano::message_type::confirm_ack),
vote (vote_a)
{
debug_assert (vote->hashes.size () < 256);

header.block_type_set (nano::block_type::not_a_block);
header.flag_set (rebroadcasted_flag, rebroadcasted_a);

if (vote->hashes.size () >= 16)
{
Expand Down Expand Up @@ -671,6 +672,11 @@ std::size_t nano::confirm_ack::size (const nano::message_header & header)
return nano::vote::size (count);
}

bool nano::confirm_ack::is_rebroadcasted () const
clemahieu marked this conversation as resolved.
Show resolved Hide resolved
{
return header.flag_test (rebroadcasted_flag);
}

void nano::confirm_ack::operator() (nano::object_stream & obs) const
{
nano::message::operator() (obs); // Write common data
Expand Down
62 changes: 61 additions & 1 deletion nano/node/messages.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ enum class bulk_pull_account_flags : uint8_t

class message_visitor;

/*
* Common Header Binary Format:
* [2 bytes] Network (big endian)
* [1 byte] Maximum protocol version
* [1 byte] Protocol version currently in use
* [1 byte] Minimum protocol version
* [1 byte] Message type
* [2 bytes] Extensions (message-specific flags and properties)
*
* Notes:
* - The structure and bit usage of the `extensions` field vary by message type.
*/
class message_header final
{
public:
Expand Down Expand Up @@ -140,6 +152,14 @@ class message
virtual void operator() (nano::object_stream &) const;
};

/*
* Binary Format:
* [message_header] Common message header
* [8x (16 bytes (IP) + 2 bytes (port)] Array of 8 peers
*
* Header extensions:
* - No specific bits from the `extensions` field are used for `keepalive`.
*/
class keepalive final : public message
{
public:
Expand All @@ -156,6 +176,14 @@ class keepalive final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [variable] Block (serialized according to the block type specified in the header)
*
* Header extensions:
* - [0x0f00] Block type: Identifies the specific type of the block.
*/
class publish final : public message
{
public:
Expand All @@ -172,6 +200,20 @@ class publish final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [N x (32 bytes (block hash) + 32 bytes (root))] Pairs of (block_hash, root)
* - The count is determined by the header's count bits.
*
* Header extensions:
* - [0xf000] Count (for V1 protocol)
* - [0x0f00] Block type
* - Not used anymore (V25.1+), but still present and set to `not_a_block = 0x1` for backwards compatibility
* - [0xf000 (high), 0x00f0 (low)] Count V2 (for V2 protocol)
* - [0x0001] Confirm V2 flag
* - [0x0002] Reserved for V3+ versioning
*/
class confirm_req final : public message
{
public:
Expand All @@ -197,18 +239,36 @@ class confirm_req final : public message
void operator() (nano::object_stream &) const override;
};

/*
* Binary Format:
* [message_header] Common message header
* [variable] Vote
* - Serialized/deserialized by the `nano::vote` class.
*
* Header extensions:
* - [0xf000] Count (for V1 protocol)
* - [0x0f00] Block type
* - Not used anymore (V25.1+), but still present and set to `not_a_block = 0x1` for backwards compatibility
* - [0xf000 (high), 0x00f0 (low)] Count V2 masks (for V2 protocol)
* - [0x0001] Confirm V2 flag
* - [0x0002] Reserved for V3+ versioning
* - [0x0004] Rebroadcasted flag
*/
class confirm_ack final : public message
{
public:
confirm_ack (bool & error, nano::stream &, nano::message_header const &, nano::vote_uniquer * = nullptr);
confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &);
confirm_ack (nano::network_constants const & constants, std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);

void serialize (nano::stream &) const override;
void visit (nano::message_visitor &) const override;
bool operator== (nano::confirm_ack const &) const;

static std::size_t size (nano::message_header const &);

static uint8_t constexpr rebroadcasted_flag = 2; // 0x0004
bool is_rebroadcasted () const;

private:
static uint8_t hash_count (nano::message_header const &);

Expand Down
8 changes: 4 additions & 4 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,18 @@ void nano::network::flood_block_initial (std::shared_ptr<nano::block> const & bl
}
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote_a, float scale)
void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote, float scale, bool rebroadcasted)
{
nano::confirm_ack message{ node.network_params.network, vote_a };
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto & i : list (fanout (scale)))
{
i->send (message, nullptr);
}
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote_a)
void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote, bool rebroadcasted)
{
nano::confirm_ack message{ node.network_params.network, vote_a };
nano::confirm_ack message{ node.network_params.network, vote, rebroadcasted };
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, nano::transport::buffer_drop_policy::no_limiter_drop);
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ class network final
void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f);
void flood_keepalive (float const scale_a = 1.0f);
void flood_keepalive_self (float const scale_a = 0.5f);
void flood_vote (std::shared_ptr<nano::vote> const &, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote> const &);
void flood_vote (std::shared_ptr<nano::vote> const &, float scale, bool rebroadcasted = false);
void flood_vote_pr (std::shared_ptr<nano::vote> const &, bool rebroadcasted = false);
// Flood block to all PRs and a random selection of non-PRs
void flood_block_initial (std::shared_ptr<nano::block> const &);
// Flood block to a random selection of peers
Expand Down
16 changes: 10 additions & 6 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
auto const reps = wallets.reps ();
if (!reps.have_half_rep () && !reps.exists (vote->account))
{
network.flood_vote (vote, 0.5f);
network.flood_vote (vote, 0.5f, /* rebroadcasted */ true);
}
}
});
Expand Down Expand Up @@ -358,18 +358,22 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
this->network.send_keepalive_self (channel_a);
});

observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_code code) {
observers.vote.add ([this] (std::shared_ptr<nano::vote> vote, std::shared_ptr<nano::transport::channel> const & channel, nano::vote_source source, nano::vote_code code) {
debug_assert (vote != nullptr);
debug_assert (code != nano::vote_code::invalid);
if (channel == nullptr)
{
return; // Channel expired when waiting for vote to be processed
}
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
// Ignore republished votes
if (source == nano::vote_source::live)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
online_reps.observe (vote->account);
bool active_in_rep_crawler = rep_crawler.process (vote, channel);
if (active_in_rep_crawler)
{
// Representative is defined as online if replying to live votes or rep_crawler queries
online_reps.observe (vote->account);
}
}
});

Expand Down
Loading
Loading