Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mv write sizing #132

Merged
merged 3 commits into from
May 8, 2014
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,16 @@ Status BuildTable(const std::string& dbname,
std::string fname = TableFileName(options, meta->number, meta->level);
if (iter->Valid()) {
WritableFile* file;
s = env->NewWritableFile(fname, &file);
size_t map_size;

// large buffers, try for a little bit bigger than half hoping
// for two writes ... not three
if (10*1024*1024 < options.write_buffer_size)
map_size=(options.write_buffer_size/6)*4;
else
map_size=(options.write_buffer_size*12)/10; // integer multiply 1.2

s = env->NewWritableFile(fname, &file, map_size);
if (!s.ok()) {
return s;
}
Expand Down
4 changes: 3 additions & 1 deletion db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,9 @@ TEST(CorruptionTest, NewFileErrorDuringWrite) {
const int num = 3 + (Options().write_buffer_size / kValueSize);
std::string value_storage;
Status s;
for (int i = 0; s.ok() && i < num; i++) {
for (int i = 0;
s.ok() && i < num && 0==env_.num_writable_file_errors_;
i++) {
WriteBatch batch;
batch.Put("a", Value(100, &value_storage));
s = db_->Write(WriteOptions(), &batch);
Expand Down
2 changes: 1 addition & 1 deletion db/db_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,7 @@ class Benchmark {
char fname[100];
snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file);
Status s = Env::Default()->NewWritableFile(fname, &file, 2<<20);
if (!s.ok()) {
fprintf(stderr, "%s\n", s.ToString().c_str());
return;
Expand Down
26 changes: 15 additions & 11 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,14 @@ Options SanitizeOptions(const std::string& dbname,
ClipToRange(&result.block_size, 1<<10, 4<<20);

if (src.limited_developer_mem)
{
gMapSize=2*1024*1024L;
if (2*1024*1024L < result.write_buffer_size) // let unit tests be smaller
result.write_buffer_size=2*1024*1024L;
} // if

// alternate means to change gMapSize ... more generic
if (0!=src.mmap_size)
gMapSize=src.mmap_size;

if (gMapSize < result.write_buffer_size) // let unit tests be smaller
result.write_buffer_size=gMapSize;

// Validate tiered storage options
tiered_dbname=MakeTieredDbname(dbname, result);
Expand All @@ -155,7 +158,6 @@ Options SanitizeOptions(const std::string& dbname,
result.block_cache = block_cache;
}


return result;
}

Expand Down Expand Up @@ -248,7 +250,7 @@ Status DBImpl::NewDB() {

const std::string manifest = DescriptorFileName(dbname_, 1);
WritableFile* file;
Status s = env_->NewWritableFile(manifest, &file);
Status s = env_->NewWritableFile(manifest, &file, 4*1024L);
if (!s.ok()) {
return s;
}
Expand Down Expand Up @@ -1108,7 +1110,7 @@ Status DBImpl::OpenCompactionOutputFile(

// Make the output file
std::string fname = TableFileName(options_, file_number, compact->compaction->level()+1);
Status s = env_->NewWritableFile(fname, &compact->outfile);
Status s = env_->NewWritableFile(fname, &compact->outfile, gMapSize);
if (s.ok()) {
Options options;
options=options_;
Expand Down Expand Up @@ -1214,9 +1216,9 @@ DBImpl::MaybeRaiseBlockSize(
file_data_size=versions_->MaxFileSizeForLevel(CompactionStuff.level());
keys_per_file=file_data_size / avg_value_size;

if (75000 < keys_per_file)
if (300000 < keys_per_file)
{
keys_per_file = 75000;
keys_per_file = 300000;
file_data_size = avg_value_size * keys_per_file;
} // if

Expand Down Expand Up @@ -1862,9 +1864,11 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber();

WritableFile* lfile = NULL;
gPerfCounters->Inc(ePerfWriteNewMem);
s = env_->NewWriteOnlyFile(LogFileName(dbname_, new_log_number), &lfile);
s = env_->NewWriteOnlyFile(LogFileName(dbname_, new_log_number), &lfile,
options_.env->RecoveryMmapSize(&options_));
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
Expand Down Expand Up @@ -2042,7 +2046,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile;
s = options.env->NewWriteOnlyFile(LogFileName(dbname, new_log_number),
&lfile);
&lfile, options.env->RecoveryMmapSize(&options));
if (s.ok()) {
edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile;
Expand Down
4 changes: 2 additions & 2 deletions db/db_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class SpecialEnv : public EnvWrapper {
count_random_reads_ = false;
}

Status NewWritableFile(const std::string& f, WritableFile** r) {
Status NewWritableFile(const std::string& f, WritableFile** r, size_t map_size) {
class SSTableFile : public WritableFile {
private:
SpecialEnv* env_;
Expand Down Expand Up @@ -105,7 +105,7 @@ class SpecialEnv : public EnvWrapper {
return Status::IOError("simulated write error");
}

Status s = target()->NewWritableFile(f, r);
Status s = target()->NewWritableFile(f, r, 2<<20);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
Expand Down
2 changes: 1 addition & 1 deletion db/repair.cc
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ class Repairer {
Status WriteDescriptor() {
std::string tmp = TempFileName(dbname_, 1);
WritableFile* file;
Status status = env_->NewWritableFile(tmp, &file);
Status status = env_->NewWritableFile(tmp, &file, 4096);
if (!status.ok()) {
return status;
}
Expand Down
4 changes: 2 additions & 2 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
assert(descriptor_file_ == NULL);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_, 4*1024L);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
Expand Down Expand Up @@ -1853,7 +1853,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key, size_t key_count) {
// to meet file open speed goals
else
{
ret_flag=(75000<key_count);
ret_flag=(300000<key_count);
} // else
} // if

Expand Down
2 changes: 1 addition & 1 deletion helpers/memenv/memenv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class InMemoryEnv : public EnvWrapper {
}

virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) {
WritableFile** result, size_t) {
MutexLock lock(&mutex_);
if (file_map_.find(fname) != file_map_.end()) {
DeleteFileInternal(fname);
Expand Down
10 changes: 5 additions & 5 deletions helpers/memenv/memenv_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ TEST(MemEnvTest, Basics) {
ASSERT_EQ(0, children.size());

// Create a file.
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file));
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file, 2<<20));
delete writable_file;

// Check that the file exists.
Expand All @@ -53,7 +53,7 @@ TEST(MemEnvTest, Basics) {
ASSERT_EQ("f", children[0]);

// Write to the file.
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file));
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file, 2<<20));
ASSERT_OK(writable_file->Append("abc"));
delete writable_file;

Expand Down Expand Up @@ -98,7 +98,7 @@ TEST(MemEnvTest, ReadWrite) {

ASSERT_OK(env_->CreateDir(dbname + ""));

ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file));
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file, 2<<20));
ASSERT_OK(writable_file->Append("hello "));
ASSERT_OK(writable_file->Append("world"));
delete writable_file;
Expand Down Expand Up @@ -145,7 +145,7 @@ TEST(MemEnvTest, Misc) {
ASSERT_TRUE(!test_dir.empty());

WritableFile* writable_file;
ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file));
ASSERT_OK(env_->NewWritableFile("/a/b", &writable_file, 2<<20));

// These are no-ops, but we test they return success.
ASSERT_OK(writable_file->Sync());
Expand All @@ -167,7 +167,7 @@ TEST(MemEnvTest, LargeWrite) {
}

WritableFile* writable_file;
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file));
ASSERT_OK(env_->NewWritableFile(dbname + "/f", &writable_file, 2<<20));
ASSERT_OK(writable_file->Append("foo"));
ASSERT_OK(writable_file->Append(write_data));
delete writable_file;
Expand Down
30 changes: 19 additions & 11 deletions include/leveldb/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@

namespace leveldb {

class AppendableFile;
class FileLock;
class Options;
class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
class WritableFile;
class AppendableFile;

class Env {
public:
Expand Down Expand Up @@ -74,7 +75,8 @@ class Env {
//
// The returned file will only be accessed by one thread at a time.
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) = 0;
WritableFile** result,
size_t map_size) = 0;

// Riak specific:
// Derived from NewWritableFile. One change: if the file exists,
Expand All @@ -85,7 +87,8 @@ class Env {
//
// The returned file will only be accessed by one thread at a time.
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result) = 0;
WritableFile** result,
size_t map_size) = 0;

// Riak specific:
// Allows for virtualized version of NewWritableFile that enables write
Expand All @@ -94,8 +97,9 @@ class Env {
//
// The returned file will only be accessed by one thread at a time.
virtual Status NewWriteOnlyFile(const std::string& fname,
WritableFile** result)
{return(NewWritableFile(fname, result));};
WritableFile** result,
size_t map_size)
{return(NewWritableFile(fname, result, map_size));};

// Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0;
Expand Down Expand Up @@ -179,6 +183,10 @@ class Env {
// Riak specific: Get object that is tracking various software counters
virtual PerformanceCounters * GetPerformanceCounters() {return(gPerfCounters);};

// Riak specific: Request size of recovery memory map, potentially using
// Options data for the decision. Default 2Mbyte is Google's original size.
virtual size_t RecoveryMmapSize(const struct Options *) const {return(2*1024*1024L);};

private:
// No copying allowed
Env(const Env&);
Expand Down Expand Up @@ -333,14 +341,14 @@ class EnvWrapper : public Env {
Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) {
return target_->NewRandomAccessFile(f, r);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
return target_->NewWritableFile(f, r);
Status NewWritableFile(const std::string& f, WritableFile** r, size_t s=0) {
return target_->NewWritableFile(f, r, s);
}
Status NewAppendableFile(const std::string& f, WritableFile** r) {
return target_->NewAppendableFile(f, r);
Status NewAppendableFile(const std::string& f, WritableFile** r, size_t s=0) {
return target_->NewAppendableFile(f, r, s);
}
Status NewWriteOnlyFile(const std::string& f, WritableFile** r) {
return target_->NewWriteOnlyFile(f, r);
Status NewWriteOnlyFile(const std::string& f, WritableFile** r, size_t s=0) {
return target_->NewWriteOnlyFile(f, r, s);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
Expand Down
3 changes: 3 additions & 0 deletions include/leveldb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ struct Options {
// Default: false
bool limited_developer_mem;

// The size of each MMAped file, choose 0 for the default (20M)
uint64_t mmap_size;

// Riak option to adjust aggressive delete behavior.
// - zero disables aggressive delete
// - positive value indicates how many deletes must exist
Expand Down
2 changes: 1 addition & 1 deletion table/format.cc
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ Status ReadBlock(RandomAccessFile* file,

// create / append file to hold removed blocks
new_name+="/BLOCKS.bad";
s2=options.GetEnv()->NewAppendableFile(new_name, &bad_file);
s2=options.GetEnv()->NewAppendableFile(new_name, &bad_file, 4*1024);
if (s2.ok())
{
// need a try/catch
Expand Down
6 changes: 4 additions & 2 deletions util/cache2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,14 +432,16 @@ DoubleCache::DoubleCache(
: m_FileCache(NULL), m_BlockCache(NULL),
m_IsInternalDB(options.is_internal_db), m_PlentySpace(true),
m_Overhead(0), m_TotalAllocation(0),
m_FileTimeout(4*24*60*60), // default is 4 days
m_FileTimeout(10*24*60*60), // default is 10 days
m_BlockCacheThreshold(options.block_cache_threshold),
m_SizeCachedFiles(0)
{
// fixed allocation for recovery log and info LOG: 20M each
// (with 64 or open databases, this is a serious number)
// and fixed allocation for two write buffers
m_Overhead=options.write_buffer_size*2 + gMapSize*2;

m_Overhead=options.write_buffer_size*2
+ options.env->RecoveryMmapSize(&options) + 4096;
m_TotalAllocation=gFlexCache.GetDBCacheCapacity(m_IsInternalDB);

if (m_Overhead < m_TotalAllocation)
Expand Down
2 changes: 1 addition & 1 deletion util/env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ static Status DoWriteStringToFile(Env* env, const Slice& data,
const std::string& fname,
bool should_sync) {
WritableFile* file;
Status s = env->NewWritableFile(fname, &file);
Status s = env->NewWritableFile(fname, &file, 4*1024L);
if (!s.ok()) {
return s;
}
Expand Down
Loading