From 43bc71fef678eb5480d6ee010cc8875bd5618be1 Mon Sep 17 00:00:00 2001 From: Yu Zhang Date: Tue, 10 Sep 2024 13:23:13 -0700 Subject: [PATCH] Add an internal API MemTableList::GetEditForDroppingCurrentVersion (#13001) Summary: Prepare this internal API to be used by atomic data replacement. The main purpose of this API is to get a `VersionEdit` to mark the entire current `MemTableListVersion` as dropped. Flush needs the similar functionality when installing results, so that logic is refactored into a util function `GetDBRecoveryEditForObsoletingMemTables` to be shared by flush and this internal API. To test this internal API, flush's result installation is redirected to use this API when it is flushing all the immutable MemTables in debug mode. It should achieve the exact same results, just with a duplicated `VersionEdit::log_number` field that doesn't upsets the recovery logic. Pull Request resolved: https://github.com/facebook/rocksdb/pull/13001 Test Plan: Existing tests Reviewed By: pdillinger Differential Revision: D62309591 Pulled By: jowlyzhang fbshipit-source-id: e25914d9a2e281c25ab7ee31a66eaf6adfae4b88 --- db/db_impl/db_impl.h | 8 ++++ db/db_impl/db_impl_files.cc | 32 ++++++++++++++ db/memtable_list.cc | 83 ++++++++++++++++++++++++------------- db/memtable_list.h | 6 +++ 4 files changed, 100 insertions(+), 29 deletions(-) diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index 7206d85e149..1b3bafaae0c 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -2963,6 +2963,14 @@ DBOptions SanitizeOptions(const std::string& db, const DBOptions& src, CompressionType GetCompressionFlush(const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options); +// Return a VersionEdit for the DB's recovery when the `memtables` of the +// specified column family are obsolete. Specifically, the min log number to +// keep, and the WAL files that can be deleted. +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker); + // Return the earliest log file to keep after the memtable flush is // finalized. // `cfd_to_flush` is the column family whose memtable (specified in diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index 0db7293682d..bb0ff3985a9 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -722,6 +722,38 @@ void DBImpl::DeleteObsoleteFiles() { mutex_.Lock(); } +VersionEdit GetDBRecoveryEditForObsoletingMemTables( + VersionSet* vset, const ColumnFamilyData& cfd, + const autovector& edit_list, + const autovector& memtables, LogsWithPrepTracker* prep_tracker) { + VersionEdit wal_deletion_edit; + uint64_t min_wal_number_to_keep = 0; + assert(edit_list.size() > 0); + if (vset->db_options()->allow_2pc) { + // Note that if mempurge is successful, the edit_list will + // not be applicable (contains info of new min_log number to keep, + // and level 0 file path of SST file created during normal flush, + // so both pieces of information are irrelevant after a successful + // mempurge operation). + min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( + vset, cfd, edit_list, memtables, prep_tracker); + + // We piggyback the information of earliest log file to keep in the + // manifest entry for the last file flushed. + } else { + min_wal_number_to_keep = + PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list); + } + + wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep); + if (vset->db_options()->track_and_verify_wals_in_manifest) { + if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { + wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep); + } + } + return wal_deletion_edit; +} + uint64_t FindMinPrepLogReferencedByMemTable( VersionSet* vset, const autovector& memtables_to_flush) { uint64_t min_log = 0; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 3675a280b93..c3612656e24 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -582,37 +582,28 @@ Status MemTableList::TryInstallMemtableFlushResults( // TODO(myabandeh): Not sure how batch_count could be 0 here. if (batch_count > 0) { - uint64_t min_wal_number_to_keep = 0; - assert(edit_list.size() > 0); - if (vset->db_options()->allow_2pc) { - // Note that if mempurge is successful, the edit_list will - // not be applicable (contains info of new min_log number to keep, - // and level 0 file path of SST file created during normal flush, - // so both pieces of information are irrelevant after a successful - // mempurge operation). - min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( - vset, *cfd, edit_list, memtables_to_flush, prep_tracker); - - // We piggyback the information of earliest log file to keep in the - // manifest entry for the last file flushed. + VersionEdit edit; +#ifdef ROCKSDB_ASSERT_STATUS_CHECKED + if (memtables_to_flush.size() == memlist.size()) { + // TODO(yuzhangyu): remove this testing code once the + // `GetEditForDroppingCurrentVersion` API is used by the atomic data + // replacement. This function can get the same edits for wal related + // fields, and some duplicated fields as contained already in edit_list + // for column family's recovery. + edit = GetEditForDroppingCurrentVersion(cfd, vset, prep_tracker); } else { - min_wal_number_to_keep = - PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list); - } - - VersionEdit wal_deletion; - wal_deletion.SetMinLogNumberToKeep(min_wal_number_to_keep); - if (vset->db_options()->track_and_verify_wals_in_manifest) { - if (min_wal_number_to_keep > - vset->GetWalSet().GetMinWalNumberToKeep()) { - wal_deletion.DeleteWalsBefore(min_wal_number_to_keep); - } - TEST_SYNC_POINT_CALLBACK( - "MemTableList::TryInstallMemtableFlushResults:" - "AfterComputeMinWalToKeep", - nullptr); + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); } - edit_list.push_back(&wal_deletion); +#else + edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_flush, prep_tracker); +#endif // ROCKSDB_ASSERT_STATUS_CHECKED + TEST_SYNC_POINT_CALLBACK( + "MemTableList::TryInstallMemtableFlushResults:" + "AfterComputeMinWalToKeep", + nullptr); + edit_list.push_back(&edit); const auto manifest_write_cb = [this, cfd, batch_count, log_buffer, to_delete, mu](const Status& status) { @@ -1026,4 +1017,38 @@ void MemTableList::RemoveOldMemTables(uint64_t log_number, ResetTrimHistoryNeeded(); } +VersionEdit MemTableList::GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const { + assert(cfd); + auto& memlist = current_->memlist_; + if (memlist.empty()) { + return VersionEdit(); + } + + uint64_t max_next_log_number = 0; + autovector edit_list; + autovector memtables_to_drop; + for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + MemTable* m = *it; + memtables_to_drop.push_back(m); + max_next_log_number = std::max(m->GetNextLogNumber(), max_next_log_number); + } + + // Check the obsoleted MemTables' impact on WALs related to DB's recovery (min + // log number to keep, a delta of WAL files to delete). + VersionEdit edit_with_log_number; + edit_with_log_number.SetPrevLogNumber(0); + edit_with_log_number.SetLogNumber(max_next_log_number); + edit_list.push_back(&edit_with_log_number); + VersionEdit edit = GetDBRecoveryEditForObsoletingMemTables( + vset, *cfd, edit_list, memtables_to_drop, prep_tracker); + + // Set fields related to the column family's recovery. + edit.SetColumnFamily(cfd->GetID()); + edit.SetPrevLogNumber(0); + edit.SetLogNumber(max_next_log_number); + return edit; +} + } // namespace ROCKSDB_NAMESPACE diff --git a/db/memtable_list.h b/db/memtable_list.h index 218701e0b3b..dd439de5590 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -447,6 +447,12 @@ class MemTableList { void RemoveOldMemTables(uint64_t log_number, autovector* to_delete); + // This API is only used by atomic date replacement. To get an edit for + // dropping the current `MemTableListVersion`. + VersionEdit GetEditForDroppingCurrentVersion( + const ColumnFamilyData* cfd, VersionSet* vset, + LogsWithPrepTracker* prep_tracker) const; + private: friend Status InstallMemtableAtomicFlushResults( const autovector* imm_lists,