-
Notifications
You must be signed in to change notification settings - Fork 70
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
node: main chain db txn management #2112
Merged
Merged
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
55df74b
Add ExecutionEngine tests
JacekGlen dfbd7ef
Comment out failing test
JacekGlen eadf9ea
Fix separate forks issue
JacekGlen 465f7d1
Merge remote-tracking branch 'origin/master' into execution_engine_in…
JacekGlen 7f45490
make fmt
web-flow 47ace1c
Fix build
JacekGlen 22b3967
Merge branch 'execution_engine_integration_tests' of https://github.c…
JacekGlen 5c99ed1
make fmt
web-flow 58e34e1
Fix tests
JacekGlen 865089b
Add keep_db_txn_open and transaction handling
JacekGlen 2fe4b7e
Simplify transaction handling code
JacekGlen 494d4fb
Merge branch 'master' into main-chain-db-txn-management
JacekGlen 27e5837
Clean-up code
JacekGlen b487649
Remove redundant picohttpparser
JacekGlen 0f22d41
Fix build error
JacekGlen 46bbc03
make fmt
web-flow File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,41 @@ | |
|
||
#include "extending_fork.hpp" | ||
|
||
namespace { | ||
|
||
//! @brief Handles the transaction lifecycle for long-standing and per-request transactions | ||
class TransactionHandler { | ||
public: | ||
TransactionHandler(silkworm::db::RWTxnManaged& txn, silkworm::db::RWAccess& db_access, bool keep_db_txn_open = true) | ||
: txn_{txn}, db_access_{db_access}, keep_db_txn_open_{keep_db_txn_open} { | ||
if (!keep_db_txn_open_) { | ||
if (request_count_ == 0 && !txn_.is_open()) { | ||
txn_.reopen(*db_access_); | ||
} | ||
request_count_++; | ||
} else { | ||
if (!txn_.is_open()) { | ||
txn_.reopen(*db_access_); | ||
} | ||
} | ||
} | ||
|
||
~TransactionHandler() { | ||
if (!keep_db_txn_open_) { | ||
if (--request_count_ == 0) { | ||
txn_.commit_and_stop(); | ||
} | ||
} | ||
} | ||
|
||
private: | ||
silkworm::db::RWTxnManaged& txn_; | ||
silkworm::db::RWAccess& db_access_; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
bool keep_db_txn_open_{true}; | ||
inline static SILKWORM_THREAD_LOCAL int request_count_; | ||
}; | ||
} // namespace | ||
|
||
namespace silkworm::stagedsync { | ||
|
||
//! The number of inserted blocks between two successive commits on db | ||
|
@@ -45,7 +80,7 @@ MainChain::MainChain(boost::asio::io_context& ctx, NodeSettings& ns, db::RWAcces | |
} | ||
|
||
void MainChain::open() { | ||
tx_.reopen(*db_access_); // comply to mdbx limitation: tx must be used from its creation thread | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
|
||
// Load last finalized and last chosen blocks from persistence | ||
auto last_finalized_hash = db::read_last_finalized_block(tx_); | ||
|
@@ -83,6 +118,12 @@ void MainChain::open() { | |
} | ||
|
||
void MainChain::close() { | ||
if (node_settings_.keep_db_txn_open) { | ||
tx_.commit_and_stop(); | ||
} | ||
} | ||
|
||
void MainChain::abort() { | ||
tx_.abort(); | ||
} | ||
|
||
|
@@ -107,27 +148,32 @@ BlockId MainChain::last_finalized_head() const { | |
} | ||
|
||
std::optional<BlockId> MainChain::find_forking_point(const BlockHeader& header, const Hash& header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return interim_canonical_chain_.find_forking_point(header, header_hash); | ||
} | ||
|
||
std::optional<BlockId> MainChain::find_forking_point(const Hash& header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
auto header = get_header(header_hash); | ||
if (!header) return std::nullopt; | ||
return find_forking_point(*header, header_hash); | ||
} | ||
|
||
bool MainChain::is_finalized_canonical(BlockId block) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
if (block.number > last_fork_choice_.number) return false; | ||
return (interim_canonical_chain_.get_hash(block.number) == block.hash); | ||
} | ||
|
||
// protected, no txn handling required | ||
Hash MainChain::insert_header(const BlockHeader& header) { | ||
return db::write_header_ex(tx_, header, /*with_header_numbers=*/true); | ||
// with_header_numbers=true is necessary at the moment because many getters here rely on kHeaderNumbers table; | ||
// that table is updated by stage block-hashes so only after a pipeline run | ||
// todo: remove getters that take only an hash as input and use with_header_numbers=false here | ||
} | ||
|
||
// protected, no txn handling required | ||
void MainChain::insert_body(const Block& block, const Hash& block_hash) { | ||
// avoid calculation of block.header.hash() because is computationally expensive | ||
BlockNum block_num = block.header.number; | ||
|
@@ -138,6 +184,7 @@ void MainChain::insert_body(const Block& block, const Hash& block_hash) { | |
} | ||
|
||
void MainChain::insert_block(const Block& block) { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
Hash header_hash = insert_header(block.header); | ||
insert_body(block, header_hash); | ||
|
||
|
@@ -157,6 +204,7 @@ void MainChain::insert_block(const Block& block) { | |
} | ||
|
||
VerificationResult MainChain::verify_chain(Hash block_hash) { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
SILK_TRACE << "MainChain: verifying chain block=" << block_hash.to_hex(); | ||
|
||
// Retrieve the block header to validate | ||
|
@@ -243,6 +291,7 @@ VerificationResult MainChain::verify_chain(Hash block_hash) { | |
} | ||
|
||
bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Hash> finalized_block_hash) { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
if (finalized_block_hash && !interim_canonical_chain_.has(*finalized_block_hash)) { | ||
return false; // finalized block not found | ||
} | ||
|
@@ -292,6 +341,7 @@ bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Ha | |
return true; | ||
} | ||
|
||
// protected, no txn handling required | ||
std::set<Hash> MainChain::collect_bad_headers(db::RWTxn& tx, InvalidChain& invalid_chain) { | ||
if (!invalid_chain.bad_block) return {}; | ||
|
||
|
@@ -336,6 +386,7 @@ std::unique_ptr<ExtendingFork> MainChain::fork(BlockId forking_point) { | |
} | ||
|
||
void MainChain::reintegrate_fork(ExtendingFork& extending_fork) { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
Fork* fork = extending_fork.fork_.get(); | ||
|
||
ensure(fork->head_status() && std::holds_alternative<ValidChain>(*fork->head_status()), | ||
|
@@ -352,15 +403,16 @@ void MainChain::reintegrate_fork(ExtendingFork& extending_fork) { | |
} | ||
|
||
std::optional<BlockHeader> MainChain::get_header(Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
// const BlockHeader* cached = header_cache_.get(header_hash); | ||
// if (cached) { | ||
// return *cached; | ||
// } | ||
std::optional<BlockHeader> header = data_model_.read_header(header_hash); | ||
return header; | ||
return data_model_.read_header(header_hash); | ||
} | ||
|
||
std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
// const BlockHeader* cached = header_cache_.get(header_hash); | ||
// if (cached) { | ||
// return *cached; | ||
|
@@ -371,31 +423,37 @@ std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash he | |
|
||
std::optional<Hash> MainChain::get_finalized_canonical_hash(BlockNum height) const { | ||
if (height > last_fork_choice_.number) return {}; | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return interim_canonical_chain_.get_hash(height); | ||
} | ||
|
||
std::optional<TotalDifficulty> MainChain::get_header_td(BlockNum header_height, Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return db::read_total_difficulty(tx_, header_height, header_hash); | ||
} | ||
|
||
std::optional<TotalDifficulty> MainChain::get_header_td(Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
auto header = get_header(header_hash); | ||
if (!header) return {}; | ||
return db::read_total_difficulty(tx_, header->number, header_hash); | ||
} | ||
|
||
std::optional<BlockBody> MainChain::get_body(Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
BlockBody body; | ||
bool found = data_model_.read_body(header_hash, body); | ||
if (!found) return {}; | ||
return body; | ||
} | ||
|
||
BlockNum MainChain::get_block_progress() const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return data_model_.highest_block_number(); | ||
} | ||
|
||
std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
std::vector<BlockHeader> headers; | ||
|
||
data_model_.for_last_n_headers(limit, [&headers](BlockHeader&& header) { | ||
|
@@ -406,18 +464,22 @@ std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const { | |
} | ||
|
||
std::optional<BlockNum> MainChain::get_block_number(Hash header_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return data_model_.read_block_number(header_hash); | ||
} | ||
|
||
bool MainChain::is_ancestor(BlockId supposed_parent, BlockId block) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return extends(block, supposed_parent); | ||
} | ||
|
||
bool MainChain::extends_last_fork_choice(BlockNum height, Hash hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
return extends({height, hash}, last_fork_choice_); | ||
} | ||
|
||
bool MainChain::extends(BlockId block, BlockId supposed_parent) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
while (block.number > supposed_parent.number) { | ||
auto header = get_header(block.number, block.hash); | ||
if (!header) return false; | ||
|
@@ -431,22 +493,26 @@ bool MainChain::extends(BlockId block, BlockId supposed_parent) const { | |
} | ||
|
||
bool MainChain::is_finalized_canonical(Hash block_hash) const { | ||
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; | ||
auto header = get_header(block_hash); | ||
if (!header) return false; | ||
if (header->number > last_fork_choice_.number) return false; | ||
auto canonical_hash_at_same_height = interim_canonical_chain_.get_hash(header->number); | ||
return canonical_hash_at_same_height == block_hash; | ||
} | ||
|
||
// protected, no txn handling required | ||
bool MainChain::is_canonical(BlockNum block_height, const Hash& block_hash) const { | ||
// Check if specified block already exists as canonical block | ||
return interim_canonical_chain_.get_hash(block_height) == block_hash; | ||
} | ||
|
||
// protected, no txn handling required | ||
bool MainChain::is_canonical_head_ancestor(const Hash& block_hash) const { | ||
return interim_canonical_chain_.has(block_hash) && interim_canonical_chain_.current_head().hash != block_hash; | ||
} | ||
|
||
// protected, no txn handling required | ||
void MainChain::forward(BlockNum head_height, const Hash& head_hash) { | ||
// update canonical up to header_hash | ||
interim_canonical_chain_.update_up_to(head_height, head_hash); | ||
|
@@ -488,6 +554,7 @@ void MainChain::forward(BlockNum head_height, const Hash& head_hash) { | |
interim_head_status_ = verify_result; | ||
} | ||
|
||
// protected, no txn handling required | ||
void MainChain::unwind(BlockNum unwind_point) { | ||
const auto unwind_result = pipeline_.unwind(tx_, unwind_point); | ||
success_or_throw(unwind_result); // unwind must complete with success | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We typically use
db::RWTxnManaged
only when we need to ensure proper ownership of txn lifecycle (e.g. instagedsync::MainChain
which holds its own txn), otherwise it is sufficient to use justdb::RWTxn
, passed by reference.This approach follows the same convention used by MDBX API, which contains different abstractions for managed classes (e.g.
::mdbx::env_managed
,::mdbx::txn_managed
...) and unmanaged ones (e.g.::mdbx::env
,::mdbx::txn
...). The latter ones are handles wrapping up a raw pointer and have nice pass-by-value-and-move semantics.