diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 711c55c9e6b..cbc2db14f24 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -390,8 +390,11 @@ Status DBImpl::GetLiveFilesStorageInfo( info.file_number = live_wal_files[i]->LogNumber(); info.file_type = kWalFile; info.size = live_wal_files[i]->SizeFileBytes(); - // Only last should need to be trimmed - info.trim_to_size = (i + 1 == wal_size); + // Trim the log either if its the last one, or log file recycling is + // enabled. In the latter case, a hard link doesn't prevent the file + // from being renamed and recycled. So we need to copy it instead. + info.trim_to_size = (i + 1 == wal_size) || + (immutable_db_options_.recycle_log_file_num > 0); if (opts.include_checksum_info) { info.file_checksum_func_name = kUnknownFileChecksumFuncName; info.file_checksum = kUnknownFileChecksum; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index ef3ce78b469..f7e6f9692b2 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -1757,7 +1757,11 @@ void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir, wal.GetPreSyncSize() > 0) { synced_wals->AddWal(wal.number, WalMetadata(wal.GetPreSyncSize())); } - if (wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) { + // Check if the file has been closed, i.e wal.writer->file() == nullptr + // which can happen if log recycling is enabled, or if all the data in + // the log has been synced + if (wal.writer->file() == nullptr || + wal.GetPreSyncSize() == wal.writer->file()->GetFlushedSize()) { // Fully synced logs_to_free_.push_back(wal.ReleaseWriter()); it = logs_.erase(it); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index 8845a2a4d7e..15b0377d427 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -163,8 +163,10 @@ IOStatus DBImpl::SyncClosedLogs(const WriteOptions& write_options, if (error_recovery_in_prog) { log->file()->reset_seen_error(); } - // TODO: plumb Env::IOActivity, Env::IOPriority - io_s = log->Close(WriteOptions()); + // Normally the log file is closed when purging obsolete file, but if + // log recycling is enabled, the log file is closed here so that it + // can be reused. + io_s = log->Close(write_options); if (!io_s.ok()) { break; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index d2591b6e92c..786abb74f24 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -104,7 +104,6 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src, if (result.recycle_log_file_num && (result.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || - result.wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery || result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) { // - kTolerateCorruptedTailRecords is inconsistent with recycle log file // feature. WAL recycling expects recovery success upon encountering a @@ -1086,6 +1085,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, Logger* info_log; const char* fname; Status* status; // nullptr if immutable_db_options_.paranoid_checks==false + bool* old_log_record; void Corruption(size_t bytes, const Status& s) override { ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s", (status == nullptr ? "(ignoring error) " : ""), fname, @@ -1094,10 +1094,19 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, *status = s; } } + + void OldLogRecord(size_t bytes) override { + if (old_log_record != nullptr) { + *old_log_record = true; + } + ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled", + fname, static_cast(bytes)); + } }; mutex_.AssertHeld(); Status status; + bool old_log_record = false; std::unordered_map version_edits; // no need to refcount because iteration is under mutex for (auto cfd : *versions_->GetColumnFamilySet()) { @@ -1188,6 +1197,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, reporter.env = env_; reporter.info_log = immutable_db_options_.info_log.get(); reporter.fname = fname.c_str(); + reporter.old_log_record = &old_log_record; if (!immutable_db_options_.paranoid_checks || immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords) { @@ -1335,7 +1345,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } } - if (!status.ok()) { + if (!status.ok() || old_log_record) { if (status.IsNotSupported()) { // We should not treat NotSupported as corruption. It is rather a clear // sign that we are processing a WAL that is produced by an incompatible @@ -1360,6 +1370,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } // We should ignore the error but not continue replaying status = Status::OK(); + old_log_record = false; stop_replay_for_corruption = true; corrupted_wal_number = wal_number; if (corrupted_wal_found != nullptr) { diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index ee103a57a9c..5f18e01d24e 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -220,6 +220,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, write_options.protection_bytes_per_key != 8) { return Status::InvalidArgument( "`WriteOptions::protection_bytes_per_key` must be zero or eight"); + } else if (write_options.disableWAL && + immutable_db_options_.recycle_log_file_num > 0) { + return Status::InvalidArgument( + "WriteOptions::disableWAL option is not supported if " + "DBOptions::recycle_log_file_num > 0"); } // TODO: this use of operator bool on `tracer_` can avoid unnecessary lock // grabs but does not seem thread-safe. @@ -2173,8 +2178,10 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { log_write_mutex_.Unlock(); } uint64_t recycle_log_number = 0; + // If file deletion is disabled, don't recycle logs since it'll result in + // the file getting renamed if (creating_new_log && immutable_db_options_.recycle_log_file_num && - !log_recycle_files_.empty()) { + !log_recycle_files_.empty() && IsFileDeletionsEnabled()) { recycle_log_number = log_recycle_files_.front(); } uint64_t new_log_number = diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index 490ab2742dc..91070e298b6 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1123,15 +1123,13 @@ TEST_F(DBWALTest, PreallocateBlock) { } #endif // !(defined NDEBUG) || !defined(OS_WIN) -TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { - // TODO(ajkr): Disabled until WAL recycling is fixed for - // `kPointInTimeRecovery`. - +TEST_F(DBWALTest, FullPurgePreservesRecycledLog) { // For github issue #1303 for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.create_if_missing = true; options.recycle_log_file_num = 2; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; if (i != 0) { options.wal_dir = alternative_wal_dir_; } @@ -1162,16 +1160,14 @@ TEST_F(DBWALTest, DISABLED_FullPurgePreservesRecycledLog) { } } -TEST_F(DBWALTest, DISABLED_FullPurgePreservesLogPendingReuse) { - // TODO(ajkr): Disabled until WAL recycling is fixed for - // `kPointInTimeRecovery`. - +TEST_F(DBWALTest, FullPurgePreservesLogPendingReuse) { // Ensures full purge cannot delete a WAL while it's in the process of being // recycled. In particular, we force the full purge after a file has been // chosen for reuse, but before it has been renamed. for (int i = 0; i < 2; ++i) { Options options = CurrentOptions(); options.recycle_log_file_num = 1; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; if (i != 0) { options.wal_dir = alternative_wal_dir_; } diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 792ec305a65..f464a3036b8 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -821,6 +821,95 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) { ASSERT_LE(bytes_num, 1024 * 100); } +void CorruptLogFile(Env* env, Options& options, std::string log_path, + uint64_t log_num, int record_num) { + std::shared_ptr fs = env->GetFileSystem(); + std::unique_ptr file_reader; + Status status; + { + std::unique_ptr file; + status = fs->NewSequentialFile(log_path, FileOptions(), &file, nullptr); + ASSERT_EQ(status, IOStatus::OK()); + file_reader.reset(new SequentialFileReader(std::move(file), log_path)); + } + std::unique_ptr reader(new log::Reader( + nullptr, std::move(file_reader), nullptr, false, log_num)); + std::string scratch; + Slice record; + uint64_t record_checksum; + for (int i = 0; i < record_num; ++i) { + ASSERT_TRUE(reader->ReadRecord(&record, &scratch, options.wal_recovery_mode, + &record_checksum)); + } + uint64_t rec_start = reader->LastRecordOffset(); + reader.reset(); + { + std::unique_ptr file; + status = fs->NewRandomRWFile(log_path, FileOptions(), &file, nullptr); + ASSERT_EQ(status, IOStatus::OK()); + uint32_t bad_lognum = 0xff; + ASSERT_EQ(file->Write( + rec_start + 7, + Slice(reinterpret_cast(&bad_lognum), sizeof(uint32_t)), + IOOptions(), nullptr), + IOStatus::OK()); + ASSERT_OK(file->Close(IOOptions(), nullptr)); + file.reset(); + } +} + +TEST_P(DBWriteTest, RecycleLogTest) { + Options options = GetOptions(); + options.recycle_log_file_num = 0; + options.avoid_flush_during_recovery = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + Reopen(options); + ASSERT_OK(Put(Key(1), "val1")); + ASSERT_OK(Put(Key(2), "val1")); + + uint64_t latest_log_num = 0; + std::unique_ptr log_file; + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + latest_log_num = log_file->LogNumber(); + Reopen(options); + ASSERT_OK(Put(Key(3), "val3")); + + // Corrupt second entry of first log + std::string log_path = LogFileName(dbname_, latest_log_num); + CorruptLogFile(env_, options, log_path, latest_log_num, 2); + + Reopen(options); + ASSERT_EQ(Get(Key(1)), "val1"); + ASSERT_EQ(Get(Key(2)), "NOT_FOUND"); + ASSERT_EQ(Get(Key(3)), "NOT_FOUND"); +} + +TEST_P(DBWriteTest, RecycleLogTestCFAheadOfWAL) { + Options options = GetOptions(); + options.recycle_log_file_num = 0; + options.avoid_flush_during_recovery = true; + options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery; + + CreateAndReopenWithCF({"pikachu"}, options); + ASSERT_OK(Put(1, Key(1), "val1")); + ASSERT_OK(Put(0, Key(2), "val2")); + + uint64_t latest_log_num = 0; + std::unique_ptr log_file; + ASSERT_OK(dbfull()->GetCurrentWalFile(&log_file)); + latest_log_num = log_file->LogNumber(); + ASSERT_OK(Flush(1)); + ASSERT_OK(Put(1, Key(3), "val3")); + + // Corrupt second entry of first log + std::string log_path = LogFileName(dbname_, latest_log_num); + CorruptLogFile(env_, options, log_path, latest_log_num, 2); + + ASSERT_EQ(TryReopenWithColumnFamilies({"default", "pikachu"}, options), + Status::Corruption()); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/log_reader.cc b/db/log_reader.cc index 48380a735c5..da979a1ee1e 100644 --- a/db/log_reader.cc +++ b/db/log_reader.cc @@ -258,6 +258,10 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch, // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. scratch->clear(); + } else { + if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { + ReportOldLogRecord(scratch->size()); + } } return false; } @@ -405,6 +409,12 @@ void Reader::ReportDrop(size_t bytes, const Status& reason) { } } +void Reader::ReportOldLogRecord(size_t bytes) { + if (reporter_ != nullptr) { + reporter_->OldLogRecord(bytes); + } +} + bool Reader::ReadMore(size_t* drop_size, int* error) { if (!eof_ && !read_error_) { // Last read was a full read, so this is a trailer to skip diff --git a/db/log_reader.h b/db/log_reader.h index 697d1b5d58c..6e4eded0916 100644 --- a/db/log_reader.h +++ b/db/log_reader.h @@ -45,6 +45,8 @@ class Reader { // Some corruption was detected. "size" is the approximate number // of bytes dropped due to the corruption. virtual void Corruption(size_t bytes, const Status& status) = 0; + + virtual void OldLogRecord(size_t /*bytes*/) {} }; // Create a reader that will return log records from "*file". @@ -202,6 +204,7 @@ class Reader { // buffer_ must be updated to remove the dropped bytes prior to invocation. void ReportCorruption(size_t bytes, const char* reason); void ReportDrop(size_t bytes, const Status& reason); + void ReportOldLogRecord(size_t bytes); void InitCompression(const CompressionTypeRecord& compression_record); diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index d4204d182ac..3fbd5729c2c 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -771,6 +771,10 @@ def finalize_and_sanitize(src_params): # disable atomic flush. if dest_params["test_best_efforts_recovery"] == 0: dest_params["disable_wal"] = 0 + if dest_params.get("disable_wal") == 1: + # disableWAL and recycle_log_file_num options are not mutually + # compatible at the moment + dest_params["recycle_log_file_num"] = 0 return dest_params diff --git a/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md b/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md new file mode 100644 index 00000000000..d11ba21efa7 --- /dev/null +++ b/unreleased_history/bug_fixes/recycle_logs_point_in_time_recovery.md @@ -0,0 +1 @@ +Re-enable the recycle_log_file_num option in DBOptions for kPointInTimeRecovery WAL recovery mode, which was previously disabled due to a bug in the recovery logic. This option is incompatible with WriteOptions::disableWAL. A Status::InvalidArgument() will be returned if disableWAL is specified.