Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark blob files not needed by any memtables/SSTs obsolete #6032

Closed
wants to merge 12 commits into from
237 changes: 191 additions & 46 deletions utilities/blob_db/blob_db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cinttypes>
#include <iomanip>
#include <memory>
#include <sstream>

#include "db/blob_index.h"
#include "db/db_impl/db_impl.h"
Expand Down Expand Up @@ -206,6 +207,8 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {

InitializeBlobFileToSstMapping(live_files);

MarkUnreferencedBlobFilesObsoleteDuringOpen();

if (!disable_auto_compactions) {
s = db_->EnableAutoCompaction(*handles);
if (!s.ok()) {
Expand Down Expand Up @@ -288,8 +291,9 @@ Status BlobDBImpl::OpenAllBlobFiles() {
next_file_number_.store(*file_numbers.rbegin() + 1);
}

std::string blob_file_list;
std::string obsolete_file_list;
std::ostringstream blob_file_oss;
std::ostringstream live_imm_oss;
std::ostringstream obsolete_file_oss;

for (auto& file_number : file_numbers) {
std::shared_ptr<BlobFile> blob_file = std::make_shared<BlobFile>(
Expand All @@ -300,11 +304,12 @@ Status BlobDBImpl::OpenAllBlobFiles() {
Status read_metadata_status = blob_file->ReadMetadata(env_, env_options_);
if (read_metadata_status.IsCorruption()) {
// Remove incomplete file.
ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
if (!obsolete_file_list.empty()) {
obsolete_file_list.append(", ");
if (!obsolete_files_.empty()) {
obsolete_file_oss << ", ";
}
obsolete_file_list.append(ToString(file_number));
obsolete_file_oss << file_number;

ObsoleteBlobFile(blob_file, 0 /*obsolete_seq*/, false /*update_size*/);
continue;
} else if (!read_metadata_status.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log,
Expand All @@ -316,20 +321,33 @@ Status BlobDBImpl::OpenAllBlobFiles() {

total_blob_size_ += blob_file->GetFileSize();

if (!blob_files_.empty()) {
blob_file_oss << ", ";
}
blob_file_oss << file_number;

blob_files_[file_number] = blob_file;
if (!blob_file_list.empty()) {
blob_file_list.append(", ");

if (!blob_file->HasTTL()) {
if (!live_imm_non_ttl_blob_files_.empty()) {
live_imm_oss << ", ";
}
live_imm_oss << file_number;

live_imm_non_ttl_blob_files_[file_number] = blob_file;
}
blob_file_list.append(ToString(file_number));
}

ROCKS_LOG_INFO(db_options_.info_log,
"Found %" ROCKSDB_PRIszt " blob files: %s", blob_files_.size(),
blob_file_list.c_str());
blob_file_oss.str().c_str());
ROCKS_LOG_INFO(
db_options_.info_log, "Found %" ROCKSDB_PRIszt " non-TTL blob files: %s",
live_imm_non_ttl_blob_files_.size(), live_imm_oss.str().c_str());
ROCKS_LOG_INFO(db_options_.info_log,
"Found %" ROCKSDB_PRIszt
" incomplete or corrupted blob files: %s",
obsolete_files_.size(), obsolete_file_list.c_str());
obsolete_files_.size(), obsolete_file_oss.str().c_str());
return s;
}

Expand Down Expand Up @@ -462,6 +480,83 @@ void BlobDBImpl::ProcessCompactionJobInfo(const CompactionJobInfo& info) {
LinkSstToBlobFile(output.file_number, output.oldest_blob_file_number);
}
}

MarkUnreferencedBlobFilesObsolete();
}

bool BlobDBImpl::MarkBlobFileObsoleteIfNeeded(
const std::shared_ptr<BlobFile>& blob_file, SequenceNumber obsolete_seq) {
assert(blob_file);
assert(!blob_file->HasTTL());
assert(blob_file->Immutable());
assert(bdb_options_.enable_garbage_collection);

// Note: FIFO eviction could have marked this file obsolete already.
if (blob_file->Obsolete()) {
return true;
}

if (!blob_file->GetLinkedSstFiles().empty()) {
return false;
}

ROCKS_LOG_INFO(db_options_.info_log,
"Blob file %" PRIu64 " is no longer needed, marking obsolete",
blob_file->BlobFileNumber());

ObsoleteBlobFile(blob_file, obsolete_seq, /* update_size */ true);
return true;
}

template <class Functor>
void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteImpl(Functor mark_if_needed) {
assert(bdb_options_.enable_garbage_collection);

// Iterate through all live immutable non-TTL blob files, and mark them
// obsolete assuming no SST files rely on the blobs in them.
// Note: we need to stop as soon as we find a blob file that has any
// linked SSTs.

auto it = live_imm_non_ttl_blob_files_.begin();
while (it != live_imm_non_ttl_blob_files_.end()) {
const auto& blob_file = it->second;
assert(blob_file);
assert(blob_file->BlobFileNumber() == it->first);
assert(!blob_file->HasTTL());
assert(blob_file->Immutable());

// Small optimization: Obsolete() does an atomic read, so we can do
// this check without taking a lock on the blob file's mutex.
if (blob_file->Obsolete()) {
it = live_imm_non_ttl_blob_files_.erase(it);
continue;
}

if (!mark_if_needed(blob_file)) {
break;
}

it = live_imm_non_ttl_blob_files_.erase(it);
}
}

void BlobDBImpl::MarkUnreferencedBlobFilesObsolete() {
const SequenceNumber obsolete_seq = GetLatestSequenceNumber();

WriteLock lock(&mutex_);

MarkUnreferencedBlobFilesObsoleteImpl(
[=](const std::shared_ptr<BlobFile>& blob_file) {
WriteLock file_lock(&blob_file->mutex_);
return MarkBlobFileObsoleteIfNeeded(blob_file, obsolete_seq);
});
}

void BlobDBImpl::MarkUnreferencedBlobFilesObsoleteDuringOpen() {
MarkUnreferencedBlobFilesObsoleteImpl(
[=](const std::shared_ptr<BlobFile>& blob_file) {
return MarkBlobFileObsoleteIfNeeded(blob_file, /* obsolete_seq */ 0);
});
}

void BlobDBImpl::CloseRandomAccessLocked(
Expand Down Expand Up @@ -1041,11 +1136,12 @@ Status BlobDBImpl::CheckSizeAndEvictBlobFiles(uint64_t blob_size,
WriteLock file_lock(&blob_file->mutex_);
if (blob_file->Obsolete()) {
// File already obsoleted by someone else.
assert(blob_file->Immutable());
continue;
}
// FIFO eviction can evict open blob files.
if (!blob_file->Immutable()) {
Status s = CloseBlobFile(blob_file, false /*need_lock*/);
Status s = CloseBlobFile(blob_file);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1380,8 +1476,16 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
ROCKS_LOG_INFO(db_options_.info_log, "Number of open files %" ROCKSDB_PRIszt,
open_ttl_files_.size());

for (auto bfile : open_ttl_files_) {
assert(!bfile->Immutable());
for (const auto& blob_file : open_ttl_files_) {
(void)blob_file;
assert(!blob_file->Immutable());
}

for (const auto& pair : live_imm_non_ttl_blob_files_) {
const auto& blob_file = pair.second;
(void)blob_file;
assert(!blob_file->HasTTL());
assert(blob_file->Immutable());
}

uint64_t now = EpochNow();
Expand Down Expand Up @@ -1423,58 +1527,72 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
return std::make_pair(true, -1);
}

Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile,
bool need_lock) {
assert(bfile != nullptr);
Status BlobDBImpl::CloseBlobFile(std::shared_ptr<BlobFile> bfile) {
assert(bfile);
assert(!bfile->Immutable());
assert(!bfile->Obsolete());
write_mutex_.AssertHeld();
Status s;

ROCKS_LOG_INFO(db_options_.info_log,
"Closing blob file %" PRIu64 ". Path: %s",
bfile->BlobFileNumber(), bfile->PathName().c_str());
{
std::unique_ptr<WriteLock> lock;
if (need_lock) {
lock.reset(new WriteLock(&mutex_));
}

if (bfile->HasTTL()) {
size_t erased __attribute__((__unused__));
erased = open_ttl_files_.erase(bfile);
} else if (bfile == open_non_ttl_file_) {
open_non_ttl_file_ = nullptr;
}
}

if (!bfile->closed_.load()) {
std::unique_ptr<WriteLock> file_lock;
if (need_lock) {
file_lock.reset(new WriteLock(&bfile->mutex_));
}
s = bfile->WriteFooterAndCloseLocked();
}
const Status s = bfile->WriteFooterAndCloseLocked();

if (s.ok()) {
total_blob_size_ += BlobLogFooter::kSize;
} else {
bfile->MarkImmutable();

ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to close blob file %" PRIu64 "with error: %s",
bfile->BlobFileNumber(), s.ToString().c_str());
}

if (bfile->HasTTL()) {
size_t erased __attribute__((__unused__));
erased = open_ttl_files_.erase(bfile);
} else {
if (bfile == open_non_ttl_file_) {
open_non_ttl_file_ = nullptr;
}

const uint64_t blob_file_number = bfile->BlobFileNumber();
auto it = live_imm_non_ttl_blob_files_.lower_bound(blob_file_number);
assert(it == live_imm_non_ttl_blob_files_.end() ||
it->first != blob_file_number);
live_imm_non_ttl_blob_files_.insert(
it, std::map<uint64_t, std::shared_ptr<BlobFile>>::value_type(
blob_file_number, bfile));
}

return s;
}

Status BlobDBImpl::CloseBlobFileIfNeeded(std::shared_ptr<BlobFile>& bfile) {
write_mutex_.AssertHeld();

// atomic read
if (bfile->GetFileSize() < bdb_options_.blob_file_size) {
return Status::OK();
}

WriteLock lock(&mutex_);
WriteLock file_lock(&bfile->mutex_);

assert(!bfile->Obsolete() || bfile->Immutable());
if (bfile->Immutable()) {
return Status::OK();
}

return CloseBlobFile(bfile);
}

void BlobDBImpl::ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
SequenceNumber obsolete_seq,
bool update_size) {
assert(blob_file->Immutable());
assert(!blob_file->Obsolete());

// Should hold write lock of mutex_ or during DB open.
blob_file->MarkObsolete(obsolete_seq);
obsolete_files_.push_back(blob_file);
Expand Down Expand Up @@ -1545,15 +1663,23 @@ std::pair<bool, int64_t> BlobDBImpl::EvictExpiredFiles(bool aborted) {
SequenceNumber seq = GetLatestSequenceNumber();
{
MutexLock l(&write_mutex_);
WriteLock lock(&mutex_);
for (auto& blob_file : process_files) {
WriteLock file_lock(&blob_file->mutex_);
if (!blob_file->Immutable()) {
CloseBlobFile(blob_file, false /*need_lock*/);
}

// Need to double check if the file is obsolete.
if (!blob_file->Obsolete()) {
ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
if (blob_file->Obsolete()) {
assert(blob_file->Immutable());
continue;
}

if (!blob_file->Immutable()) {
CloseBlobFile(blob_file);
}

assert(blob_file->Immutable());

ObsoleteBlobFile(blob_file, seq, true /*update_size*/);
}
}

Expand Down Expand Up @@ -1918,6 +2044,8 @@ Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
if (newfile != nullptr) {
{
MutexLock l(&write_mutex_);
WriteLock lock(&mutex_);
WriteLock file_lock(&newfile->mutex_);
CloseBlobFile(newfile);
}
total_blob_size_ += newfile->file_size_;
Expand Down Expand Up @@ -2093,8 +2221,12 @@ Status BlobDBImpl::TEST_GetBlobValue(const Slice& key, const Slice& index_entry,
}

void BlobDBImpl::TEST_AddDummyBlobFile(uint64_t blob_file_number) {
blob_files_[blob_file_number] = std::make_shared<BlobFile>(
this, blob_dir_, blob_file_number, db_options_.info_log.get());
auto blob_file = std::make_shared<BlobFile>(this, blob_dir_, blob_file_number,
db_options_.info_log.get());
blob_file->MarkImmutable();

blob_files_[blob_file_number] = blob_file;
live_imm_non_ttl_blob_files_[blob_file_number] = blob_file;
}

std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
Expand All @@ -2106,6 +2238,16 @@ std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
return blob_files;
}

std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetLiveImmNonTTLFiles()
const {
ReadLock l(&mutex_);
std::vector<std::shared_ptr<BlobFile>> live_imm_non_ttl_files;
for (const auto& pair : live_imm_non_ttl_blob_files_) {
live_imm_non_ttl_files.emplace_back(pair.second);
}
return live_imm_non_ttl_files;
}

std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetObsoleteFiles()
const {
ReadLock l(&mutex_);
Expand All @@ -2122,6 +2264,9 @@ void BlobDBImpl::TEST_DeleteObsoleteFiles() {

Status BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
MutexLock l(&write_mutex_);
WriteLock lock(&mutex_);
WriteLock file_lock(&bfile->mutex_);

return CloseBlobFile(bfile);
}

Expand Down
Loading