Skip to content

Commit

Permalink
Fix OnFlushCompleted fired before flush result write to MANIFEST (fac…
Browse files Browse the repository at this point in the history
…ebook#5908) (#127) (#130)

Summary:
When there are concurrent flush job on the same CF, `OnFlushCompleted` can be called before the flush result being install to LSM. Fixing the issue by passing `FlushJobInfo` through `MemTable`, and the thread who commit the flush result can fetch the `FlushJobInfo` and fire `OnFlushCompleted` on behave of the thread actually writing the SST.

Fix facebook#5892
Pull Request resolved: facebook#5908

Test Plan: Add new test. The test will fail without the fix.

Differential Revision: D17916144

Pulled By: riversand963

fbshipit-source-id: e18df67d9533b5baee52ae3605026cdeb05cbe10
Signed-off-by: Yi Wu <yiwu@pingcap.com>
  • Loading branch information
yiwu-arbug authored and Connor1996 committed Nov 19, 2019
1 parent d21d976 commit f6f91c1
Show file tree
Hide file tree
Showing 11 changed files with 214 additions and 64 deletions.
95 changes: 95 additions & 0 deletions db/db_flush_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.

#include <atomic>

#include "db/db_impl.h"
#include "db/db_test_util.h"
#include "port/port.h"
#include "port/stack_trace.h"
#include "util/cast_util.h"
#include "util/fault_injection_test_env.h"
#include "util/sync_point.h"

Expand Down Expand Up @@ -290,6 +295,96 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) {
Close();
}

#ifndef ROCKSDB_LITE
TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) {
class TestListener : public EventListener {
public:
void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
// There's only one key in each flush.
ASSERT_EQ(info.smallest_seqno, info.largest_seqno);
ASSERT_NE(0, info.smallest_seqno);
if (info.smallest_seqno == seq1) {
// First flush completed
ASSERT_FALSE(completed1);
completed1 = true;
CheckFlushResultCommitted(db, seq1);
} else {
// Second flush completed
ASSERT_FALSE(completed2);
completed2 = true;
ASSERT_EQ(info.smallest_seqno, seq2);
CheckFlushResultCommitted(db, seq2);
}
}

void CheckFlushResultCommitted(DB* db, SequenceNumber seq) {
DBImpl* db_impl = static_cast_with_check<DBImpl>(db);
InstrumentedMutex* mutex = db_impl->mutex();
mutex->Lock();
auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db->DefaultColumnFamily())
->cfd();
ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber());
mutex->Unlock();
}

std::atomic<SequenceNumber> seq1{0};
std::atomic<SequenceNumber> seq2{0};
std::atomic<bool> completed1{false};
std::atomic<bool> completed2{false};
};
std::shared_ptr<TestListener> listener = std::make_shared<TestListener>();

SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FlushMemTable:AfterScheduleFlush",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"},
{"DBImpl::FlushMemTableToOutputFile:Finish",
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}});
SyncPoint::GetInstance()->SetCallBack(
"FlushJob::WriteLevel0Table", [&listener](void* arg) {
// Wait for the second flush finished, out of mutex.
auto* mems = reinterpret_cast<autovector<MemTable*>*>(arg);
if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) {
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:"
"WaitSecond");
}
});

Options options = CurrentOptions();
options.create_if_missing = true;
options.listeners.push_back(listener);
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options.max_background_jobs = 8;
// Allow 2 immutable memtables.
options.max_write_buffer_number = 3;
Reopen(options);
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(Put("foo", "v"));
listener->seq1 = db_->GetLatestSequenceNumber();
// t1 will wait for the second flush complete before committing flush result.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
// Wait for first flush scheduled.
TEST_SYNC_POINT(
"DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst");
// The second flush will exit early without commit its result. The work
// is delegated to the first flush.
ASSERT_OK(Put("bar", "v"));
listener->seq2 = db_->GetLatestSequenceNumber();
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
t1.join();
ASSERT_TRUE(listener->completed1);
ASSERT_TRUE(listener->completed2);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}
#endif // !ROCKSDB_LITE

TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
Expand Down
8 changes: 4 additions & 4 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -794,11 +794,11 @@ class DBImpl : public DB {

void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
int job_id);

