Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

snapshots: refactor SnapshotRepository #2056

Merged
merged 16 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/capi/execute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <silkworm/capi/silkworm.h>
#include <silkworm/db/access_layer.hpp>
#include <silkworm/db/mdbx/mdbx.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshots/repository.hpp>
#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/common/log.hpp>
Expand Down Expand Up @@ -150,8 +151,8 @@ std::vector<SilkwormChainSnapshot> collect_all_snapshots(SnapshotRepository& sna
std::vector<SilkwormBodiesSnapshot> bodies_snapshot_sequence;
std::vector<SilkwormTransactionsSnapshot> transactions_snapshot_sequence;

snapshot_repository.view_bundles(
[&](const SnapshotBundle& bundle) {
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles()) {
{
{
SilkwormHeadersSnapshot raw_headers_snapshot{
.segment{
Expand Down Expand Up @@ -202,8 +203,8 @@ std::vector<SilkwormChainSnapshot> collect_all_snapshots(SnapshotRepository& sna
};
transactions_snapshot_sequence.push_back(raw_transactions_snapshot);
}
return true;
});
}
}

ensure(headers_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid header snapshot count");
ensure(bodies_snapshot_sequence.size() == snapshot_repository.bundles_count(), "invalid body snapshot count");
Expand Down Expand Up @@ -436,7 +437,7 @@ int main(int argc, char* argv[]) {
int status_code = -1;
if (settings.execute_blocks_settings) {
// Execute specified block range using Silkworm API library
SnapshotRepository repository{snapshot_settings};
SnapshotRepository repository{snapshot_settings, std::make_unique<db::SnapshotBundleFactoryImpl>()};
repository.reopen_folder();
status_code = execute_blocks(handle, *settings.execute_blocks_settings, repository, data_dir);
} else if (settings.build_indexes_settings) {
Expand Down
4 changes: 3 additions & 1 deletion cmd/dev/check_changes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <silkworm/core/types/evmc_bytes32.hpp>
#include <silkworm/db/access_layer.hpp>
#include <silkworm/db/buffer.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshots/repository.hpp>
#include <silkworm/infra/common/directories.hpp>
#include <silkworm/infra/common/log.hpp>
Expand Down Expand Up @@ -109,7 +110,8 @@ int main(int argc, char* argv[]) {
throw std::runtime_error("Unable to retrieve chain config");
}

snapshots::SnapshotRepository repository;
auto snapshot_bundle_factory = std::make_unique<db::SnapshotBundleFactoryImpl>();
snapshots::SnapshotRepository repository{snapshots::SnapshotSettings{}, std::move(snapshot_bundle_factory)};
repository.reopen_folder();
db::DataModel::set_snapshot_repository(&repository);
db::DataModel access_layer{txn};
Expand Down
120 changes: 64 additions & 56 deletions cmd/dev/snapshots.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <silkworm/core/types/address.hpp>
#include <silkworm/core/types/block_body_for_storage.hpp>
#include <silkworm/core/types/evmc_bytes32.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshot_sync.hpp>
#include <silkworm/db/snapshots/bittorrent/client.hpp>
#include <silkworm/db/snapshots/body_index.hpp>
Expand Down Expand Up @@ -227,44 +228,48 @@ void decode_segment(const SnapSettings& settings, int repetitions) {
SILK_INFO << "Decode snapshot elapsed: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
}

static std::unique_ptr<SnapshotBundleFactory> bundle_factory() {
return std::make_unique<silkworm::db::SnapshotBundleFactoryImpl>();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this factory function into snapshot_bundle_factory_impl.hpp, so that we can use it everywhere and have a one-liner creation for SnapshotRepository as below:

SnapshotRepository snapshot_repo{settings, bundle_factory()};


void count_bodies(const SnapSettings& settings, int repetitions) {
SnapshotRepository snapshot_repo{settings};
SnapshotRepository snapshot_repo{settings, bundle_factory()};
snapshot_repo.reopen_folder();
std::chrono::time_point start{std::chrono::steady_clock::now()};
int num_bodies{0};
uint64_t num_txns{0};
for (int i{0}; i < repetitions; ++i) {
const bool success = snapshot_repo.for_each_body([&](BlockNum number, const BlockBodyForStorage& b) -> bool {
// If *system transactions* should not be counted, skip first and last tx in block body
const auto base_txn_id{settings.skip_system_txs ? b.base_txn_id + 1 : b.base_txn_id};
const auto txn_count{settings.skip_system_txs && b.txn_count >= 2 ? b.txn_count - 2 : b.txn_count};
SILK_DEBUG << "Body number: " << number << " base_txn_id: " << base_txn_id << " txn_count: " << txn_count
<< " #ommers: " << b.ommers.size();
num_bodies++;
num_txns += txn_count;
return true;
});
ensure(success, "count_bodies: for_each_body failed");
for (const SnapshotBundle& bundle : snapshot_repo.view_bundles()) {
for (const BlockBodyForStorage& b : BodySnapshotReader{bundle.body_snapshot}) {
// If *system transactions* should not be counted, skip first and last tx in block body
const auto base_txn_id{settings.skip_system_txs ? b.base_txn_id + 1 : b.base_txn_id};
const auto txn_count{settings.skip_system_txs && b.txn_count >= 2 ? b.txn_count - 2 : b.txn_count};
SILK_TRACE << "Body number: " << num_bodies << " base_txn_id: " << base_txn_id << " txn_count: " << txn_count
<< " #ommers: " << b.ommers.size();
num_bodies++;
num_txns += txn_count;
}
}
}
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
const auto duration = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
SILK_INFO << "How many bodies: " << num_bodies << " txs: " << num_txns << " duration: " << duration << " msec";
}

void count_headers(const SnapSettings& settings, int repetitions) {
SnapshotRepository snapshot_repo{settings};
SnapshotRepository snapshot_repo{settings, bundle_factory()};
snapshot_repo.reopen_folder();
std::chrono::time_point start{std::chrono::steady_clock::now()};
int count{0};
for (int i{0}; i < repetitions; ++i) {
const bool success = snapshot_repo.for_each_header([&count](const BlockHeader& h) -> bool {
++count;
if (h.number % 50'000 == 0) {
SILK_INFO << "Header number: " << h.number << " hash: " << to_hex(h.hash());
for (const SnapshotBundle& bundle : snapshot_repo.view_bundles()) {
for (const BlockHeader& h : HeaderSnapshotReader{bundle.header_snapshot}) {
++count;
if (h.number % 50'000 == 0) {
SILK_INFO << "Header number: " << h.number << " hash: " << to_hex(h.hash());
}
}
return true;
});
ensure(success, "count_headers: for_each_header failed");
}
}
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
SILK_INFO << "How many headers: " << count << " duration: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
Expand Down Expand Up @@ -405,16 +410,17 @@ void lookup_header_by_hash(const SnapSettings& settings) {

std::optional<SnapshotPath> matching_snapshot;
std::optional<BlockHeader> matching_header;
SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
snapshot_repository.reopen_folder();
snapshot_repository.view_header_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
const auto header = HeaderFindByHashQuery{snapshot.snapshot, snapshot.index}.exec(*hash);
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::headers);
const auto header = HeaderFindByHashQuery{snapshot_and_index}.exec(*hash);
if (header) {
matching_header = header;
matching_snapshot = snapshot.snapshot.path();
matching_snapshot = snapshot_and_index.snapshot.path();
break;
}
return header.has_value();
});
}
if (matching_snapshot) {
SILK_INFO << "Lookup header hash: " << hash->to_hex() << " found in: " << matching_snapshot->filename();
if (matching_header && settings.print) {
Expand All @@ -433,16 +439,16 @@ void lookup_header_by_number(const SnapSettings& settings) {
SILK_INFO << "Lookup header number: " << block_number;
std::chrono::time_point start{std::chrono::steady_clock::now()};

SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
snapshot_repository.reopen_folder();
const auto header_snapshot{snapshot_repository.find_header_segment(block_number)};
if (header_snapshot) {
const auto header = HeaderFindByBlockNumQuery{header_snapshot->snapshot, header_snapshot->index}.exec(block_number);
const auto snapshot_and_index = snapshot_repository.find_segment(SnapshotType::headers, block_number);
if (snapshot_and_index) {
const auto header = HeaderFindByBlockNumQuery{*snapshot_and_index}.exec(block_number);
ensure(header.has_value(),
[&]() { return "lookup_header_by_number: " + std::to_string(block_number) + " NOT found in " + header_snapshot->snapshot.path().filename(); });
SILK_INFO << "Lookup header number: " << block_number << " found in: " << header_snapshot->snapshot.path().filename();
[&]() { return "lookup_header_by_number: " + std::to_string(block_number) + " NOT found in " + snapshot_and_index->snapshot.path().filename(); });
SILK_INFO << "Lookup header number: " << block_number << " found in: " << snapshot_and_index->snapshot.path().filename();
if (settings.print) {
print_header(*header, header_snapshot->snapshot.path().filename());
print_header(*header, snapshot_and_index->snapshot.path().filename());
}
} else {
SILK_WARN << "Lookup header number: " << block_number << " NOT found";
Expand Down Expand Up @@ -479,7 +485,7 @@ void lookup_body_in_one(const SnapSettings& settings, BlockNum block_number, con
Index idx_body_number{snapshot_path->index_file()};
idx_body_number.reopen_index();

const auto body = BodyFindByBlockNumQuery{body_snapshot, idx_body_number}.exec(block_number);
const auto body = BodyFindByBlockNumQuery{{body_snapshot, idx_body_number}}.exec(block_number);
if (body) {
SILK_INFO << "Lookup body number: " << block_number << " found in: " << body_snapshot.path().filename();
if (settings.print) {
Expand All @@ -493,18 +499,18 @@ void lookup_body_in_one(const SnapSettings& settings, BlockNum block_number, con
}

void lookup_body_in_all(const SnapSettings& settings, BlockNum block_number) {
SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
snapshot_repository.reopen_folder();

std::chrono::time_point start{std::chrono::steady_clock::now()};
const auto body_snapshot{snapshot_repository.find_body_segment(block_number)};
if (body_snapshot) {
const auto body = BodyFindByBlockNumQuery{body_snapshot->snapshot, body_snapshot->index}.exec(block_number);
const auto snapshot_and_index = snapshot_repository.find_segment(SnapshotType::bodies, block_number);
if (snapshot_and_index) {
const auto body = BodyFindByBlockNumQuery{*snapshot_and_index}.exec(block_number);
ensure(body.has_value(),
[&]() { return "lookup_body: " + std::to_string(block_number) + " NOT found in " + body_snapshot->snapshot.path().filename(); });
SILK_INFO << "Lookup body number: " << block_number << " found in: " << body_snapshot->snapshot.path().filename();
[&]() { return "lookup_body: " + std::to_string(block_number) + " NOT found in " + snapshot_and_index->snapshot.path().filename(); });
SILK_INFO << "Lookup body number: " << block_number << " found in: " << snapshot_and_index->snapshot.path().filename();
if (settings.print) {
print_body(*body, body_snapshot->snapshot.path().filename());
print_body(*body, snapshot_and_index->snapshot.path().filename());
}
} else {
SILK_WARN << "Lookup body number: " << block_number << " NOT found";
Expand Down Expand Up @@ -585,7 +591,7 @@ void lookup_txn_by_hash_in_one(const SnapSettings& settings, const Hash& hash, c
Index idx_txn_hash{snapshot_path->index_file()};
idx_txn_hash.reopen_index();

const auto transaction = TransactionFindByHashQuery{tx_snapshot, idx_txn_hash}.exec(hash);
const auto transaction = TransactionFindByHashQuery{{tx_snapshot, idx_txn_hash}}.exec(hash);
if (transaction) {
SILK_INFO << "Lookup txn hash: " << hash.to_hex() << " found in: " << tx_snapshot.path().filename();
if (settings.print) {
Expand All @@ -600,21 +606,22 @@ void lookup_txn_by_hash_in_one(const SnapSettings& settings, const Hash& hash, c
}

void lookup_txn_by_hash_in_all(const SnapSettings& settings, const Hash& hash) {
SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
snapshot_repository.reopen_folder();

std::optional<SnapshotPath> matching_snapshot;
std::chrono::time_point start{std::chrono::steady_clock::now()};
snapshot_repository.view_tx_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
const auto transaction = TransactionFindByHashQuery{snapshot.snapshot, snapshot.index}.exec(hash);
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::transactions);
const auto transaction = TransactionFindByHashQuery{snapshot_and_index}.exec(hash);
if (transaction) {
matching_snapshot = snapshot.snapshot.path();
matching_snapshot = snapshot_and_index.snapshot.path();
if (settings.print) {
print_txn(*transaction, matching_snapshot->path().filename());
}
break;
}
return transaction.has_value();
});
}
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
SILK_INFO << "Lookup txn elapsed: " << duration_as<std::chrono::microseconds>(elapsed) << " usec";
if (matching_snapshot) {
Expand Down Expand Up @@ -648,7 +655,7 @@ void lookup_txn_by_id_in_one(const SnapSettings& settings, uint64_t txn_id, cons
Index idx_txn_hash{snapshot_path->index_file()};
idx_txn_hash.reopen_index();

const auto transaction = TransactionFindByIdQuery{tx_snapshot, idx_txn_hash}.exec(txn_id);
const auto transaction = TransactionFindByIdQuery{{tx_snapshot, idx_txn_hash}}.exec(txn_id);
if (transaction) {
SILK_INFO << "Lookup txn ID: " << txn_id << " found in: " << tx_snapshot.path().filename();
if (settings.print) {
Expand All @@ -663,21 +670,22 @@ void lookup_txn_by_id_in_one(const SnapSettings& settings, uint64_t txn_id, cons
}

void lookup_txn_by_id_in_all(const SnapSettings& settings, uint64_t txn_id) {
SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
snapshot_repository.reopen_folder();

std::optional<SnapshotPath> matching_snapshot;
std::chrono::time_point start{std::chrono::steady_clock::now()};
snapshot_repository.view_tx_segments([&](SnapshotRepository::SnapshotAndIndex snapshot) -> bool {
const auto transaction = TransactionFindByIdQuery{snapshot.snapshot, snapshot.index}.exec(txn_id);
for (const SnapshotBundle& bundle : snapshot_repository.view_bundles_reverse()) {
auto snapshot_and_index = bundle.snapshot_and_index(SnapshotType::transactions);
const auto transaction = TransactionFindByIdQuery{snapshot_and_index}.exec(txn_id);
if (transaction) {
matching_snapshot = snapshot.snapshot.path();
matching_snapshot = snapshot_and_index.snapshot.path();
if (settings.print) {
print_txn(*transaction, matching_snapshot->path().filename());
}
break;
}
return transaction.has_value();
});
}
std::chrono::duration elapsed{std::chrono::steady_clock::now() - start};
SILK_INFO << "Lookup txn elapsed: " << duration_as<std::chrono::milliseconds>(elapsed) << " msec";
if (matching_snapshot) {
Expand Down Expand Up @@ -708,7 +716,7 @@ void lookup_transaction(const SnapSettings& settings) {

void sync(const SnapSettings& settings) {
std::chrono::time_point start{std::chrono::steady_clock::now()};
SnapshotRepository snapshot_repository{settings};
SnapshotRepository snapshot_repository{settings, bundle_factory()};
db::SnapshotSync snapshot_sync{&snapshot_repository, kMainnetConfig};
std::vector<std::string> snapshot_file_names;
if (settings.snapshot_file_name) {
Expand Down
4 changes: 3 additions & 1 deletion silkworm/capi/silkworm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include <silkworm/core/execution/execution.hpp>
#include <silkworm/db/access_layer.hpp>
#include <silkworm/db/buffer.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshots/body_index.hpp>
#include <silkworm/db/snapshots/header_index.hpp>
#include <silkworm/db/snapshots/index.hpp>
Expand Down Expand Up @@ -206,7 +207,8 @@ SILKWORM_EXPORT int silkworm_init(SilkwormHandle* handle, const struct SilkwormS
log::init(log_settings);
log::Info{"Silkworm build info", log_args_for_version()}; // NOLINT(*-unused-raii)

auto snapshot_repository = std::make_unique<snapshots::SnapshotRepository>();
auto snapshot_bundle_factory = std::make_unique<db::SnapshotBundleFactoryImpl>();
auto snapshot_repository = std::make_unique<snapshots::SnapshotRepository>(snapshots::SnapshotSettings{}, std::move(snapshot_bundle_factory));
db::DataModel::set_snapshot_repository(snapshot_repository.get());

// NOLINTNEXTLINE(bugprone-unhandled-exception-at-new)
Expand Down
Loading
Loading