Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Interrupt block validation on new best head #1047

Merged
merged 26 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
276fd79
GH-1039 Modify fork_database to return a descriptive enum for add() i…
heifner Nov 20, 2024
ca99a11
GH-1039 Move producer_plugin timers to their own thread
heifner Nov 20, 2024
f3665ba
GH-1039 Rename interrupt_transaction() to interrupt_apply_block_trans…
heifner Nov 20, 2024
414349a
Merge branch 'GH-986-retry-with-oc' into GH-1039-interrupt-block-vali…
heifner Nov 22, 2024
8cbcac2
GH-986 Add failure to fork_db_add_t
heifner Nov 22, 2024
3ff642b
GH-1039 Do not remove block from fork db on interrupt
heifner Nov 22, 2024
773542c
GH-1039 Interrupt apply block when a fork switch is received.
heifner Nov 22, 2024
20609f9
GH-1039 Interrupt apply_block when scheduled to produce. Do not attem…
heifner Nov 22, 2024
6fe2a17
GH-1039 add ability to broadcast a block from test_control_plugin
heifner Nov 22, 2024
da5b393
GH-1039 Add debug output on controller shutdown call
heifner Nov 22, 2024
6a57eda
GH-1039 Verify an infinite trx in a block is auto recovered when a ne…
heifner Nov 25, 2024
426cfc6
GH-986 Add option to shutdown after swap
heifner Nov 25, 2024
90d255a
GH-1039 Do not replay blocks out of forkdb on startup. This allows fo…
heifner Nov 25, 2024
ac8d2f0
GH-1039 Update tests for fork database not automatically read at startup
heifner Nov 26, 2024
5773e88
GH-1039 Move call to apply_blocks with startup
heifner Nov 26, 2024
6bc4406
GH-1039 Kick off a process of incoming blocks if peers configured on …
heifner Nov 26, 2024
ece786d
GH-1039 Add a better comment
heifner Nov 26, 2024
1da3692
GH-1039 BPs apply blocks when not scheduled to produce
heifner Nov 27, 2024
95dcc4b
GH-1039 Start production after net_plugin is started to allow for new…
heifner Nov 27, 2024
5b25a00
GH-1039 If a BP then do not allow apply_blocks to run past block dead…
heifner Nov 27, 2024
2a42ef4
GH-1039 Better forks are applied now on BP nodes, so it can take awhi…
heifner Nov 27, 2024
7740940
GH-1039 Remove log statement as it can not be definitely determined a…
heifner Dec 3, 2024
5e16cd5
GH-1039 Improve comments
heifner Dec 3, 2024
1efb62c
GH-1039 Add comment
heifner Dec 3, 2024
8a3f00a
Merge branch 'GH-986-retry-with-oc' into GH-1039-interrupt-block-vali…
heifner Dec 4, 2024
3684035
Merge branch 'main' into GH-1039-interrupt-block-validation
heifner Dec 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 9 additions & 52 deletions libraries/chain/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1870,12 +1870,8 @@ struct controller_impl {
fork_db_reset_root_to_chain_head();
} else if( !except_ptr && !check_shutdown() && !irreversible_mode() ) {
if (auto fork_db_head = fork_db.head()) {
// applies all blocks up to fork_db head from fork_db, shouldn't return incomplete, but if it does loop until complete
ilog("applying ${n} fork database blocks from ${ch} to ${fh}",
ilog("fork database contains ${n} blocks after head from ${ch} to ${fh}",
("n", fork_db_head->block_num() - chain_head.block_num())("ch", chain_head.block_num())("fh", fork_db_head->block_num()));
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
ilog( "reversible blocks replayed to ${bn} : ${id}", ("bn", fork_db_head->block_num())("id", fork_db_head->id()) );
}
}

Expand Down Expand Up @@ -2042,42 +2038,6 @@ struct controller_impl {
// Furthermore, fork_db.root()->block_num() <= lib_num.
// Also, even though blog.head() may still be nullptr, blog.first_block_num() is guaranteed to be lib_num + 1.

auto finish_init = [&](auto& fork_db) {
if( read_mode != db_read_mode::IRREVERSIBLE ) {
auto pending_head = fork_db.head();
if ( pending_head && pending_head->id() != chain_head.id() ) {
// chain_head equal to root means that read_mode was changed from irreversible mode to head/speculative
bool chain_head_is_root = chain_head.id() == fork_db.root()->id();
if (chain_head_is_root) {
ilog( "read_mode has changed from irreversible: applying best branch from fork database" );
}

// See comment below about pause-at-block for why `|| conf.num_configured_p2p_peers > 0`
if (chain_head_is_root || conf.num_configured_p2p_peers > 0) {
ilog("applying branch from fork database ending with block: ${id}", ("id", pending_head->id()));
// applies all blocks up to forkdb head from forkdb, shouldn't return incomplete, but if it does loop until complete
while (maybe_apply_blocks(forked_callback_t{}, trx_meta_cache_lookup{}) == controller::apply_blocks_result::incomplete)
;
}
}
} else {
// It is possible that the node was shutdown with blocks to process in the fork database. For example, if
// it was syncing and had processed blocks into the fork database but not yet applied them. In general,
// it makes sense to process those blocks on startup. However, if the node was shutdown via
// terminate-at-block, the current expectation is that the node can be restarted to examine the state at
// which it was shutdown. For now, we will only process these blocks if there are peers configured. This
// is a bit of a hack for Spring 1.0.0 until we can add a proper pause-at-block (issue #570) which could
// be used to explicitly request a node to not process beyond a specified block.
if (conf.num_configured_p2p_peers > 0) {
ilog("Process blocks out of fork_db if needed");
log_irreversible();
transition_to_savanna_if_needed();
}
}
};

fork_db_.apply<void>(finish_init);

// At Leap startup, we want to provide to our local finalizers the correct safety information
// to use if they don't already have one.
// If we start at a block prior to the IF transition, that information will be provided when
Expand Down Expand Up @@ -4232,11 +4192,11 @@ struct controller_impl {
assert(!verify_qc_future.valid());
}

bool best_head = fork_db.add(bsp, ignore_duplicate_t::yes);
fork_db_add_t add_result = fork_db.add(bsp, ignore_duplicate_t::yes);
if constexpr (is_proper_savanna_block)
vote_processor.notify_new_block(async_aggregation);

return controller::accepted_block_result{best_head, block_handle{std::move(bsp)}};
return controller::accepted_block_result{add_result, block_handle{std::move(bsp)}};
}

// thread safe, expected to be called from thread other than the main thread
Expand Down Expand Up @@ -4440,7 +4400,6 @@ struct controller_impl {

const auto start_apply_blocks_loop = fc::time_point::now();
for( auto ritr = new_head_branch.rbegin(); ritr != new_head_branch.rend(); ++ritr ) {
const auto start_apply_block = fc::time_point::now();
auto except = std::exception_ptr{};
const auto& bsp = *ritr;
try {
Expand All @@ -4465,11 +4424,9 @@ struct controller_impl {
throw;
} catch (const fc::exception& e) {
if (e.code() == interrupt_exception::code_value) {
if (fc::time_point::now() - start_apply_block < fc::milliseconds(2 * config::block_interval_ms)) {
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
throw; // do not want to remove block from fork_db if not interrupting a long, maybe infinite, block
}
ilog("interrupt while applying block, removing block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
// do not want to remove block from fork_db if interrupted
ilog("interrupt while applying block ${bn} : ${id}", ("bn", bsp->block_num())("id", bsp->id()));
throw;
} else {
elog("exception thrown while applying block ${bn} : ${id}, previous ${p}, error: ${e}",
("bn", bsp->block_num())("id", bsp->id())("p", bsp->previous())("e", e.to_detail_string()));
Expand Down Expand Up @@ -4538,7 +4495,7 @@ struct controller_impl {
return applied_trxs;
}

void interrupt_transaction() {
void interrupt_apply_block_transaction() {
// Only interrupt transaction if applying a block. Speculative trxs already have a deadline set so they
// have limited run time already. This is to allow killing a long-running transaction in a block being
// validated.
Expand Down Expand Up @@ -5308,8 +5265,8 @@ deque<transaction_metadata_ptr> controller::abort_block() {
return my->abort_block();
}

void controller::interrupt_transaction() {
my->interrupt_transaction();
void controller::interrupt_apply_block_transaction() {
my->interrupt_apply_block_transaction();
}

boost::asio::io_context& controller::get_thread_pool() {
Expand Down
20 changes: 15 additions & 5 deletions libraries/chain/fork_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ namespace eosio::chain {

void open_impl( const char* desc, const std::filesystem::path& fork_db_file, fc::cfile_datastream& ds, validator_t& validator );
void close_impl( std::ofstream& out );
bool add_impl( const bsp_t& n, ignore_duplicate_t ignore_duplicate, bool validate, validator_t& validator );
fork_db_add_t add_impl( const bsp_t& n, ignore_duplicate_t ignore_duplicate, bool validate, validator_t& validator );
bool is_valid() const;

bsp_t get_block_impl( const block_id_type& id, include_root_t include_root = include_root_t::no ) const;
Expand Down Expand Up @@ -241,8 +241,8 @@ namespace eosio::chain {
}

template <class BSP>
bool fork_database_impl<BSP>::add_impl(const bsp_t& n, ignore_duplicate_t ignore_duplicate,
bool validate, validator_t& validator) {
fork_db_add_t fork_database_impl<BSP>::add_impl(const bsp_t& n, ignore_duplicate_t ignore_duplicate,
bool validate, validator_t& validator) {
EOS_ASSERT( root, fork_database_exception, "root not yet set" );
EOS_ASSERT( n, fork_database_exception, "attempt to add null block state" );

Expand Down Expand Up @@ -278,15 +278,25 @@ namespace eosio::chain {
EOS_RETHROW_EXCEPTIONS( fork_database_exception, "serialized fork database is incompatible with configured protocol features" )
}

auto prev_head = head_impl(include_root_t::yes);

auto inserted = index.insert(n);
EOS_ASSERT(ignore_duplicate == ignore_duplicate_t::yes || inserted.second, fork_database_exception,
"duplicate block added: ${id}", ("id", n->id()));

return inserted.second && n == head_impl(include_root_t::no);
if (!inserted.second)
return fork_db_add_t::duplicate;
const bool new_head = n == head_impl(include_root_t::no);
if (new_head && n->previous() == prev_head->id())
return fork_db_add_t::appended_to_head;
if (new_head)
return fork_db_add_t::fork_switch;

return fork_db_add_t::added;
}

template<class BSP>
bool fork_database_t<BSP>::add( const bsp_t& n, ignore_duplicate_t ignore_duplicate ) {
fork_db_add_t fork_database_t<BSP>::add( const bsp_t& n, ignore_duplicate_t ignore_duplicate ) {
std::lock_guard g( my->mtx );
return my->add_impl(n, ignore_duplicate, false,
[](block_timestamp_type timestamp,
Expand Down
7 changes: 4 additions & 3 deletions libraries/chain/include/eosio/chain/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ namespace eosio::chain {
using resource_limits::resource_limits_manager;
using apply_handler = std::function<void(apply_context&)>;

enum class fork_db_add_t;
using forked_callback_t = std::function<void(const transaction_metadata_ptr&)>;

// lookup transaction_metadata via supplied function to avoid re-creation
Expand Down Expand Up @@ -207,8 +208,8 @@ namespace eosio::chain {
*/
deque<transaction_metadata_ptr> abort_block();

/// Expected to be called from signal handler
void interrupt_transaction();
/// Expected to be called from signal handler, or producer_plugin
void interrupt_apply_block_transaction();

/**
*
Expand All @@ -235,7 +236,7 @@ namespace eosio::chain {
void set_async_aggregation(async_t val);

struct accepted_block_result {
const bool is_new_best_head = false; // true if new best head
const fork_db_add_t add_result;
std::optional<block_handle> block; // empty optional if block is unlinkable
};
// thread-safe
Expand Down
16 changes: 14 additions & 2 deletions libraries/chain/include/eosio/chain/fork_database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ namespace eosio::chain {
using block_branch_t = std::vector<signed_block_ptr>;
enum class ignore_duplicate_t { no, yes };
enum class include_root_t { no, yes };
enum class fork_db_add_t {
failure, // add failed
duplicate, // already added and ignore_duplicate=true
added, // inserted into an existing branch or started a new branch, but not best branch
appended_to_head, // new best head of current best branch; no fork switch
fork_switch // new best head of new branch, fork switch to new branch
};

// Used for logging of comparison values used for best fork determination
std::string log_fork_comparison(const block_state& bs);
Expand Down Expand Up @@ -67,9 +74,11 @@ namespace eosio::chain {
/**
* Add block state to fork database.
* Must link to existing block in fork database or the root.
* @return true if n becomes the new best head (and was not the best head before)
* @returns fork_db_add_t - result of the add
* @throws unlinkable_block_exception - unlinkable to any branch
* @throws fork_database_exception - no root, n is nullptr, protocol feature error, duplicate when ignore_duplicate=false
*/
bool add( const bsp_t& n, ignore_duplicate_t ignore_duplicate );
fork_db_add_t add( const bsp_t& n, ignore_duplicate_t ignore_duplicate );

void remove( const block_id_type& id );

Expand Down Expand Up @@ -306,3 +315,6 @@ namespace eosio::chain {
static constexpr uint32_t max_supported_version = 3;
};
} /// eosio::chain

FC_REFLECT_ENUM( eosio::chain::fork_db_add_t,
(failure)(duplicate)(added)(appended_to_head)(fork_switch) )
1 change: 1 addition & 0 deletions libraries/testing/include/eosio/testing/tester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ namespace eosio::testing {
// producer become inactive
void produce_min_num_of_blocks_to_spend_time_wo_inactive_prod(const fc::microseconds target_elapsed_time = fc::microseconds());
void push_block(const signed_block_ptr& b);
void apply_blocks();

/**
* These transaction IDs represent transactions available in the head chain state as scheduled
Expand Down
13 changes: 11 additions & 2 deletions libraries/testing/tester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,9 @@ namespace eosio::testing {
case block_signal::accepted_block:
// should get accepted_block signal after accepted_block_header signal
// or after accepted_block (on fork switch, accepted block signaled when block re-applied)
return present && (itr->second == block_signal::accepted_block_header ||
itr->second == block_signal::accepted_block);
// or first thing on restart if applying out of the forkdb
return !present || (present && (itr->second == block_signal::accepted_block_header ||
itr->second == block_signal::accepted_block));

case block_signal::irreversible_block:
// can be signaled on restart as the first thing since other signals happened before shutdown
Expand Down Expand Up @@ -423,13 +424,15 @@ namespace eosio::testing {
open(std::move(pfs), snapshot_chain_id, [&snapshot,&control=this->control]() {
control->startup( [](){}, []() { return false; }, snapshot );
});
apply_blocks();
}

void base_tester::open( protocol_feature_set&& pfs, const genesis_state& genesis, call_startup_t call_startup ) {
if (call_startup == call_startup_t::yes) {
open(std::move(pfs), genesis.compute_chain_id(), [&genesis,&control=this->control]() {
control->startup( [](){}, []() { return false; }, genesis );
});
apply_blocks();
} else {
open(std::move(pfs), genesis.compute_chain_id(), nullptr);
}
Expand All @@ -439,6 +442,7 @@ namespace eosio::testing {
open(std::move(pfs), expected_chain_id, [&control=this->control]() {
control->startup( [](){}, []() { return false; } );
});
apply_blocks();
}

void base_tester::push_block(const signed_block_ptr& b) {
Expand All @@ -460,6 +464,11 @@ namespace eosio::testing {
_check_for_vote_if_needed(*control, bh);
}

void base_tester::apply_blocks() {
while (control->apply_blocks( {}, {} ) == controller::apply_blocks_result::incomplete)
;
}

signed_block_ptr base_tester::_produce_block( fc::microseconds skip_time, bool skip_pending_trxs ) {
auto res = _produce_block( skip_time, skip_pending_trxs, false );
return res.block;
Expand Down
5 changes: 4 additions & 1 deletion plugins/chain_plugin/chain_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1133,7 +1133,10 @@ void chain_plugin::plugin_initialize(const variables_map& options) {
void chain_plugin_impl::plugin_startup()
{ try {
try {
auto shutdown = [](){ return app().quit(); };
auto shutdown = []() {
dlog("controller shutdown, quitting...");
return app().quit();
};
auto check_shutdown = [](){ return app().is_quiting(); };
if (snapshot_path)
chain->startup(shutdown, check_shutdown, std::make_shared<threaded_snapshot_reader>(*snapshot_path));
Expand Down
3 changes: 3 additions & 0 deletions plugins/net_plugin/include/eosio/net_plugin/net_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ namespace eosio {
void register_increment_failed_p2p_connections(std::function<void()>&&);
void register_increment_dropped_trxs(std::function<void()>&&);

// for testing
void broadcast_block(const signed_block_ptr& b, const block_id_type& id);

private:
std::shared_ptr<class net_plugin_impl> my;
};
Expand Down
Loading
Loading