Skip to content

Commit

Permalink
db: snapshots seeding (#2302)
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr authored Sep 6, 2024
1 parent 1b24907 commit cbc4aeb
Show file tree
Hide file tree
Showing 14 changed files with 195 additions and 57 deletions.
6 changes: 6 additions & 0 deletions silkworm/db/snapshot_merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ void SnapshotMerger::commit(std::shared_ptr<DataMigrationResult> result) {
for (auto& merged_bundle : merged_bundles) {
schedule_bundle_cleanup(*merged_bundle);
}

on_snapshot_merged_signal_(bundle.block_range());
}

boost::signals2::scoped_connection SnapshotMerger::on_snapshot_merged(std::function<void(BlockNumRange)> callback) {
return on_snapshot_merged_signal_.connect(std::move(callback));
}

Task<void> SnapshotMerger::cleanup() {
Expand Down
12 changes: 11 additions & 1 deletion silkworm/db/snapshot_merger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,15 @@

#pragma once

#include <functional>

#include <boost/signals2.hpp>

#include <silkworm/core/common/base.hpp>

#include "data_migration.hpp"
#include "snapshots/snapshot_repository.hpp"
#include "snapshots/snapshot_size.hpp"

namespace silkworm::db {

Expand All @@ -29,9 +36,11 @@ class SnapshotMerger : public DataMigration {
: snapshots_(snapshots),
tmp_dir_path_(std::move(tmp_dir_path)) {}

boost::signals2::scoped_connection on_snapshot_merged(std::function<void(BlockNumRange)> callback);

private:
static constexpr size_t kBatchSize = 10;
static constexpr size_t kMaxSnapshotSize = 100'000;
static constexpr size_t kMaxSnapshotSize = snapshots::kMaxMergerSnapshotSize;

const char* name() const override { return "SnapshotMerger"; }
std::unique_ptr<DataMigrationCommand> next_command() override;
Expand All @@ -42,6 +51,7 @@ class SnapshotMerger : public DataMigration {

snapshots::SnapshotRepository& snapshots_;
std::filesystem::path tmp_dir_path_;
boost::signals2::signal<void(BlockNumRange)> on_snapshot_merged_signal_;
};

} // namespace silkworm::db
54 changes: 47 additions & 7 deletions silkworm/db/snapshot_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <silkworm/db/blocks/headers/header_snapshot.hpp>
#include <silkworm/db/mdbx/etl_mdbx_collector.hpp>
#include <silkworm/db/snapshot_bundle_factory_impl.hpp>
#include <silkworm/db/snapshots/bittorrent/torrent_file.hpp>
#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/db/stages.hpp>
#include <silkworm/infra/common/ensure.hpp>
Expand Down Expand Up @@ -86,6 +87,10 @@ Task<void> SnapshotSync::setup_and_run() {

co_await setup();

[[maybe_unused]] auto snapshot_merged_subscription = snapshot_merger_.on_snapshot_merged([this](BlockNumRange range) {
this->seed_frozen_bundle(range);
});

co_await (
snapshot_freezer_.run_loop() &&
snapshot_merger_.run_loop());
Expand Down Expand Up @@ -114,6 +119,8 @@ Task<void> SnapshotSync::setup() {
// Set snapshot repository into snapshot-aware database access
db::DataModel::set_snapshot_repository(&repository_);

seed_frozen_local_snapshots();

std::scoped_lock lock{setup_done_mutex_};
setup_done_ = true;
setup_done_cond_var_.notify_all();
Expand Down Expand Up @@ -164,7 +171,7 @@ Task<void> SnapshotSync::download_snapshots() {
auto log_added = [](const std::filesystem::path& snapshot_file) {
SILK_TRACE << "SnapshotSync: download started for: " << snapshot_file.filename().string();
};
const auto added_connection = client_.added_subscription.connect(log_added);
boost::signals2::scoped_connection added_subscription{client_.added_subscription.connect(log_added)};

size_t completed = 0;
auto log_stats = [&](lt::span<const int64_t> counters) {
Expand All @@ -186,15 +193,15 @@ Task<void> SnapshotSync::download_snapshots() {
SILK_TRACE << "SnapshotSync: counters dump [" << counters_dump << "]";
}
};
const auto stats_connection = client_.stats_subscription.connect(log_stats);
boost::signals2::scoped_connection stats_subscription{client_.stats_subscription.connect(log_stats)};

auto executor = co_await boost::asio::this_coro::executor;
// make the buffer bigger so that try_send always succeeds in case of duplicate files (see snapshot_set below)
concurrency::Channel<std::filesystem::path> completed_channel{executor, num_snapshots * 2};
auto log_completed = [&](const std::filesystem::path& snapshot_file) {
completed_channel.try_send(snapshot_file);
};
const auto completed_connection = client_.completed_subscription.connect(log_completed);
boost::signals2::scoped_connection completed_subscription{client_.completed_subscription.connect(log_completed)};

for (const auto& preverified_snapshot : snapshot_config.preverified_snapshots()) {
SILK_TRACE << "SnapshotSync: adding info hash for preverified: " << preverified_snapshot.file_name;
Expand All @@ -218,10 +225,6 @@ Task<void> SnapshotSync::download_snapshots() {
}

SILK_INFO << "SnapshotSync: sync completed: [" << num_snapshots << "/" << num_snapshots << "]";

added_connection.disconnect();
completed_connection.disconnect();
stats_connection.disconnect();
}

Task<void> SnapshotSync::build_missing_indexes() {
Expand Down Expand Up @@ -266,6 +269,43 @@ Task<void> SnapshotSync::build_missing_indexes() {
SILK_INFO << "SnapshotSync: built missing indexes";
}

void SnapshotSync::seed_frozen_local_snapshots() {
for (auto& bundle_ptr : repository_.view_bundles()) {
auto& bundle = *bundle_ptr;
bool is_frozen = bundle.block_range().size() >= kMaxMergerSnapshotSize;
bool is_preverified = bundle.block_to() <= snapshots_config_.max_block_number() + 1;
if (is_frozen && !is_preverified) {
seed_bundle(bundle);
}
}
}

void SnapshotSync::seed_frozen_bundle(BlockNumRange range) {
bool is_frozen = range.size() >= kMaxMergerSnapshotSize;
auto bundle = repository_.find_bundle(range.start);
if (bundle && (bundle->block_range() == range) && is_frozen) {
seed_bundle(*bundle);
}
}

void SnapshotSync::seed_bundle(SnapshotBundle& bundle) {
for (auto& path : bundle.snapshot_paths()) {
seed_snapshot(path);
}
}

void SnapshotSync::seed_snapshot(const SnapshotPath& path) {
std::filesystem::path torrent_path = path.path().concat(".torrent");
auto torrent_file =
std::filesystem::exists(torrent_path)
? bittorrent::TorrentFile{torrent_path}
: bittorrent::TorrentFile::from_source_file(path.path());
if (!std::filesystem::exists(torrent_path)) {
torrent_file.save(torrent_path);
}
client_.add_info_hash(path.path().filename().string(), torrent_file.info_hash());
}

void SnapshotSync::update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping) {
update_block_headers(txn, max_block_available, is_stopping);
update_block_bodies(txn, max_block_available);
Expand Down
8 changes: 8 additions & 0 deletions silkworm/db/snapshot_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include <silkworm/db/snapshot_merger.hpp>
#include <silkworm/db/snapshots/bittorrent/client.hpp>
#include <silkworm/db/snapshots/config.hpp>
#include <silkworm/db/snapshots/snapshot_bundle.hpp>
#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/db/snapshots/snapshot_repository.hpp>
#include <silkworm/db/snapshots/snapshot_settings.hpp>
#include <silkworm/db/stage_scheduler.hpp>
Expand Down Expand Up @@ -61,6 +63,12 @@ class SnapshotSync {
Task<void> setup_and_run();
Task<void> setup();
Task<void> build_missing_indexes();

void seed_frozen_local_snapshots();
void seed_frozen_bundle(BlockNumRange range);
void seed_bundle(snapshots::SnapshotBundle& bundle);
void seed_snapshot(const snapshots::SnapshotPath& path);

void update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping);
void update_block_headers(db::RWTxn& txn, BlockNum max_block_available, const std::function<bool()>& is_stopping);
void update_block_bodies(db::RWTxn& txn, BlockNum max_block_available);
Expand Down
15 changes: 10 additions & 5 deletions silkworm/db/snapshots/bittorrent/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ namespace fs = std::filesystem;
using namespace std::chrono_literals;

std::vector<char> BitTorrentClient::load_file(const fs::path& filename) {
std::ifstream input_file_stream{filename, std::ios_base::binary};
input_file_stream.unsetf(std::ios_base::skipws);
return {std::istream_iterator<char>(input_file_stream), std::istream_iterator<char>()};
if (!std::filesystem::exists(filename)) return {};
std::ifstream input_file_stream{filename, std::ios::binary | std::ios::ate};
input_file_stream.exceptions(std::ios::failbit | std::ios::badbit);
std::streamsize file_size = input_file_stream.tellg();
std::vector<char> contents(static_cast<size_t>(file_size));
input_file_stream.seekg(0);
input_file_stream.read(contents.data(), file_size);
return contents;
}

void BitTorrentClient::save_file(const fs::path& filename, const std::vector<char>& data) {
SILK_TRACE << "Save #data=" << data.size() << " in file: " << filename;
std::ofstream output_file_stream{filename, std::ios_base::binary};
output_file_stream.unsetf(std::ios_base::skipws);
std::ofstream output_file_stream{filename, std::ios::binary | std::ios::trunc};
output_file_stream.exceptions(std::ios::failbit | std::ios::badbit);
output_file_stream.write(data.data(), static_cast<std::streamsize>(data.size()));
}

Expand Down
15 changes: 12 additions & 3 deletions silkworm/db/snapshots/bittorrent/client_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,14 @@ static inline std::vector<char> test_resume_data() {
return resume_data;
}

TEST_CASE("BitTorrentClient::load_file", "[silkworm][snapshot][bittorrent]") {
TemporaryDirectory tmp_dir;
const auto path = tmp_dir.path() / "test.resume";
const auto resume_data = test_resume_data();
BitTorrentClientForTest::save_file(path, resume_data);
CHECK(BitTorrentClientForTest::load_file(path) == resume_data);
}

TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent]") {
TemporaryDirectory tmp_dir;
BitTorrentSettings settings;
Expand All @@ -123,16 +131,17 @@ TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent
SECTION("nonempty resume dir") {
const auto resume_dir_path = settings.repository_path / BitTorrentClient::kResumeDirName;
std::filesystem::create_directories(resume_dir_path);

const auto ignored_file{resume_dir_path / "a.txt"};
BitTorrentClientForTest::save_file(ignored_file, std::vector<char>{});

const auto empty_resume_file{resume_dir_path / "a.resume"};
BitTorrentClientForTest::save_file(empty_resume_file, std::vector<char>{});
const auto invalid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"};
std::vector<char> invalid_resume_data{};
BitTorrentClientForTest::save_file(invalid_resume_file, invalid_resume_data);

const auto valid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"};
std::vector<char> resume_data{test_resume_data()};
BitTorrentClientForTest::save_file(valid_resume_file, resume_data);

CHECK_NOTHROW(BitTorrentClient{settings});
}
}
Expand Down
19 changes: 19 additions & 0 deletions silkworm/db/snapshots/bittorrent/torrent_file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

#include "torrent_file.hpp"

#include <fstream>
#include <iterator>
#include <sstream>
#include <string>

#include <libtorrent/create_torrent.hpp>
Expand All @@ -37,6 +39,10 @@ TorrentFile::TorrentFile(ByteView data)
: params_(lt::load_torrent_buffer(byte_view_to_str_span(data))) {
}

TorrentFile::TorrentFile(const std::filesystem::path& path)
: params_(lt::load_torrent_file(path.string())) {
}

TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date) {
lt::file_storage storage;
lt::create_flags_t flags = lt::create_torrent::v1_only;
Expand All @@ -57,10 +63,23 @@ TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_fi
return TorrentFile{string_view_to_byte_view(data)};
}

std::string TorrentFile::info_hash() const {
std::stringstream stream;
stream << params_.ti->info_hashes().get_best();
return stream.str();
}

Bytes TorrentFile::to_bytes() const {
std::string data;
lt::bencode(std::back_inserter(data), lt::write_torrent_file(params_));
return string_to_bytes(data);
}

void TorrentFile::save(const std::filesystem::path& path) {
Bytes data = to_bytes();
std::ofstream file{path, std::ios::binary | std::ios::trunc};
file.exceptions(std::ios::failbit | std::ios::badbit);
file << byte_view_to_string_view(data);
}

} // namespace silkworm::snapshots::bittorrent
3 changes: 3 additions & 0 deletions silkworm/db/snapshots/bittorrent/torrent_file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ namespace silkworm::snapshots::bittorrent {
class TorrentFile {
public:
TorrentFile(ByteView data);
TorrentFile(const std::filesystem::path& path);

static TorrentFile from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date = 0);

const lt::add_torrent_params& params() const { return params_; }
std::string info_hash() const;
Bytes to_bytes() const;
void save(const std::filesystem::path& path);

private:
lt::add_torrent_params params_;
Expand Down
60 changes: 43 additions & 17 deletions silkworm/db/snapshots/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,49 @@

#include "config.hpp"

#include <map>
#include <utility>
#include <span>

#include <boost/algorithm/string.hpp>

#include <silkworm/db/snapshots/snapshot_path.hpp>
#include <silkworm/infra/common/log.hpp>
#include <silkworm/core/common/small_map.hpp>

#include "config/bor_mainnet.hpp"
#include "config/mainnet.hpp"
#include "config/mumbai.hpp"
#include "config/sepolia.hpp"
#include "snapshot_path.hpp"
#include "snapshot_size.hpp"

namespace silkworm::snapshots {

inline constexpr SmallMap<ChainId, std::span<const Entry>> kKnownConfigGeneratedEntries{
{*kKnownChainNameToId.find("mainnet"sv), {kMainnetSnapshots.data(), kMainnetSnapshots.size()}},
{*kKnownChainNameToId.find("sepolia"sv), {kSepoliaSnapshots.data(), kSepoliaSnapshots.size()}},
{*kKnownChainNameToId.find("bor-mainnet"sv), {kBorMainnetSnapshots.data(), kBorMainnetSnapshots.size()}},
{*kKnownChainNameToId.find("mumbai"sv), {kMumbaiSnapshots.data(), kMumbaiSnapshots.size()}},
};

Config Config::lookup_known_config(ChainId chain_id) {
const auto config = kKnownSnapshotConfigs.find(chain_id);
if (!config) {
const auto entries_ptr = kKnownConfigGeneratedEntries.find(chain_id);
if (!entries_ptr) {
return Config{PreverifiedList{}};
}
return Config{PreverifiedList(config->begin(), config->end())};

PreverifiedList entries(entries_ptr->begin(), entries_ptr->end());
entries = remove_unsupported_snapshots(entries);

return Config{std::move(entries)};
}

Config::Config(PreverifiedList preverified_snapshots)
: preverified_snapshots_(std::move(preverified_snapshots)), max_block_number_(compute_max_block()) {
remove_unsupported_snapshots();
Config::Config(PreverifiedList entries)
: entries_(std::move(entries)),
max_block_number_(compute_max_block(entries_)) {
}

BlockNum Config::compute_max_block() {
BlockNum Config::compute_max_block(const PreverifiedList& entries) {
BlockNum max_block{0};
for (const auto& preverified_entry : preverified_snapshots_) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{preverified_entry.file_name});
for (const auto& entry : entries) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name});
if (!snapshot_path) continue;
if (!snapshot_path->is_segment()) continue;
if (snapshot_path->type() != SnapshotType::headers) continue;
Expand All @@ -53,16 +69,26 @@ BlockNum Config::compute_max_block() {
return max_block > 0 ? max_block - 1 : 0;
}

void Config::remove_unsupported_snapshots() {
PreverifiedList Config::remove_unsupported_snapshots(const PreverifiedList& entries) {
static constexpr std::array kUnsupportedSnapshotNameTokens = {
"accessor/"sv, "domain/"sv, "history/"sv, "idx/"sv, "manifest.txt"sv, "salt-blocks.txt"sv, "salt-state.txt"sv, "blobsidecars.seg"sv};

PreverifiedList results = entries;

// Check if a snapshot contains any of unsupported tokens
std::erase_if(preverified_snapshots_, [&](const auto& snapshot) {
return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&snapshot](const auto& token) {
return boost::algorithm::contains(snapshot.file_name, token);
std::erase_if(results, [&](const auto& entry) {
return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&entry](const auto& token) {
return boost::algorithm::contains(entry.file_name, token);
});
});

// Exclude small snapshots
std::erase_if(results, [&](const auto& entry) {
const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name});
return !snapshot_path.has_value() || (snapshot_path->block_range().size() < kMaxMergerSnapshotSize);
});

return results;
}

} // namespace silkworm::snapshots
Loading

0 comments on commit cbc4aeb

Please sign in to comment.