Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RPC publish optional work_watch flag to add to work_watcher #2168

Merged
merged 10 commits into from
Aug 11, 2019
17 changes: 11 additions & 6 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,12 +347,17 @@ void nano::block_processor::process_batch (std::unique_lock<std::mutex> & lock_a
}
}

void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a)
void nano::block_processor::process_live (nano::block_hash const & hash_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
{
// Start collecting quorum on block
node.active.start (block_a);
//add block to watcher if desired after block has been added to active
if (watch_work_a)
{
node.wallets.watcher.add (block_a);
}
// Announce block contents to the network
node.network.flood_block (block_a);
node.network.flood_block (block_a, false);
if (node.config.enable_voting)
{
// Announce our weighted vote to the network
Expand Down Expand Up @@ -382,7 +387,7 @@ void nano::block_processor::process_live (nano::block_hash const & hash_a, std::
});
}

nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a)
nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, nano::unchecked_info info_a, const bool work_watcher_a)
argakiig marked this conversation as resolved.
Show resolved Hide resolved
{
nano::process_return result;
auto hash (info_a.block->hash ());
Expand All @@ -400,7 +405,7 @@ nano::process_return nano::block_processor::process_one (nano::transaction const
}
if (info_a.modified > nano::seconds_since_epoch () - 300 && node.block_arrival.recent (hash))
{
process_live (hash, info_a.block);
process_live (hash, info_a.block, work_watcher_a);
}
queue_unchecked (transaction_a, hash);
break;
Expand Down Expand Up @@ -515,10 +520,10 @@ nano::process_return nano::block_processor::process_one (nano::transaction const
return result;
}

nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a)
nano::process_return nano::block_processor::process_one (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, const bool watch_work_a)
{
nano::unchecked_info info (block_a, block_a->account (), 0, nano::signature_verification::unknown);
auto result (process_one (transaction_a, info));
auto result (process_one (transaction_a, info, watch_work_a));
return result;
}

Expand Down
6 changes: 3 additions & 3 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ class block_processor final
bool should_log (bool);
bool have_blocks ();
void process_blocks ();
nano::process_return process_one (nano::transaction const &, nano::unchecked_info);
nano::process_return process_one (nano::transaction const &, std::shared_ptr<nano::block>);
nano::process_return process_one (nano::transaction const &, nano::unchecked_info, const bool = false);
nano::process_return process_one (nano::transaction const &, std::shared_ptr<nano::block>, const bool = false);
nano::vote_generator generator;
// Delay required for average network propagartion before requesting confirmation
static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 };
Expand All @@ -54,7 +54,7 @@ class block_processor final
void queue_unchecked (nano::transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::transaction const & transaction_a, std::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (std::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false);
bool stopped;
bool active;
std::chrono::steady_clock::time_point next_log;
Expand Down
8 changes: 4 additions & 4 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void nano::frontier_req_client::run ()
}
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

std::shared_ptr<nano::bootstrap_client> nano::bootstrap_client::shared ()
Expand Down Expand Up @@ -352,7 +352,7 @@ void nano::bulk_pull_client::request ()
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_client::receive_block ()
Expand Down Expand Up @@ -539,7 +539,7 @@ void nano::bulk_push_client::start ()
}
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_push_client::push (nano::transaction const & transaction_a)
Expand Down Expand Up @@ -678,7 +678,7 @@ void nano::bulk_pull_account_client::request ()
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_dropable false
false); // is bootstrap traffic is_droppable false
}

