Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: main chain db txn management #2112

Merged
merged 16 commits into from
Jun 18, 2024
Merged
1 change: 1 addition & 0 deletions silkworm/node/common/node_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct NodeSettings {
uint32_t sync_loop_throttle_seconds{0}; // Minimum interval amongst sync cycle
uint32_t sync_loop_log_interval_seconds{30}; // Interval for sync loop to emit logs
bool parallel_fork_tracking_enabled{false}; // Whether to track multiple parallel forks at head
bool keep_db_txn_open{true}; // Whether to keep db transaction open between requests

inline db::etl::CollectorSettings etl() const {
return {data_directory->etl().path(), etl_buffer_size};
Expand Down
6 changes: 4 additions & 2 deletions silkworm/node/stagedsync/execution_engine_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class ExecutionEngine_ForTest : public stagedsync::ExecutionEngine {
};

TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engine]") {
test_util::SetLogVerbosityGuard log_guard(log::Level::kWarning);

test_util::SetLogVerbosityGuard log_guard(log::Level::kNone);
test_util::TaskRunner runner;
Environment::set_stop_before_stage(db::stages::kSendersKey); // only headers, block hashes and bodies

Expand All @@ -60,6 +59,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin
.chaindata_env_config = db_context.get_env_config(),
.chain_config = db_context.get_chain_config(),
.parallel_fork_tracking_enabled = false,
.keep_db_txn_open = true,
};

db::RWAccess db_access{db_context.get_mdbx_env()};
Expand Down Expand Up @@ -717,6 +717,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin

CHECK(db::read_block_number(tx2, block1_hash).has_value());
CHECK(db::read_block_number(tx2, block2_hash).has_value());
tx2.abort();
}

SECTION("notify_fork_choice_update does not update chain database") {
Expand All @@ -742,6 +743,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin
auto tx2 = db_access.start_ro_tx();
CHECK(db::read_block_number(tx2, block1_hash).has_value());
CHECK(db::read_block_number(tx2, block2_hash).has_value());
tx2.abort();
}

// TODO: temoporarily disabled, to be fixed (JG)
Expand Down
73 changes: 70 additions & 3 deletions silkworm/node/stagedsync/forks/main_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,41 @@

#include "extending_fork.hpp"

