Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

1.4.2 turner #97

Closed
wants to merge 10 commits into from
17 changes: 17 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ Status BuildTable(const std::string& dbname,
FileMetaData* meta,
SequenceNumber smallest_snapshot) {
Status s;
size_t keys_seen, keys_retired;

keys_seen=0;
keys_retired=0;

meta->file_size = 0;
iter->SeekToFirst();

Expand All @@ -39,13 +44,18 @@ Status BuildTable(const std::string& dbname,
TableBuilder* builder = new TableBuilder(options, file);
meta->smallest.DecodeFrom(iter->key());
for (; iter->Valid(); iter->Next()) {
++keys_seen;
Slice key = iter->key();
if (!retire(key))
{
meta->largest.DecodeFrom(key);
builder->Add(key, iter->value());
++meta->num_entries;
} // if
else
{
++keys_retired;
} // else
}

// Finish and check for builder errors
Expand Down Expand Up @@ -88,6 +98,13 @@ Status BuildTable(const std::string& dbname,

if (s.ok() && meta->file_size > 0) {
// Keep it
if (0!=keys_retired)
{
Log(options.info_log, "Level-0 table #%llu: %llu keys seen, %llu keys retired",
(unsigned long long) meta->number,
(unsigned long long) keys_seen,
(unsigned long long) keys_retired);
} // if
} else {
env->DeleteFile(fname);
}
Expand Down
152 changes: 114 additions & 38 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/hot_threads.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/thread_tasks.h"
#include "util/throttle.h"
#include "leveldb/perf_count.h"

Expand Down Expand Up @@ -158,7 +160,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false),
manual_compaction_(NULL),
level0_good(true)
level0_good(true),
running_compactions_(0)
{
mem_->Ref();
has_imm_.Release_Store(NULL);
Expand All @@ -177,7 +180,7 @@ DBImpl::~DBImpl() {
// Wait for background work to finish
mutex_.Lock();
shutting_down_.Release_Store(this); // Any non-NULL value is ok
while (bg_compaction_scheduled_) {
while (IsCompactionScheduled()) {
bg_cv_.Wait();
}
mutex_.Unlock();
Expand Down Expand Up @@ -243,30 +246,36 @@ void DBImpl::MaybeIgnoreError(Status* s) const {
}
}

void DBImpl::DeleteObsoleteFiles() {
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);

// prune the database root directory
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
for (size_t i = 0; i < filenames.size(); i++) {
KeepOrDelete(filenames[i], -1, live);
} // for

// prune the table file directories
for (int level=0; level<config::kNumLevels; ++level)
void DBImpl::DeleteObsoleteFiles()
{
// Only run this routine when down to one
// simultaneous compaction
if (RunningCompactionCount()<2)
{
std::string dirname;
// Make a set of all of the live files
std::set<uint64_t> live = pending_outputs_;
versions_->AddLiveFiles(&live);

filenames.clear();
dirname=MakeDirName2(dbname_, level, "sst");
env_->GetChildren(dirname, &filenames); // Ignoring errors on purpose
// prune the database root directory
std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
for (size_t i = 0; i < filenames.size(); i++) {
KeepOrDelete(filenames[i], level, live);
KeepOrDelete(filenames[i], -1, live);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's up with the -1 here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The github differencing logic is piecing stuff from high and low. The real change is that I inserted "if (RunningCompactionCount()<2)", then tabbed everything else in the function in by one tab. Old (red) line 248 is now new (green) line 256. Move 7 lines down in red, then in green ... both have same "KeepOrDelete(filenames[i], -1, live)". Move 19 lines down from the starting point. You will see "KeepOrDelete(filenames[i], level, live);" in both.

} // for

// prune the table file directories
for (int level=0; level<config::kNumLevels; ++level)
{
std::string dirname;

filenames.clear();
dirname=MakeDirName2(dbname_, level, "sst");
env_->GetChildren(dirname, &filenames); // Ignoring errors on purpose
for (size_t i = 0; i < filenames.size(); i++) {
KeepOrDelete(filenames[i], level, live);
} // for
} // for
} // for
} // if
}

void
Expand Down Expand Up @@ -480,7 +489,7 @@ void DBImpl::CheckCompactionState()
int level;

// wait out executing compaction (Wait gives mutex to compactions)
if (bg_compaction_scheduled_)
if (IsCompactionScheduled())
bg_cv_.Wait();

for (level=0, need_compaction=false;
Expand All @@ -500,7 +509,7 @@ void DBImpl::CheckCompactionState()
} //if
} // for

} while(bg_compaction_scheduled_ && need_compaction);
} while(IsCompactionScheduled() && need_compaction);