void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop);
void NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info);

void NotifyOnCompactionBegin(ColumnFamilyData* cfd,
Compaction *c, const Status &st,
Expand Down
77 changes: 33 additions & 44 deletions db/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,7 @@ Status DBImpl::FlushMemTableToOutputFile(

#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id,
flush_job.GetTableProperties());
NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id);
#endif // ROCKSDB_LITE

Status s;
Expand Down Expand Up @@ -190,8 +189,8 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) {
#ifndef ROCKSDB_LITE
// may temporarily unlock and lock the mutex.
NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options,
job_context->job_id, flush_job.GetTableProperties());
NotifyOnFlushCompleted(cfd, mutable_cf_options,
flush_job.GetCommittedFlushJobsInfo());
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
if (sfm) {
Expand All @@ -209,6 +208,7 @@ Status DBImpl::FlushMemTableToOutputFile(
}
#endif // ROCKSDB_LITE
}
TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish");
return s;
}

Expand Down Expand Up @@ -274,7 +274,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
snapshot_checker = DisableGCSnapshotChecker::Instance();
}
autovector<Directory*> distinct_output_dirs;
std::vector<FlushJob> jobs;
std::vector<std::unique_ptr<FlushJob>> jobs;
std::vector<MutableCFOptions> all_mutable_cf_options;
int num_cfs = static_cast<int>(cfds.size());
all_mutable_cf_options.reserve(num_cfs);
Expand All @@ -299,15 +299,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions());
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back();
const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_);
jobs.emplace_back(
jobs.emplace_back(new FlushJob(
dbname_, cfds[i], immutable_db_options_, mutable_cf_options,
max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_,
&shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, job_context, log_buffer, directories_.GetDbDir(),
data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
stats_, &event_logger_, mutable_cf_options.report_bg_io_stats,
false /* sync_output_directory */, false /* write_manifest */);
jobs.back().PickMemTable();
false /* sync_output_directory */, false /* write_manifest */));
jobs.back()->PickMemTable();
}

std::vector<FileMetaData> file_meta(num_cfs);
Expand All @@ -319,7 +319,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i);
// may temporarily unlock and lock the mutex.
NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options,
job_context->job_id, jobs[i].GetTableProperties());
job_context->job_id);
}
#endif /* !ROCKSDB_LITE */

Expand All @@ -341,7 +341,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// TODO (yanqin): parallelize jobs with threads.
for (int i = 1; i != num_cfs; ++i) {
exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
}
if (num_cfs > 1) {
Expand All @@ -350,8 +350,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
TEST_SYNC_POINT(
"DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2");
}
assert(exec_status.size() > 0);
assert(!file_meta.empty());
exec_status[0].second =
jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]);
jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]);
exec_status[0].first = true;

