From 3af40cf1b11819a4dea9ade0200c9b68066c355d Mon Sep 17 00:00:00 2001 From: yiwu-arbug Date: Tue, 29 Oct 2019 23:00:50 -0700 Subject: [PATCH] Fix OnFlushCompleted fired before flush result write to MANIFEST (#5908) (#127) Summary: When there are concurrent flush job on the same CF, `OnFlushCompleted` can be called before the flush result being install to LSM. Fixing the issue by passing `FlushJobInfo` through `MemTable`, and the thread who commit the flush result can fetch the `FlushJobInfo` and fire `OnFlushCompleted` on behave of the thread actually writing the SST. Fix https://github.com/facebook/rocksdb/issues/5892 Pull Request resolved: https://github.com/facebook/rocksdb/pull/5908 Test Plan: Add new test. The test will fail without the fix. Differential Revision: D17916144 Pulled By: riversand963 fbshipit-source-id: e18df67d9533b5baee52ae3605026cdeb05cbe10 Signed-off-by: Yi Wu --- db/db_flush_test.cc | 95 ++++++++++++++++++++++++++++++++++ db/db_impl.h | 8 +-- db/db_impl_compaction_flush.cc | 77 ++++++++++++--------------- db/flush_job.cc | 26 +++++++++- db/flush_job.h | 18 ++++++- db/flush_job_test.cc | 12 ++--- db/memtable.h | 16 ++++++ db/memtable_list.cc | 11 +++- db/memtable_list.h | 5 +- db/memtable_list_test.cc | 6 ++- include/rocksdb/listener.h | 4 +- 11 files changed, 214 insertions(+), 64 deletions(-) diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index 8a4d8fc63a1..1a333cddd25 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -7,8 +7,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include + +#include "db/db_impl.h" #include "db/db_test_util.h" +#include "port/port.h" #include "port/stack_trace.h" +#include "util/cast_util.h" #include "util/fault_injection_test_env.h" #include "util/sync_point.h" @@ -290,6 +295,96 @@ TEST_F(DBFlushTest, ManualFlushFailsInReadOnlyMode) { Close(); } +#ifndef ROCKSDB_LITE +TEST_F(DBFlushTest, FireOnFlushCompletedAfterCommittedResult) { + class TestListener : public EventListener { + public: + void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { + // There's only one key in each flush. + ASSERT_EQ(info.smallest_seqno, info.largest_seqno); + ASSERT_NE(0, info.smallest_seqno); + if (info.smallest_seqno == seq1) { + // First flush completed + ASSERT_FALSE(completed1); + completed1 = true; + CheckFlushResultCommitted(db, seq1); + } else { + // Second flush completed + ASSERT_FALSE(completed2); + completed2 = true; + ASSERT_EQ(info.smallest_seqno, seq2); + CheckFlushResultCommitted(db, seq2); + } + } + + void CheckFlushResultCommitted(DB* db, SequenceNumber seq) { + DBImpl* db_impl = static_cast_with_check(db); + InstrumentedMutex* mutex = db_impl->mutex(); + mutex->Lock(); + auto* cfd = + reinterpret_cast(db->DefaultColumnFamily()) + ->cfd(); + ASSERT_LT(seq, cfd->imm()->current()->GetEarliestSequenceNumber()); + mutex->Unlock(); + } + + std::atomic seq1{0}; + std::atomic seq2{0}; + std::atomic completed1{false}; + std::atomic completed2{false}; + }; + std::shared_ptr listener = std::make_shared(); + + SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::FlushMemTable:AfterScheduleFlush", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"}, + {"DBImpl::FlushMemTableToOutputFile:Finish", + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitSecond"}}); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::WriteLevel0Table", [&listener](void* arg) { + // Wait for the second flush finished, out of mutex. + auto* mems = reinterpret_cast*>(arg); + if (mems->front()->GetEarliestSequenceNumber() == listener->seq1 - 1) { + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:" + "WaitSecond"); + } + }); + + Options options = CurrentOptions(); + options.create_if_missing = true; + options.listeners.push_back(listener); + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options.max_background_jobs = 8; + // Allow 2 immutable memtables. + options.max_write_buffer_number = 3; + Reopen(options); + SyncPoint::GetInstance()->EnableProcessing(); + ASSERT_OK(Put("foo", "v")); + listener->seq1 = db_->GetLatestSequenceNumber(); + // t1 will wait for the second flush complete before committing flush result. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + // Wait for first flush scheduled. + TEST_SYNC_POINT( + "DBFlushTest::FireOnFlushCompletedAfterCommittedResult:WaitFirst"); + // The second flush will exit early without commit its result. The work + // is delegated to the first flush. + ASSERT_OK(Put("bar", "v")); + listener->seq2 = db_->GetLatestSequenceNumber(); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + t1.join(); + ASSERT_TRUE(listener->completed1); + ASSERT_TRUE(listener->completed2); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} +#endif // !ROCKSDB_LITE + TEST_P(DBAtomicFlushTest, ManualAtomicFlush) { Options options = CurrentOptions(); options.create_if_missing = true; diff --git a/db/db_impl.h b/db/db_impl.h index 61bec147a17..6c6a30283c2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -794,11 +794,11 @@ class DBImpl : public DB { void NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + int job_id); - void NotifyOnFlushCompleted(ColumnFamilyData* cfd, FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop); + void NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + std::list>* flush_jobs_info); void NotifyOnCompactionBegin(ColumnFamilyData* cfd, Compaction *c, const Status &st, diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index ea37205a415..4a34c4d6cd1 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -141,8 +141,7 @@ Status DBImpl::FlushMemTableToOutputFile( #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id, - flush_job.GetTableProperties()); + NotifyOnFlushBegin(cfd, &file_meta, mutable_cf_options, job_context->job_id); #endif // ROCKSDB_LITE Status s; @@ -190,8 +189,8 @@ Status DBImpl::FlushMemTableToOutputFile( if (s.ok()) { #ifndef ROCKSDB_LITE // may temporarily unlock and lock the mutex. - NotifyOnFlushCompleted(cfd, &file_meta, mutable_cf_options, - job_context->job_id, flush_job.GetTableProperties()); + NotifyOnFlushCompleted(cfd, mutable_cf_options, + flush_job.GetCommittedFlushJobsInfo()); auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); if (sfm) { @@ -209,6 +208,7 @@ Status DBImpl::FlushMemTableToOutputFile( } #endif // ROCKSDB_LITE } + TEST_SYNC_POINT("DBImpl::FlushMemTableToOutputFile:Finish"); return s; } @@ -274,7 +274,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( snapshot_checker = DisableGCSnapshotChecker::Instance(); } autovector distinct_output_dirs; - std::vector jobs; + std::vector> jobs; std::vector all_mutable_cf_options; int num_cfs = static_cast(cfds.size()); all_mutable_cf_options.reserve(num_cfs); @@ -299,15 +299,15 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.emplace_back(*cfd->GetLatestMutableCFOptions()); const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.back(); const uint64_t* max_memtable_id = &(bg_flush_args[i].max_memtable_id_); - jobs.emplace_back( + jobs.emplace_back(new FlushJob( dbname_, cfds[i], immutable_db_options_, mutable_cf_options, max_memtable_id, env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), data_dir, GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats, - false /* sync_output_directory */, false /* write_manifest */); - jobs.back().PickMemTable(); + false /* sync_output_directory */, false /* write_manifest */)); + jobs.back()->PickMemTable(); } std::vector file_meta(num_cfs); @@ -319,7 +319,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( const MutableCFOptions& mutable_cf_options = all_mutable_cf_options.at(i); // may temporarily unlock and lock the mutex. NotifyOnFlushBegin(cfds[i], &file_meta[i], mutable_cf_options, - job_context->job_id, jobs[i].GetTableProperties()); + job_context->job_id); } #endif /* !ROCKSDB_LITE */ @@ -341,7 +341,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // TODO (yanqin): parallelize jobs with threads. for (int i = 1; i != num_cfs; ++i) { exec_status[i].second = - jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]); + jobs[i]->Run(&logs_with_prep_tracker_, &file_meta[i]); exec_status[i].first = true; } if (num_cfs > 1) { @@ -350,8 +350,10 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( TEST_SYNC_POINT( "DBImpl::AtomicFlushMemTablesToOutputFiles:SomeFlushJobsComplete:2"); } + assert(exec_status.size() > 0); + assert(!file_meta.empty()); exec_status[0].second = - jobs[0].Run(&logs_with_prep_tracker_, &file_meta[0]); + jobs[0]->Run(&logs_with_prep_tracker_, &file_meta[0]); exec_status[0].first = true; Status error_status; @@ -386,7 +388,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( auto wait_to_install_func = [&]() { bool ready = true; for (size_t i = 0; i != cfds.size(); ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (cfds[i]->IsDropped()) { // If the column family is dropped, then do not wait. continue; @@ -427,7 +429,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( autovector mutable_cf_options_list; autovector tmp_file_meta; for (int i = 0; i != num_cfs; ++i) { - const auto& mems = jobs[i].GetMemTables(); + const auto& mems = jobs[i]->GetMemTables(); if (!cfds[i]->IsDropped() && !mems.empty()) { tmp_cfds.emplace_back(cfds[i]); mems_list.emplace_back(&mems); @@ -463,12 +465,13 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( #ifndef ROCKSDB_LITE auto sfm = static_cast( immutable_db_options_.sst_file_manager.get()); + assert(all_mutable_cf_options.size() == static_cast(num_cfs)); for (int i = 0; i != num_cfs; ++i) { if (cfds[i]->IsDropped()) { continue; } - NotifyOnFlushCompleted(cfds[i], &file_meta[i], all_mutable_cf_options[i], - job_context->job_id, jobs[i].GetTableProperties()); + NotifyOnFlushCompleted(cfds[i], all_mutable_cf_options[i], + jobs[i]->GetCommittedFlushJobsInfo()); if (sfm) { std::string file_path = MakeTableFileName( cfds[i]->ioptions()->cf_paths[0].path, file_meta[i].fd.GetNumber()); @@ -492,12 +495,12 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // unref the versions. for (int i = 0; i != num_cfs; ++i) { if (!exec_status[i].first) { - jobs[i].Cancel(); + jobs[i]->Cancel(); } } for (int i = 0; i != num_cfs; ++i) { if (exec_status[i].first && exec_status[i].second.ok()) { - auto& mems = jobs[i].GetMemTables(); + auto& mems = jobs[i]->GetMemTables(); cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber()); } @@ -511,7 +514,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { + int job_id) { #ifndef ROCKSDB_LITE if (immutable_db_options_.listeners.size() == 0U) { return; @@ -542,7 +545,6 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.triggered_writes_stop = triggered_writes_stop; info.smallest_seqno = file_meta->fd.smallest_seqno; info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; info.flush_reason = cfd->GetFlushReason(); for (auto listener : immutable_db_options_.listeners) { listener->OnFlushBegin(this, info); @@ -556,15 +558,14 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, (void)file_meta; (void)mutable_cf_options; (void)job_id; - (void)prop; #endif // ROCKSDB_LITE } -void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, - FileMetaData* file_meta, - const MutableCFOptions& mutable_cf_options, - int job_id, TableProperties prop) { +void DBImpl::NotifyOnFlushCompleted( + ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, + std::list>* flush_jobs_info) { #ifndef ROCKSDB_LITE + assert(flush_jobs_info != nullptr); if (immutable_db_options_.listeners.size() == 0U) { return; } @@ -581,34 +582,22 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, // release lock while notifying events mutex_.Unlock(); { - FlushJobInfo info; - info.cf_id = cfd->GetID(); - info.cf_name = cfd->GetName(); - // TODO(yhchiang): make db_paths dynamic in case flush does not - // go to L0 in the future. - info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, - file_meta->fd.GetNumber()); - info.thread_id = env_->GetThreadID(); - info.job_id = job_id; - info.triggered_writes_slowdown = triggered_writes_slowdown; - info.triggered_writes_stop = triggered_writes_stop; - info.smallest_seqno = file_meta->fd.smallest_seqno; - info.largest_seqno = file_meta->fd.largest_seqno; - info.table_properties = prop; - info.flush_reason = cfd->GetFlushReason(); - for (auto listener : immutable_db_options_.listeners) { - listener->OnFlushCompleted(this, info); + for (auto& info : *flush_jobs_info) { + info->triggered_writes_slowdown = triggered_writes_slowdown; + info->triggered_writes_stop = triggered_writes_stop; + for (auto listener : immutable_db_options_.listeners) { + listener->OnFlushCompleted(this, *info); + } } + flush_jobs_info->clear(); } mutex_.Lock(); // no need to signal bg_cv_ as it will be signaled at the end of the // flush process. #else (void)cfd; - (void)file_meta; (void)mutable_cf_options; - (void)job_id; - (void)prop; + (void)flush_jobs_info; #endif // ROCKSDB_LITE } diff --git a/db/flush_job.cc b/db/flush_job.cc index 8769c849e46..462b3e88992 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -237,7 +237,7 @@ Status FlushJob::Run(LogsWithPrepTracker* prep_tracker, s = cfd_->imm()->TryInstallMemtableFlushResults( cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, - log_buffer_); + log_buffer_, &committed_flush_jobs_info_); } if (s.ok() && file_meta != nullptr) { @@ -379,7 +379,7 @@ Status FlushJob::WriteLevel0Table() { if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { s = output_file_directory_->Fsync(); } - TEST_SYNC_POINT("FlushJob::WriteLevel0Table"); + TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); db_mutex_->Lock(); } base_->Unref(); @@ -397,6 +397,10 @@ Status FlushJob::WriteLevel0Table() { meta_.fd.smallest_seqno, meta_.fd.largest_seqno, meta_.marked_for_compaction); } +#ifndef ROCKSDB_LITE + // Piggyback FlushJobInfo on the first first flushed memtable. + mems_[0]->SetFlushJobInfo(GetFlushJobInfo()); +#endif // !ROCKSDB_LITE // Note that here we treat flush as level 0 compaction in internal stats InternalStats::CompactionStats stats(CompactionReason::kFlush, 1); @@ -410,4 +414,22 @@ Status FlushJob::WriteLevel0Table() { return s; } +#ifndef ROCKSDB_LITE +std::unique_ptr FlushJob::GetFlushJobInfo() const { + db_mutex_->AssertHeld(); + std::unique_ptr info(new FlushJobInfo); + info->cf_id = cfd_->GetID(); + info->cf_name = cfd_->GetName(); + info->file_path = MakeTableFileName(cfd_->ioptions()->cf_paths[0].path, + meta_.fd.GetNumber()); + info->thread_id = db_options_.env->GetThreadID(); + info->job_id = job_context_->job_id; + info->smallest_seqno = meta_.fd.smallest_seqno; + info->largest_seqno = meta_.fd.largest_seqno; + info->table_properties = table_properties_; + info->flush_reason = cfd_->GetFlushReason(); + return info; +} +#endif // !ROCKSDB_LITE + } // namespace rocksdb diff --git a/db/flush_job.h b/db/flush_job.h index d993e410d1f..37f736d0f9e 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -11,10 +11,11 @@ #include #include #include +#include #include +#include #include #include -#include #include "db/column_family.h" #include "db/dbformat.h" @@ -33,6 +34,7 @@ #include "port/port.h" #include "rocksdb/db.h" #include "rocksdb/env.h" +#include "rocksdb/listener.h" #include "rocksdb/memtablerep.h" #include "rocksdb/transaction_log.h" #include "table/scoped_arena_iterator.h" @@ -78,14 +80,22 @@ class FlushJob { Status Run(LogsWithPrepTracker* prep_tracker = nullptr, FileMetaData* file_meta = nullptr); void Cancel(); - TableProperties GetTableProperties() const { return table_properties_; } const autovector& GetMemTables() const { return mems_; } +#ifndef ROCKSDB_LITE + std::list>* GetCommittedFlushJobsInfo() { + return &committed_flush_jobs_info_; + } +#endif // !ROCKSDB_LITE + private: void ReportStartedFlush(); void ReportFlushInputSize(const autovector& mems); void RecordFlushIOStats(); Status WriteLevel0Table(); +#ifndef ROCKSDB_LITE + std::unique_ptr GetFlushJobInfo() const; +#endif // !ROCKSDB_LITE const std::string& dbname_; ColumnFamilyData* cfd_; @@ -130,6 +140,10 @@ class FlushJob { // In this case, only after all flush jobs succeed in flush can RocksDB // commit to the MANIFEST. const bool write_manifest_; + // The current flush job can commit flush result of a concurrent flush job. + // We collect FlushJobInfo of all jobs committed by current job and fire + // OnFlushCompleted for them. + std::list> committed_flush_jobs_info_; // Variables below are set by PickMemTable(): FileMetaData meta_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 1f7bc7b845b..86166a616ff 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -294,17 +294,17 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { EventLogger event_logger(db_options_.info_log.get()); SnapshotChecker* snapshot_checker = nullptr; // not relevant - std::vector flush_jobs; + std::vector> flush_jobs; k = 0; for (auto cfd : all_cfds) { std::vector snapshot_seqs; - flush_jobs.emplace_back( + flush_jobs.emplace_back(new FlushJob( dbname_, cfd, db_options_, *cfd->GetLatestMutableCFOptions(), &memtable_ids[k], env_options_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, kMaxSequenceNumber, snapshot_checker, &job_context, nullptr, nullptr, nullptr, kNoCompression, db_options_.statistics.get(), &event_logger, true, - false /* sync_output_directory */, false /* write_manifest */); + false /* sync_output_directory */, false /* write_manifest */)); k++; } HistogramData hist; @@ -313,12 +313,12 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { file_metas.reserve(flush_jobs.size()); mutex_.Lock(); for (auto& job : flush_jobs) { - job.PickMemTable(); + job->PickMemTable(); } for (auto& job : flush_jobs) { FileMetaData meta; // Run will release and re-acquire mutex - ASSERT_OK(job.Run(nullptr /**/, &meta)); + ASSERT_OK(job->Run(nullptr /**/, &meta)); file_metas.emplace_back(meta); } autovector file_meta_ptrs; @@ -327,7 +327,7 @@ TEST_F(FlushJobTest, FlushMemtablesMultipleColumnFamilies) { } autovector*> mems_list; for (size_t i = 0; i != all_cfds.size(); ++i) { - const auto& mems = flush_jobs[i].GetMemTables(); + const auto& mems = flush_jobs[i]->GetMemTables(); mems_list.push_back(&mems); } autovector mutable_cf_options_list; diff --git a/db/memtable.h b/db/memtable.h index 6ce28961ea6..d0f74c66c7e 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -31,6 +31,7 @@ namespace rocksdb { +struct FlushJobInfo; class Mutex; class MemTableIterator; class MergeContext; @@ -396,6 +397,16 @@ class MemTable { flush_in_progress_ = in_progress; } +#ifndef ROCKSDB_LITE + void SetFlushJobInfo(std::unique_ptr&& info) { + flush_job_info_ = std::move(info); + } + + std::unique_ptr ReleaseFlushJobInfo() { + return std::move(flush_job_info_); + } +#endif // !ROCKSDB_LITE + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -474,6 +485,11 @@ class MemTable { // writes with sequence number smaller than seq are flushed. SequenceNumber atomic_flush_seqno_; +#ifndef ROCKSDB_LITE + // Flush job info of the current memtable. + std::unique_ptr flush_job_info_; +#endif // !ROCKSDB_LITE + // Returns a heuristic flush decision bool ShouldFlushNow() const; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 9397dbc7e00..c5abac76584 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -322,7 +322,8 @@ Status MemTableList::TryInstallMemtableFlushResults( const autovector& mems, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer) { + LogBuffer* log_buffer, + std::list>* committed_flush_jobs_info) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); mu->AssertHeld(); @@ -380,6 +381,14 @@ Status MemTableList::TryInstallMemtableFlushResults( cfd->GetName().c_str(), m->file_number_); edit_list.push_back(&m->edit_); memtables_to_flush.push_back(m); +#ifndef ROCKSDB_LITE + std::unique_ptr info = m->ReleaseFlushJobInfo(); + if (info != nullptr) { + committed_flush_jobs_info->push_back(std::move(info)); + } +#else + (void)committed_flush_jobs_info; +#endif // !ROCKSDB_LITE } batch_count++; } diff --git a/db/memtable_list.h b/db/memtable_list.h index b56ad4932c4..f70428ca0d2 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -33,6 +33,8 @@ class InstrumentedMutex; class MergeIteratorBuilder; class MemTableList; +struct FlushJobInfo; + // keeps a list of immutable memtables in a vector. the list is immutable // if refcount is bigger than one. It is used as a state for Get() and // Iterator code paths @@ -227,7 +229,8 @@ class MemTableList { const autovector& m, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, autovector* to_delete, Directory* db_directory, - LogBuffer* log_buffer); + LogBuffer* log_buffer, + std::list>* committed_flush_jobs_info); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index f0f4b0bb0cb..351bdbfa171 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -117,9 +117,11 @@ class MemTableListTest : public testing::Test { // Create dummy mutex. InstrumentedMutex mutex; InstrumentedMutexLock l(&mutex); - return list->TryInstallMemtableFlushResults( + std::list> flush_jobs_info; + Status s = list->TryInstallMemtableFlushResults( cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, - file_num, to_delete, nullptr, &log_buffer); + file_num, to_delete, nullptr, &log_buffer, &flush_jobs_info); + return s; } // Calls MemTableList::InstallMemtableFlushResults() and sets up all diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 7c2c26785c3..22706b59de7 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -457,8 +457,8 @@ class EventListener { #else -class EventListener { -}; +class EventListener {}; +struct FlushJobInfo {}; #endif // ROCKSDB_LITE