if (log_flag)
Log(options_.info_log, "Cleanup compactions completed ... DB::Open continuing");
Expand Down Expand Up @@ -610,15 +619,15 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
return status;
}

Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Status DBImpl::WriteLevel0Table(volatile MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
meta.level = 0;
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Iterator* iter = ((MemTable *)mem)->NewIterator();
SequenceNumber smallest_snapshot;
Log(options_.info_log, "Level-0 table #%llu: started",
(unsigned long long) meta.number);
Expand Down Expand Up @@ -769,7 +778,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {

MutexLock l(&mutex_);
while (!manual.done) {
while (manual_compaction_ != NULL || bg_compaction_scheduled_) {
while (manual_compaction_ != NULL || IsCompactionScheduled()) {
bg_cv_.Wait();
}
manual_compaction_ = &manual;
Expand Down Expand Up @@ -810,7 +819,7 @@ void DBImpl::MaybeScheduleCompaction() {
// writing of memory to disk: high priority
if (NULL!=imm_)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems to be a clause with nothing in it anymore. Should it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct on both points. I left the dead code in place in case something goes wrong and I need to know quicker "how it used to work". This intent applies to the extra logging messages added, but should go away once Turner is "happy".

{
push=true;
// push=true;
} // if

// merge level 0s to level 1
Expand Down Expand Up @@ -847,7 +856,7 @@ void DBImpl::MaybeScheduleCompaction() {
// Already scheduled
} else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions
} else if (imm_ == NULL &&
} else if (//imm_ == NULL &&
manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) {
// No work to be done
Expand All @@ -863,7 +872,10 @@ void DBImpl::BGWork(void* db) {

void DBImpl::BackgroundCall() {
MutexLock l(&mutex_);
assert(bg_compaction_scheduled_);
++running_compactions_;
Log(options_.info_log, "Background compact start: %u", running_compactions_);

assert(IsCompactionScheduled());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor note: this assertion should probably be changed to only check for bg_compaction_scheduled_ as the other condition that makes IsCompactScheduled true is a non-NULL imm, but imm compaction isn't handled by BackgroundCall anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no. Today, bg_compaction_scheduled_ is a flag for any compaction BUT imm compaction. This routine should only be called for a non-imm compaction AND it should only be called when bg_compaction_scheduled_ is set.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. This assertion isn't checking that case. It's checking IsCompactionScheduled() which is true when bg_compaction_scheduled_ OR imm != NULL. If we somehow end up in this function when bg_compaction_scheduled = false but imm != NULL, the assertion should fire. Obviously, this doesn't affect things in production, but seems helpful to have proper assertion to help with future debugging in later code changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argh. I misread everything. assertion needs to change.

if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction();
if (!s.ok()) {
Expand All @@ -880,31 +892,81 @@ void DBImpl::BackgroundCall() {
}
}
bg_compaction_scheduled_ = false;
--running_compactions_;
Log(options_.info_log, "Background compact done: %u", running_compactions_);

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
if (!options_.is_repair)
MaybeScheduleCompaction();
bg_cv_.SignalAll();

}


void
DBImpl::BackgroundImmCompactCall() {
MutexLock l(&mutex_);
++running_compactions_;
Log(options_.info_log, "Background imm start: %u", running_compactions_);
assert(NULL != imm_);
if (!shutting_down_.Acquire_Load()) {
Status s = CompactMemTable();
if (!s.ok()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
// chew up resources for failed compactions for the duration of
// the problem.
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
Log(options_.info_log, "Waiting after background imm compaction error: %s",
s.ToString().c_str());
mutex_.Unlock();
env_->SleepForMicroseconds(1000000);
mutex_.Lock();
}
}
// IsCompactionScheduled() = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note sure if this is commented out old code, or a comment. If a comment, I believe it is incorrect -- we could still have a running concurrent compaction (eg. bg_compaction_scheduled_).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the places where "dead code" should be cleared. I left the dead line as a reminder of the sequence used in the original function. Honestly it took two, maybe three, passes at this function to get it proper for simultaneous compactions ... with this being the imm compaction. The dead code is still a reminder if we find another issue at customer site next week. Will be cleared by the time this goes to 2.0. The imm compaction's "flag" is whether or not DBImpl::imm_ is set or not.

--running_compactions_;
Log(options_.info_log, "Background imm done: %u", running_compactions_);

// Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed.
if (!options_.is_repair)
MaybeScheduleCompaction();

// shutdown is waiting for this imm_ to clear
if (shutting_down_.Acquire_Load()) {

// must abandon data in memory ... hope recovery log works
imm_->Unref();
imm_ = NULL;
has_imm_.Release_Store(NULL);
} // if

bg_cv_.SignalAll();
}



Status DBImpl::BackgroundCompaction() {
Status status;

mutex_.AssertHeld();

#if 0
if (imm_ != NULL) {
pthread_rwlock_rdlock(&gThreadLock0);
status=CompactMemTable();
pthread_rwlock_unlock(&gThreadLock0);
return status;
}
#endif

Compaction* c;
bool is_manual = (manual_compaction_ != NULL);
InternalKey manual_end;
if (is_manual) {
ManualCompaction* m = manual_compaction_;
ManualCompaction* m = (ManualCompaction *) manual_compaction_;
c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL);
if (c != NULL) {
Expand Down Expand Up @@ -971,7 +1033,7 @@ Status DBImpl::BackgroundCompaction() {
}

if (is_manual) {
ManualCompaction* m = manual_compaction_;
ManualCompaction* m = (ManualCompaction *)manual_compaction_;
if (!status.ok()) {
m->done = true;
}
Expand Down Expand Up @@ -1144,9 +1206,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
for (; input->Valid() && !shutting_down_.Acquire_Load(); )
{
// Prioritize compaction work ... every 100 keys
#if 0
if (NULL==compact->builder
|| 0==(compact->builder->NumEntries() % 100))
imm_micros+=PrioritizeWork(is_level0_compaction);
#endif

Slice key = input->key();
if (compact->builder != NULL
Expand Down Expand Up @@ -1339,7 +1403,7 @@ struct IterState {
port::Mutex* mu;
Version* version;
MemTable* mem;
MemTable* imm;
volatile MemTable* imm;
};

static void CleanupIteratorState(void* arg1, void* arg2) {
Expand All @@ -1364,7 +1428,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
list.push_back(mem_->NewIterator());
mem_->Ref();
if (imm_ != NULL) {
list.push_back(imm_->NewIterator());
list.push_back(((MemTable *)imm_)->NewIterator());
imm_->Ref();
}
versions_->current()->AddIterators(options, &list);
Expand Down Expand Up @@ -1412,7 +1476,7 @@ Status DBImpl::Get(const ReadOptions& options,
}

MemTable* mem = mem_;
MemTable* imm = imm_;
volatile MemTable* imm = imm_;
Version* current = versions_->current();
mem->Ref();
if (imm != NULL) imm->Ref();
Expand All @@ -1429,7 +1493,7 @@ Status DBImpl::Get(const ReadOptions& options,
if (mem->Get(lkey, value, &s)) {
// Done
gPerfCounters->Inc(ePerfGetMem);
} else if (imm != NULL && imm->Get(lkey, value, &s)) {
} else if (imm != NULL && ((MemTable *)imm)->Get(lkey, value, &s)) {
// Done
gPerfCounters->Inc(ePerfGetImm);
} else {
Expand Down Expand Up @@ -1551,7 +1615,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
gPerfCounters->Inc(ePerfApiWrite);

// protect use of versions_ ... still within scope of mutex_ lock
throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_);
throttle=versions_->WriteThrottleUsec(IsCompactionScheduled());
} // release MutexLock l(&mutex_)

// throttle on exit to reduce possible reordering
Expand Down Expand Up @@ -1627,6 +1691,10 @@ Status DBImpl::MakeRoomForWrite(bool force) {
bool allow_delay = !force;
Status s;

if (force)
Log(options_.info_log,
"\"force\" parameter passed to MakeRoomForWrite");

// hint to background compaction.
level0_good=(versions_->NumLevelFiles(0) < (int)config::kL0_CompactionTrigger);

Expand Down Expand Up @@ -1673,6 +1741,9 @@ Status DBImpl::MakeRoomForWrite(bool force) {
Log(options_.info_log, "running...\n");
} else {
// Attempt to switch to a new memtable and trigger compaction of old
Log(options_.info_log, "Level-0 created at %llu with threshold of %llu",
(long long unsigned)mem_->ApproximateMemoryUsage(),
(long long unsigned)options_.write_buffer_size);
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
Expand All @@ -1689,7 +1760,12 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
imm_ = mem_;
has_imm_.Release_Store(imm_);
has_imm_.Release_Store((MemTable*)imm_);
if (NULL!=imm_)
{
ThreadTask * task=new ImmWriteTask(this);
gImmThreads->Submit(task);
}
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
Expand Down
Loading