From 5172201d64b84b8b9bf9f32d50d54dfaf8433899 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Tue, 8 Oct 2019 20:21:59 +0000 Subject: [PATCH] Reproduce data loss with concurrent memtable flush and GC Signed-off-by: Yi Wu --- CMakeLists.txt | 4 +-- src/blob_gc_job.cc | 2 ++ src/blob_gc_picker.cc | 6 ++++ src/db_impl.cc | 5 +++ src/db_impl_files.cc | 1 + src/titan_db_test.cc | 77 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 93 insertions(+), 2 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2418486cc..7a8d36e3d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -5,11 +5,11 @@ enable_language(C) find_package(Git) if (NOT ROCKSDB_GIT_REPO) - set(ROCKSDB_GIT_REPO "https://github.com/pingcap/rocksdb.git") + set(ROCKSDB_GIT_REPO "https://github.com/yiwu-arbug/rocksdb.git") endif() if (NOT ROCKSDB_GIT_BRANCH) - set(ROCKSDB_GIT_BRANCH "6.4.tikv") + set(ROCKSDB_GIT_BRANCH "flush_demo_6.4") endif() if (NOT DEFINED ROCKSDB_DIR) diff --git a/src/blob_gc_job.cc b/src/blob_gc_job.cc index 89a85de7c..5200f1168 100644 --- a/src/blob_gc_job.cc +++ b/src/blob_gc_job.cc @@ -247,6 +247,7 @@ Status BlobGCJob::DoSample(const BlobFileMeta* file, bool* selected) { } Status BlobGCJob::DoRunGC() { + printf("do run gc\n"); Status s; std::unique_ptr gc_iter; @@ -415,6 +416,7 @@ Status BlobGCJob::DiscardEntry(const Slice& key, const BlobIndex& blob_index, if (!s.ok() && !s.IsNotFound()) { return s; } + printf("%s is blob %d\n", key.ToString().c_str(), is_blob_index); // count read bytes for checking LSM entry metrics_.bytes_read += key.size() + index_entry.size(); if (s.IsNotFound() || !is_blob_index) { diff --git a/src/blob_gc_picker.cc b/src/blob_gc_picker.cc index bdf88afd1..8a21385c5 100644 --- a/src/blob_gc_picker.cc +++ b/src/blob_gc_picker.cc @@ -26,7 +26,9 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( bool stop_picking = false; bool maybe_continue_next_time = false; uint64_t next_gc_size = 0; + printf("picker\n"); for (auto& gc_score : blob_storage->gc_score()) { + printf("%lu %lf\n", gc_score.file_number, gc_score.score); if (gc_score.score < cf_options_.blob_file_discardable_ratio) { break; } @@ -37,6 +39,7 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( // or this file had been GCed ROCKS_LOG_INFO(db_options_.info_log, "Blob file %" PRIu64 " no need gc", blob_file->file_number()); + printf("no need gc\n"); continue; } if (!stop_picking) { @@ -71,6 +74,7 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( return nullptr; } // if there is only one small file to merge, no need to perform + /* if (blob_files.size() == 1 && blob_files[0]->file_size() <= cf_options_.merge_small_file_threshold && blob_files[0]->gc_mark() == false && @@ -78,6 +82,8 @@ std::unique_ptr BasicBlobGCPicker::PickBlobGC( cf_options_.blob_file_discardable_ratio) { return nullptr; } + */ + printf("pick %lu files\n", blob_files.size()); return std::unique_ptr(new BlobGC( std::move(blob_files), std::move(cf_options_), maybe_continue_next_time)); diff --git a/src/db_impl.cc b/src/db_impl.cc index da1dac1aa..1a14f3e51 100644 --- a/src/db_impl.cc +++ b/src/db_impl.cc @@ -544,6 +544,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, bool is_blob_index = false; s = db_impl_->GetImpl(options, handle, key, value, nullptr /*value_found*/, nullptr /*read_callback*/, &is_blob_index); + printf("get status %s\n", s.ToString().c_str()); if (!s.ok() || !is_blob_index) return s; StopWatch get_sw(env_, stats_.get(), BLOB_DB_GET_MICROS); @@ -553,6 +554,7 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options, s = index.DecodeFrom(value); assert(s.ok()); if (!s.ok()) return s; + printf("get from file %lu\n", index.file_number); BlobRecord record; PinnableSlice buffer; @@ -959,6 +961,7 @@ bool TitanDBImpl::GetIntProperty(ColumnFamilyHandle* column_family, } void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { + printf("completed flush job id %d\n", flush_job_info.job_id); const auto& tps = flush_job_info.table_properties; auto ucp_iter = tps.user_collected_properties.find( BlobFileSizeCollector::kPropertiesName); @@ -1004,9 +1007,11 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) { ROCKS_LOG_INFO(db_options_.info_log, "OnFlushCompleted[%d]: output blob file %" PRIu64 ".", flush_job_info.job_id, file->file_number()); + printf("complete file %lu\n", file_number); file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted); } } + TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished"); } void TitanDBImpl::OnCompactionCompleted( diff --git a/src/db_impl_files.cc b/src/db_impl_files.cc index fd1f427b5..282992dbb 100644 --- a/src/db_impl_files.cc +++ b/src/db_impl_files.cc @@ -20,6 +20,7 @@ Status TitanDBImpl::PurgeObsoleteFilesImpl() { candidate_files.end()); for (const auto& candidate_file : candidate_files) { + printf("purge file %s\n", candidate_file.c_str()); ROCKS_LOG_INFO(db_options_.info_log, "Titan deleting obsolete file [%s]", candidate_file.c_str()); Status delete_status = env_->DeleteFile(candidate_file); diff --git a/src/titan_db_test.cc b/src/titan_db_test.cc index c002843da..a5ca48d45 100644 --- a/src/titan_db_test.cc +++ b/src/titan_db_test.cc @@ -2,8 +2,10 @@ #include #include +#include "db/db_impl/db_impl.h" #include "file/filename.h" #include "rocksdb/utilities/debug.h" +#include "port/port.h" #include "test_util/sync_point.h" #include "test_util/testharness.h" #include "util/random.h" @@ -1072,6 +1074,81 @@ TEST_F(TitanDBTest, GCAfterDropCF) { Close(); } +TEST_F(TitanDBTest, GCBeforeFlushCommit) { + port::Mutex mu; + port::CondVar cv(&mu); + bool first_flush_pending = false; + bool gc_finished = false; + DBImpl* db_impl = nullptr; + + SyncPoint::GetInstance()->LoadDependency({{ + "TitanDBImpl::OnFlushCompleted:Finished", + "TitanDBTest::GCBeforeFlushCommit:1"}}); + SyncPoint::GetInstance()->SetCallBack( + "FlushJob::BeforeInstallResults", + [&](void*) { + MutexLock l(&mu); + if (first_flush_pending) { + // We are in the second flush. + return; + } + auto* db_mutex = db_impl->mutex(); + db_mutex->Unlock(); + first_flush_pending = true; + cv.SignalAll(); + while (!gc_finished) { + cv.Wait(); + } + db_mutex->Lock(); + }); + + options_.create_if_missing = true; + // Setting max_flush_jobs = max_background_jobs / 4 = 2. + options_.max_background_jobs = 8; + options_.max_write_buffer_number = 4; + options_.min_blob_size = 0; + options_.merge_small_file_threshold = 1024 * 1024; + options_.disable_background_gc = true; + Open(); + + db_impl = reinterpret_cast(db_->GetRootDB()); + SyncPoint::GetInstance()->EnableProcessing(); + + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v")); + // t1 will wait for the second flush complete before install super version. + auto t1 = port::Thread([&]() { + // flush_opts.wait = true + ASSERT_OK(db_->Flush(FlushOptions())); + }); + { + MutexLock l(&mu); + while (!first_flush_pending) { + cv.Wait(); + } + } + // In the second flush we check if memtable has been committed, and signal + // the first flush to proceed. + ASSERT_OK(db_->Put(WriteOptions(), "bar", "v")); + FlushOptions flush_opts; + flush_opts.wait = false; + ASSERT_OK(db_->Flush(flush_opts)); + TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:1"); + ASSERT_OK(db_impl_->TEST_StartGC(db_->DefaultColumnFamily()->GetID())); + ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles()); + { + MutexLock l(&mu); + gc_finished = true; + cv.SignalAll(); + } + t1.join(); + // Check value after memtable committed. + std::string value; + ASSERT_OK(db_->Get(ReadOptions(), "bar", &value)); + ASSERT_EQ("v", value); + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); +} + } // namespace titandb } // namespace rocksdb