Skip to content

Commit

Permalink
Fix Titan missing blob issue (#97)
Browse files Browse the repository at this point in the history
Cherry-picking the following patches for fixing #93 
```
0a3f87a 2019-10-15 yiwu@pingcap.com     Temp fix for data loss caused by concurrent flush (#96)
8ac5003 2019-10-16 zbk602423539@gmail.. merge BackgroundGC with TEST_StartGC (#94)
b9915d9 2019-10-07 zbk602423539@gmail.. check nullptr (#91)
```
  • Loading branch information
yiwu-arbug authored Oct 16, 2019
1 parent ab0743c commit 43225d4
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 90 deletions.
4 changes: 2 additions & 2 deletions src/blob_gc_picker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ std::unique_ptr<BlobGC> BasicBlobGCPicker::PickBlobGC(
}

bool BasicBlobGCPicker::CheckBlobFile(BlobFileMeta* blob_file) const {
assert(blob_file != nullptr &&
assert(blob_file == nullptr ||
blob_file->file_state() != BlobFileMeta::FileState::kInit);
if (blob_file != nullptr &&
if (blob_file == nullptr ||
blob_file->file_state() != BlobFileMeta::FileState::kNormal)
return false;

Expand Down
21 changes: 16 additions & 5 deletions src/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,12 +553,17 @@ Status TitanDBImpl::GetImpl(const ReadOptions& options,
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock();

{
if (storage) {
StopWatch read_sw(env_, stats_.get(), BLOB_DB_BLOB_FILE_READ_MICROS);
s = storage->Get(options, index, &record, &buffer);
RecordTick(stats_.get(), BLOB_DB_NUM_KEYS_READ);
RecordTick(stats_.get(), BLOB_DB_BLOB_FILE_BYTES_READ,
index.blob_handle.size);
} else {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return Status::NotFound(
"Column family id: " + std::to_string(handle->GetID()) + " not Found.");
}
if (s.IsCorruption()) {
ROCKS_LOG_ERROR(db_options_.info_log,
Expand Down Expand Up @@ -625,15 +630,20 @@ Iterator* TitanDBImpl::NewIteratorImpl(
auto cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(handle)->cfd();

mutex_.Lock();
auto storage = blob_file_set_->GetBlobStorage(handle->GetID());
auto storage = blob_file_set_->GetBlobStorage(handle->GetID()).lock();
mutex_.Unlock();

if (!storage) {
ROCKS_LOG_ERROR(db_options_.info_log,
"Column family id:%" PRIu32 " not Found.", handle->GetID());
return nullptr;
}

std::unique_ptr<ArenaWrappedDBIter> iter(db_impl_->NewIteratorImpl(
options, cfd, options.snapshot->GetSequenceNumber(),
nullptr /*read_callback*/, true /*allow_blob*/, true /*allow_refresh*/));
return new TitanDBIterator(options, storage.lock().get(), snapshot,
std::move(iter), env_, stats_.get(),
db_options_.info_log.get());
return new TitanDBIterator(options, storage.get(), snapshot, std::move(iter),
env_, stats_.get(), db_options_.info_log.get());
}

Status TitanDBImpl::NewIterators(
Expand Down Expand Up @@ -984,6 +994,7 @@ void TitanDBImpl::OnFlushCompleted(const FlushJobInfo& flush_job_info) {
file->FileStateTransit(BlobFileMeta::FileEvent::kFlushCompleted);
}
}
TEST_SYNC_POINT("TitanDBImpl::OnFlushCompleted:Finished");
}

void TitanDBImpl::OnCompactionCompleted(
Expand Down
2 changes: 1 addition & 1 deletion src/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class TitanDBImpl : public TitanDB {

static void BGWorkGC(void* db);
void BackgroundCallGC();
Status BackgroundGC(LogBuffer* log_buffer);
Status BackgroundGC(LogBuffer* log_buffer, uint32_t column_family_id);

void PurgeObsoleteFiles();
Status PurgeObsoleteFilesImpl();
Expand Down
123 changes: 41 additions & 82 deletions src/db_impl_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ void TitanDBImpl::BGWorkGC(void* db) {
}

void TitanDBImpl::BackgroundCallGC() {
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get());
TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeGCRunning");

{
MutexLock l(&mutex_);
assert(bg_gc_scheduled_ > 0);
Expand All @@ -41,13 +39,17 @@ void TitanDBImpl::BackgroundCallGC() {
bg_gc_running_++;

TEST_SYNC_POINT("TitanDBImpl::BackgroundCallGC:BeforeBackgroundGC");
BackgroundGC(&log_buffer);

{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL,
db_options_.info_log.get());
BackgroundGC(&log_buffer, column_family_id);
{
mutex_.Unlock();
log_buffer.FlushBufferToLog();
LogFlush(db_options_.info_log.get());
mutex_.Lock();
}
}

bg_gc_running_--;
Expand All @@ -67,36 +69,35 @@ void TitanDBImpl::BackgroundCallGC() {
}
}

Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer,
uint32_t column_family_id) {
mutex_.AssertHeld();
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);

std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;
Status s;
if (!gc_queue_.empty()) {
uint32_t column_family_id = PopFirstFromGCQueue();
std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
stats_.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}

std::shared_ptr<BlobStorage> blob_storage;
// Skip CFs that have been dropped.
if (!blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
blob_storage = blob_file_set_->GetBlobStorage(column_family_id).lock();
} else {
TEST_SYNC_POINT_CALLBACK("TitanDBImpl::BackgroundGC:CFDropped", nullptr);
ROCKS_LOG_BUFFER(log_buffer, "GC skip dropped colum family [%s].",
cf_info_[column_family_id].name.c_str());
}
if (blob_storage != nullptr) {
const auto& cf_options = blob_storage->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options,
stats_.get());
blob_gc = blob_gc_picker->PickBlobGC(blob_storage.get());

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}
}

Expand All @@ -106,6 +107,12 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
// Nothing to do
ROCKS_LOG_BUFFER(log_buffer, "Titan GC nothing to do");
} else {
{
mutex_.Unlock();
auto* cfd = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh.get())->cfd();
db_impl_->WaitForFlushMemTable(cfd);
mutex_.Lock();
}
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), log_buffer, &shuting_down_,
Expand Down Expand Up @@ -146,7 +153,6 @@ Status TitanDBImpl::BackgroundGC(LogBuffer* log_buffer) {
return s;
}

// TODO(yiwu): merge with BackgroundGC().
Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
// BackgroundCallGC
Status s;
Expand All @@ -160,54 +166,7 @@ Status TitanDBImpl::TEST_StartGC(uint32_t column_family_id) {
bg_gc_running_++;
bg_gc_scheduled_++;

// BackgroudGC
StopWatch gc_sw(env_, stats_.get(), BLOB_DB_GC_MICROS);

std::unique_ptr<BlobGC> blob_gc;
std::unique_ptr<ColumnFamilyHandle> cfh;

if (blob_file_set_->IsColumnFamilyObsolete(column_family_id)) {
return Status::ShutdownInProgress(
"Column Family has been dropped before GC.");
}
auto bs = blob_file_set_->GetBlobStorage(column_family_id).lock().get();
const auto& cf_options = bs->cf_options();
std::shared_ptr<BlobGCPicker> blob_gc_picker =
std::make_shared<BasicBlobGCPicker>(db_options_, cf_options, nullptr);
blob_gc = blob_gc_picker->PickBlobGC(bs);

if (blob_gc) {
cfh = db_impl_->GetColumnFamilyHandleUnlocked(column_family_id);
assert(column_family_id == cfh->GetID());
blob_gc->SetColumnFamily(cfh.get());
}

if (UNLIKELY(!blob_gc)) {
ROCKS_LOG_BUFFER(&log_buffer, "Titan GC nothing to do");
} else {
BlobGCJob blob_gc_job(blob_gc.get(), db_, &mutex_, db_options_, env_,
env_options_, blob_manager_.get(),
blob_file_set_.get(), &log_buffer, &shuting_down_,
stats_.get());
s = blob_gc_job.Prepare();

if (s.ok()) {
mutex_.Unlock();
s = blob_gc_job.Run();
mutex_.Lock();
}

if (s.ok()) {
s = blob_gc_job.Finish();
}

blob_gc->ReleaseGcFiles();
}

if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, "Titan GC error: %s",
s.ToString().c_str());
}
s = BackgroundGC(&log_buffer, column_family_id);

{
mutex_.Unlock();
Expand Down
105 changes: 105 additions & 0 deletions src/titan_db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#include "blob_file_iterator.h"
#include "blob_file_reader.h"
#include "blob_file_size_collector.h"
#include "db/db_impl.h"
#include "db_impl.h"
#include "db_iter.h"
#include "monitoring/statistics.h"
#include "port/port.h"
#include "rocksdb/utilities/debug.h"
#include "titan/db.h"
#include "titan_fault_injection_test_env.h"
Expand Down Expand Up @@ -152,6 +154,10 @@ class TitanDBTest : public testing::Test {
return db_impl_->blob_file_set_->GetBlobStorage(cf_handle->GetID());
}

ColumnFamilyHandle* GetColumnFamilyHandle(uint32_t cf_id) {
return db_impl_->db_impl_->GetColumnFamilyHandleUnlocked(cf_id).release();
}

void VerifyDB(const std::map<std::string, std::string>& data,
ReadOptions ropts = ReadOptions()) {
db_impl_->PurgeObsoleteFiles();
Expand Down Expand Up @@ -518,6 +524,35 @@ TEST_F(TitanDBTest, DropColumnFamily) {
Close();
}

TEST_F(TitanDBTest, DestroyColumnFamilyHandle) {
Open();
const uint64_t kNumCF = 3;
for (uint64_t i = 1; i <= kNumCF; i++) {
AddCF(std::to_string(i));
}
const uint64_t kNumEntries = 10;
std::map<std::string, std::string> data;
for (uint64_t i = 1; i <= kNumEntries; i++) {
Put(i, &data);
}
VerifyDB(data);
Flush();
VerifyDB(data);

// Destroy column families handle, check whether GC skips the column families.
for (auto& handle : cf_handles_) {
auto cf_id = handle->GetID();
db_->DestroyColumnFamilyHandle(handle);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
}
cf_handles_.clear();
VerifyDB(data);

Reopen();
VerifyDB(data);
Close();
}

TEST_F(TitanDBTest, DeleteFilesInRange) {
Open();

Expand Down Expand Up @@ -1071,6 +1106,76 @@ TEST_F(TitanDBTest, GCAfterDropCF) {
Close();
}

TEST_F(TitanDBTest, GCBeforeFlushCommit) {
std::atomic<bool> is_first_flush{true};
DBImpl* db_impl = nullptr;

SyncPoint::GetInstance()->LoadDependency(
{{"TitanDBTest::GCBeforeFlushCommit:PauseInstall",
"TitanDBTest::GCBeforeFlushCommit:WaitFlushPause"},
{"TitanDBImpl::OnFlushCompleted:Finished",
"TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush"}});
SyncPoint::GetInstance()->SetCallBack("FlushJob::InstallResults", [&](void*) {
if (is_first_flush) {
is_first_flush = false;
} else {
// skip waiting for the second flush.
return;
}
auto* db_mutex = db_impl->mutex();
db_mutex->Unlock();
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:PauseInstall");
Env::Default()->SleepForMicroseconds(1000 * 1000); // 1s
db_mutex->Lock();
});

options_.create_if_missing = true;
// Setting max_flush_jobs = max_background_jobs / 4 = 2.
options_.max_background_jobs = 8;
options_.max_write_buffer_number = 4;
options_.min_blob_size = 0;
options_.merge_small_file_threshold = 1024 * 1024;
options_.disable_background_gc = true;
Open();
uint32_t cf_id = db_->DefaultColumnFamily()->GetID();

db_impl = reinterpret_cast<DBImpl*>(db_->GetRootDB());
SyncPoint::GetInstance()->EnableProcessing();

ASSERT_OK(db_->Put(WriteOptions(), "foo", "v"));
// t1 will wait for the second flush complete before install super version.
auto t1 = port::Thread([&]() {
// flush_opts.wait = true
ASSERT_OK(db_->Flush(FlushOptions()));
});
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitFlushPause");
// In the second flush we check if memtable has been committed, and signal
// the first flush to proceed.
ASSERT_OK(db_->Put(WriteOptions(), "bar", "v"));
FlushOptions flush_opts;
flush_opts.wait = false;
ASSERT_OK(db_->Flush(flush_opts));
TEST_SYNC_POINT("TitanDBTest::GCBeforeFlushCommit:WaitSecondFlush");
// Set GC mark to force GC select the file.
auto blob_storage = GetBlobStorage().lock();
std::map<uint64_t, std::weak_ptr<BlobFileMeta>> blob_files;
blob_storage->ExportBlobFiles(blob_files);
ASSERT_EQ(2, blob_files.size());
auto second_file = blob_files.rbegin()->second.lock();
second_file->set_gc_mark(true);
ASSERT_OK(db_impl_->TEST_StartGC(cf_id));
ASSERT_OK(db_impl_->TEST_PurgeObsoleteFiles());
t1.join();
// Check value after memtable committed.
std::string value;
// Before fixing the issue, this call will return
// Corruption: Missing blob file error.
ASSERT_OK(db_->Get(ReadOptions(), "bar", &value));
ASSERT_EQ("v", value);
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

} // namespace titandb
} // namespace rocksdb

Expand Down

0 comments on commit 43225d4

Please sign in to comment.