Skip to content

Commit

Permalink
Fix a bug with atomic_flush that causes DB to stuck after a flush fai…
Browse files Browse the repository at this point in the history
…lure (facebook#11872)

Summary:
With atomic_flush=true, a flush job with younger memtables wait for older memtables to be installed before install its memtables. If the flush for older memtables failed, auto-recovery starts a resume thread which can becomes stuck waiting for all background work to finish (including the flush for younger memtables). If a non-recovery flush starts now and tries to flush, it can make the situation worse since it will fail due to background error but never rollback its memtable: https://github.com/facebook/rocksdb/blob/269478ee4618283cd6d710fdfea9687157a259c1/db/db_impl/db_impl_compaction_flush.cc#L725 This prevents any future flush to pick old memtables.

A more detailed repro is in unit test.

This PR fixes this issue by
1. Ensure we rollback memtables if an atomic flush fails due to background error
2. When there is a background error, abort atomic flushes that are waiting for older memtables to be installed
3. Do not schedule non-recovery flushes when there is a background error that stops background work

There was another issue with atomic_flush=true where DB can hang during DB close, see more in facebook#11867. The fix in this PR, specifically fix 2 above, should be enough to resolve it too.

Pull Request resolved: facebook#11872

Test Plan: new unit test.

Reviewed By: jowlyzhang

Differential Revision: D49556867

Pulled By: cbi42

fbshipit-source-id: 4a0210ff28a8552a99ece7fbb0f574fd24b4da3f
  • Loading branch information
cbi42 committed Sep 25, 2023
1 parent d6ae3a9 commit 25fd29b
Show file tree
Hide file tree
Showing 8 changed files with 210 additions and 24 deletions.
89 changes: 88 additions & 1 deletion db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3358,18 +3358,24 @@ TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {

SyncPoint::GetInstance()->EnableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"RecoverFromRetryableBGIOError:RecoverSuccess",
{{"Let error recovery start",
"RecoverFromRetryableBGIOError:BeforeStart"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});

ASSERT_OK(Put(Key(1), "val1"));
// trigger bg flush0 for mem0
ASSERT_OK(Put(Key(2), "val2"));
// Not checking status since this wait can finish before flush starts.
dbfull()->TEST_WaitForFlushMemTable().PermitUncheckedError();

// trigger bg flush1 for mem1, should see bg error and abort
// before picking a memtable to flush
ASSERT_OK(Put(Key(3), "val3"));
ASSERT_NOK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_EQ(0, NumTableFilesAtLevel(0));

TEST_SYNC_POINT("Let error recovery start");
TEST_SYNC_POINT("Wait for error recover");
// Recovery flush writes 2 memtables together into 1 file.
ASSERT_EQ(1, NumTableFilesAtLevel(0));
Expand All @@ -3379,6 +3385,87 @@ TEST_F(DBFlushTest, NonAtomicNormalFlushAbortWhenBGError) {
SyncPoint::GetInstance()->DisableProcessing();
}

TEST_F(DBFlushTest, DBStuckAfterAtomicFlushError) {
// Test for a bug with atomic flush where DB can become stuck
// after a flush error. A repro timeline:
//
// Start Flush0 for mem0
// Start Flush1 for mem1
// Now Flush1 will wait for Flush0 to install mem0
// Flush0 finishes with retryable IOError, rollbacks mem0
// Resume starts and waits for background job to finish, i.e., Flush1
// Fill memtable again, trigger Flush2 for mem0
// Flush2 will get error status, and not rollback mem0, see code in
// https://github.com/facebook/rocksdb/blob/b927ba5936216861c2c35ab68f50ba4a78e65747/db/db_impl/db_impl_compaction_flush.cc#L725
//
// DB is stuck since mem0 can never be picked now
//
// The fix is to rollback mem0 in Flush2, and let Flush1 also abort upon
// background error besides waiting for older memtables to be installed.
// The recovery flush in this case should pick up all memtables
// and write them to a single L0 file.
Options opts = CurrentOptions();
opts.atomic_flush = true;
opts.memtable_factory.reset(test::NewSpecialSkipListFactory(1));
opts.max_write_buffer_number = 64;
opts.max_background_flushes = 4;
env_->SetBackgroundThreads(4, Env::HIGH);
DestroyAndReopen(opts);

std::atomic_int flush_count = 0;
SyncPoint::GetInstance()->ClearAllCallBacks();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table:s", [&](void* s_ptr) {
int c = flush_count.fetch_add(1);
if (c == 0) {
Status* s = (Status*)(s_ptr);
IOStatus io_error = IOStatus::IOError("injected foobar");
io_error.SetRetryable(true);
*s = io_error;
TEST_SYNC_POINT("Let flush for mem1 start");
// Wait for Flush1 to start waiting to install flush result
TEST_SYNC_POINT("Wait for flush for mem1");
}
});
SyncPoint::GetInstance()->LoadDependency(
{{"Let flush for mem1 start", "Flush for mem1"},
{"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV",
"Wait for flush for mem1"},
{"RecoverFromRetryableBGIOError:BeforeStart",
"Wait for resume to start"},
{"Recovery should continue here",
"RecoverFromRetryableBGIOError:BeforeStart2"},
{"RecoverFromRetryableBGIOError:RecoverSuccess",
"Wait for error recover"}});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put(Key(1), "val1"));
// trigger Flush0 for mem0
ASSERT_OK(Put(Key(2), "val2"));

