-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
1.4.2 turner #97
1.4.2 turner #97
Changes from all commits
a6d8657
48b607c
a7adf44
a5b2f71
67f5063
a54904b
905d8e7
5a39395
b1e6e33
8b5c6bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,8 +33,10 @@ | |
#include "table/merger.h" | ||
#include "table/two_level_iterator.h" | ||
#include "util/coding.h" | ||
#include "util/hot_threads.h" | ||
#include "util/logging.h" | ||
#include "util/mutexlock.h" | ||
#include "util/thread_tasks.h" | ||
#include "util/throttle.h" | ||
#include "leveldb/perf_count.h" | ||
|
||
|
@@ -158,7 +160,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) | |
tmp_batch_(new WriteBatch), | ||
bg_compaction_scheduled_(false), | ||
manual_compaction_(NULL), | ||
level0_good(true) | ||
level0_good(true), | ||
running_compactions_(0) | ||
{ | ||
mem_->Ref(); | ||
has_imm_.Release_Store(NULL); | ||
|
@@ -177,7 +180,7 @@ DBImpl::~DBImpl() { | |
// Wait for background work to finish | ||
mutex_.Lock(); | ||
shutting_down_.Release_Store(this); // Any non-NULL value is ok | ||
while (bg_compaction_scheduled_) { | ||
while (IsCompactionScheduled()) { | ||
bg_cv_.Wait(); | ||
} | ||
mutex_.Unlock(); | ||
|
@@ -243,30 +246,36 @@ void DBImpl::MaybeIgnoreError(Status* s) const { | |
} | ||
} | ||
|
||
void DBImpl::DeleteObsoleteFiles() { | ||
// Make a set of all of the live files | ||
std::set<uint64_t> live = pending_outputs_; | ||
versions_->AddLiveFiles(&live); | ||
|
||
// prune the database root directory | ||
std::vector<std::string> filenames; | ||
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose | ||
for (size_t i = 0; i < filenames.size(); i++) { | ||
KeepOrDelete(filenames[i], -1, live); | ||
} // for | ||
|
||
// prune the table file directories | ||
for (int level=0; level<config::kNumLevels; ++level) | ||
void DBImpl::DeleteObsoleteFiles() | ||
{ | ||
// Only run this routine when down to one | ||
// simultaneous compaction | ||
if (RunningCompactionCount()<2) | ||
{ | ||
std::string dirname; | ||
// Make a set of all of the live files | ||
std::set<uint64_t> live = pending_outputs_; | ||
versions_->AddLiveFiles(&live); | ||
|
||
filenames.clear(); | ||
dirname=MakeDirName2(dbname_, level, "sst"); | ||
env_->GetChildren(dirname, &filenames); // Ignoring errors on purpose | ||
// prune the database root directory | ||
std::vector<std::string> filenames; | ||
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose | ||
for (size_t i = 0; i < filenames.size(); i++) { | ||
KeepOrDelete(filenames[i], level, live); | ||
KeepOrDelete(filenames[i], -1, live); | ||
} // for | ||
|
||
// prune the table file directories | ||
for (int level=0; level<config::kNumLevels; ++level) | ||
{ | ||
std::string dirname; | ||
|
||
filenames.clear(); | ||
dirname=MakeDirName2(dbname_, level, "sst"); | ||
env_->GetChildren(dirname, &filenames); // Ignoring errors on purpose | ||
for (size_t i = 0; i < filenames.size(); i++) { | ||
KeepOrDelete(filenames[i], level, live); | ||
} // for | ||
} // for | ||
} // for | ||
} // if | ||
} | ||
|
||
void | ||
|
@@ -480,7 +489,7 @@ void DBImpl::CheckCompactionState() | |
int level; | ||
|
||
// wait out executing compaction (Wait gives mutex to compactions) | ||
if (bg_compaction_scheduled_) | ||
if (IsCompactionScheduled()) | ||
bg_cv_.Wait(); | ||
|
||
for (level=0, need_compaction=false; | ||
|
@@ -500,7 +509,7 @@ void DBImpl::CheckCompactionState() | |
} //if | ||
} // for | ||
|
||
} while(bg_compaction_scheduled_ && need_compaction); | ||
} while(IsCompactionScheduled() && need_compaction); | ||
|
||
if (log_flag) | ||
Log(options_.info_log, "Cleanup compactions completed ... DB::Open continuing"); | ||
|
@@ -610,15 +619,15 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, | |
return status; | ||
} | ||
|
||
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, | ||
Status DBImpl::WriteLevel0Table(volatile MemTable* mem, VersionEdit* edit, | ||
Version* base) { | ||
mutex_.AssertHeld(); | ||
const uint64_t start_micros = env_->NowMicros(); | ||
FileMetaData meta; | ||
meta.number = versions_->NewFileNumber(); | ||
meta.level = 0; | ||
pending_outputs_.insert(meta.number); | ||
Iterator* iter = mem->NewIterator(); | ||
Iterator* iter = ((MemTable *)mem)->NewIterator(); | ||
SequenceNumber smallest_snapshot; | ||
Log(options_.info_log, "Level-0 table #%llu: started", | ||
(unsigned long long) meta.number); | ||
|
@@ -769,7 +778,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { | |
|
||
MutexLock l(&mutex_); | ||
while (!manual.done) { | ||
while (manual_compaction_ != NULL || bg_compaction_scheduled_) { | ||
while (manual_compaction_ != NULL || IsCompactionScheduled()) { | ||
bg_cv_.Wait(); | ||
} | ||
manual_compaction_ = &manual; | ||
|
@@ -810,7 +819,7 @@ void DBImpl::MaybeScheduleCompaction() { | |
// writing of memory to disk: high priority | ||
if (NULL!=imm_) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this seems to be a clause with nothing in it anymore. Should it be removed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are correct on both points. I left the dead code in place in case something goes wrong and I need to know quicker "how it used to work". This intent applies to the extra logging messages added, but should go away once Turner is "happy". |
||
{ | ||
push=true; | ||
// push=true; | ||
} // if | ||
|
||
// merge level 0s to level 1 | ||
|
@@ -847,7 +856,7 @@ void DBImpl::MaybeScheduleCompaction() { | |
// Already scheduled | ||
} else if (shutting_down_.Acquire_Load()) { | ||
// DB is being deleted; no more background compactions | ||
} else if (imm_ == NULL && | ||
} else if (//imm_ == NULL && | ||
manual_compaction_ == NULL && | ||
!versions_->NeedsCompaction()) { | ||
// No work to be done | ||
|
@@ -863,7 +872,10 @@ void DBImpl::BGWork(void* db) { | |
|
||
void DBImpl::BackgroundCall() { | ||
MutexLock l(&mutex_); | ||
assert(bg_compaction_scheduled_); | ||
++running_compactions_; | ||
Log(options_.info_log, "Background compact start: %u", running_compactions_); | ||
|
||
assert(IsCompactionScheduled()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor note: this assertion should probably be changed to only check for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no. Today, bg_compaction_scheduled_ is a flag for any compaction BUT imm compaction. This routine should only be called for a non-imm compaction AND it should only be called when bg_compaction_scheduled_ is set. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with you. This assertion isn't checking that case. It's checking There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. argh. I misread everything. assertion needs to change. |
||
if (!shutting_down_.Acquire_Load()) { | ||
Status s = BackgroundCompaction(); | ||
if (!s.ok()) { | ||
|
@@ -880,31 +892,81 @@ void DBImpl::BackgroundCall() { | |
} | ||
} | ||
bg_compaction_scheduled_ = false; | ||
--running_compactions_; | ||
Log(options_.info_log, "Background compact done: %u", running_compactions_); | ||
|
||
// Previous compaction may have produced too many files in a level, | ||
// so reschedule another compaction if needed. | ||
if (!options_.is_repair) | ||
MaybeScheduleCompaction(); | ||
bg_cv_.SignalAll(); | ||
|
||
} | ||
|
||
|
||
void | ||
DBImpl::BackgroundImmCompactCall() { | ||
MutexLock l(&mutex_); | ||
++running_compactions_; | ||
Log(options_.info_log, "Background imm start: %u", running_compactions_); | ||
assert(NULL != imm_); | ||
if (!shutting_down_.Acquire_Load()) { | ||
Status s = CompactMemTable(); | ||
if (!s.ok()) { | ||
// Wait a little bit before retrying background compaction in | ||
// case this is an environmental problem and we do not want to | ||
// chew up resources for failed compactions for the duration of | ||
// the problem. | ||
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error | ||
Log(options_.info_log, "Waiting after background imm compaction error: %s", | ||
s.ToString().c_str()); | ||
mutex_.Unlock(); | ||
env_->SleepForMicroseconds(1000000); | ||
mutex_.Lock(); | ||
} | ||
} | ||
// IsCompactionScheduled() = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note sure if this is commented out old code, or a comment. If a comment, I believe it is incorrect -- we could still have a running concurrent compaction (eg. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is one of the places where "dead code" should be cleared. I left the dead line as a reminder of the sequence used in the original function. Honestly it took two, maybe three, passes at this function to get it proper for simultaneous compactions ... with this being the imm compaction. The dead code is still a reminder if we find another issue at customer site next week. Will be cleared by the time this goes to 2.0. The imm compaction's "flag" is whether or not DBImpl::imm_ is set or not. |
||
--running_compactions_; | ||
Log(options_.info_log, "Background imm done: %u", running_compactions_); | ||
|
||
// Previous compaction may have produced too many files in a level, | ||
// so reschedule another compaction if needed. | ||
if (!options_.is_repair) | ||
MaybeScheduleCompaction(); | ||
|
||
// shutdown is waiting for this imm_ to clear | ||
if (shutting_down_.Acquire_Load()) { | ||
|
||
// must abandon data in memory ... hope recovery log works | ||
imm_->Unref(); | ||
imm_ = NULL; | ||
has_imm_.Release_Store(NULL); | ||
} // if | ||
|
||
bg_cv_.SignalAll(); | ||
} | ||
|
||
|
||
|
||
Status DBImpl::BackgroundCompaction() { | ||
Status status; | ||
|
||
mutex_.AssertHeld(); | ||
|
||
#if 0 | ||
if (imm_ != NULL) { | ||
pthread_rwlock_rdlock(&gThreadLock0); | ||
status=CompactMemTable(); | ||
pthread_rwlock_unlock(&gThreadLock0); | ||
return status; | ||
} | ||
#endif | ||
|
||
Compaction* c; | ||
bool is_manual = (manual_compaction_ != NULL); | ||
InternalKey manual_end; | ||
if (is_manual) { | ||
ManualCompaction* m = manual_compaction_; | ||
ManualCompaction* m = (ManualCompaction *) manual_compaction_; | ||
c = versions_->CompactRange(m->level, m->begin, m->end); | ||
m->done = (c == NULL); | ||
if (c != NULL) { | ||
|
@@ -971,7 +1033,7 @@ Status DBImpl::BackgroundCompaction() { | |
} | ||
|
||
if (is_manual) { | ||
ManualCompaction* m = manual_compaction_; | ||
ManualCompaction* m = (ManualCompaction *)manual_compaction_; | ||
if (!status.ok()) { | ||
m->done = true; | ||
} | ||
|
@@ -1144,9 +1206,11 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { | |
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) | ||
{ | ||
// Prioritize compaction work ... every 100 keys | ||
#if 0 | ||
if (NULL==compact->builder | ||
|| 0==(compact->builder->NumEntries() % 100)) | ||
imm_micros+=PrioritizeWork(is_level0_compaction); | ||
#endif | ||
|
||
Slice key = input->key(); | ||
if (compact->builder != NULL | ||
|
@@ -1339,7 +1403,7 @@ struct IterState { | |
port::Mutex* mu; | ||
Version* version; | ||
MemTable* mem; | ||
MemTable* imm; | ||
volatile MemTable* imm; | ||
}; | ||
|
||
static void CleanupIteratorState(void* arg1, void* arg2) { | ||
|
@@ -1364,7 +1428,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, | |
list.push_back(mem_->NewIterator()); | ||
mem_->Ref(); | ||
if (imm_ != NULL) { | ||
list.push_back(imm_->NewIterator()); | ||
list.push_back(((MemTable *)imm_)->NewIterator()); | ||
imm_->Ref(); | ||
} | ||
versions_->current()->AddIterators(options, &list); | ||
|
@@ -1412,7 +1476,7 @@ Status DBImpl::Get(const ReadOptions& options, | |
} | ||
|
||
MemTable* mem = mem_; | ||
MemTable* imm = imm_; | ||
volatile MemTable* imm = imm_; | ||
Version* current = versions_->current(); | ||
mem->Ref(); | ||
if (imm != NULL) imm->Ref(); | ||
|
@@ -1429,7 +1493,7 @@ Status DBImpl::Get(const ReadOptions& options, | |
if (mem->Get(lkey, value, &s)) { | ||
// Done | ||
gPerfCounters->Inc(ePerfGetMem); | ||
} else if (imm != NULL && imm->Get(lkey, value, &s)) { | ||
} else if (imm != NULL && ((MemTable *)imm)->Get(lkey, value, &s)) { | ||
// Done | ||
gPerfCounters->Inc(ePerfGetImm); | ||
} else { | ||
|
@@ -1551,7 +1615,7 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { | |
gPerfCounters->Inc(ePerfApiWrite); | ||
|
||
// protect use of versions_ ... still within scope of mutex_ lock | ||
throttle=versions_->WriteThrottleUsec(bg_compaction_scheduled_); | ||
throttle=versions_->WriteThrottleUsec(IsCompactionScheduled()); | ||
} // release MutexLock l(&mutex_) | ||
|
||
// throttle on exit to reduce possible reordering | ||
|
@@ -1627,6 +1691,10 @@ Status DBImpl::MakeRoomForWrite(bool force) { | |
bool allow_delay = !force; | ||
Status s; | ||
|
||
if (force) | ||
Log(options_.info_log, | ||
"\"force\" parameter passed to MakeRoomForWrite"); | ||
|
||
// hint to background compaction. | ||
level0_good=(versions_->NumLevelFiles(0) < (int)config::kL0_CompactionTrigger); | ||
|
||
|
@@ -1673,6 +1741,9 @@ Status DBImpl::MakeRoomForWrite(bool force) { | |
Log(options_.info_log, "running...\n"); | ||
} else { | ||
// Attempt to switch to a new memtable and trigger compaction of old | ||
Log(options_.info_log, "Level-0 created at %llu with threshold of %llu", | ||
(long long unsigned)mem_->ApproximateMemoryUsage(), | ||
(long long unsigned)options_.write_buffer_size); | ||
assert(versions_->PrevLogNumber() == 0); | ||
uint64_t new_log_number = versions_->NewFileNumber(); | ||
WritableFile* lfile = NULL; | ||
|
@@ -1689,7 +1760,12 @@ Status DBImpl::MakeRoomForWrite(bool force) { | |
logfile_number_ = new_log_number; | ||
log_ = new log::Writer(lfile); | ||
imm_ = mem_; | ||
has_imm_.Release_Store(imm_); | ||
has_imm_.Release_Store((MemTable*)imm_); | ||
if (NULL!=imm_) | ||
{ | ||
ThreadTask * task=new ImmWriteTask(this); | ||
gImmThreads->Submit(task); | ||
} | ||
mem_ = new MemTable(internal_comparator_); | ||
mem_->Ref(); | ||
force = false; // Do not force another compaction if have room | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's up with the -1 here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The github differencing logic is piecing stuff from high and low. The real change is that I inserted "if (RunningCompactionCount()<2)", then tabbed everything else in the function in by one tab. Old (red) line 248 is now new (green) line 256. Move 7 lines down in red, then in green ... both have same "KeepOrDelete(filenames[i], -1, live)". Move 19 lines down from the starting point. You will see "KeepOrDelete(filenames[i], level, live);" in both.