namespace {

//! @brief Handles the transaction lifecycle for long-standing and per-request transactions
class TransactionHandler {
public:
TransactionHandler(silkworm::db::RWTxnManaged& txn, silkworm::db::RWAccess& db_access, bool keep_db_txn_open = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically use db::RWTxnManaged only when we need to ensure proper ownership of txn lifecycle (e.g. in stagedsync::MainChain which holds its own txn), otherwise it is sufficient to use just db::RWTxn, passed by reference.

This approach follows the same convention used by MDBX API, which contains different abstractions for managed classes (e.g. ::mdbx::env_managed, ::mdbx::txn_managed...) and unmanaged ones (e.g. ::mdbx::env, ::mdbx::txn...). The latter ones are handles wrapping up a raw pointer and have nice pass-by-value-and-move semantics.

: txn_{txn}, db_access_{db_access}, keep_db_txn_open_{keep_db_txn_open} {
if (!keep_db_txn_open_) {
if (request_count_ == 0 && !txn_.is_open()) {
txn_.reopen(*db_access_);
}
request_count_++;
} else {
if (!txn_.is_open()) {
txn_.reopen(*db_access_);
}
}
}

~TransactionHandler() {
if (!keep_db_txn_open_) {
if (--request_count_ == 0) {
txn_.commit_and_stop();
}
}
}

private:
silkworm::db::RWTxnManaged& txn_;
silkworm::db::RWAccess& db_access_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

db::ROAccess and db::RWAccess are lightweight wrappers for ::mdbx::env used to enforce strict usage of read-only vs read-write transactions. ::mdbx::env is in turn just a handle wrapping a raw pointer, so all of them are a perfect fit for pass by-value and move.

bool keep_db_txn_open_{true};
inline static SILKWORM_THREAD_LOCAL int request_count_;
};
} // namespace

namespace silkworm::stagedsync {

//! The number of inserted blocks between two successive commits on db
Expand All @@ -45,7 +80,7 @@ MainChain::MainChain(boost::asio::io_context& ctx, NodeSettings& ns, db::RWAcces
}

void MainChain::open() {
tx_.reopen(*db_access_); // comply to mdbx limitation: tx must be used from its creation thread
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};

// Load last finalized and last chosen blocks from persistence
auto last_finalized_hash = db::read_last_finalized_block(tx_);
Expand Down Expand Up @@ -83,6 +118,12 @@ void MainChain::open() {
}

void MainChain::close() {
if (node_settings_.keep_db_txn_open) {
tx_.commit_and_stop();
}
}

void MainChain::abort() {
tx_.abort();
}

Expand All @@ -107,27 +148,32 @@ BlockId MainChain::last_finalized_head() const {
}

std::optional<BlockId> MainChain::find_forking_point(const BlockHeader& header, const Hash& header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return interim_canonical_chain_.find_forking_point(header, header_hash);
}

std::optional<BlockId> MainChain::find_forking_point(const Hash& header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(header_hash);
if (!header) return std::nullopt;
return find_forking_point(*header, header_hash);
}

bool MainChain::is_finalized_canonical(BlockId block) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
if (block.number > last_fork_choice_.number) return false;
return (interim_canonical_chain_.get_hash(block.number) == block.hash);
}

// protected, no txn handling required
Hash MainChain::insert_header(const BlockHeader& header) {
return db::write_header_ex(tx_, header, /*with_header_numbers=*/true);
// with_header_numbers=true is necessary at the moment because many getters here rely on kHeaderNumbers table;
// that table is updated by stage block-hashes so only after a pipeline run
// todo: remove getters that take only an hash as input and use with_header_numbers=false here
}

// protected, no txn handling required
void MainChain::insert_body(const Block& block, const Hash& block_hash) {
// avoid calculation of block.header.hash() because is computationally expensive
BlockNum block_num = block.header.number;
Expand All @@ -138,6 +184,7 @@ void MainChain::insert_body(const Block& block, const Hash& block_hash) {
}

void MainChain::insert_block(const Block& block) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
Hash header_hash = insert_header(block.header);
insert_body(block, header_hash);

Expand All @@ -157,6 +204,7 @@ void MainChain::insert_block(const Block& block) {
}

VerificationResult MainChain::verify_chain(Hash block_hash) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
SILK_TRACE << "MainChain: verifying chain block=" << block_hash.to_hex();

// Retrieve the block header to validate
Expand Down Expand Up @@ -243,6 +291,7 @@ VerificationResult MainChain::verify_chain(Hash block_hash) {
}

bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Hash> finalized_block_hash) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
if (finalized_block_hash && !interim_canonical_chain_.has(*finalized_block_hash)) {
return false; // finalized block not found
}
Expand Down Expand Up @@ -292,6 +341,7 @@ bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Ha
return true;
}

