Skip to content

Commit

Permalink
Fix background atomic flush hang
Browse files Browse the repository at this point in the history
  • Loading branch information
jowlyzhang committed Sep 21, 2023
1 parent 089070c commit d2d8e2a
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 2 deletions.
110 changes: 110 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3150,6 +3150,116 @@ TEST_P(DBAtomicFlushTest, BgThreadNoWaitAfterManifestError) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}

// Test fix for this bug:
// A chain of events in a closing scenario would make Close() call hang when
// atomic_flush = true.
// The factors to reproduce the hang:
// 1) An earlier flush job encounters a retryable error, as a result, fails to
// install flush result, and at the same time, spawns a recovery thread.
// 2) A concurrently running flush job only flushing newer memtables
// successfully finish flushing and waiting to install results.
// 3) Shutting down DB, the recovery thread hangs because of this logic:
// Close() -> EndAutoRecovery() -> WaitForBackgroundWork() waits for
// background flush thread in 2).
TEST_P(DBAtomicFlushTest, BgRecoveryThreadNoWaitDuringShutdown) {
bool atomic_flush = GetParam();
if (!atomic_flush) {
return;
}
auto fault_injection_env = std::make_shared<FaultInjectionTestEnv>(env_);
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.atomic_flush = true;
options.env = fault_injection_env.get();
// Set a larger value than default so that RocksDB can schedule concurrent
// background flush threads.
options.max_background_jobs = 8;
options.max_write_buffer_number = 8;
CreateAndReopenWithCF({"pikachu"}, options);

assert(2 == handles_.size());

WriteOptions write_opts;
write_opts.disableWAL = true;

ASSERT_OK(Put(0, "a", "v_0_a", write_opts));

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();

SyncPoint::GetInstance()->LoadDependency({
{"DBAtomicFlushTest::BgThr2::WaitToInstallResults",
"DBAtomicFlushTest::MainThr::BeforeClose"},
});

std::thread::id bg_flush_thr1, bg_flush_thr2;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void*) {
if (bg_flush_thr1 == std::thread::id()) {
bg_flush_thr1 = std::this_thread::get_id();
} else if (bg_flush_thr2 == std::thread::id()) {
bg_flush_thr2 = std::this_thread::get_id();
}
});

SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables",
[&](void*) {
if (std::this_thread::get_id() != bg_flush_thr1) {
return;
}
ASSERT_OK(Put(0, "a", "v_1_a", write_opts));

// Kick off flush job2 within the background flush thread1 to
// make sure the newer flush job only picks mem1
FlushOptions flush_opts;
flush_opts.wait = false;
dbfull()->TEST_UnlockMutex();
ASSERT_OK(dbfull()->Flush(flush_opts, handles_));
dbfull()->TEST_LockMutex();
});

SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table::AfterFileSync", [&](void* arg) {
if (std::this_thread::get_id() == bg_flush_thr1) {
auto* ptr = reinterpret_cast<Status*>(arg);
assert(ptr);
IOStatus io_status = IOStatus::IOError("Injected retryable failure");
io_status.SetRetryable(true);
*ptr = io_status;
}
});

int called = 0;
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::AtomicFlushMemTablesToOutputFiles:WaitToCommit", [&](void* arg) {
if (std::this_thread::get_id() == bg_flush_thr2) {
const auto* ptr = reinterpret_cast<std::pair<Status, bool>*>(arg);
assert(ptr);
if (0 == called) {
ASSERT_OK(ptr->first);
ASSERT_TRUE(ptr->second);
} else if (1 == called) {
TEST_SYNC_POINT("DBAtomicFlushTest::BgThr2::WaitToInstallResults");
} else {
ASSERT_TRUE(ptr->first.IsShutdownInProgress());
ASSERT_FALSE(ptr->second);
}
++called;
}
});
SyncPoint::GetInstance()->EnableProcessing();

// Kick off flush job 1 to flush mem0. This flush job encounters the injected
// retryable IO error, finishes with IO error, and spawns a recovery thread.
ASSERT_TRUE(dbfull()->Flush(FlushOptions(), handles_).IsIOError());

TEST_SYNC_POINT("DBAtomicFlushTest::MainThr::BeforeClose");
Close();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_P(DBAtomicFlushTest, NoWaitWhenWritesStopped) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
Expand Down
1 change: 1 addition & 0 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ Status DBImpl::CloseHelper() {
// continuing with the shutdown
mutex_.Lock();
shutdown_initiated_ = true;
atomic_flush_install_cv_.SignalAll();
error_handler_.CancelErrorRecovery();
while (error_handler_.IsRecoveryInProgress()) {
bg_cv_.Wait();
Expand Down
7 changes: 5 additions & 2 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
pick_status[i] = true;
}
}

TEST_SYNC_POINT_CALLBACK(
"DBImpl::AtomicFlushMemTablesToOutputFiles:AfterPickMemTables", nullptr);
if (s.ok()) {
assert(switched_to_mempurge.size() ==
static_cast<long unsigned int>(num_cfs));
Expand Down Expand Up @@ -647,7 +648,8 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// Something went wrong elsewhere, we cannot count on waiting for our
// turn to write/sync to MANIFEST or CURRENT. Just return.
return std::make_pair(versions_->io_status(), false);
} else if (shutting_down_.load(std::memory_order_acquire)) {
} else if (shutdown_initiated_.load(std::memory_order_acquire) ||
shutting_down_.load(std::memory_order_acquire)) {
return std::make_pair(Status::ShutdownInProgress(), false);
}
bool ready = true;
Expand Down Expand Up @@ -3193,6 +3195,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
bg_flush_scheduled_--;
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();

atomic_flush_install_cv_.SignalAll();
bg_cv_.SignalAll();
// IMPORTANT: there should be no code after calling SignalAll. This call may
Expand Down
2 changes: 2 additions & 0 deletions db/error_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeStart");
InstrumentedMutexLock l(db_mutex_);
if (end_recovery_) {
recovery_in_prog_ = false;
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_,
Status::ShutdownInProgress(),
db_mutex_);
Expand All @@ -683,6 +684,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
// Recover from the retryable error. Create a separate thread to do it.
while (resume_count > 0) {
if (end_recovery_) {
recovery_in_prog_ = false;
EventHelpers::NotifyOnErrorRecoveryEnd(db_options_.listeners, bg_error_,
Status::ShutdownInProgress(),
db_mutex_);
Expand Down
1 change: 1 addition & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1003,6 +1003,7 @@ Status FlushJob::WriteLevel0Table() {
DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
}
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table::AfterFileSync", &s);
db_mutex_->Lock();
}
base_->Unref();
Expand Down

0 comments on commit d2d8e2a

Please sign in to comment.