Skip to content

Commit

Permalink
node: main chain db txn management (#2112)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacekGlen authored Jun 18, 2024
1 parent 48b6800 commit 6542d38
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 6 deletions.
1 change: 1 addition & 0 deletions silkworm/node/common/node_settings.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ struct NodeSettings {
uint32_t sync_loop_throttle_seconds{0}; // Minimum interval amongst sync cycle
uint32_t sync_loop_log_interval_seconds{30}; // Interval for sync loop to emit logs
bool parallel_fork_tracking_enabled{false}; // Whether to track multiple parallel forks at head
bool keep_db_txn_open{true}; // Whether to keep db transaction open between requests

inline db::etl::CollectorSettings etl() const {
return {data_directory->etl().path(), etl_buffer_size};
Expand Down
6 changes: 4 additions & 2 deletions silkworm/node/stagedsync/execution_engine_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class ExecutionEngine_ForTest : public stagedsync::ExecutionEngine {
};

TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engine]") {
test_util::SetLogVerbosityGuard log_guard(log::Level::kWarning);

test_util::SetLogVerbosityGuard log_guard(log::Level::kNone);
test_util::TaskRunner runner;
Environment::set_stop_before_stage(db::stages::kSendersKey); // only headers, block hashes and bodies

Expand All @@ -60,6 +59,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin
.chaindata_env_config = db_context.get_env_config(),
.chain_config = db_context.get_chain_config(),
.parallel_fork_tracking_enabled = false,
.keep_db_txn_open = true,
};

db::RWAccess db_access{db_context.get_mdbx_env()};
Expand Down Expand Up @@ -717,6 +717,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin

CHECK(db::read_block_number(tx2, block1_hash).has_value());
CHECK(db::read_block_number(tx2, block2_hash).has_value());
tx2.abort();
}