// protected, no txn handling required
std::set<Hash> MainChain::collect_bad_headers(db::RWTxn& tx, InvalidChain& invalid_chain) {
if (!invalid_chain.bad_block) return {};

Expand Down Expand Up @@ -336,6 +386,7 @@ std::unique_ptr<ExtendingFork> MainChain::fork(BlockId forking_point) {
}

void MainChain::reintegrate_fork(ExtendingFork& extending_fork) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
Fork* fork = extending_fork.fork_.get();

ensure(fork->head_status() && std::holds_alternative<ValidChain>(*fork->head_status()),
Expand All @@ -352,15 +403,16 @@ void MainChain::reintegrate_fork(ExtendingFork& extending_fork) {
}

std::optional<BlockHeader> MainChain::get_header(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
// const BlockHeader* cached = header_cache_.get(header_hash);
// if (cached) {
// return *cached;
// }
std::optional<BlockHeader> header = data_model_.read_header(header_hash);
return header;
return data_model_.read_header(header_hash);
}

std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
// const BlockHeader* cached = header_cache_.get(header_hash);
// if (cached) {
// return *cached;
Expand All @@ -371,31 +423,37 @@ std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash he

std::optional<Hash> MainChain::get_finalized_canonical_hash(BlockNum height) const {
if (height > last_fork_choice_.number) return {};
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return interim_canonical_chain_.get_hash(height);
}

std::optional<TotalDifficulty> MainChain::get_header_td(BlockNum header_height, Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return db::read_total_difficulty(tx_, header_height, header_hash);
}

std::optional<TotalDifficulty> MainChain::get_header_td(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(header_hash);
if (!header) return {};
return db::read_total_difficulty(tx_, header->number, header_hash);
}

std::optional<BlockBody> MainChain::get_body(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
BlockBody body;
bool found = data_model_.read_body(header_hash, body);
if (!found) return {};
return body;
}

BlockNum MainChain::get_block_progress() const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return data_model_.highest_block_number();
}

std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
std::vector<BlockHeader> headers;

data_model_.for_last_n_headers(limit, [&headers](BlockHeader&& header) {
Expand All @@ -406,18 +464,22 @@ std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const {
}

std::optional<BlockNum> MainChain::get_block_number(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return data_model_.read_block_number(header_hash);
}

bool MainChain::is_ancestor(BlockId supposed_parent, BlockId block) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return extends(block, supposed_parent);
}

bool MainChain::extends_last_fork_choice(BlockNum height, Hash hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return extends({height, hash}, last_fork_choice_);
}

bool MainChain::extends(BlockId block, BlockId supposed_parent) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
while (block.number > supposed_parent.number) {
auto header = get_header(block.number, block.hash);
if (!header) return false;
Expand All @@ -431,22 +493,26 @@ bool MainChain::extends(BlockId block, BlockId supposed_parent) const {
}

bool MainChain::is_finalized_canonical(Hash block_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(block_hash);
if (!header) return false;
if (header->number > last_fork_choice_.number) return false;
auto canonical_hash_at_same_height = interim_canonical_chain_.get_hash(header->number);
return canonical_hash_at_same_height == block_hash;
}

// protected, no txn handling required
bool MainChain::is_canonical(BlockNum block_height, const Hash& block_hash) const {
// Check if specified block already exists as canonical block
return interim_canonical_chain_.get_hash(block_height) == block_hash;
}

// protected, no txn handling required
bool MainChain::is_canonical_head_ancestor(const Hash& block_hash) const {
return interim_canonical_chain_.has(block_hash) && interim_canonical_chain_.current_head().hash != block_hash;
}

// protected, no txn handling required
void MainChain::forward(BlockNum head_height, const Hash& head_hash) {
// update canonical up to header_hash
interim_canonical_chain_.update_up_to(head_height, head_hash);
Expand Down Expand Up @@ -488,6 +554,7 @@ void MainChain::forward(BlockNum head_height, const Hash& head_hash) {
interim_head_status_ = verify_result;
}

// protected, no txn handling required
void MainChain::unwind(BlockNum unwind_point) {
const auto unwind_result = pipeline_.unwind(tx_, unwind_point);
success_or_throw(unwind_result); // unwind must complete with success
Expand Down
3 changes: 2 additions & 1 deletion silkworm/node/stagedsync/forks/main_chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MainChain {

void open(); // needed to circumvent mdbx threading model limitations
void close();
void abort();

// extension
void insert_block(const Block&);
Expand Down Expand Up @@ -98,7 +99,7 @@ class MainChain {

boost::asio::io_context& io_context_;
NodeSettings& node_settings_;
db::RWAccess db_access_;
mutable db::RWAccess db_access_;
mutable db::RWTxnManaged tx_;
db::DataModel data_model_;
bool is_first_sync_{true};
Expand Down
Loading
Loading