// trigger Flush1 for mem1
TEST_SYNC_POINT("Flush for mem1");
ASSERT_OK(Put(Key(3), "val3"));

// Wait until resume started to schedule another flush
TEST_SYNC_POINT("Wait for resume to start");
// This flush should not be scheduled due to bg error
ASSERT_OK(Put(Key(4), "val4"));

// TEST_WaitForBackgroundWork() returns background error
// after all background work is done.
ASSERT_NOK(dbfull()->TEST_WaitForBackgroundWork());
// Flush should abort and not writing any table
ASSERT_EQ(0, NumTableFilesAtLevel(0));

// Wait until this flush is done.
TEST_SYNC_POINT("Recovery should continue here");
TEST_SYNC_POINT("Wait for error recover");
// error recovery can schedule new flushes, but should not
// encounter error
ASSERT_OK(dbfull()->TEST_WaitForBackgroundWork());
ASSERT_EQ(1, NumTableFilesAtLevel(0));
}
} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
26 changes: 23 additions & 3 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,30 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
s = Status::ShutdownInProgress();
}
if (s.ok()) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
// Since we drop all non-recovery flush requests during recovery,
// and new memtable may fill up during recovery,
// schedule one more round of flush.
FlushOptions flush_opts;
flush_opts.allow_write_stall = false;
flush_opts.wait = false;
Status status = FlushAllColumnFamilies(
flush_opts, FlushReason::kCatchUpAfterErrorRecovery);
if (!status.ok()) {
// FlushAllColumnFamilies internally should take care of setting
// background error if needed.
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"The catch up flush after successful recovery failed [%s]",
s.ToString().c_str());
}
// FlushAllColumnFamilies releases and re-acquires mutex.
if (shutdown_initiated_) {
s = Status::ShutdownInProgress();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
SchedulePendingCompaction(cfd);
}
MaybeScheduleFlushOrCompaction();
}
MaybeScheduleFlushOrCompaction();
}

// Wake up any waiters - in this case, it could be the shutdown thread
Expand Down
104 changes: 87 additions & 17 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,16 +285,20 @@ Status DBImpl::FlushMemTableToOutputFile(
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:BeforePickMemtables");
// Exit a flush due to bg error should not set bg error again.
bool skip_set_bg_error = false;
if (s.ok() && flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
!error_handler_.GetBGError().ok()) {
if (s.ok() && !error_handler_.GetBGError().ok() &&
error_handler_.IsBGWorkStopped() &&
flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
// Error recovery in progress, should not pick memtable which excludes
// them from being picked up by recovery flush.
// This ensures that when bg error is set, no new flush can pick
// memtables.
skip_set_bg_error = true;
s = error_handler_.GetBGError();
assert(!s.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Skip flush due to background error %s",
job_context->job_id, s.ToString().c_str());
}

if (s.ok()) {
Expand Down Expand Up @@ -572,6 +576,21 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
pick_status.push_back(false);
}

bool flush_for_recovery =
bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecoveryRetryFlush;
bool skip_set_bg_error = false;

if (s.ok() && !error_handler_.GetBGError().ok() &&
error_handler_.IsBGWorkStopped() && !flush_for_recovery) {
s = error_handler_.GetBGError();
skip_set_bg_error = true;
assert(!s.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Skip flush due to background error %s",
job_context->job_id, s.ToString().c_str());
}

if (s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
jobs[i]->PickMemTable();
Expand Down Expand Up @@ -636,7 +655,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
}
}
} else {
} else if (!skip_set_bg_error) {
// When `skip_set_bg_error` is true, no memtable is picked so
// there is no need to call Cancel() or RollbackMemtableFlush().
//
// Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop.
// Have to cancel the flush jobs that have NOT executed because we need to
Expand Down Expand Up @@ -692,10 +714,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
};

