From 63a105a481c9b8fd489a3a5b28c475b771cf05dd Mon Sep 17 00:00:00 2001 From: anand76 Date: Thu, 21 Mar 2024 12:29:35 -0700 Subject: [PATCH] Enable recycle_log_file_num option for point in time recovery (#12403) Summary: This option was previously disabled due to a bug in the recovery logic. The recovery code in `DBImpl::RecoverLogFiles` couldn't tell if an EoF reported by the log reader was really an EoF or a possible corruption that made a record look like an old log record. To fix this, the log reader now explicitly reports when it encounters what looks like an old record. The recovery code treats it as a possible corruption, and uses the next sequence number in the WAL to determine if it should continue replaying the WAL. This PR also fixes a couple of bugs that log file recycling exposed in the backup and checkpoint path. Pull Request resolved: https://github.com/facebook/rocksdb/pull/12403 Test Plan: 1. Add new unit tests to verify behavior upon corruption 2. Re-enable disabled tests for verifying recycling behavior Reviewed By: ajkr Differential Revision: D54544824 Pulled By: anand1976 fbshipit-source-id: 12f5ce39bd6bc0d63b0bc6432dc4db510e0e802a --- db/db_filesnapshot.cc | 7 +- db/db_impl/db_impl.cc | 6 +- db/db_impl/db_impl_compaction_flush.cc | 6 +- db/db_impl/db_impl_open.cc | 15 +++- db/db_impl/db_impl_write.cc | 9 +- db/db_wal_test.cc | 12 +-- db/db_write_test.cc | 89 +++++++++++++++++++ db/log_reader.cc | 10 +++ db/log_reader.h | 3 + tools/db_crashtest.py | 4 + .../recycle_logs_point_in_time_recovery.md | 1 + 11 files changed, 146 insertions(+), 16 deletions(-) create mode 100644 unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 711c55c9e6b..cbc2db14f24 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -390,8 +390,11 @@ Status DBImpl::GetLiveFilesStorageInfo( info.file_number = live_wal_files[i]->LogNumber(); info.file_type = kWalFile; info.size = live_wal_files[i]->SizeFileBytes(); - // Only last should need to be trimmed - info.trim_to_size = (i + 1 == wal_size); + // Trim the log either if its the last one, or log file recycling is + // enabled. In the latter case, a hard link doesn't prevent the file + // from being renamed and recycled. So we need to copy it instead. + info.trim_to_size = (i + 1 == wal_size) || + (immutable_db_options_.recycle_log_file_num > 0); if (opts.include_checksum_info) { info.file_checksum_func_name = kUnknownFileChecksumFuncName; info.file_checksum = kUnknownFileChecksum; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ef3ce78b469..f7e6f9692b2 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1757,7 +1757,11 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } - if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) { + // Check if the file has been closed, i.e wal.writer->file() == nullptr + // which can happen if log recycling is enabled, or if all the data in + // the log has been synced + if (wal.writer->file() == nullptr || + wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) { // Fully synced logs_to_free_.push_back(wal.ReleaseWriter()); it = logs_.erase(it); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8845a2a4d7e..15b0377d427 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -163,8 +163,10 @@ IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options, if (error_recovery_in_prog) { log->file()->reset_seen_error(); } - // TODO: plumb Env::IOActivity, Env::IOPriority - io_s = log->Close(WriteOptions()); + // Normally the log file is closed when purging obsolete file, but if + // log recycling is enabled, the log file is closed here so that it + // can be reused. + io_s = log->Close(write_options); if (!io_s.ok()) { break; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d2591b6e92c..786abb74f24 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -104,7 +104,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, if (result.recycle_log_file_num && (result.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || - result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery || result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) { // - kTolerateCorruptedTailRecords is inconsistent with recycle log file // feature. WAL recycling expects recovery success upon encountering a @@ -1086,6 +1085,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, Logger* info_log; const char* fname; Status* status; // nullptr if immutable_db_options_.paranoid_checks==false + bool* old_log_record; void Corruption(size_t bytes, const Status& s) override { ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", (status == nullptr ? "(ignoring error) " : ""), fname, @@ -1094,10 +1094,19 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, *status = s; } } + + void OldLogRecord(size_t bytes) override { + if (old_log_record != nullptr) { + *old_log_record = true; + } + ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled", + fname, static_cast(bytes)); + } }; mutex_.AssertHeld(); Status status; + bool old_log_record = false; std::unordered_map version_edits; // no need to refcount because iteration is under mutex for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -1188,6 +1197,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, reporter.env = env_; reporter.info_log = immutable_db_options_.info_log.get(); reporter.fname = fname.c_str(); + reporter.old_log_record = &old_log_record; if (!immutable_db_options_.paranoid_checks || immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords) { @@ -1335,7 +1345,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } } - if (!status.ok()) { + if (!status.ok() || old_log_record) { if (status.IsNotSupported()) { // We should not treat NotSupported as corruption. It is rather a clear // sign that we are processing a WAL that is produced by an incompatible @@ -1360,6 +1370,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } // We should ignore the error but not continue replaying status = Status::OK(); + old_log_record = false; stop_replay_for_corruption = true; corrupted_wal_number = wal_number; if (corrupted_wal_found != nullptr) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index ee103a57a9c..5f18e01d24e 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -220,6 +220,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.protection_bytes_per_key != 8) { return Status::InvalidArgument( "`WriteOptions::protection_bytes_per_key` must be zero or eight"); + } else if (write_options.disableWAL && + immutable_db_options_.recycle_log_file_num > 0) { + return Status::InvalidArgument( + "WriteOptions::disableWAL option is not supported if " + "DBOptions::recycle_log_file_num > 0"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -2173,8 +2178,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_write_mutex_.Unlock(); } uint64_t recycle_log_number = 0; + // If file deletion is disabled, don't recycle logs since it'll result in + // the file getting renamed if (creating_new_log && immutable_db_options_.recycle_log_file_num && - !log_recycle_files_.empty()) { + !log_recycle_files_.empty() && IsFileDeletionsEnabled()) { recycle_log_number = log_recycle_files_.front(); } uint64_t new_log_number = diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 490ab2742dc..91070e298b6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1123,15 +1123,13 @@ TEST_F(DBWALTest, PreallocateBlock) { } #endif // !(defined NDEBUG) || !defined(OS_WIN) -TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { - // TODO(ajkr): Disabled until WAL recycling is fixed for - // `kPointInTimeRecovery`. - +TEST_F(DBWALTest, FullPurgePreservesRecycledLog) { // For github issue #1303 for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.create_if_missing = true; options.recycle_log_file_num = 2; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; if (i != 0) { options.wal_dir = alternative_wal_dir_; } @@ -1162,16 +1160,14 @@ TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { } } -TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) { - // TODO(ajkr): Disabled until WAL recycling is fixed for - // `kPointInTimeRecovery`. - +TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) { // Ensures full purge cannot delete a WAL while it's in the process of being // recycled. In particular, we force the full purge after a file has been // chosen for reuse, but before it has been renamed. for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.recycle_log_file_num = 1; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; if (i != 0) { options.wal_dir = alternative_wal_dir_; } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 792ec305a65..f464a3036b8 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -821,6 +821,95 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { ASSERT_LE(bytes_num, 1024 * 100); } +void CorruptLogFile(Env* env, Options& options, std::string log_path, + uint64_t log_num, int record_num) { + std::shared_ptr fs = env->GetFileSystem(); + std::unique_ptr file_reader; + Status status; + { + std::unique_ptr file; + status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr); + ASSERT_EQ(status, IOStatus::OK()); + file_reader.reset(new SequentialFileReader(std::move(file), log_path)); + } + std::unique_ptr reader(new log::Reader( + nullptr, std::move(file_reader), nullptr, false, log_num)); + std::string scratch; + Slice record; + uint64_t record_checksum; + for (int i = 0; i < record_num; ++i) { + ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode, + &record_checksum)); + } + uint64_t rec_start = reader->LastRecordOffset(); + reader.reset(); + { + std::unique_ptr file; + status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr); + ASSERT_EQ(status, IOStatus::OK()); + uint32_t bad_lognum = 0xff; + ASSERT_EQ(file->Write( + rec_start + 7, + Slice(reinterpret_cast(&bad_lognum), sizeof(uint32_t)), + IOOptions(), nullptr), + IOStatus::OK()); + ASSERT_OK(file->Close(IOOptions(), nullptr)); + file.reset(); + } +} + +TEST_P(DBWriteTest, RecycleLogTest) { + Options options = GetOptions(); + options.recycle_log_file_num = 0; + options.avoid_flush_during_recovery = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + Reopen(options); + ASSERT_OK(Put(Key(1), "val1")); + ASSERT_OK(Put(Key(2), "val1")); + + uint64_t latest_log_num = 0; + std::unique_ptr log_file; + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + latest_log_num = log_file->LogNumber(); + Reopen(options); + ASSERT_OK(Put(Key(3), "val3")); + + // Corrupt second entry of first log + std::string log_path = LogFileName(dbname_, latest_log_num); + CorruptLogFile(env_, options, log_path, latest_log_num, 2); + + Reopen(options); + ASSERT_EQ(Get(Key(1)), "val1"); + ASSERT_EQ(Get(Key(2)), "NOT_FOUND"); + ASSERT_EQ(Get(Key(3)), "NOT_FOUND"); +} + +TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) { + Options options = GetOptions(); + options.recycle_log_file_num = 0; + options.avoid_flush_during_recovery = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, Key(1), "val1")); + ASSERT_OK(Put(0, Key(2), "val2")); + + uint64_t latest_log_num = 0; + std::unique_ptr log_file; + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + latest_log_num = log_file->LogNumber(); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(1, Key(3), "val3")); + + // Corrupt second entry of first log + std::string log_path = LogFileName(dbname_, latest_log_num); + CorruptLogFile(env_, options, log_path, latest_log_num, 2); + + ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options), + Status::Corruption()); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/log_reader.cc b/db/log_reader.cc index 48380a735c5..da979a1ee1e 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -258,6 +258,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. scratch->clear(); + } else { + if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { + ReportOldLogRecord(scratch->size()); + } } return false; } @@ -405,6 +409,12 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { } } +void Reader::ReportOldLogRecord(size_t bytes) { + if (reporter_ != nullptr) { + reporter_->OldLogRecord(bytes); + } +} + bool Reader::ReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip diff --git a/db/log_reader.h b/db/log_reader.h index 697d1b5d58c..6e4eded0916 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -45,6 +45,8 @@ class Reader { // Some corruption was detected. "size" is the approximate number // of bytes dropped due to the corruption. virtual void Corruption(size_t bytes, const Status& status) = 0; + + virtual void OldLogRecord(size_t /*bytes*/) {} }; // Create a reader that will return log records from "*file". @@ -202,6 +204,7 @@ class Reader { // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); void ReportDrop(size_t bytes, const Status& reason); + void ReportOldLogRecord(size_t bytes); void InitCompression(const CompressionTypeRecord& compression_record); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index d4204d182ac..3fbd5729c2c 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -771,6 +771,10 @@ def finalize_and_sanitize(src_params): # disable atomic flush. if dest_params["test_best_efforts_recovery"] == 0: dest_params["disable_wal"] = 0 + if dest_params.get("disable_wal") == 1: + # disableWAL and recycle_log_file_num options are not mutually + # compatible at the moment + dest_params["recycle_log_file_num"] = 0 return dest_params diff --git a/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md b/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md new file mode 100644 index 00000000000..d11ba21efa7 --- /dev/null +++ b/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md @@ -0,0 +1 @@ +Re-enable the recycle_log_file_num option in DBOptions for kPointInTimeRecovery WAL recovery mode, which was previously disabled due to a bug in the recovery logic. This option is incompatible with WriteOptions::disableWAL. A Status::InvalidArgument() will be returned if disableWAL is specified.