Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: snapshots seeding #2302

Merged
merged 2 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions silkworm/db/snapshot_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ void SnapshotMerger::commit(std::shared_ptr<DataMigrationResult> result) {
for (auto& merged_bundle : merged_bundles) {
schedule_bundle_cleanup(*merged_bundle);
}

on_snapshot_merged_signal_(bundle.block_range());
}

boost::signals2::scoped_connection SnapshotMerger::on_snapshot_merged(std::function<void(BlockNumRange)> callback) {
return on_snapshot_merged_signal_.connect(std::move(callback));
}

Task<void> SnapshotMerger::cleanup() {
Expand Down
12 changes: 11 additions & 1 deletion silkworm/db/snapshot_merger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

#pragma once

#include <functional>

#include <boost/signals2.hpp>

#include <silkworm/core/common/base.hpp>

#include "data_migration.hpp"
#include "snapshots/snapshot_repository.hpp"
#include "snapshots/snapshot_size.hpp"

namespace silkworm::db {

Expand All @@ -29,9 +36,11 @@ class SnapshotMerger : public DataMigration {
: snapshots_(snapshots),
tmp_dir_path_(std::move(tmp_dir_path)) {}

boost::signals2::scoped_connection on_snapshot_merged(std::function<void(BlockNumRange)> callback);

private:
static constexpr size_t kBatchSize = 10;
static constexpr size_t kMaxSnapshotSize = 100'000;
static constexpr size_t kMaxSnapshotSize = snapshots::kMaxMergerSnapshotSize;

const char* name() const override { return "SnapshotMerger"; }
std::unique_ptr<DataMigrationCommand> next_command() override;
Expand All @@ -42,6 +51,7 @@ class SnapshotMerger : public DataMigration {

snapshots::SnapshotRepository& snapshots_;
std::filesystem::path tmp_dir_path_;
boost::signals2::signal<void(BlockNumRange)> on_snapshot_merged_signal_;
};

} // namespace silkworm::db
54 changes: 47 additions & 7 deletions silkworm/db/snapshot_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <silkworm/db/blocks/headers/header_snapshot.hpp>
#include <silkworm/db/mdbx/etl_mdbx_collector.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshots/bittorrent/torrent_file.hpp>
#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/db/stages.hpp>
#include <silkworm/infra/common/ensure.hpp>
Expand Down Expand Up @@ -86,6 +87,10 @@ Task<void> SnapshotSync::setup_and_run() {

co_await setup();

[[maybe_unused]] auto snapshot_merged_subscription = snapshot_merger_.on_snapshot_merged([this](BlockNumRange range) {
this->seed_frozen_bundle(range);
});

co_await (
snapshot_freezer_.run_loop() &&
snapshot_merger_.run_loop());
Expand Down Expand Up @@ -114,6 +119,8 @@ Task<void> SnapshotSync::setup() {
// Set snapshot repository into snapshot-aware database access
db::DataModel::set_snapshot_repository(&repository_);

seed_frozen_local_snapshots();

std::scoped_lock lock{setup_done_mutex_};
setup_done_ = true;
setup_done_cond_var_.notify_all();
Expand Down Expand Up @@ -164,7 +171,7 @@ Task<void> SnapshotSync::download_snapshots() {
auto log_added = [](const std::filesystem::path& snapshot_file) {
SILK_TRACE << "SnapshotSync: download started for: " << snapshot_file.filename().string();
};
const auto added_connection = client_.added_subscription.connect(log_added);
boost::signals2::scoped_connection added_subscription{client_.added_subscription.connect(log_added)};

size_t completed = 0;
auto log_stats = [&](lt::span<const int64_t> counters) {
Expand All @@ -186,15 +193,15 @@ Task<void> SnapshotSync::download_snapshots() {
SILK_TRACE << "SnapshotSync: counters dump [" << counters_dump << "]";
}
};
const auto stats_connection = client_.stats_subscription.connect(log_stats);
boost::signals2::scoped_connection stats_subscription{client_.stats_subscription.connect(log_stats)};

auto executor = co_await boost::asio::this_coro::executor;
// make the buffer bigger so that try_send always succeeds in case of duplicate files (see snapshot_set below)
concurrency::Channel<std::filesystem::path> completed_channel{executor, num_snapshots * 2};
auto log_completed = [&](const std::filesystem::path& snapshot_file) {
completed_channel.try_send(snapshot_file);
};
const auto completed_connection = client_.completed_subscription.connect(log_completed);
boost::signals2::scoped_connection completed_subscription{client_.completed_subscription.connect(log_completed)};

for (const auto& preverified_snapshot : snapshot_config.preverified_snapshots()) {
SILK_TRACE << "SnapshotSync: adding info hash for preverified: " << preverified_snapshot.file_name;
Expand All @@ -218,10 +225,6 @@ Task<void> SnapshotSync::download_snapshots() {
}

SILK_INFO << "SnapshotSync: sync completed: [" << num_snapshots << "/" << num_snapshots << "]";

added_connection.disconnect();
completed_connection.disconnect();
stats_connection.disconnect();
}

Task<void> SnapshotSync::build_missing_indexes() {
Expand Down Expand Up @@ -266,6 +269,43 @@ Task<void> SnapshotSync::build_missing_indexes() {
SILK_INFO << "SnapshotSync: built missing indexes";
}

void SnapshotSync::seed_frozen_local_snapshots() {
for (auto& bundle_ptr : repository_.view_bundles()) {
auto& bundle = *bundle_ptr;
bool is_frozen = bundle.block_range().size() >= kMaxMergerSnapshotSize;
bool is_preverified = bundle.block_to() <= snapshots_config_.max_block_number() + 1;
if (is_frozen && !is_preverified) {
seed_bundle(bundle);
}
}
}

void SnapshotSync::seed_frozen_bundle(BlockNumRange range) {
bool is_frozen = range.size() >= kMaxMergerSnapshotSize;
auto bundle = repository_.find_bundle(range.start);
if (bundle && (bundle->block_range() == range) && is_frozen) {
seed_bundle(*bundle);
}
}

void SnapshotSync::seed_bundle(SnapshotBundle& bundle) {
for (auto& path : bundle.snapshot_paths()) {
seed_snapshot(path);
}
}

void SnapshotSync::seed_snapshot(const SnapshotPath& path) {
std::filesystem::path torrent_path = path.path().concat(".torrent");
auto torrent_file =
std::filesystem::exists(torrent_path)
? bittorrent::TorrentFile{torrent_path}
: bittorrent::TorrentFile::from_source_file(path.path());
if (!std::filesystem::exists(torrent_path)) {
torrent_file.save(torrent_path);
}
client_.add_info_hash(path.path().filename().string(), torrent_file.info_hash());
}

void SnapshotSync::update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping) {
update_block_headers(txn, max_block_available, is_stopping);
update_block_bodies(txn, max_block_available);
Expand Down
8 changes: 8 additions & 0 deletions silkworm/db/snapshot_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include <silkworm/db/snapshot_merger.hpp>
#include <silkworm/db/snapshots/bittorrent/client.hpp>
#include <silkworm/db/snapshots/config.hpp>
#include <silkworm/db/snapshots/snapshot_bundle.hpp>
#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/db/snapshots/snapshot_repository.hpp>
#include <silkworm/db/snapshots/snapshot_settings.hpp>
#include <silkworm/db/stage_scheduler.hpp>
Expand Down Expand Up @@ -61,6 +63,12 @@ class SnapshotSync {
Task<void> setup_and_run();
Task<void> setup();
Task<void> build_missing_indexes();

void seed_frozen_local_snapshots();
void seed_frozen_bundle(BlockNumRange range);
void seed_bundle(snapshots::SnapshotBundle& bundle);
void seed_snapshot(const snapshots::SnapshotPath& path);

void update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping);
void update_block_headers(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping);
void update_block_bodies(db::RWTxn& txn, BlockNum max_block_available);
Expand Down
15 changes: 10 additions & 5 deletions silkworm/db/snapshots/bittorrent/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ namespace fs = std::filesystem;
using namespace std::chrono_literals;

std::vector<char> BitTorrentClient::load_file(const fs::path& filename) {
std::ifstream input_file_stream{filename, std::ios_base::binary};
input_file_stream.unsetf(std::ios_base::skipws);
return {std::istream_iterator<char>(input_file_stream), std::istream_iterator<char>()};
if (!std::filesystem::exists(filename)) return {};
std::ifstream input_file_stream{filename, std::ios::binary | std::ios::ate};
input_file_stream.exceptions(std::ios::failbit | std::ios::badbit);
std::streamsize file_size = input_file_stream.tellg();
std::vector<char> contents(static_cast<size_t>(file_size));
input_file_stream.seekg(0);
input_file_stream.read(contents.data(), file_size);
return contents;
}

void BitTorrentClient::save_file(const fs::path& filename, const std::vector<char>& data) {
SILK_TRACE << "Save #data=" << data.size() << " in file: " << filename;
std::ofstream output_file_stream{filename, std::ios_base::binary};
output_file_stream.unsetf(std::ios_base::skipws);
std::ofstream output_file_stream{filename, std::ios::binary | std::ios::trunc};
output_file_stream.exceptions(std::ios::failbit | std::ios::badbit);
output_file_stream.write(data.data(), static_cast<std::streamsize>(data.size()));
}

Expand Down
15 changes: 12 additions & 3 deletions silkworm/db/snapshots/bittorrent/client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ static inline std::vector<char> test_resume_data() {
return resume_data;
}

TEST_CASE("BitTorrentClient::load_file", "[silkworm][snapshot][bittorrent]") {
TemporaryDirectory tmp_dir;
const auto path = tmp_dir.path() / "test.resume";
const auto resume_data = test_resume_data();
BitTorrentClientForTest::save_file(path, resume_data);
CHECK(BitTorrentClientForTest::load_file(path) == resume_data);
}

TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent]") {
TemporaryDirectory tmp_dir;
BitTorrentSettings settings;
Expand All @@ -123,16 +131,17 @@ TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent
SECTION("nonempty resume dir") {
const auto resume_dir_path = settings.repository_path / BitTorrentClient::kResumeDirName;
std::filesystem::create_directories(resume_dir_path);

const auto ignored_file{resume_dir_path / "a.txt"};
BitTorrentClientForTest::save_file(ignored_file, std::vector<char>{});

const auto empty_resume_file{resume_dir_path / "a.resume"};
BitTorrentClientForTest::save_file(empty_resume_file, std::vector<char>{});
const auto invalid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"};
std::vector<char> invalid_resume_data{};
BitTorrentClientForTest::save_file(invalid_resume_file, invalid_resume_data);

const auto valid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"};
std::vector<char> resume_data{test_resume_data()};
BitTorrentClientForTest::save_file(valid_resume_file, resume_data);

CHECK_NOTHROW(BitTorrentClient{settings});
}
}
Expand Down
19 changes: 19 additions & 0 deletions silkworm/db/snapshots/bittorrent/torrent_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "torrent_file.hpp"

#include <fstream>
#include <iterator>
#include <sstream>
#include <string>

#include <libtorrent/create_torrent.hpp>
Expand All @@ -37,6 +39,10 @@ TorrentFile::TorrentFile(ByteView data)
: params_(lt::load_torrent_buffer(byte_view_to_str_span(data))) {
}

TorrentFile::TorrentFile(const std::filesystem::path& path)
: params_(lt::load_torrent_file(path.string())) {
}

TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date) {
lt::file_storage storage;
lt::create_flags_t flags = lt::create_torrent::v1_only;
Expand All @@ -57,10 +63,23 @@ TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_fi
return TorrentFile{string_view_to_byte_view(data)};
}

std::string TorrentFile::info_hash() const {
std::stringstream stream;
stream << params_.ti->info_hashes().get_best();
return stream.str();
}

Bytes TorrentFile::to_bytes() const {
std::string data;
lt::bencode(std::back_inserter(data), lt::write_torrent_file(params_));
return string_to_bytes(data);
}

void TorrentFile::save(const std::filesystem::path& path) {
Bytes data = to_bytes();
std::ofstream file{path, std::ios::binary | std::ios::trunc};
file.exceptions(std::ios::failbit | std::ios::badbit);
file << byte_view_to_string_view(data);
}

} // namespace silkworm::snapshots::bittorrent
3 changes: 3 additions & 0 deletions silkworm/db/snapshots/bittorrent/torrent_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ namespace silkworm::snapshots::bittorrent {
class TorrentFile {
public:
TorrentFile(ByteView data);
TorrentFile(const std::filesystem::path& path);

static TorrentFile from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date = 0);

const lt::add_torrent_params& params() const { return params_; }
std::string info_hash() const;
Bytes to_bytes() const;
void save(const std::filesystem::path& path);

private:
lt::add_torrent_params params_;
Expand Down
60 changes: 43 additions & 17 deletions silkworm/db/snapshots/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,49 @@

#include "config.hpp"

#include <map>
#include <utility>
#include <span>

#include <boost/algorithm/string.hpp>

#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/core/common/small_map.hpp>

#include "config/bor_mainnet.hpp"
#include "config/mainnet.hpp"
#include "config/mumbai.hpp"
#include "config/sepolia.hpp"
#include "snapshot_path.hpp"
#include "snapshot_size.hpp"

namespace silkworm::snapshots {

inline constexpr SmallMap<ChainId, std::span<const Entry>> kKnownConfigGeneratedEntries{
{*kKnownChainNameToId.find("mainnet"sv), {kMainnetSnapshots.data(), kMainnetSnapshots.size()}},
{*kKnownChainNameToId.find("sepolia"sv), {kSepoliaSnapshots.data(), kSepoliaSnapshots.size()}},
{*kKnownChainNameToId.find("bor-mainnet"sv), {kBorMainnetSnapshots.data(), kBorMainnetSnapshots.size()}},
{*kKnownChainNameToId.find("mumbai"sv), {kMumbaiSnapshots.data(), kMumbaiSnapshots.size()}},
};

Config Config::lookup_known_config(ChainId chain_id) {
const auto config = kKnownSnapshotConfigs.find(chain_id);
if (!config) {
const auto entries_ptr = kKnownConfigGeneratedEntries.find(chain_id);
if (!entries_ptr) {
return Config{PreverifiedList{}};
}
return Config{PreverifiedList(config->begin(), config->end())};

PreverifiedList entries(entries_ptr->begin(), entries_ptr->end());
entries = remove_unsupported_snapshots(entries);

return Config{std::move(entries)};
}

Config::Config(PreverifiedList preverified_snapshots)
: preverified_snapshots_(std::move(preverified_snapshots)), max_block_number_(compute_max_block()) {
remove_unsupported_snapshots();
Config::Config(PreverifiedList entries)
: entries_(std::move(entries)),
max_block_number_(compute_max_block(entries_)) {
}

BlockNum Config::compute_max_block() {
BlockNum Config::compute_max_block(const PreverifiedList& entries) {
BlockNum max_block{0};
for (const auto& preverified_entry : preverified_snapshots_) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{preverified_entry.file_name});
for (const auto& entry : entries) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name});
if (!snapshot_path) continue;
if (!snapshot_path->is_segment()) continue;
if (snapshot_path->type() != SnapshotType::headers) continue;
Expand All @@ -53,16 +69,26 @@ BlockNum Config::compute_max_block() {
return max_block > 0 ? max_block - 1 : 0;
}

void Config::remove_unsupported_snapshots() {
PreverifiedList Config::remove_unsupported_snapshots(const PreverifiedList& entries) {
static constexpr std::array kUnsupportedSnapshotNameTokens = {
"accessor/"sv, "domain/"sv, "history/"sv, "idx/"sv, "manifest.txt"sv, "salt-blocks.txt"sv, "salt-state.txt"sv, "blobsidecars.seg"sv};

PreverifiedList results = entries;

// Check if a snapshot contains any of unsupported tokens
std::erase_if(preverified_snapshots_, [&](const auto& snapshot) {
return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&snapshot](const auto& token) {
return boost::algorithm::contains(snapshot.file_name, token);
std::erase_if(results, [&](const auto& entry) {
return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&entry](const auto& token) {
return boost::algorithm::contains(entry.file_name, token);
});
});

// Exclude small snapshots
std::erase_if(results, [&](const auto& entry) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name});
return !snapshot_path.has_value() || (snapshot_path->block_range().size() < kMaxMergerSnapshotSize);
});

return results;
}

} // namespace silkworm::snapshots
Loading
Loading