bool resuming_from_bg_err =
error_handler_.IsDBStopped() ||
(bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
bg_flush_args[0].flush_reason_ ==
FlushReason::kErrorRecoveryRetryFlush);
error_handler_.IsDBStopped() || flush_for_recovery;
while ((!resuming_from_bg_err || error_handler_.GetRecoveryError().ok())) {
std::pair<Status, bool> res = wait_to_install_func();

Expand All @@ -706,15 +725,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = res.first;
break;
} else if (!res.second) {
// we are the oldest immutable memtable
break;
}
// We are not the oldest immutable memtable
TEST_SYNC_POINT_CALLBACK(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitCV", &res);
//
// If bg work is stopped, recovery thread first calls
// WaitForBackgroundWork() before proceeding to flush for recovery. This
// flush can block WaitForBackgroundWork() while waiting for recovery
// flush to install result. To avoid this deadlock, we should abort here
// if there is background error.
if (!flush_for_recovery && error_handler_.IsBGWorkStopped() &&
!error_handler_.GetBGError().ok()) {
s = error_handler_.GetBGError();
assert(!s.ok());
break;
}
atomic_flush_install_cv_.Wait();

resuming_from_bg_err =
error_handler_.IsDBStopped() ||
(bg_flush_args[0].flush_reason_ == FlushReason::kErrorRecovery ||
bg_flush_args[0].flush_reason_ ==
FlushReason::kErrorRecoveryRetryFlush);
resuming_from_bg_err = error_handler_.IsDBStopped() || flush_for_recovery;
}

if (!resuming_from_bg_err) {
Expand All @@ -730,6 +761,17 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// installation.
s = error_handler_.GetRecoveryError();
}
// Since we are not installing these memtables, need to rollback
// to allow future flush job to pick up these memtables.
if (!s.ok()) {
for (int i = 0; i != num_cfs; ++i) {
assert(exec_status[i].first);
assert(exec_status[i].second.ok());
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(
mems, /*rollback_succeeding_memtables=*/false);
}
}
}

