Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mv dynamic block size #111

Merged
merged 10 commits into from
Dec 10, 2013
179 changes: 170 additions & 9 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <time.h>
#include <algorithm>
#include <errno.h>
#include <math.h>
#include <set>
#include <string>
#include <stdint.h>
Expand Down Expand Up @@ -176,8 +177,10 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
manual_compaction_(NULL),
level0_good(true),
throttle_end(0),
running_compactions_(0)
running_compactions_(0),
block_size_changed_(0), last_low_mem_(0)
{
current_block_size_=options_.block_size;
DBList()->AddDB(this, options_.is_internal_db);

mem_->Ref();
Expand Down Expand Up @@ -671,6 +674,7 @@ Status DBImpl::WriteLevel0Table(volatile MemTable* mem, VersionEdit* edit,
// no compression for level 0.
local_options=options_;
local_options.compression=kNoCompression;
local_options.block_size=current_block_size_;
s = BuildTable(dbname_, env_, local_options, user_comparator(), table_cache_, iter, &meta, smallest_snapshot);

mutex_.Lock();
Expand Down Expand Up @@ -838,7 +842,7 @@ Status DBImpl::TEST_CompactMemTable() {
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();

if (!shutting_down_.Acquire_Load())
if (!shutting_down_.Acquire_Load())
{
if (NULL==manual_compaction_)
{
Expand Down Expand Up @@ -1067,7 +1071,9 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
delete compact;
}

Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
Status DBImpl::OpenCompactionOutputFile(
CompactionState* compact,
size_t sample_value_size) {
assert(compact != NULL);
assert(compact->builder == NULL);
uint64_t file_number;
Expand All @@ -1087,11 +1093,166 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
std::string fname = TableFileName(dbname_, file_number, compact->compaction->level()+1);
Status s = env_->NewWritableFile(fname, &compact->outfile);
if (s.ok()) {
compact->builder = new TableBuilder(options_, compact->outfile);
}
Options options;
options=options_;
options.block_size=current_block_size_;

// consider larger block size if option enabled (block_size_steps!=0)
// and low on file cache space
if (0!=options.block_size_steps)
{
uint64_t now;

now=env_->NowMicros();

if (!double_cache.GetPlentySpace())
{
// keep track of last time there was lack of space.
// use info in block below to revert block_size
last_low_mem_=now;

// do not make changes often, a multi file compaction
// could raise more than one step (5 min)
if (block_size_changed_+(5*60*1000000L) < now)
{
size_t old_size=current_block_size_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The temporary variable old_size can be removed if MaybeRaiseBlockSize doesn't mutate current_block_size_. The mutation can then happen inside the modified if (options.block_size !=current_block_size_) block. It also prevents an extra conditional in MaybeRaiseBlockSize.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I declared current_block_size_ as a volatile. This is because Riak 2.0 allows simultaneous compactions on independent levels. I believe this function is already protected by the database level mutex … but to lazy to guarantee that today, and not willing to risk it in the future … cuz I have burned myself once already on "knowing how locks worked" in the code, then changed a code path breaking the assumption elsewhere. These values are not critical, so I do not believe two threads working through here at the same time will interact horribly or leave values "wrong" … hence why I am intentionally not thread protecting actively.

I should mention that this lazy threading model is to avoid creating yet another mutex or similar object. Any mutex that is actually used by two threads simultaneously causes a disproportional performance loss related to Erlang schedulers. Laziness pays off greatly in the shadow of Erlang.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Makes sense I think. If it gets changed underneath, you want to keep doing the calculation with the temporary.


options.block_size=MaybeRaiseBlockSize(*compact->compaction, sample_value_size);

// did size change?
if (options.block_size!=old_size)
{
gPerfCounters->Inc(ePerfDebug0);
block_size_changed_=now;
} // if
} // if

} // if

// has system's memory been ok for a while now
else if (last_low_mem_+double_cache.GetFileTimeout()*1000000L < now)
{
// reset size to original, data could have been deleted and/or old
// files no longer need cache space
current_block_size_=options_.block_size;
} // else if

} // if
compact->builder = new TableBuilder(options, compact->outfile);
} // if

return s;
}


size_t
DBImpl::MaybeRaiseBlockSize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to have tests showing what block sizes we get from running this function against various values for the inputs, to ensure we don't hit any pathological cases. Graphs would help. Alternatively, or in conjunction with this, graphs of block size changes vs disk activity during performance tests would probably be informative. I'm sure you've thought of this and have all sorts of tuning tests planned, just adding some emphasis.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sent graphic image of max block size plane to Andrew offline. No holes / spikes. Similarly constructed spreadsheet with file sizes from 800 to 800M and key sizes from 10 to 1080 … resulting max block size matrix was a plane without spikes / holes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, things look good here.

Compaction & CompactionStuff,
size_t SampleValueSize)
{
size_t new_block_size, tot_user_data, tot_index_keys, avg_value_size,
avg_key_size, avg_block_size;

// start with most recent dynamic sizing
new_block_size=current_block_size_;

//
// 1. Get estimates for key values. Zero implies unable to estimate
// (as the formula is tuned, some of the values become unused ... apologies
CompactionStuff.CalcInputStats(*table_cache_);
tot_user_data=CompactionStuff.TotalUserDataSize();
tot_index_keys=CompactionStuff.TotalIndexKeys();
avg_value_size=CompactionStuff.AverageValueSize();
avg_key_size=CompactionStuff.AverageKeySize();
avg_block_size=CompactionStuff.AverageBlockSize();

// CalcInputStats does not have second source for avg_value_size.
// Use size of next key.
if (0==avg_value_size)
avg_value_size=SampleValueSize;

Log(options_.info_log,
"Block stats used %zd user data, %zd index keys, %zd avg value, %zd avg key, %zd avg block",
tot_user_data, tot_index_keys, avg_value_size, avg_key_size, avg_block_size);

//
// 2. Define boundaries of block size steps. Calculate
// "next step"
//
if (0!=tot_user_data && 0!=tot_index_keys && 0!=avg_value_size
&& 0!=avg_key_size && 0!=avg_block_size)
{
size_t high_size, low_size, cur_size, increment, file_data_size, keys_per_file;

// 2a. Highest block size:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably break this out into a method. I think there should be a method for calculating high_size and low_size, as well as a sanity check function to better split this code up, instead of having it all in one large method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. That is a stylistic thing. Based upon how this was developed and tuned, I prefer to keep it this way.

// (sqrt()/sqrt() stuff is from first derivative to minimize
// total read size of one block plus file metadata)

// limited by keys or filesize? (pretend metadata is zero, i love pretend games)
file_data_size=versions_->MaxFileSizeForLevel(CompactionStuff.level());
keys_per_file=file_data_size / avg_value_size;

if (75000 < keys_per_file)
{
keys_per_file = 75000;
file_data_size = avg_value_size * keys_per_file;
} // if

high_size=(size_t)((double)file_data_size / (sqrt(file_data_size)/sqrt(avg_key_size)));

// 2b. Lowest block size: largest of given block size or average value size
// because large values are one block
if (avg_value_size < options_.block_size)
low_size=options_.block_size;
else
low_size=avg_value_size;

// 2c. Current block size: compaction can skew numbers in files
// without counters, use current dynamic block_size in that case
if (options_.block_size < avg_block_size)
cur_size=avg_block_size;
else
cur_size=current_block_size_;

// safety check values to eliminate negatives
if (low_size <= high_size)
{
size_t cur_step;

increment=(high_size - low_size)/options_.block_size_steps;

// adjust old, too low stuff
if (low_size < cur_size)
cur_step=(cur_size - low_size)/increment;
else
cur_step=0;

// move to next step, but not over the top step
if (cur_step < (size_t)options_.block_size_steps)
++cur_step;
else
cur_step=options_.block_size_steps;

//
// 3. Set new block size to next higher step
//
new_block_size=low_size + increment * cur_step;

Log(options_.info_log,
"Block size selected %zd block size, %zd cur, %zd low, %zd high, %zd inc, %zd step",
new_block_size, cur_size, low_size, high_size, increment, cur_step);

// This is not thread safe, but not worthy of mutex either
if (current_block_size_ < new_block_size)
current_block_size_ = new_block_size;
} // if
} // if

return(new_block_size);

} // DBImpl::MaybeRaiseBlockSize


Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Iterator* input) {
assert(compact != NULL);
Expand Down Expand Up @@ -1219,7 +1380,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
if (!drop) {
// Open output file if necessary
if (compact->builder == NULL) {
status = OpenCompactionOutputFile(compact);
status = OpenCompactionOutputFile(compact, input->value().size() + key.size());
if (!status.ok()) {
break;
}
Expand Down Expand Up @@ -1992,10 +2153,10 @@ DBImpl::VerifyLevels()
} // VerifyLevels


bool
DBImpl::IsCompactionScheduled()
bool
DBImpl::IsCompactionScheduled()
{
mutex_.AssertHeld();
mutex_.AssertHeld();
bool flag(false);
for (int level=0; level< config::kNumLevels && !flag; ++level)
flag=versions_->IsCompactionSubmitted(level);
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ class DBImpl : public DB {
Status DoCompactionWork(CompactionState* compact);
int64_t PrioritizeWork(bool IsLevel0);

Status OpenCompactionOutputFile(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact, size_t sample_value_size);
size_t MaybeRaiseBlockSize(Compaction & CompactionStuff, size_t SampleValueSize);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact);

Expand Down Expand Up @@ -204,6 +205,9 @@ class DBImpl : public DB {

volatile uint64_t throttle_end;
volatile uint32_t running_compactions_;
volatile size_t current_block_size_; // last dynamic block size computed
volatile uint64_t block_size_changed_; // NowMicros() when block size computed
volatile uint64_t last_low_mem_; // NowMicros() when low memory last seen

// accessor to new, dynamic block_cache
Cache * block_cache() {return(double_cache.GetBlockCache());};
Expand Down
10 changes: 6 additions & 4 deletions db/table_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,10 @@ void TableCache::Evict(uint64_t file_number, bool is_overlapped) {
* Riak specific routine to return table statistic ONLY if table metadata
* already within cache ... otherwise return 0.
*/
uint64_t
TableCache::GetStatisticValue(uint64_t file_number, unsigned Index)
uint64_t
TableCache::GetStatisticValue(
uint64_t file_number,
unsigned Index)
{
uint64_t ret_val;
char buf[sizeof(file_number)];
Expand All @@ -176,7 +178,7 @@ TableCache::GetStatisticValue(uint64_t file_number, unsigned Index)
Slice key(buf, sizeof(buf));
handle = cache_->Lookup(key);

if (NULL != handle)
if (NULL != handle)
{
TableAndFile * tf;

Expand All @@ -186,7 +188,7 @@ TableCache::GetStatisticValue(uint64_t file_number, unsigned Index)
} // if

return(ret_val);

} // TableCache::GetStatisticValue

} // namespace leveldb
2 changes: 2 additions & 0 deletions db/table_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ class TableCache {

Cache* TEST_GetInternalCache() {return(cache_);};

void Release(Cache::Handle * handle) {cache_->Release(handle);};

private:
Env* const env_;
const std::string dbname_;
Expand Down
2 changes: 1 addition & 1 deletion db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ struct FileMetaData {
// int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
uint64_t num_entries;
uint64_t num_entries; // count of values in .sst file, only valid during table build
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table
int level;
Expand Down
Loading