Status error_status;
Expand Down Expand Up @@ -386,7 +388,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
auto wait_to_install_func = [&]() {
bool ready = true;
for (size_t i = 0; i != cfds.size(); ++i) {
const auto& mems = jobs[i].GetMemTables();
const auto& mems = jobs[i]->GetMemTables();
if (cfds[i]->IsDropped()) {
// If the column family is dropped, then do not wait.
continue;
Expand Down Expand Up @@ -427,7 +429,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i].GetMemTables();
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
Expand Down Expand Up @@ -463,12 +465,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
#ifndef ROCKSDB_LITE
auto sfm = static_cast<SstFileManagerImpl*>(
immutable_db_options_.sst_file_manager.get());
assert(all_mutable_cf_options.size() == static_cast<size_t>(num_cfs));
for (int i = 0; i != num_cfs; ++i) {
if (cfds[i]->IsDropped()) {
continue;
}
NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i],
job_context->job_id, jobs[i].GetTableProperties());
NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i],
jobs[i]->GetCommittedFlushJobsInfo());
if (sfm) {
std::string file_path = MakeTableFileName(
cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber());
Expand All @@ -492,12 +495,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// unref the versions.
for (int i = 0; i != num_cfs; ++i) {
if (!exec_status[i].first) {
jobs[i].Cancel();
jobs[i]->Cancel();
}
}
for (int i = 0; i != num_cfs; ++i) {
if (exec_status[i].first && exec_status[i].second.ok()) {
auto& mems = jobs[i].GetMemTables();
auto& mems = jobs[i]->GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
Expand All @@ -511,7 +514,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(

void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
int job_id) {
#ifndef ROCKSDB_LITE
if (immutable_db_options_.listeners.size() == 0U) {
return;
Expand Down Expand Up @@ -542,7 +545,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushBegin(this, info);
Expand All @@ -556,15 +558,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
#endif // ROCKSDB_LITE
}

void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
FileMetaData* file_meta,
const MutableCFOptions& mutable_cf_options,
int job_id, TableProperties prop) {
void DBImpl::NotifyOnFlushCompleted(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
std::list<std::unique_ptr<FlushJobInfo>>* flush_jobs_info) {
#ifndef ROCKSDB_LITE
assert(flush_jobs_info != nullptr);
if (immutable_db_options_.listeners.size() == 0U) {
return;
}
Expand All @@ -581,34 +582,22 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
// release lock while notifying events
mutex_.Unlock();
{
FlushJobInfo info;
info.cf_id = cfd->GetID();
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
info.triggered_writes_slowdown = triggered_writes_slowdown;
info.triggered_writes_stop = triggered_writes_stop;
info.smallest_seqno = file_meta->fd.smallest_seqno;
info.largest_seqno = file_meta->fd.largest_seqno;
info.table_properties = prop;
info.flush_reason = cfd->GetFlushReason();
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, info);
for (auto& info : *flush_jobs_info) {
info->triggered_writes_slowdown = triggered_writes_slowdown;
info->triggered_writes_stop = triggered_writes_stop;
for (auto listener : immutable_db_options_.listeners) {
listener->OnFlushCompleted(this, *info);
}
}
flush_jobs_info->clear();
}
mutex_.Lock();
// no need to signal bg_cv_ as it will be signaled at the end of the
// flush process.
#else
(void)cfd;
(void)file_meta;
(void)mutable_cf_options;
(void)job_id;
(void)prop;
(void)flush_jobs_info;
#endif // ROCKSDB_LITE
}

Expand Down
26 changes: 24 additions & 2 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
s = cfd_->imm()->TryInstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_);
log_buffer_, &committed_flush_jobs_info_);
}

if (s.ok() && file_meta != nullptr) {
Expand Down Expand Up @@ -379,7 +379,7 @@ Status FlushJob::WriteLevel0Table() {
if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) {
s = output_file_directory_->Fsync();
}
TEST_SYNC_POINT("FlushJob::WriteLevel0Table");
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_);
db_mutex_->Lock();
}
base_->Unref();
Expand All @@ -397,6 +397,10 @@ Status FlushJob::WriteLevel0Table() {
meta_.fd.smallest_seqno, meta_.fd.largest_seqno,
meta_.marked_for_compaction);
}
#ifndef ROCKSDB_LITE
// Piggyback FlushJobInfo on the first first flushed memtable.
mems_[0]->SetFlushJobInfo(GetFlushJobInfo());
#endif // !ROCKSDB_LITE

// Note that here we treat flush as level 0 compaction in internal stats
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
Expand All @@ -410,4 +414,22 @@ Status FlushJob::WriteLevel0Table() {
return s;
}

#ifndef ROCKSDB_LITE
std::unique_ptr<FlushJobInfo> FlushJob::GetFlushJobInfo() const {
db_mutex_->AssertHeld();
std::unique_ptr<FlushJobInfo> info(new FlushJobInfo);
info->cf_id = cfd_->GetID();
info->cf_name = cfd_->GetName();
info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path,
meta_.fd.GetNumber());
info->thread_id = db_options_.env->GetThreadID();
info->job_id = job_context_->job_id;
info->smallest_seqno = meta_.fd.smallest_seqno;
info->largest_seqno = meta_.fd.largest_seqno;
info->table_properties = table_properties_;
info->flush_reason = cfd_->GetFlushReason();
return info;
}
#endif // !ROCKSDB_LITE

} // namespace rocksdb
Loading

0 comments on commit f6f91c1

Please sign in to comment.