if (s.ok()) {
Expand Down Expand Up @@ -833,7 +875,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

// Need to undo atomic flush if something went wrong, i.e. s is not OK and
// it is not because of CF drop.
if (!s.ok() && !s.IsColumnFamilyDropped()) {
if (!s.ok() && !s.IsColumnFamilyDropped() && !skip_set_bg_error) {
if (log_io_s.ok()) {
// Error while writing to MANIFEST.
// In fact, versions_->io_status() can also be the result of renaming
Expand Down Expand Up @@ -2239,9 +2281,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
WaitForPendingWrites();

if (flush_reason != FlushReason::kErrorRecoveryRetryFlush &&
flush_reason != FlushReason::kCatchUpAfterErrorRecovery &&
(!cfd->mem()->IsEmpty() || !cached_recoverable_state_empty_.load())) {
// Note that, when flush reason is kErrorRecoveryRetryFlush, during the
// auto retry resume, we want to avoid creating new small memtables.
// If flush reason is kCatchUpAfterErrorRecovery, we try to flush any new
// memtable that filled up during recovery, and we also want to avoid
// switching memtable to create small memtables.
// Therefore, SwitchMemtable will not be called. Also, since ResumeImpl
// will iterate through all the CFs and call FlushMemtable during auto
// retry resume, it is possible that in some CFs,
Expand Down Expand Up @@ -2432,7 +2478,8 @@ Status DBImpl::AtomicFlushMemTables(

for (auto cfd : cfds) {
if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
flush_reason == FlushReason::kErrorRecoveryRetryFlush) {
flush_reason == FlushReason::kErrorRecoveryRetryFlush ||
flush_reason == FlushReason::kCatchUpAfterErrorRecovery) {
continue;
}
cfd->Ref();
Expand Down Expand Up @@ -2700,6 +2747,11 @@ void DBImpl::MaybeScheduleFlushOrCompaction() {
// There has been a hard error and this call is not part of the recovery
// sequence. Bail out here so we don't get into an endless loop of
// scheduling BG work which will again call this function
//
// Note that a non-recovery flush can still be scheduled if
// error_handler_.IsRecoveryInProgress() returns true. We rely on
// BackgroundCallFlush() to check flush reason and drop non-recovery
// flushes.
return;
} else if (shutting_down_.load(std::memory_order_acquire)) {
// DB is being deleted; no more background compactions
Expand Down Expand Up @@ -3029,6 +3081,24 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
// This cfd is already referenced
FlushRequest flush_req = PopFirstFromFlushQueue();
FlushReason flush_reason = flush_req.flush_reason;
if (!error_handler_.GetBGError().ok() && error_handler_.IsBGWorkStopped() &&
flush_reason != FlushReason::kErrorRecovery &&
flush_reason != FlushReason::kErrorRecoveryRetryFlush) {
// Stop non-recovery flush when bg work is stopped
// Note that we drop the flush request here.
// Recovery thread should schedule further flushes after bg error
// is cleared.
status = error_handler_.GetBGError();
assert(!status.ok());
ROCKS_LOG_BUFFER(log_buffer,
"[JOB %d] Abort flush due to background error %s",
job_context->job_id, status.ToString().c_str());
*reason = flush_reason;
for (auto item : flush_req.cfd_to_max_mem_id_to_persist) {
item.first->UnrefAndTryDelete();
}
return status;
}
if (!immutable_db_options_.atomic_flush &&
ShouldRescheduleFlushRequestToRetainUDT(flush_req)) {
assert(flush_req.cfd_to_max_mem_id_to_persist.size() == 1);
Expand Down Expand Up @@ -3171,9 +3241,9 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
ROCKS_LOG_ERROR(immutable_db_options_.info_log,
"Waiting after background flush error: %s"
"[JOB %d] Waiting after background flush error: %s"
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
job_context.job_id, s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(immutable_db_options_.info_log);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
Expand Down
1 change: 1 addition & 0 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ const Status& ErrorHandler::StartRecoverFromRetryableBGIOError(
// mutex is released.
void ErrorHandler::RecoverFromRetryableBGIOError() {
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart");
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart2");
InstrumentedMutexLock l(db_mutex_);
if (end_recovery_) {
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_,
Expand Down
5 changes: 4 additions & 1 deletion db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ const char* GetFlushReasonString(FlushReason flush_reason) {
return "Error Recovery Retry Flush";
case FlushReason::kWalFull:
return "WAL Full";
case FlushReason::kCatchUpAfterErrorRecovery:
return "Catch Up After Error Recovery";
default:
return "Invalid";
}
Expand Down Expand Up @@ -311,7 +313,8 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, FileMetaData* file_meta,
if (!db_options_.atomic_flush &&
flush_reason_ != FlushReason::kErrorRecovery &&
flush_reason_ != FlushReason::kErrorRecoveryRetryFlush &&
error_handler && !error_handler->GetBGError().ok()) {
error_handler && !error_handler->GetBGError().ok() &&
error_handler->IsBGWorkStopped()) {
cfd_->imm()->RollbackMemtableFlush(
mems_, /*rollback_succeeding_memtables=*/!db_options_.atomic_flush);
s = error_handler->GetBGError();
Expand Down
5 changes: 3 additions & 2 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
TEST_SYNC_POINT("RollbackMemtableFlush");
AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_ROLLBACK);
assert(!mems.empty());
#ifndef NDEBUG
for (MemTable* m : mems) {
assert(m->flush_in_progress_);
Expand Down Expand Up @@ -483,7 +482,9 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
num_flush_not_started_++;
}
}
imm_flush_needed.store(true, std::memory_order_release);
if (!mems.empty()) {
imm_flush_needed.store(true, std::memory_order_release);
}
}

// Try record a successful flush in the manifest file. It might just return
Expand Down
Loading

0 comments on commit 25fd29b

Please sign in to comment.