From cbc4aeb00f1d81433043a93704bde92e6170e42c Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Fri, 6 Sep 2024 08:29:55 +0200 Subject: [PATCH] db: snapshots seeding (#2302) --- silkworm/db/snapshot_merger.cpp | 6 ++ silkworm/db/snapshot_merger.hpp | 12 +++- silkworm/db/snapshot_sync.cpp | 54 ++++++++++++++--- silkworm/db/snapshot_sync.hpp | 8 +++ silkworm/db/snapshots/bittorrent/client.cpp | 15 +++-- .../db/snapshots/bittorrent/client_test.cpp | 15 ++++- .../db/snapshots/bittorrent/torrent_file.cpp | 19 ++++++ .../db/snapshots/bittorrent/torrent_file.hpp | 3 + silkworm/db/snapshots/config.cpp | 60 +++++++++++++------ silkworm/db/snapshots/config.hpp | 28 +++------ silkworm/db/snapshots/config_test.cpp | 2 +- silkworm/db/snapshots/snapshot_path.hpp | 2 +- silkworm/db/snapshots/snapshot_repository.hpp | 3 +- silkworm/db/snapshots/snapshot_size.hpp | 25 ++++++++ 14 files changed, 195 insertions(+), 57 deletions(-) create mode 100644 silkworm/db/snapshots/snapshot_size.hpp diff --git a/silkworm/db/snapshot_merger.cpp b/silkworm/db/snapshot_merger.cpp index 196bf3da4f..f447ff2882 100644 --- a/silkworm/db/snapshot_merger.cpp +++ b/silkworm/db/snapshot_merger.cpp @@ -146,6 +146,12 @@ void SnapshotMerger::commit(std::shared_ptr result) { for (auto& merged_bundle : merged_bundles) { schedule_bundle_cleanup(*merged_bundle); } + + on_snapshot_merged_signal_(bundle.block_range()); +} + +boost::signals2::scoped_connection SnapshotMerger::on_snapshot_merged(std::function callback) { + return on_snapshot_merged_signal_.connect(std::move(callback)); } Task SnapshotMerger::cleanup() { diff --git a/silkworm/db/snapshot_merger.hpp b/silkworm/db/snapshot_merger.hpp index 5674ca7efe..9e063d14f6 100644 --- a/silkworm/db/snapshot_merger.hpp +++ b/silkworm/db/snapshot_merger.hpp @@ -16,8 +16,15 @@ #pragma once +#include + +#include + +#include + #include "data_migration.hpp" #include "snapshots/snapshot_repository.hpp" +#include "snapshots/snapshot_size.hpp" namespace silkworm::db { @@ -29,9 +36,11 @@ class SnapshotMerger : public DataMigration { : snapshots_(snapshots), tmp_dir_path_(std::move(tmp_dir_path)) {} + boost::signals2::scoped_connection on_snapshot_merged(std::function callback); + private: static constexpr size_t kBatchSize = 10; - static constexpr size_t kMaxSnapshotSize = 100'000; + static constexpr size_t kMaxSnapshotSize = snapshots::kMaxMergerSnapshotSize; const char* name() const override { return "SnapshotMerger"; } std::unique_ptr next_command() override; @@ -42,6 +51,7 @@ class SnapshotMerger : public DataMigration { snapshots::SnapshotRepository& snapshots_; std::filesystem::path tmp_dir_path_; + boost::signals2::signal on_snapshot_merged_signal_; }; } // namespace silkworm::db diff --git a/silkworm/db/snapshot_sync.cpp b/silkworm/db/snapshot_sync.cpp index 71d4ab80da..02942da982 100644 --- a/silkworm/db/snapshot_sync.cpp +++ b/silkworm/db/snapshot_sync.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include #include #include @@ -86,6 +87,10 @@ Task SnapshotSync::setup_and_run() { co_await setup(); + [[maybe_unused]] auto snapshot_merged_subscription = snapshot_merger_.on_snapshot_merged([this](BlockNumRange range) { + this->seed_frozen_bundle(range); + }); + co_await ( snapshot_freezer_.run_loop() && snapshot_merger_.run_loop()); @@ -114,6 +119,8 @@ Task SnapshotSync::setup() { // Set snapshot repository into snapshot-aware database access db::DataModel::set_snapshot_repository(&repository_); + seed_frozen_local_snapshots(); + std::scoped_lock lock{setup_done_mutex_}; setup_done_ = true; setup_done_cond_var_.notify_all(); @@ -164,7 +171,7 @@ Task SnapshotSync::download_snapshots() { auto log_added = [](const std::filesystem::path& snapshot_file) { SILK_TRACE << "SnapshotSync: download started for: " << snapshot_file.filename().string(); }; - const auto added_connection = client_.added_subscription.connect(log_added); + boost::signals2::scoped_connection added_subscription{client_.added_subscription.connect(log_added)}; size_t completed = 0; auto log_stats = [&](lt::span counters) { @@ -186,7 +193,7 @@ Task SnapshotSync::download_snapshots() { SILK_TRACE << "SnapshotSync: counters dump [" << counters_dump << "]"; } }; - const auto stats_connection = client_.stats_subscription.connect(log_stats); + boost::signals2::scoped_connection stats_subscription{client_.stats_subscription.connect(log_stats)}; auto executor = co_await boost::asio::this_coro::executor; // make the buffer bigger so that try_send always succeeds in case of duplicate files (see snapshot_set below) @@ -194,7 +201,7 @@ Task SnapshotSync::download_snapshots() { auto log_completed = [&](const std::filesystem::path& snapshot_file) { completed_channel.try_send(snapshot_file); }; - const auto completed_connection = client_.completed_subscription.connect(log_completed); + boost::signals2::scoped_connection completed_subscription{client_.completed_subscription.connect(log_completed)}; for (const auto& preverified_snapshot : snapshot_config.preverified_snapshots()) { SILK_TRACE << "SnapshotSync: adding info hash for preverified: " << preverified_snapshot.file_name; @@ -218,10 +225,6 @@ Task SnapshotSync::download_snapshots() { } SILK_INFO << "SnapshotSync: sync completed: [" << num_snapshots << "/" << num_snapshots << "]"; - - added_connection.disconnect(); - completed_connection.disconnect(); - stats_connection.disconnect(); } Task SnapshotSync::build_missing_indexes() { @@ -266,6 +269,43 @@ Task SnapshotSync::build_missing_indexes() { SILK_INFO << "SnapshotSync: built missing indexes"; } +void SnapshotSync::seed_frozen_local_snapshots() { + for (auto& bundle_ptr : repository_.view_bundles()) { + auto& bundle = *bundle_ptr; + bool is_frozen = bundle.block_range().size() >= kMaxMergerSnapshotSize; + bool is_preverified = bundle.block_to() <= snapshots_config_.max_block_number() + 1; + if (is_frozen && !is_preverified) { + seed_bundle(bundle); + } + } +} + +void SnapshotSync::seed_frozen_bundle(BlockNumRange range) { + bool is_frozen = range.size() >= kMaxMergerSnapshotSize; + auto bundle = repository_.find_bundle(range.start); + if (bundle && (bundle->block_range() == range) && is_frozen) { + seed_bundle(*bundle); + } +} + +void SnapshotSync::seed_bundle(SnapshotBundle& bundle) { + for (auto& path : bundle.snapshot_paths()) { + seed_snapshot(path); + } +} + +void SnapshotSync::seed_snapshot(const SnapshotPath& path) { + std::filesystem::path torrent_path = path.path().concat(".torrent"); + auto torrent_file = + std::filesystem::exists(torrent_path) + ? bittorrent::TorrentFile{torrent_path} + : bittorrent::TorrentFile::from_source_file(path.path()); + if (!std::filesystem::exists(torrent_path)) { + torrent_file.save(torrent_path); + } + client_.add_info_hash(path.path().filename().string(), torrent_file.info_hash()); +} + void SnapshotSync::update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function& is_stopping) { update_block_headers(txn, max_block_available, is_stopping); update_block_bodies(txn, max_block_available); diff --git a/silkworm/db/snapshot_sync.hpp b/silkworm/db/snapshot_sync.hpp index 5aa0973965..8ad2653c7f 100644 --- a/silkworm/db/snapshot_sync.hpp +++ b/silkworm/db/snapshot_sync.hpp @@ -34,6 +34,8 @@ #include #include #include +#include +#include #include #include #include @@ -61,6 +63,12 @@ class SnapshotSync { Task setup_and_run(); Task setup(); Task build_missing_indexes(); + + void seed_frozen_local_snapshots(); + void seed_frozen_bundle(BlockNumRange range); + void seed_bundle(snapshots::SnapshotBundle& bundle); + void seed_snapshot(const snapshots::SnapshotPath& path); + void update_database(db::RWTxn& txn, BlockNum max_block_available, const std::function& is_stopping); void update_block_headers(db::RWTxn& txn, BlockNum max_block_available, const std::function& is_stopping); void update_block_bodies(db::RWTxn& txn, BlockNum max_block_available); diff --git a/silkworm/db/snapshots/bittorrent/client.cpp b/silkworm/db/snapshots/bittorrent/client.cpp index 3244f1ba02..d0c398a61c 100644 --- a/silkworm/db/snapshots/bittorrent/client.cpp +++ b/silkworm/db/snapshots/bittorrent/client.cpp @@ -44,15 +44,20 @@ namespace fs = std::filesystem; using namespace std::chrono_literals; std::vector BitTorrentClient::load_file(const fs::path& filename) { - std::ifstream input_file_stream{filename, std::ios_base::binary}; - input_file_stream.unsetf(std::ios_base::skipws); - return {std::istream_iterator(input_file_stream), std::istream_iterator()}; + if (!std::filesystem::exists(filename)) return {}; + std::ifstream input_file_stream{filename, std::ios::binary | std::ios::ate}; + input_file_stream.exceptions(std::ios::failbit | std::ios::badbit); + std::streamsize file_size = input_file_stream.tellg(); + std::vector contents(static_cast(file_size)); + input_file_stream.seekg(0); + input_file_stream.read(contents.data(), file_size); + return contents; } void BitTorrentClient::save_file(const fs::path& filename, const std::vector& data) { SILK_TRACE << "Save #data=" << data.size() << " in file: " << filename; - std::ofstream output_file_stream{filename, std::ios_base::binary}; - output_file_stream.unsetf(std::ios_base::skipws); + std::ofstream output_file_stream{filename, std::ios::binary | std::ios::trunc}; + output_file_stream.exceptions(std::ios::failbit | std::ios::badbit); output_file_stream.write(data.data(), static_cast(data.size())); } diff --git a/silkworm/db/snapshots/bittorrent/client_test.cpp b/silkworm/db/snapshots/bittorrent/client_test.cpp index 17455434d9..502ab41ea5 100644 --- a/silkworm/db/snapshots/bittorrent/client_test.cpp +++ b/silkworm/db/snapshots/bittorrent/client_test.cpp @@ -105,6 +105,14 @@ static inline std::vector test_resume_data() { return resume_data; } +TEST_CASE("BitTorrentClient::load_file", "[silkworm][snapshot][bittorrent]") { + TemporaryDirectory tmp_dir; + const auto path = tmp_dir.path() / "test.resume"; + const auto resume_data = test_resume_data(); + BitTorrentClientForTest::save_file(path, resume_data); + CHECK(BitTorrentClientForTest::load_file(path) == resume_data); +} + TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent]") { TemporaryDirectory tmp_dir; BitTorrentSettings settings; @@ -123,16 +131,17 @@ TEST_CASE("BitTorrentClient::BitTorrentClient", "[silkworm][snapshot][bittorrent SECTION("nonempty resume dir") { const auto resume_dir_path = settings.repository_path / BitTorrentClient::kResumeDirName; std::filesystem::create_directories(resume_dir_path); + const auto ignored_file{resume_dir_path / "a.txt"}; BitTorrentClientForTest::save_file(ignored_file, std::vector{}); + const auto empty_resume_file{resume_dir_path / "a.resume"}; BitTorrentClientForTest::save_file(empty_resume_file, std::vector{}); - const auto invalid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"}; - std::vector invalid_resume_data{}; - BitTorrentClientForTest::save_file(invalid_resume_file, invalid_resume_data); + const auto valid_resume_file{resume_dir_path / "83112dec4bec180cff67e01d6345c88c3134fd26.resume"}; std::vector resume_data{test_resume_data()}; BitTorrentClientForTest::save_file(valid_resume_file, resume_data); + CHECK_NOTHROW(BitTorrentClient{settings}); } } diff --git a/silkworm/db/snapshots/bittorrent/torrent_file.cpp b/silkworm/db/snapshots/bittorrent/torrent_file.cpp index c4130f909a..3c93dc47bc 100644 --- a/silkworm/db/snapshots/bittorrent/torrent_file.cpp +++ b/silkworm/db/snapshots/bittorrent/torrent_file.cpp @@ -16,7 +16,9 @@ #include "torrent_file.hpp" +#include #include +#include #include #include @@ -37,6 +39,10 @@ TorrentFile::TorrentFile(ByteView data) : params_(lt::load_torrent_buffer(byte_view_to_str_span(data))) { } +TorrentFile::TorrentFile(const std::filesystem::path& path) + : params_(lt::load_torrent_file(path.string())) { +} + TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date) { lt::file_storage storage; lt::create_flags_t flags = lt::create_torrent::v1_only; @@ -57,10 +63,23 @@ TorrentFile TorrentFile::from_source_file(const std::filesystem::path& source_fi return TorrentFile{string_view_to_byte_view(data)}; } +std::string TorrentFile::info_hash() const { + std::stringstream stream; + stream << params_.ti->info_hashes().get_best(); + return stream.str(); +} + Bytes TorrentFile::to_bytes() const { std::string data; lt::bencode(std::back_inserter(data), lt::write_torrent_file(params_)); return string_to_bytes(data); } +void TorrentFile::save(const std::filesystem::path& path) { + Bytes data = to_bytes(); + std::ofstream file{path, std::ios::binary | std::ios::trunc}; + file.exceptions(std::ios::failbit | std::ios::badbit); + file << byte_view_to_string_view(data); +} + } // namespace silkworm::snapshots::bittorrent diff --git a/silkworm/db/snapshots/bittorrent/torrent_file.hpp b/silkworm/db/snapshots/bittorrent/torrent_file.hpp index dc643f1b8e..f5b1aade10 100644 --- a/silkworm/db/snapshots/bittorrent/torrent_file.hpp +++ b/silkworm/db/snapshots/bittorrent/torrent_file.hpp @@ -31,11 +31,14 @@ namespace silkworm::snapshots::bittorrent { class TorrentFile { public: TorrentFile(ByteView data); + TorrentFile(const std::filesystem::path& path); static TorrentFile from_source_file(const std::filesystem::path& source_file_path, std::time_t creation_date = 0); const lt::add_torrent_params& params() const { return params_; } + std::string info_hash() const; Bytes to_bytes() const; + void save(const std::filesystem::path& path); private: lt::add_torrent_params params_; diff --git a/silkworm/db/snapshots/config.cpp b/silkworm/db/snapshots/config.cpp index f7973eb2ca..994d52dc24 100644 --- a/silkworm/db/snapshots/config.cpp +++ b/silkworm/db/snapshots/config.cpp @@ -16,33 +16,49 @@ #include "config.hpp" -#include -#include +#include #include -#include -#include +#include + +#include "config/bor_mainnet.hpp" +#include "config/mainnet.hpp" +#include "config/mumbai.hpp" +#include "config/sepolia.hpp" +#include "snapshot_path.hpp" +#include "snapshot_size.hpp" namespace silkworm::snapshots { +inline constexpr SmallMap> kKnownConfigGeneratedEntries{ + {*kKnownChainNameToId.find("mainnet"sv), {kMainnetSnapshots.data(), kMainnetSnapshots.size()}}, + {*kKnownChainNameToId.find("sepolia"sv), {kSepoliaSnapshots.data(), kSepoliaSnapshots.size()}}, + {*kKnownChainNameToId.find("bor-mainnet"sv), {kBorMainnetSnapshots.data(), kBorMainnetSnapshots.size()}}, + {*kKnownChainNameToId.find("mumbai"sv), {kMumbaiSnapshots.data(), kMumbaiSnapshots.size()}}, +}; + Config Config::lookup_known_config(ChainId chain_id) { - const auto config = kKnownSnapshotConfigs.find(chain_id); - if (!config) { + const auto entries_ptr = kKnownConfigGeneratedEntries.find(chain_id); + if (!entries_ptr) { return Config{PreverifiedList{}}; } - return Config{PreverifiedList(config->begin(), config->end())}; + + PreverifiedList entries(entries_ptr->begin(), entries_ptr->end()); + entries = remove_unsupported_snapshots(entries); + + return Config{std::move(entries)}; } -Config::Config(PreverifiedList preverified_snapshots) - : preverified_snapshots_(std::move(preverified_snapshots)), max_block_number_(compute_max_block()) { - remove_unsupported_snapshots(); +Config::Config(PreverifiedList entries) + : entries_(std::move(entries)), + max_block_number_(compute_max_block(entries_)) { } -BlockNum Config::compute_max_block() { +BlockNum Config::compute_max_block(const PreverifiedList& entries) { BlockNum max_block{0}; - for (const auto& preverified_entry : preverified_snapshots_) { - const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{preverified_entry.file_name}); + for (const auto& entry : entries) { + const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name}); if (!snapshot_path) continue; if (!snapshot_path->is_segment()) continue; if (snapshot_path->type() != SnapshotType::headers) continue; @@ -53,16 +69,26 @@ BlockNum Config::compute_max_block() { return max_block > 0 ? max_block - 1 : 0; } -void Config::remove_unsupported_snapshots() { +PreverifiedList Config::remove_unsupported_snapshots(const PreverifiedList& entries) { static constexpr std::array kUnsupportedSnapshotNameTokens = { "accessor/"sv, "domain/"sv, "history/"sv, "idx/"sv, "manifest.txt"sv, "salt-blocks.txt"sv, "salt-state.txt"sv, "blobsidecars.seg"sv}; + PreverifiedList results = entries; + // Check if a snapshot contains any of unsupported tokens - std::erase_if(preverified_snapshots_, [&](const auto& snapshot) { - return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&snapshot](const auto& token) { - return boost::algorithm::contains(snapshot.file_name, token); + std::erase_if(results, [&](const auto& entry) { + return std::any_of(kUnsupportedSnapshotNameTokens.begin(), kUnsupportedSnapshotNameTokens.end(), [&entry](const auto& token) { + return boost::algorithm::contains(entry.file_name, token); }); }); + + // Exclude small snapshots + std::erase_if(results, [&](const auto& entry) { + const auto snapshot_path = SnapshotPath::parse(std::filesystem::path{entry.file_name}); + return !snapshot_path.has_value() || (snapshot_path->block_range().size() < kMaxMergerSnapshotSize); + }); + + return results; } } // namespace silkworm::snapshots diff --git a/silkworm/db/snapshots/config.hpp b/silkworm/db/snapshots/config.hpp index 94deb5b798..df13da6058 100644 --- a/silkworm/db/snapshots/config.hpp +++ b/silkworm/db/snapshots/config.hpp @@ -16,17 +16,12 @@ #pragma once -#include -#include #include #include -#include -#include -#include -#include -#include -#include +#include + +#include "entry.hpp" namespace silkworm::snapshots { @@ -36,24 +31,17 @@ class Config { public: static Config lookup_known_config(ChainId chain_id); - explicit Config(PreverifiedList preverified_snapshots); + explicit Config(PreverifiedList entries); - [[nodiscard]] const PreverifiedList& preverified_snapshots() const { return preverified_snapshots_; } + [[nodiscard]] const PreverifiedList& preverified_snapshots() const { return entries_; } [[nodiscard]] BlockNum max_block_number() const { return max_block_number_; } private: - BlockNum compute_max_block(); - void remove_unsupported_snapshots(); + static BlockNum compute_max_block(const PreverifiedList& entries); + static PreverifiedList remove_unsupported_snapshots(const PreverifiedList& entries); - PreverifiedList preverified_snapshots_; + PreverifiedList entries_; BlockNum max_block_number_; }; -inline constexpr SmallMap> kKnownSnapshotConfigs{ - {*kKnownChainNameToId.find("mainnet"sv), {kMainnetSnapshots.data(), kMainnetSnapshots.size()}}, - {*kKnownChainNameToId.find("sepolia"sv), {kSepoliaSnapshots.data(), kSepoliaSnapshots.size()}}, - {*kKnownChainNameToId.find("bor-mainnet"sv), {kBorMainnetSnapshots.data(), kBorMainnetSnapshots.size()}}, - {*kKnownChainNameToId.find("mumbai"sv), {kMumbaiSnapshots.data(), kMumbaiSnapshots.size()}}, -}; - } // namespace silkworm::snapshots diff --git a/silkworm/db/snapshots/config_test.cpp b/silkworm/db/snapshots/config_test.cpp index 96efd3dc91..b76559bb69 100644 --- a/silkworm/db/snapshots/config_test.cpp +++ b/silkworm/db/snapshots/config_test.cpp @@ -33,7 +33,7 @@ TEST_CASE("Config::lookup_known_config", "[silkworm][snapshot][config]") { } SECTION("mainnet") { - constexpr std::size_t kMaxBlockNumber{20'461'000}; + constexpr std::size_t kMaxBlockNumber{20'400'000}; const auto mainnet_snapshot_config = Config::lookup_known_config(1); CHECK(mainnet_snapshot_config.max_block_number() == kMaxBlockNumber - 1); } diff --git a/silkworm/db/snapshots/snapshot_path.hpp b/silkworm/db/snapshots/snapshot_path.hpp index a34efdc39a..5a3e6bc02f 100644 --- a/silkworm/db/snapshots/snapshot_path.hpp +++ b/silkworm/db/snapshots/snapshot_path.hpp @@ -63,8 +63,8 @@ class SnapshotPath { [[nodiscard]] uint8_t version() const { return version_; } [[nodiscard]] BlockNum block_from() const { return block_from_; } - [[nodiscard]] BlockNum block_to() const { return block_to_; } + [[nodiscard]] BlockNumRange block_range() const { return BlockNumRange{block_from_, block_to_}; } [[nodiscard]] SnapshotType type() const { return type_; } diff --git a/silkworm/db/snapshots/snapshot_repository.hpp b/silkworm/db/snapshots/snapshot_repository.hpp index 4a8d4b3f6f..cfcc8fcbf2 100644 --- a/silkworm/db/snapshots/snapshot_repository.hpp +++ b/silkworm/db/snapshots/snapshot_repository.hpp @@ -108,10 +108,9 @@ class SnapshotRepository { } [[nodiscard]] std::pair, std::shared_ptr> find_segment(SnapshotType type, BlockNum number) const; - - private: std::shared_ptr find_bundle(BlockNum number) const; + private: [[nodiscard]] SnapshotPathList get_segment_files() const { return get_files(kSegmentExtension); } diff --git a/silkworm/db/snapshots/snapshot_size.hpp b/silkworm/db/snapshots/snapshot_size.hpp new file mode 100644 index 0000000000..82a4537a29 --- /dev/null +++ b/silkworm/db/snapshots/snapshot_size.hpp @@ -0,0 +1,25 @@ +/* + Copyright 2024 The Silkworm Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +#pragma once + +#include + +namespace silkworm::snapshots { + +static constexpr size_t kMaxMergerSnapshotSize = 100'000; + +} // namespace silkworm::snapshots