diff --git a/db/db_impl.cc b/db/db_impl.cc index 200b520f..30fdc0a7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -158,7 +158,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), manual_compaction_(NULL), - level0_good(true) + level0_good(true), + throttle_end(0) { mem_->Ref(); has_imm_.Release_Store(NULL); @@ -1180,8 +1181,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { size_t entry_count; entry_count=compact->num_entries + compact->builder->NumEntries(); - // imm_micros intentional NOT removed from time calculation, - // gives better measure of overall activity / write overhead + // every so often see if priority needs to change if (1==(entry_count % 1000) && 1000WriteThrottleUsec(bg_compaction_scheduled_); } // release MutexLock l(&mutex_) + // throttle on exit to reduce possible reordering if (0!=throttle) { - int count; + uint64_t now, remaining_wait, new_end, batch_wait; + int batch_count; + /// slowing each call down sequentially MutexLock l(&throttle_mutex_); + // server may have been busy since previous write, + // use only the remaining time as throttle + now=env_->NowMicros(); + + if (now < throttle_end) + { + + remaining_wait=throttle_end - now; + env_->SleepForMicroseconds(remaining_wait); + new_end=now+remaining_wait+throttle; + + gPerfCounters->Add(ePerfDebug0, remaining_wait); + } // if + else + { + remaining_wait=0; + new_end=now + throttle; + } // else + // throttle is per key write, how many in batch? - count=(NULL!=my_batch ? WriteBatchInternal::Count(my_batch) : 1); - env_->SleepForMicroseconds(throttle * count); - gPerfCounters->Add(ePerfDebug0, throttle * count); + batch_count=(NULL!=my_batch ? WriteBatchInternal::Count(my_batch) : 1); + if (0 < batch_count) // unclear if Count() could return zero + --batch_count; + batch_wait=throttle * batch_count; + + // only wait on batch if extends beyond potential wait period + if (now + remaining_wait < throttle_end + batch_wait) + { + remaining_wait=throttle_end + batch_wait - (now + remaining_wait); + env_->SleepForMicroseconds(remaining_wait); + new_end +=remaining_wait; + + gPerfCounters->Add(ePerfDebug0, remaining_wait); + } // if + + throttle_end=new_end; } // if + // throttle not needed, kill off old wait time + else if (0!=throttle_end) + { + throttle_end=0; + } // else if + return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 0b994a2d..08394f61 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -188,6 +188,8 @@ class DBImpl : public DB { // hint to background thread when level0 is backing up volatile bool level0_good; + volatile uint64_t throttle_end; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/util/env_posix.cc b/util/env_posix.cc index 23cfbe12..b00d4276 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -693,9 +693,18 @@ class PosixEnv : public Env { } virtual uint64_t NowMicros() { +#if _POSIX_TIMERS >= 200801L + struct timespec ts; + + // this is rumored to be faster that gettimeofday(), + // and sometimes shift less ... someday use CLOCK_MONOTONIC_RAW + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000 + ts.tv_nsec/1000; +#else struct timeval tv; gettimeofday(&tv, NULL); return static_cast(tv.tv_sec) * 1000000 + tv.tv_usec; +#endif } virtual void SleepForMicroseconds(int micros) {