Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Fix NotifyOnFlushCompleted() for atomic flush (#8585) #34

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -466,20 +466,27 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
autovector<const autovector<MemTable*>*> mems_list;
autovector<const MutableCFOptions*> mutable_cf_options_list;
autovector<FileMetaData*> tmp_file_meta;
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i != num_cfs; ++i) {
const auto& mems = jobs[i]->GetMemTables();
if (!cfds[i]->IsDropped() && !mems.empty()) {
tmp_cfds.emplace_back(cfds[i]);
mems_list.emplace_back(&mems);
mutable_cf_options_list.emplace_back(&all_mutable_cf_options[i]);
tmp_file_meta.emplace_back(&file_meta[i]);
#ifndef ROCKSDB_LITE
committed_flush_jobs_info.emplace_back(
jobs[i]->GetCommittedFlushJobsInfo());
#endif //! ROCKSDB_LITE
}
}

s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, tmp_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, tmp_file_meta,
&job_context->memtables_to_free, directories_.GetDbDir(), log_buffer);
committed_flush_jobs_info, &job_context->memtables_to_free,
directories_.GetDbDir(), log_buffer);
}

if (s.ok()) {
Expand Down
10 changes: 9 additions & 1 deletion db/flush_job_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -388,10 +388,18 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) {
for (auto cfd : all_cfds) {
mutable_cf_options_list.push_back(cfd->GetLatestMutableCFOptions());
}
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
#ifndef ROCKSDB_LITE
for (auto& job : flush_jobs) {
committed_flush_jobs_info.push_back(job->GetCommittedFlushJobsInfo());
}
#endif //! ROCKSDB_LITE

Status s = InstallMemtableAtomicFlushResults(
nullptr /* imm_lists */, all_cfds, mutable_cf_options_list, mems_list,
versions_.get(), &mutex_, file_meta_ptrs, &job_context.memtables_to_free,
versions_.get(), &mutex_, file_meta_ptrs,
committed_flush_jobs_info, &job_context.memtables_to_free,
nullptr /* db_directory */, nullptr /* log_buffer */);
ASSERT_OK(s);

Expand Down
56 changes: 31 additions & 25 deletions db/listener_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,32 +350,38 @@ TEST_F(EventListenerTest, MultiCF) {
#ifdef ROCKSDB_USING_THREAD_STATUS
options.enable_thread_tracking = true;
#endif // ROCKSDB_USING_THREAD_STATUS
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = {
"pikachu", "ilya", "muromec", "dobrynia",
"nikitich", "alyosha", "popovich"};
CreateAndReopenWithCF(cf_names, options);

ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}
for (auto atomic_flush : {false, true}) {
options.atomic_flush = atomic_flush;
options.create_if_missing = true;
DestroyAndReopen(options);
TestFlushListener* listener = new TestFlushListener(options.env, this);
options.listeners.emplace_back(listener);
options.table_properties_collector_factories.push_back(
std::make_shared<TestPropertiesCollectorFactory>());
std::vector<std::string> cf_names = {"pikachu", "ilya", "muromec",
"dobrynia", "nikitich", "alyosha",
"popovich"};
CreateAndReopenWithCF(cf_names, options);

ASSERT_OK(Put(1, "pikachu", std::string(90000, 'p')));
ASSERT_OK(Put(2, "ilya", std::string(90000, 'i')));
ASSERT_OK(Put(3, "muromec", std::string(90000, 'm')));
ASSERT_OK(Put(4, "dobrynia", std::string(90000, 'd')));
ASSERT_OK(Put(5, "nikitich", std::string(90000, 'n')));
ASSERT_OK(Put(6, "alyosha", std::string(90000, 'a')));
ASSERT_OK(Put(7, "popovich", std::string(90000, 'p')));
for (int i = 1; i < 8; ++i) {
ASSERT_OK(Flush(i));
ASSERT_EQ(listener->flushed_dbs_.size(), i);
ASSERT_EQ(listener->flushed_column_family_names_.size(), i);
}

// make sure callback functions are called in the right order
for (size_t i = 0; i < cf_names.size(); i++) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
// make sure callback functions are called in the right order
for (size_t i = 0; i < cf_names.size(); i++) {
ASSERT_EQ(listener->flushed_dbs_[i], db_);
ASSERT_EQ(listener->flushed_column_family_names_[i], cf_names[i]);
}
Close();
}
}

Expand Down
13 changes: 13 additions & 0 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,8 @@ Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_metas,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater(
Expand All @@ -666,6 +668,17 @@ Status InstallMemtableAtomicFlushResults(
(*mems_list[k])[i]->SetFlushCompleted(true);
(*mems_list[k])[i]->SetFileNumber(file_metas[k]->fd.GetNumber());
}
#ifndef ROCKSDB_LITE
if (committed_flush_jobs_info[k]) {
assert(!mems_list[k]->empty());
assert((*mems_list[k])[0]);
std::unique_ptr<FlushJobInfo> flush_job_info =
(*mems_list[k])[0]->ReleaseFlushJobInfo();
committed_flush_jobs_info[k]->push_back(std::move(flush_job_info));
}
#else //! ROCKSDB_LITE
(void)committed_flush_jobs_info;
#endif // ROCKSDB_LITE
}

Status s;
Expand Down
6 changes: 6 additions & 0 deletions db/memtable_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ class MemTableListVersion {
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -375,6 +377,8 @@ class MemTableList {
const autovector<const autovector<MemTable*>*>& mems_list,
VersionSet* vset, InstrumentedMutex* mu,
const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);

Expand Down Expand Up @@ -417,6 +421,8 @@ extern Status InstallMemtableAtomicFlushResults(
const autovector<const MutableCFOptions*>& mutable_cf_options_list,
const autovector<const autovector<MemTable*>*>& mems_list, VersionSet* vset,
InstrumentedMutex* mu, const autovector<FileMetaData*>& file_meta,
const autovector<std::list<std::unique_ptr<FlushJobInfo>>*>&
committed_flush_jobs_info,
autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
} // namespace rocksdb
11 changes: 10 additions & 1 deletion db/memtable_list_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,20 @@ class MemTableListTest : public testing::Test {
for (auto& meta : file_metas) {
file_meta_ptrs.push_back(&meta);
}
std::vector<std::list<std::unique_ptr<FlushJobInfo>>>
committed_flush_jobs_info_storage(cf_ids.size());
autovector<std::list<std::unique_ptr<FlushJobInfo>>*>
committed_flush_jobs_info;
for (int i = 0; i < static_cast<int>(cf_ids.size()); ++i) {
committed_flush_jobs_info.push_back(
&committed_flush_jobs_info_storage[i]);
}

InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex);
return InstallMemtableAtomicFlushResults(
&lists, cfds, mutable_cf_options_list, mems_list, &versions, &mutex,
file_meta_ptrs, to_delete, nullptr, &log_buffer);
file_meta_ptrs, committed_flush_jobs_info, to_delete, nullptr, &log_buffer);
}
};

Expand Down