void nano::bulk_pull_account_client::receive_pending ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ bool nano::election::publish (std::shared_ptr<nano::block> block_a)
{
blocks.insert (std::make_pair (block_a->hash (), block_a));
confirm_if_quorum (transaction);
node.network.flood_block (block_a);
node.network.flood_block (block_a, false);
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2818,6 +2818,7 @@ void nano::json_handler::payment_wait ()
void nano::json_handler::process ()
{
const bool json_block_l = request.get<bool> ("json_block", false);
const bool watch_work_l = request.get<bool> ("watch_work", true);
SergiySW marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<nano::block> block;
if (json_block_l)
{
Expand Down Expand Up @@ -2906,7 +2907,7 @@ void nano::json_handler::process ()
auto transaction (node.store.tx_begin_write ());
// Set current time to trigger automatic rebroadcast and election
nano::unchecked_info info (block, block->account (), nano::seconds_since_epoch (), nano::signature_verification::unknown);
result = node.block_processor.process_one (transaction, info);
result = node.block_processor.process_one (transaction, info, watch_work_l);
}
switch (result.code)
{
Expand Down
4 changes: 2 additions & 2 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,12 +215,12 @@ bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel>
return result;
}

void nano::network::flood_message (nano::message const & message_a)
void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a)
{
auto list (list_fanout ());
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
{
(*i)->send (message_a);
(*i)->send (message_a, nullptr, is_droppable_a);
}
}

Expand Down
7 changes: 4 additions & 3 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class network final
~network ();
void start ();
void stop ();
void flood_message (nano::message const &);
void flood_message (nano::message const &, bool const = true);
void flood_keepalive ()
{
nano::keepalive message;
Expand All @@ -126,11 +126,12 @@ class network final
nano::confirm_ack message (vote_a);
flood_message (message);
}
void flood_block (std::shared_ptr<nano::block> block_a)
void flood_block (std::shared_ptr<nano::block> block_a, bool const is_droppable_a = true)
{
nano::publish publish (block_a);
flood_message (publish);
flood_message (publish, is_droppable_a);
}

void flood_block_batch (std::deque<std::shared_ptr<nano::block>>, unsigned = broadcast_interval_ms);
void merge_peers (std::array<nano::endpoint, 8> const &);
void merge_peer (nano::endpoint const &);
Expand Down
4 changes: 2 additions & 2 deletions nano/node/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ node (node_a)
{
}

void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const & is_dropable)
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a, bool const is_droppable_a)
{
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
auto detail (visitor.result);
if (!is_dropable || !limiter.should_drop (buffer->size ()))
if (!is_droppable_a || !limiter.should_drop (buffer->size ()))
{
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ namespace transport
virtual ~channel () = default;
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const & = true);
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr, bool const = true);
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
Expand Down
4 changes: 2 additions & 2 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,7 @@ std::shared_ptr<nano::block> nano::wallet::send_action (nano::account const & so
if (block != nullptr)
{
cached_block = true;
wallets.node.network.flood_block (block);
wallets.node.network.flood_block (block, false);
}
}
else if (status != MDB_NOTFOUND)
Expand Down Expand Up @@ -1488,7 +1488,7 @@ void nano::work_watcher::run ()
current->second = block;
}
}
node.network.flood_block (block);
node.network.flood_block (block, false);
node.active.update_difficulty (*block.get ());
lock.lock ();
if (stopped)
Expand Down
60 changes: 60 additions & 0 deletions nano/rpc_test/rpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1579,6 +1579,66 @@ TEST (rpc, process_block)
ASSERT_EQ (send.hash ().to_string (), send_hash);
}

TEST (rpc, process_block_with_work_watcher)
{
nano::system system;
nano::node_config node_config (24000, system.logging);
node_config.enable_voting = false;
auto & node1 = *system.add_node (node_config);
nano::keypair key;
auto latest (system.nodes[0]->latest (nano::test_genesis_key.pub));
auto send (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, latest, nano::test_genesis_key.pub, nano::genesis_amount - 100, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (latest)));
uint64_t difficulty1 (0);
nano::work_validate (*send, &difficulty1);
auto multiplier1 = nano::difficulty::to_multiplier (difficulty1, node1.network_params.network.publish_threshold);
enable_ipc_transport_tcp (node1.config.ipc_config.transport_tcp);
nano::node_rpc_config node_rpc_config;
nano::ipc::ipc_server ipc_server (node1, node_rpc_config);
nano::rpc_config rpc_config (true);
nano::ipc_rpc_processor ipc_rpc_processor (system.io_ctx, rpc_config);
nano::rpc rpc (system.io_ctx, rpc_config, ipc_rpc_processor);
rpc.start ();
boost::property_tree::ptree request;
request.put ("action", "process");
request.put ("work_watcher", true);
std::string json;
send->serialize_json (json);
request.put ("block", json);
test_response response (request, rpc.config.port, system.io_ctx);
system.deadline_set (5s);
while (response.status == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (200, response.status);
system.deadline_set (10s);
while (system.nodes[0]->latest (nano::test_genesis_key.pub) != send->hash ())
{
ASSERT_NO_ERROR (system.poll ());
}
system.deadline_set (10s);
auto updated (false);
uint64_t updated_difficulty;
while (!updated)
{
std::unique_lock<std::mutex> lock (node1.active.mutex);
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node1.active.multipliers_cb.size (); i++)
{
node1.active.multipliers_cb.push_back (multiplier1 * (1 + i / 100.));
}
node1.active.update_active_difficulty (lock);
auto const existing (node1.active.roots.find (send->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node1.active.roots.end ());
updated = existing->difficulty != difficulty1;
updated_difficulty = existing->difficulty;
lock.unlock ();
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_GT (updated_difficulty, difficulty1);
}

TEST (rpc, process_block_no_work)
{
nano::system system (24000, 1);
Expand Down