Skip to content

Commit

Permalink
Fix NotifyOnFlushCompleted() for atomic flush (facebook#8585) (facebo…
Browse files Browse the repository at this point in the history
…ok#34)

Summary:
PR facebook#5908 added `flush_jobs_info_` to `FlushJob` to make sure
`OnFlushCompleted()` is called after committing flush results to
MANIFEST. However, `flush_jobs_info_` is not updated in atomic
flush, causing `NotifyOnFlushCompleted()` to skip `OnFlushCompleted()`.

This PR fixes this, in a similar way to facebook#5908 that handles regular flush.

Pull Request resolved: facebook#8585

Test Plan: make check

Reviewed By: jay-zhuang

Differential Revision: D29913720

Pulled By: riversand963

fbshipit-source-id: 4ff023c98372fa2c93188d4a5c8a4e9ffa0f4dda

Co-authored-by: Yanqin Jin <yanqin@fb.com>
  • Loading branch information
empiredan and riversand963 authored Dec 23, 2021
1 parent af8729e commit ef29819
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 28 deletions.
9 changes: 8 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,20 +466,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]);
#ifndef ROCKSDB_LITE
committed_flush_jobs_info.emplace_back(
jobs[i]->GetCommittedFlushJobsInfo());
#endif //! ROCKSDB_LITE
}
}

s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
committed_flush_jobs_info, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
}

if (s.ok()) {
Expand Down
10 changes: 9 additions & 1 deletion db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,18 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
for (auto cfd : all_cfds) {
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
}
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
#ifndef ROCKSDB_LITE
for (auto& job : flush_jobs) {
committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
}
#endif //! ROCKSDB_LITE

Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
versions_.get(), &mutex_, file_meta_ptrs,
committed_flush_jobs_info, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);

Expand Down
56 changes: 31 additions & 25 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,32 +350,38 @@ TEST_F(EventListenerTest, MultiCF) {
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, options);

ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}
for (auto atomic_flush : {false, true}) {
options.atomic_flush = atomic_flush;
options.create_if_missing = true;
DestroyAndReopen(options);
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
"dobrynia", "nikitich", "alyosha",
"popovich"};
CreateAndReopenWithCF(cf_names, options);

ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}

// make sure callback functions are called in the right order
for (size_t i = 0; i < cf_names.size(); i++) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
// make sure callback functions are called in the right order
for (size_t i = 0; i < cf_names.size(); i++) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
}
Close();
}
}

Expand Down
13 changes: 13 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
Expand All @@ -666,6 +668,17 @@ Status InstallMemtableAtomicFlushResults(
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
}
#ifndef ROCKSDB_LITE
if (committed_flush_jobs_info[k]) {
assert(!mems_list[k]->empty());
assert((*mems_list[k])[0]);
std::unique_ptr<FlushJobInfo> flush_job_info =
(*mems_list[k])[0]->ReleaseFlushJobInfo();
committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
}
#else //! ROCKSDB_LITE
(void)committed_flush_jobs_info;
#endif // ROCKSDB_LITE
}

Status s;
Expand Down
6 changes: 6 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class MemTableListVersion {
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -375,6 +377,8 @@ class MemTableList {
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -417,6 +421,8 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
} // namespace rocksdb
11 changes: 10 additions & 1 deletion db/memtable_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,20 @@ class MemTableListTest : public testing::Test {
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
committed_flush_jobs_info_storage(cf_ids.size());
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
committed_flush_jobs_info.push_back(
&committed_flush_jobs_info_storage[i]);
}

InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_meta_ptrs, to_delete, nullptr, &log_buffer);
file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
}
};

Expand Down

0 comments on commit ef29819

Please sign in to comment.