From d2f6af94a9d9f88babada9fb749bd9a5154e65d3 Mon Sep 17 00:00:00 2001 From: Jacek Glen Date: Tue, 18 Jun 2024 10:15:02 +0200 Subject: [PATCH] node: main chain db txn management (#2112) --- silkworm/node/common/node_settings.hpp | 1 + .../node/stagedsync/execution_engine_test.cpp | 6 +- silkworm/node/stagedsync/forks/main_chain.cpp | 73 +++++++++++++- silkworm/node/stagedsync/forks/main_chain.hpp | 3 +- .../node/stagedsync/forks/main_chain_test.cpp | 96 +++++++++++++++++++ 5 files changed, 173 insertions(+), 6 deletions(-) diff --git a/silkworm/node/common/node_settings.hpp b/silkworm/node/common/node_settings.hpp index 6c303a1b89..4a2129e2f9 100644 --- a/silkworm/node/common/node_settings.hpp +++ b/silkworm/node/common/node_settings.hpp @@ -53,6 +53,7 @@ struct NodeSettings { uint32_t sync_loop_throttle_seconds{0}; // Minimum interval amongst sync cycle uint32_t sync_loop_log_interval_seconds{30}; // Interval for sync loop to emit logs bool parallel_fork_tracking_enabled{false}; // Whether to track multiple parallel forks at head + bool keep_db_txn_open{true}; // Whether to keep db transaction open between requests inline db::etl::CollectorSettings etl() const { return {data_directory->etl().path(), etl_buffer_size}; diff --git a/silkworm/node/stagedsync/execution_engine_test.cpp b/silkworm/node/stagedsync/execution_engine_test.cpp index 7b50490fa6..e8fa255d9e 100644 --- a/silkworm/node/stagedsync/execution_engine_test.cpp +++ b/silkworm/node/stagedsync/execution_engine_test.cpp @@ -49,8 +49,7 @@ class ExecutionEngine_ForTest : public stagedsync::ExecutionEngine { }; TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engine]") { - test_util::SetLogVerbosityGuard log_guard(log::Level::kWarning); - + test_util::SetLogVerbosityGuard log_guard(log::Level::kNone); test_util::TaskRunner runner; Environment::set_stop_before_stage(db::stages::kSendersKey); // only headers, block hashes and bodies @@ -60,6 +59,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin .chaindata_env_config = db_context.get_env_config(), .chain_config = db_context.get_chain_config(), .parallel_fork_tracking_enabled = false, + .keep_db_txn_open = true, }; db::RWAccess db_access{db_context.get_mdbx_env()}; @@ -717,6 +717,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin CHECK(db::read_block_number(tx2, block1_hash).has_value()); CHECK(db::read_block_number(tx2, block2_hash).has_value()); + tx2.abort(); } SECTION("notify_fork_choice_update does not update chain database") { @@ -742,6 +743,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin auto tx2 = db_access.start_ro_tx(); CHECK(db::read_block_number(tx2, block1_hash).has_value()); CHECK(db::read_block_number(tx2, block2_hash).has_value()); + tx2.abort(); } // TODO: temoporarily disabled, to be fixed (JG) diff --git a/silkworm/node/stagedsync/forks/main_chain.cpp b/silkworm/node/stagedsync/forks/main_chain.cpp index 907cd36793..dc04e34079 100644 --- a/silkworm/node/stagedsync/forks/main_chain.cpp +++ b/silkworm/node/stagedsync/forks/main_chain.cpp @@ -27,6 +27,41 @@ #include "extending_fork.hpp" +namespace { + +//! @brief Handles the transaction lifecycle for long-standing and per-request transactions +class TransactionHandler { + public: + TransactionHandler(silkworm::db::RWTxnManaged& txn, silkworm::db::RWAccess& db_access, bool keep_db_txn_open = true) + : txn_{txn}, db_access_{db_access}, keep_db_txn_open_{keep_db_txn_open} { + if (!keep_db_txn_open_) { + if (request_count_ == 0 && !txn_.is_open()) { + txn_.reopen(*db_access_); + } + request_count_++; + } else { + if (!txn_.is_open()) { + txn_.reopen(*db_access_); + } + } + } + + ~TransactionHandler() { + if (!keep_db_txn_open_) { + if (--request_count_ == 0) { + txn_.commit_and_stop(); + } + } + } + + private: + silkworm::db::RWTxnManaged& txn_; + silkworm::db::RWAccess& db_access_; + bool keep_db_txn_open_{true}; + inline static SILKWORM_THREAD_LOCAL int request_count_; +}; +} // namespace + namespace silkworm::stagedsync { //! The number of inserted blocks between two successive commits on db @@ -45,7 +80,7 @@ MainChain::MainChain(boost::asio::io_context& ctx, NodeSettings& ns, db::RWAcces } void MainChain::open() { - tx_.reopen(*db_access_); // comply to mdbx limitation: tx must be used from its creation thread + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; // Load last finalized and last chosen blocks from persistence auto last_finalized_hash = db::read_last_finalized_block(tx_); @@ -83,6 +118,12 @@ void MainChain::open() { } void MainChain::close() { + if (node_settings_.keep_db_txn_open) { + tx_.commit_and_stop(); + } +} + +void MainChain::abort() { tx_.abort(); } @@ -107,20 +148,24 @@ BlockId MainChain::last_finalized_head() const { } std::optional MainChain::find_forking_point(const BlockHeader& header, const Hash& header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return interim_canonical_chain_.find_forking_point(header, header_hash); } std::optional MainChain::find_forking_point(const Hash& header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; auto header = get_header(header_hash); if (!header) return std::nullopt; return find_forking_point(*header, header_hash); } bool MainChain::is_finalized_canonical(BlockId block) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; if (block.number > last_fork_choice_.number) return false; return (interim_canonical_chain_.get_hash(block.number) == block.hash); } +// protected, no txn handling required Hash MainChain::insert_header(const BlockHeader& header) { return db::write_header_ex(tx_, header, /*with_header_numbers=*/true); // with_header_numbers=true is necessary at the moment because many getters here rely on kHeaderNumbers table; @@ -128,6 +173,7 @@ Hash MainChain::insert_header(const BlockHeader& header) { // todo: remove getters that take only an hash as input and use with_header_numbers=false here } +// protected, no txn handling required void MainChain::insert_body(const Block& block, const Hash& block_hash) { // avoid calculation of block.header.hash() because is computationally expensive BlockNum block_num = block.header.number; @@ -138,6 +184,7 @@ void MainChain::insert_body(const Block& block, const Hash& block_hash) { } void MainChain::insert_block(const Block& block) { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; Hash header_hash = insert_header(block.header); insert_body(block, header_hash); @@ -157,6 +204,7 @@ void MainChain::insert_block(const Block& block) { } VerificationResult MainChain::verify_chain(Hash block_hash) { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; SILK_TRACE << "MainChain: verifying chain block=" << block_hash.to_hex(); // Retrieve the block header to validate @@ -243,6 +291,7 @@ VerificationResult MainChain::verify_chain(Hash block_hash) { } bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional finalized_block_hash) { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; if (finalized_block_hash && !interim_canonical_chain_.has(*finalized_block_hash)) { return false; // finalized block not found } @@ -292,6 +341,7 @@ bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional MainChain::collect_bad_headers(db::RWTxn& tx, InvalidChain& invalid_chain) { if (!invalid_chain.bad_block) return {}; @@ -336,6 +386,7 @@ std::unique_ptr MainChain::fork(BlockId forking_point) { } void MainChain::reintegrate_fork(ExtendingFork& extending_fork) { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; Fork* fork = extending_fork.fork_.get(); ensure(fork->head_status() && std::holds_alternative(*fork->head_status()), @@ -352,15 +403,16 @@ void MainChain::reintegrate_fork(ExtendingFork& extending_fork) { } std::optional MainChain::get_header(Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; // const BlockHeader* cached = header_cache_.get(header_hash); // if (cached) { // return *cached; // } - std::optional header = data_model_.read_header(header_hash); - return header; + return data_model_.read_header(header_hash); } std::optional MainChain::get_header(BlockNum header_height, Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; // const BlockHeader* cached = header_cache_.get(header_hash); // if (cached) { // return *cached; @@ -371,20 +423,24 @@ std::optional MainChain::get_header(BlockNum header_height, Hash he std::optional MainChain::get_finalized_canonical_hash(BlockNum height) const { if (height > last_fork_choice_.number) return {}; + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return interim_canonical_chain_.get_hash(height); } std::optional MainChain::get_header_td(BlockNum header_height, Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return db::read_total_difficulty(tx_, header_height, header_hash); } std::optional MainChain::get_header_td(Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; auto header = get_header(header_hash); if (!header) return {}; return db::read_total_difficulty(tx_, header->number, header_hash); } std::optional MainChain::get_body(Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; BlockBody body; bool found = data_model_.read_body(header_hash, body); if (!found) return {}; @@ -392,10 +448,12 @@ std::optional MainChain::get_body(Hash header_hash) const { } BlockNum MainChain::get_block_progress() const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return data_model_.highest_block_number(); } std::vector MainChain::get_last_headers(uint64_t limit) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; std::vector headers; data_model_.for_last_n_headers(limit, [&headers](BlockHeader&& header) { @@ -406,18 +464,22 @@ std::vector MainChain::get_last_headers(uint64_t limit) const { } std::optional MainChain::get_block_number(Hash header_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return data_model_.read_block_number(header_hash); } bool MainChain::is_ancestor(BlockId supposed_parent, BlockId block) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return extends(block, supposed_parent); } bool MainChain::extends_last_fork_choice(BlockNum height, Hash hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; return extends({height, hash}, last_fork_choice_); } bool MainChain::extends(BlockId block, BlockId supposed_parent) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; while (block.number > supposed_parent.number) { auto header = get_header(block.number, block.hash); if (!header) return false; @@ -431,6 +493,7 @@ bool MainChain::extends(BlockId block, BlockId supposed_parent) const { } bool MainChain::is_finalized_canonical(Hash block_hash) const { + TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open}; auto header = get_header(block_hash); if (!header) return false; if (header->number > last_fork_choice_.number) return false; @@ -438,15 +501,18 @@ bool MainChain::is_finalized_canonical(Hash block_hash) const { return canonical_hash_at_same_height == block_hash; } +// protected, no txn handling required bool MainChain::is_canonical(BlockNum block_height, const Hash& block_hash) const { // Check if specified block already exists as canonical block return interim_canonical_chain_.get_hash(block_height) == block_hash; } +// protected, no txn handling required bool MainChain::is_canonical_head_ancestor(const Hash& block_hash) const { return interim_canonical_chain_.has(block_hash) && interim_canonical_chain_.current_head().hash != block_hash; } +// protected, no txn handling required void MainChain::forward(BlockNum head_height, const Hash& head_hash) { // update canonical up to header_hash interim_canonical_chain_.update_up_to(head_height, head_hash); @@ -488,6 +554,7 @@ void MainChain::forward(BlockNum head_height, const Hash& head_hash) { interim_head_status_ = verify_result; } +// protected, no txn handling required void MainChain::unwind(BlockNum unwind_point) { const auto unwind_result = pipeline_.unwind(tx_, unwind_point); success_or_throw(unwind_result); // unwind must complete with success diff --git a/silkworm/node/stagedsync/forks/main_chain.hpp b/silkworm/node/stagedsync/forks/main_chain.hpp index b16a11fe9f..c0ae2924f5 100644 --- a/silkworm/node/stagedsync/forks/main_chain.hpp +++ b/silkworm/node/stagedsync/forks/main_chain.hpp @@ -44,6 +44,7 @@ class MainChain { void open(); // needed to circumvent mdbx threading model limitations void close(); + void abort(); // extension void insert_block(const Block&); @@ -98,7 +99,7 @@ class MainChain { boost::asio::io_context& io_context_; NodeSettings& node_settings_; - db::RWAccess db_access_; + mutable db::RWAccess db_access_; mutable db::RWTxnManaged tx_; db::DataModel data_model_; bool is_first_sync_{true}; diff --git a/silkworm/node/stagedsync/forks/main_chain_test.cpp b/silkworm/node/stagedsync/forks/main_chain_test.cpp index e546bce8fa..df0374ba28 100644 --- a/silkworm/node/stagedsync/forks/main_chain_test.cpp +++ b/silkworm/node/stagedsync/forks/main_chain_test.cpp @@ -49,6 +49,102 @@ class MainChain_ForTest : public stagedsync::MainChain { using stagedsync::MainChain::tx_; }; +TEST_CASE("MainChain transaction handling") { + for (int i = 0; i < 2; i++) { + auto keep_db_txn_open = i == 1; + + SECTION("keep_db_txn_open = " + std::to_string(keep_db_txn_open)) { + test_util::SetLogVerbosityGuard log_guard(log::Level::kNone); + + asio::io_context io; + asio::executor_work_guard work{io.get_executor()}; + + db::test_util::TempChainData context; + context.add_genesis_data(); + context.commit_txn(); + + PreverifiedHashes::current.clear(); // disable preverified hashes + Environment::set_stop_before_stage(db::stages::kSendersKey); // only headers, block hashes and bodies + + NodeSettings node_settings = node::test_util::make_node_settings_from_temp_chain_data(context); + node_settings.keep_db_txn_open = keep_db_txn_open; + db::RWAccess db_access{context.env()}; + MainChain_ForTest main_chain{io, node_settings, db_access}; + main_chain.open(); + + auto& tx = main_chain.tx(); + + SECTION("multiple reads") { + auto hash0 = main_chain.get_finalized_canonical_hash(0); + REQUIRE(hash0.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + + auto header0 = main_chain.get_header(0, *hash0); + REQUIRE(header0.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + + auto body0 = main_chain.get_body(*hash0); + REQUIRE(body0.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + + auto td0 = main_chain.get_header_td(0, *hash0); + REQUIRE(td0.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + + auto bn0 = main_chain.get_block_number(*hash0); + REQUIRE(bn0.has_value()); + CHECK(bn0 == 0); + CHECK(tx.is_open() == keep_db_txn_open); + } + + SECTION("multiple inserts") { + auto hash0 = main_chain.get_finalized_canonical_hash(0); + auto header0 = main_chain.get_header(0, *hash0); + + auto block1 = generate_sample_child_blocks(*header0); + auto block1_hash = block1->header.hash(); + main_chain.insert_block(*block1); + CHECK(tx.is_open() == keep_db_txn_open); + + auto header1 = main_chain.get_header(1, block1_hash); + REQUIRE(header1.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + + auto block2 = generate_sample_child_blocks(*header1); + auto block2_hash = block2->header.hash(); + main_chain.insert_block(*block2); + CHECK(tx.is_open() == keep_db_txn_open); + + auto header2 = main_chain.get_header(2, block2_hash); + REQUIRE(header2.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + } + + SECTION("completes fork choice update") { + auto hash0 = main_chain.get_finalized_canonical_hash(0); + auto header0 = main_chain.get_header(0, *hash0); + + auto block1 = generate_sample_child_blocks(*header0); + auto block1_hash = block1->header.hash(); + main_chain.insert_block(*block1); + CHECK(tx.is_open() == keep_db_txn_open); + + auto verification1 = main_chain.verify_chain(block1_hash); + REQUIRE(holds_alternative(verification1)); + CHECK(tx.is_open() == keep_db_txn_open); + + auto fcu_updated = main_chain.notify_fork_choice_update(block1_hash); + CHECK(fcu_updated); + CHECK(tx.is_open() == keep_db_txn_open); + + auto hash1 = main_chain.get_finalized_canonical_hash(1); + REQUIRE(hash1.has_value()); + CHECK(tx.is_open() == keep_db_txn_open); + } + } + } +} + TEST_CASE("MainChain") { test_util::SetLogVerbosityGuard log_guard(log::Level::kNone);