diff --git a/db/db_impl.cc b/db/db_impl.cc index 67cceadd3a8..fa611f9439f 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -701,6 +701,11 @@ Status DBImpl::FlushWAL(bool sync) { if (!s.ok()) { ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s", s.ToString().c_str()); + // In case there is a fs error we should set it globally to prevent the + // future writes + WriteStatusCheck(s); + // whether sync or not, we should abort the rest of function upon error + return s; } if (!sync) { ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false"); diff --git a/db/db_impl.h b/db/db_impl.h index 4c51d9fe52f..6d24d53d0c1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -220,6 +220,7 @@ class DBImpl : public DB { virtual Status Flush(const FlushOptions& options, ColumnFamilyHandle* column_family) override; virtual Status FlushWAL(bool sync) override; + bool TEST_WALBufferIsEmpty(); virtual Status SyncWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; @@ -874,7 +875,7 @@ class DBImpl : public DB { size_t seq_inc); // Used by WriteImpl to update bg_error_ if paranoid check is enabled. - void WriteCallbackStatusCheck(const Status& status); + void WriteStatusCheck(const Status& status); // Used by WriteImpl to update bg_error_ in case of memtable insert error. void MemTableInsertStatusCheck(const Status& memtable_insert_status); diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index c1faccb2909..dfe64f933cc 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -50,6 +50,9 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { "[JOB %d] Syncing log #%" PRIu64, job_context->job_id, log->get_log_number()); s = log->file()->Sync(immutable_db_options_.use_fsync); + if (!s.ok()) { + break; + } } if (s.ok()) { s = directories_.GetWalDir()->Fsync(); diff --git a/db/db_impl_debug.cc b/db/db_impl_debug.cc index 9d87f5c29a2..d4e47d86866 100644 --- a/db/db_impl_debug.cc +++ b/db/db_impl_debug.cc @@ -25,6 +25,12 @@ void DBImpl::TEST_SwitchWAL() { SwitchWAL(&write_context); } +bool DBImpl::TEST_WALBufferIsEmpty() { + InstrumentedMutexLock wl(&log_write_mutex_); + log::Writer* cur_log_writer = logs_.back().writer; + return cur_log_writer->TEST_BufferIsEmpty(); +} + int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes( ColumnFamilyHandle* column_family) { ColumnFamilyData* cfd; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 047a17b21fd..6a3863ba748 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -1071,7 +1071,8 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, new_log_number, new log::Writer( std::move(file_writer), new_log_number, - impl->immutable_db_options_.recycle_log_file_num > 0)); + impl->immutable_db_options_.recycle_log_file_num > 0, + impl->immutable_db_options_.manual_wal_flush)); } // set column family handles @@ -1187,6 +1188,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, if (s.ok()) { ROCKS_LOG_INFO(impl->immutable_db_options_.info_log, "DB pointer %p", impl); LogFlush(impl->immutable_db_options_.info_log); + assert(impl->TEST_WALBufferIsEmpty()); + // If the assert above fails then we need to FlushWAL before returning + // control back to the user. if (!persist_options_status.ok()) { s = Status::IOError( "DB::Open() failed --- Unable to persist Options file", diff --git a/db/db_impl_write.cc b/db/db_impl_write.cc index 9f5a80faad3..c8bdf31ef3c 100644 --- a/db/db_impl_write.cc +++ b/db/db_impl_write.cc @@ -339,7 +339,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(status); + WriteStatusCheck(status); } if (need_log_sync) { @@ -462,7 +462,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options, } if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(w.status); + WriteStatusCheck(w.status); } if (need_log_sync) { @@ -623,7 +623,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, PERF_TIMER_START(write_pre_and_post_process_time); if (!w.CallbackFailed()) { - WriteCallbackStatusCheck(status); + WriteStatusCheck(status); } if (status.ok()) { for (auto* writer : write_group) { @@ -647,7 +647,7 @@ Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options, return status; } -void DBImpl::WriteCallbackStatusCheck(const Status& status) { +void DBImpl::WriteStatusCheck(const Status& status) { // Is setting bg_error_ enough here? This will at least stop // compaction and fail any further writes. if (immutable_db_options_.paranoid_checks && !status.ok() && diff --git a/db/db_write_test.cc b/db/db_write_test.cc index 1a27f470ec7..200397681c8 100644 --- a/db/db_write_test.cc +++ b/db/db_write_test.cc @@ -50,6 +50,7 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { std::atomic leader_count{0}; std::vector threads; mock_env->SetFilesystemActive(false); + // Wait until all threads linked to write threads, to make sure // all threads join the same batch group. SyncPoint::GetInstance()->SetCallBack( @@ -68,7 +69,13 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { threads.push_back(port::Thread( [&](int index) { // All threads should fail. - ASSERT_FALSE(Put("key" + ToString(index), "value").ok()); + auto res = Put("key" + ToString(index), "value"); + if (options.manual_wal_flush) { + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); }, i)); } @@ -80,6 +87,46 @@ TEST_P(DBWriteTest, IOErrorOnWALWritePropagateToWriteThreadFollower) { Close(); } +TEST_P(DBWriteTest, ManualWalFlushInEffect) { + Options options = GetOptions(); + Reopen(options); + // try the 1st WAL created during open + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); + // try the 2nd wal created during SwitchWAL + dbfull()->TEST_SwitchWAL(); + ASSERT_TRUE(Put("key" + ToString(0), "value").ok()); + ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty()); + ASSERT_TRUE(dbfull()->FlushWAL(false).ok()); + ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty()); +} + +TEST_P(DBWriteTest, IOErrorOnWALWriteTriggersReadOnlyMode) { + std::unique_ptr mock_env( + new FaultInjectionTestEnv(Env::Default())); + Options options = GetOptions(); + options.env = mock_env.get(); + Reopen(options); + for (int i = 0; i < 2; i++) { + // Forcibly fail WAL write for the first Put only. Subsequent Puts should + // fail due to read-only mode + mock_env->SetFilesystemActive(i != 0); + auto res = Put("key" + ToString(i), "value"); + if (options.manual_wal_flush && i == 0) { + // even with manual_wal_flush the 2nd Put should return error because of + // the read-only mode + ASSERT_TRUE(res.ok()); + // we should see fs error when we do the flush + res = dbfull()->FlushWAL(false); + } + ASSERT_FALSE(res.ok()); + } + // Close before mock_env destruct. + Close(); +} + INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, testing::Values(DBTestBase::kDefault, DBTestBase::kConcurrentWALWrites, diff --git a/db/log_writer.cc b/db/log_writer.cc index a767f19160c..c31adbec5f0 100644 --- a/db/log_writer.cc +++ b/db/log_writer.cc @@ -57,9 +57,11 @@ Status Writer::AddRecord(const Slice& slice) { // Fill the trailer (literal below relies on kHeaderSize and // kRecyclableHeaderSize being <= 11) assert(header_size <= 11); - dest_->Append( - Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", - static_cast(leftover))); + s = dest_->Append(Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", + static_cast(leftover))); + if (!s.ok()) { + break; + } } block_offset_ = 0; } @@ -90,6 +92,8 @@ Status Writer::AddRecord(const Slice& slice) { return s; } +bool Writer::TEST_BufferIsEmpty() { return dest_->TEST_BufferIsEmpty(); } + Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes diff --git a/db/log_writer.h b/db/log_writer.h index 143ad2674de..abd7977b94f 100644 --- a/db/log_writer.h +++ b/db/log_writer.h @@ -85,6 +85,8 @@ class Writer { Status WriteBuffer(); + bool TEST_BufferIsEmpty(); + private: unique_ptr dest_; size_t block_offset_; // Current offset in block diff --git a/options/options_helper.cc b/options/options_helper.cc index b93f3454bb5..4235bc635dc 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -124,6 +124,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, immutable_db_options.allow_ingest_behind; options.preserve_deletes = immutable_db_options.preserve_deletes; + options.two_write_queues = immutable_db_options.two_write_queues; + options.manual_wal_flush = immutable_db_options.manual_wal_flush; return options; } diff --git a/util/file_reader_writer.h b/util/file_reader_writer.h index 9bc3b9437c3..5870a55cdf0 100644 --- a/util/file_reader_writer.h +++ b/util/file_reader_writer.h @@ -187,6 +187,8 @@ class WritableFileWriter { bool use_direct_io() { return writable_file_->use_direct_io(); } + bool TEST_BufferIsEmpty() { return buf_.CurrentSize() == 0; } + private: // Used when os buffering is OFF and we are writing // DMA such as in Direct I/O mode diff --git a/util/file_util.cc b/util/file_util.cc index 8a1adf2bd78..446a9200492 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -62,8 +62,7 @@ Status CopyFile(Env* env, const std::string& source, } size -= slice.size(); } - dest_writer->Sync(use_fsync); - return Status::OK(); + return dest_writer->Sync(use_fsync); } // Utility function to create a file with the provided contents diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index bd0f6a4393b..933e13df703 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -2606,7 +2606,6 @@ TEST_P(TransactionTest, ColumnFamiliesTest) { TEST_P(TransactionTest, ColumnFamiliesTest2) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3146,7 +3145,6 @@ TEST_P(TransactionTest, Rollback) { TEST_P(TransactionTest, LockLimitTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3252,7 +3250,6 @@ TEST_P(TransactionTest, LockLimitTest) { TEST_P(TransactionTest, IteratorTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -3433,7 +3430,6 @@ TEST_P(TransactionTest, DisableIndexingTest) { TEST_P(TransactionTest, SavepointTest) { WriteOptions write_options; ReadOptions read_options, snapshot_read_options; - TransactionOptions txn_options; string value; Status s; @@ -4688,7 +4684,6 @@ TEST_P(TransactionTest, ClearSnapshotTest) { TEST_P(TransactionTest, ToggleAutoCompactionTest) { Status s; - TransactionOptions txn_options; ColumnFamilyHandle *cfa, *cfb; ColumnFamilyOptions cf_options;