SECTION("notify_fork_choice_update does not update chain database") {
Expand All @@ -742,6 +743,7 @@ TEST_CASE("ExecutionEngine Integration Test", "[node][execution][execution_engin
auto tx2 = db_access.start_ro_tx();
CHECK(db::read_block_number(tx2, block1_hash).has_value());
CHECK(db::read_block_number(tx2, block2_hash).has_value());
tx2.abort();
}

// TODO: temoporarily disabled, to be fixed (JG)
Expand Down
73 changes: 70 additions & 3 deletions silkworm/node/stagedsync/forks/main_chain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,41 @@

#include "extending_fork.hpp"

namespace {

//! @brief Handles the transaction lifecycle for long-standing and per-request transactions
class TransactionHandler {
public:
TransactionHandler(silkworm::db::RWTxnManaged& txn, silkworm::db::RWAccess& db_access, bool keep_db_txn_open = true)
: txn_{txn}, db_access_{db_access}, keep_db_txn_open_{keep_db_txn_open} {
if (!keep_db_txn_open_) {
if (request_count_ == 0 && !txn_.is_open()) {
txn_.reopen(*db_access_);
}
request_count_++;
} else {
if (!txn_.is_open()) {
txn_.reopen(*db_access_);
}
}
}

~TransactionHandler() {
if (!keep_db_txn_open_) {
if (--request_count_ == 0) {
txn_.commit_and_stop();
}
}
}

private:
silkworm::db::RWTxnManaged& txn_;
silkworm::db::RWAccess& db_access_;
bool keep_db_txn_open_{true};
inline static SILKWORM_THREAD_LOCAL int request_count_;
};
} // namespace

namespace silkworm::stagedsync {

//! The number of inserted blocks between two successive commits on db
Expand All @@ -45,7 +80,7 @@ MainChain::MainChain(boost::asio::io_context& ctx, NodeSettings& ns, db::RWAcces
}

void MainChain::open() {
tx_.reopen(*db_access_); // comply to mdbx limitation: tx must be used from its creation thread
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};

// Load last finalized and last chosen blocks from persistence
auto last_finalized_hash = db::read_last_finalized_block(tx_);
Expand Down Expand Up @@ -83,6 +118,12 @@ void MainChain::open() {
}

void MainChain::close() {
if (node_settings_.keep_db_txn_open) {
tx_.commit_and_stop();
}
}

void MainChain::abort() {
tx_.abort();
}

Expand All @@ -107,27 +148,32 @@ BlockId MainChain::last_finalized_head() const {
}

std::optional<BlockId> MainChain::find_forking_point(const BlockHeader& header, const Hash& header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return interim_canonical_chain_.find_forking_point(header, header_hash);
}

std::optional<BlockId> MainChain::find_forking_point(const Hash& header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(header_hash);
if (!header) return std::nullopt;
return find_forking_point(*header, header_hash);
}

bool MainChain::is_finalized_canonical(BlockId block) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
if (block.number > last_fork_choice_.number) return false;
return (interim_canonical_chain_.get_hash(block.number) == block.hash);
}

// protected, no txn handling required
Hash MainChain::insert_header(const BlockHeader& header) {
return db::write_header_ex(tx_, header, /*with_header_numbers=*/true);
// with_header_numbers=true is necessary at the moment because many getters here rely on kHeaderNumbers table;
// that table is updated by stage block-hashes so only after a pipeline run
// todo: remove getters that take only an hash as input and use with_header_numbers=false here
}

// protected, no txn handling required
void MainChain::insert_body(const Block& block, const Hash& block_hash) {
// avoid calculation of block.header.hash() because is computationally expensive
BlockNum block_num = block.header.number;
Expand All @@ -138,6 +184,7 @@ void MainChain::insert_body(const Block& block, const Hash& block_hash) {
}

void MainChain::insert_block(const Block& block) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
Hash header_hash = insert_header(block.header);
insert_body(block, header_hash);

Expand All @@ -157,6 +204,7 @@ void MainChain::insert_block(const Block& block) {
}

VerificationResult MainChain::verify_chain(Hash block_hash) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
SILK_TRACE << "MainChain: verifying chain block=" << block_hash.to_hex();

// Retrieve the block header to validate
Expand Down Expand Up @@ -243,6 +291,7 @@ VerificationResult MainChain::verify_chain(Hash block_hash) {
}

bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Hash> finalized_block_hash) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
if (finalized_block_hash && !interim_canonical_chain_.has(*finalized_block_hash)) {
return false; // finalized block not found
}
Expand Down Expand Up @@ -292,6 +341,7 @@ bool MainChain::notify_fork_choice_update(Hash head_block_hash, std::optional<Ha
return true;
}

// protected, no txn handling required
std::set<Hash> MainChain::collect_bad_headers(db::RWTxn& tx, InvalidChain& invalid_chain) {
if (!invalid_chain.bad_block) return {};

Expand Down Expand Up @@ -336,6 +386,7 @@ std::unique_ptr<ExtendingFork> MainChain::fork(BlockId forking_point) {
}

void MainChain::reintegrate_fork(ExtendingFork& extending_fork) {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
Fork* fork = extending_fork.fork_.get();

ensure(fork->head_status() && std::holds_alternative<ValidChain>(*fork->head_status()),
Expand All @@ -352,15 +403,16 @@ void MainChain::reintegrate_fork(ExtendingFork& extending_fork) {
}

std::optional<BlockHeader> MainChain::get_header(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
// const BlockHeader* cached = header_cache_.get(header_hash);
// if (cached) {
// return *cached;
// }
std::optional<BlockHeader> header = data_model_.read_header(header_hash);
return header;
return data_model_.read_header(header_hash);
}

std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
// const BlockHeader* cached = header_cache_.get(header_hash);
// if (cached) {
// return *cached;
Expand All @@ -371,31 +423,37 @@ std::optional<BlockHeader> MainChain::get_header(BlockNum header_height, Hash he

std::optional<Hash> MainChain::get_finalized_canonical_hash(BlockNum height) const {
if (height > last_fork_choice_.number) return {};
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return interim_canonical_chain_.get_hash(height);
}

std::optional<TotalDifficulty> MainChain::get_header_td(BlockNum header_height, Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return db::read_total_difficulty(tx_, header_height, header_hash);
}

std::optional<TotalDifficulty> MainChain::get_header_td(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(header_hash);
if (!header) return {};
return db::read_total_difficulty(tx_, header->number, header_hash);
}

std::optional<BlockBody> MainChain::get_body(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
BlockBody body;
bool found = data_model_.read_body(header_hash, body);
if (!found) return {};
return body;
}

BlockNum MainChain::get_block_progress() const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return data_model_.highest_block_number();
}

std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
std::vector<BlockHeader> headers;

data_model_.for_last_n_headers(limit, [&headers](BlockHeader&& header) {
Expand All @@ -406,18 +464,22 @@ std::vector<BlockHeader> MainChain::get_last_headers(uint64_t limit) const {
}

std::optional<BlockNum> MainChain::get_block_number(Hash header_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return data_model_.read_block_number(header_hash);
}

bool MainChain::is_ancestor(BlockId supposed_parent, BlockId block) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return extends(block, supposed_parent);
}

bool MainChain::extends_last_fork_choice(BlockNum height, Hash hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
return extends({height, hash}, last_fork_choice_);
}

bool MainChain::extends(BlockId block, BlockId supposed_parent) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
while (block.number > supposed_parent.number) {
auto header = get_header(block.number, block.hash);
if (!header) return false;
Expand All @@ -431,22 +493,26 @@ bool MainChain::extends(BlockId block, BlockId supposed_parent) const {
}

bool MainChain::is_finalized_canonical(Hash block_hash) const {
TransactionHandler tx_handler{tx_, db_access_, node_settings_.keep_db_txn_open};
auto header = get_header(block_hash);
if (!header) return false;
if (header->number > last_fork_choice_.number) return false;
auto canonical_hash_at_same_height = interim_canonical_chain_.get_hash(header->number);
return canonical_hash_at_same_height == block_hash;
}

// protected, no txn handling required
bool MainChain::is_canonical(BlockNum block_height, const Hash& block_hash) const {
// Check if specified block already exists as canonical block
return interim_canonical_chain_.get_hash(block_height) == block_hash;
}

// protected, no txn handling required
bool MainChain::is_canonical_head_ancestor(const Hash& block_hash) const {
return interim_canonical_chain_.has(block_hash) && interim_canonical_chain_.current_head().hash != block_hash;
}

// protected, no txn handling required
void MainChain::forward(BlockNum head_height, const Hash& head_hash) {
// update canonical up to header_hash
interim_canonical_chain_.update_up_to(head_height, head_hash);
Expand Down Expand Up @@ -488,6 +554,7 @@ void MainChain::forward(BlockNum head_height, const Hash& head_hash) {
interim_head_status_ = verify_result;
}

// protected, no txn handling required
void MainChain::unwind(BlockNum unwind_point) {
const auto unwind_result = pipeline_.unwind(tx_, unwind_point);
success_or_throw(unwind_result); // unwind must complete with success
Expand Down
3 changes: 2 additions & 1 deletion silkworm/node/stagedsync/forks/main_chain.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class MainChain {

void open(); // needed to circumvent mdbx threading model limitations
void close();
void abort();

// extension
void insert_block(const Block&);
Expand Down Expand Up @@ -98,7 +99,7 @@ class MainChain {

boost::asio::io_context& io_context_;
NodeSettings& node_settings_;
db::RWAccess db_access_;
mutable db::RWAccess db_access_;
mutable db::RWTxnManaged tx_;
db::DataModel data_model_;
bool is_first_sync_{true};
Expand Down
Loading

0 comments on commit 6542d38

Please sign in to comment.