Skip to content

Commit

Permalink
Use start/stop pattern in vote_processor (#4455)
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev authored Mar 5, 2024
1 parent 1b1bac0 commit 69db556
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 45 deletions.
1 change: 1 addition & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,7 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
vote_processor.start ();
active.start ();
generator.start ();
final_generator.start ();
Expand Down
69 changes: 34 additions & 35 deletions nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

#include <nano/lib/stats.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/active_transactions.hpp>
Expand All @@ -13,6 +12,7 @@
#include <boost/format.hpp>

#include <chrono>

using namespace std::chrono_literals;

nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a) :
Expand All @@ -25,33 +25,45 @@ nano::vote_processor::vote_processor (nano::active_transactions & active_a, nano
rep_crawler (rep_crawler_a),
ledger (ledger_a),
network_params (network_params_a),
max_votes (flags_a.vote_processor_capacity),
started (false),
stopped (false),
thread ([this] () {
max_votes (flags_a.vote_processor_capacity)
{
}

nano::vote_processor::~vote_processor ()
{
// Thread must be stopped before destruction
debug_assert (!thread.joinable ());
}

void nano::vote_processor::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread{ [this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
process_loop ();
nano::unique_lock<nano::mutex> lock{ mutex };
votes.clear ();
condition.notify_all ();
})
run ();
} };
}

void nano::vote_processor::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
condition.wait (lock, [&started = started] { return started; });
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_processor::process_loop ()
void nano::vote_processor::run ()
{
nano::timer<std::chrono::milliseconds> elapsed;
bool log_this_iteration;

nano::unique_lock<nano::mutex> lock{ mutex };
started = true;

lock.unlock ();
condition.notify_all ();
lock.lock ();

while (!stopped)
{
if (!votes.empty ())
Expand Down Expand Up @@ -181,19 +193,6 @@ nano::vote_code nano::vote_processor::vote_blocking (std::shared_ptr<nano::vote>
return result;
}

void nano::vote_processor::stop ()
{
{
nano::lock_guard<nano::mutex> lock{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_processor::flush ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
Expand All @@ -208,19 +207,19 @@ void nano::vote_processor::flush ()
}
}

std::size_t nano::vote_processor::size ()
std::size_t nano::vote_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.size ();
}

bool nano::vote_processor::empty ()
bool nano::vote_processor::empty () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return votes.empty ();
}

bool nano::vote_processor::half_full ()
bool nano::vote_processor::half_full () const
{
return size () >= max_votes / 2;
}
Expand Down
28 changes: 18 additions & 10 deletions nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class vote_processor final
{
public:
vote_processor (nano::active_transactions & active_a, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_config & config_a, nano::node_flags & flags_a, nano::logger &, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a);
~vote_processor ();

void start ();
void stop ();

/** Returns false if the vote was processed */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &);
Expand All @@ -46,16 +50,14 @@ class vote_processor final
/** Function blocks until either the current queue size (a established flush boundary as it'll continue to increase)
* is processed or the queue is empty (end condition or cutoff's guard, as it is positioned ahead) */
void flush ();
std::size_t size ();
bool empty ();
bool half_full ();
std::size_t size () const;
bool empty () const;
bool half_full () const;
void calculate_weights ();
void stop ();
std::atomic<uint64_t> total_processed{ 0 };

private:
void process_loop ();
std::atomic<uint64_t> total_processed{ 0 };

private: // Dependencies
nano::active_transactions & active;
nano::node_observers & observers;
nano::stats & stats;
Expand All @@ -65,16 +67,22 @@ class vote_processor final
nano::rep_crawler & rep_crawler;
nano::ledger & ledger;
nano::network_params & network_params;

private:
void run ();

std::size_t const max_votes;
std::deque<std::pair<std::shared_ptr<nano::vote>, std::shared_ptr<nano::transport::channel>>> votes;

/** Representatives levels for random early detection */
std::unordered_set<nano::account> representatives_1;
std::unordered_set<nano::account> representatives_2;
std::unordered_set<nano::account> representatives_3;

private:
bool stopped{ false };
nano::condition_variable condition;
nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
bool started;
bool stopped;
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::thread thread;

friend std::unique_ptr<container_info_component> collect_container_info (vote_processor & vote_processor, std::string const & name);
Expand Down

0 comments on commit 69db556

Please sign in to comment.