Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mv flexcache4 #75

Merged
merged 6 commits into from
Oct 21, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 137 additions & 72 deletions c_src/eleveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
//
// -------------------------------------------------------------------

#include <syslog.h>

#include <new>
#include <set>
#include <stack>
Expand Down Expand Up @@ -157,23 +159,108 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s)
return result;
}

/** struct for grabbing eleveldb environment options via fold
* ... then loading said options into eleveldb_priv_data
*/
struct EleveldbOptions
{
int m_EleveldbThreads;
int m_LeveldbImmThreads;
int m_LeveldbBGWriteThreads;
int m_LeveldbOverlapThreads;
int m_LeveldbGroomingThreads;

int m_TotalMemPercent;
int m_TotalMem;

bool m_LimitedDeveloper;

EleveldbOptions()
: m_EleveldbThreads(71),
m_LeveldbImmThreads(0), m_LeveldbBGWriteThreads(0),
m_LeveldbOverlapThreads(0), m_LeveldbGroomingThreads(0),
m_TotalMemPercent(0), m_TotalMem(0),
m_LimitedDeveloper(false)
{};

void Dump()
{
syslog(LOG_ERR, " m_EleveldbThreads: %d\n", m_EleveldbThreads);
syslog(LOG_ERR, " m_LeveldbImmThreads: %d\n", m_LeveldbImmThreads);
syslog(LOG_ERR, " m_LeveldbBGWriteThreads: %d\n", m_LeveldbBGWriteThreads);
syslog(LOG_ERR, " m_LeveldbOverlapThreads: %d\n", m_LeveldbOverlapThreads);
syslog(LOG_ERR, " m_LeveldbGroomingThreads: %d\n", m_LeveldbGroomingThreads);

syslog(LOG_ERR, " m_TotalMemPercent: %d\n", m_TotalMemPercent);
syslog(LOG_ERR, " m_TotalMem: %d\n", m_TotalMem);

syslog(LOG_ERR, " m_LimitedDeveloper: %s\n", (m_LimitedDeveloper ? "true" : "false"));
} // Dump
}; // struct EleveldbOptions

/* Module-level private data: */

/** Module-level private data:
* singleton instance held by erlang and passed on API calls
*/
class eleveldb_priv_data
{
eleveldb_priv_data(const eleveldb_priv_data&); // nocopy
eleveldb_priv_data& operator=(const eleveldb_priv_data&); // nocopyassign
public:
EleveldbOptions m_Opts;
eleveldb::eleveldb_thread_pool thread_pool;

public:
eleveldb::eleveldb_thread_pool thread_pool;
explicit eleveldb_priv_data(EleveldbOptions & Options)
: m_Opts(Options), thread_pool(Options.m_EleveldbThreads)
{}

private:
eleveldb_priv_data(); // no default constructor
eleveldb_priv_data(const eleveldb_priv_data&); // nocopy
eleveldb_priv_data& operator=(const eleveldb_priv_data&); // nocopyassign

eleveldb_priv_data(const size_t n_write_threads)
: thread_pool(n_write_threads)
{}
};


ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOptions& opts)
{
int arity;
const ERL_NIF_TERM* option;
if (enif_get_tuple(env, item, &arity, &option))
{
if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM)
{
unsigned long memory_sz;
if (enif_get_ulong(env, option[1], &memory_sz))
{
if (memory_sz != 0)
{
opts.m_TotalMem = memory_sz;
}
}
}
else if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM_PERCENT)
{
unsigned long memory_sz;
if (enif_get_ulong(env, option[1], &memory_sz))
{
if (0 < memory_sz && memory_sz <= 100)
{
// this gets noticed later and applied against gCurrentTotalMemory
opts.m_TotalMemPercent = memory_sz;
}
}
}
else if (option[0] == eleveldb::ATOM_LIMITED_DEVELOPER_MEM)
{
if (option[1] == eleveldb::ATOM_TRUE)
opts.m_LimitedDeveloper = true;
else
opts.m_LimitedDeveloper = false;
}
}

return eleveldb::ATOM_OK;
}

ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Options& opts)
{
int arity;
Expand Down Expand Up @@ -399,13 +486,20 @@ async_open(
// convert total_leveldb_mem to byte count if it arrived as percent
// This happens now because there is no guarantee as to when the total_memory
// value would be read relative to total_leveldb_mem_percent in the option fold
if (0 < opts->total_leveldb_mem && opts->total_leveldb_mem<=100)
opts->total_leveldb_mem=(opts->total_leveldb_mem * gCurrentTotalMemory)/100;
uint64_t use_memory;

// start with all memory
use_memory=gCurrentTotalMemory;

// adjust to specific memory size
if (0!=priv.m_Opts.m_TotalMem)
use_memory=gCurrentTotalMemory;

// it could be that we are only activating internal databases (or they come up first)
// give them all RAM knowing flexcache will reduce it to 20% or so
if (0 == opts->total_leveldb_mem && opts->is_internal_db)
opts->total_leveldb_mem = gCurrentTotalMemory;
if (0 < priv.m_Opts.m_TotalMemPercent && priv.m_Opts.m_TotalMemPercent<=100)
use_memory=(opts->total_leveldb_mem * use_memory)/100; // integer math for percentate

opts->total_leveldb_mem=use_memory;
opts->limited_developer_mem=priv.m_Opts.m_LimitedDeveloper;

eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref,
db_name, opts);
Expand Down Expand Up @@ -554,7 +648,7 @@ async_iterator(
// Parse out the read options
leveldb::ReadOptions *opts = new leveldb::ReadOptions;
fold(env, options_ref, parse_read_option, *opts);

opts->fill_cache=true;
eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref,
db_ptr.get(), keys_only, opts);

Expand Down Expand Up @@ -954,67 +1048,16 @@ static void on_unload(ErlNifEnv *env, void *priv_data)
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
try
{
*priv_data = 0;
int ret_val;

ret_val=0;
*priv_data = NULL;

// inform erlang of our two resource types
eleveldb::DbObject::CreateDbObjectType(env);
eleveldb::ItrObject::CreateItrObjectType(env);

/* Gather local initialization data: */
struct _local
{
int n_threads;

_local()
: n_threads(0)
{}
} local;

/* Seed our private data with appropriate values: */
if(!enif_is_list(env, load_info))
throw std::invalid_argument("on_load::load_info");

ERL_NIF_TERM load_info_head;

while(0 != enif_get_list_cell(env, load_info, &load_info_head, &load_info))
{
int arity = 0;
ERL_NIF_TERM *tuple_data;

// Pick out "{write_threads, N}":
if(enif_get_tuple(env, load_info_head, &arity, const_cast<const ERL_NIF_TERM **>(&tuple_data)))
{
if(2 != arity)
continue;

unsigned int atom_len;
if(0 == enif_get_atom_length(env, tuple_data[0], &atom_len, ERL_NIF_LATIN1))
continue;

const unsigned int atom_max = 128;
char atom[atom_max];
if((atom_len + 1) != static_cast<unsigned int>(enif_get_atom(env, tuple_data[0], atom, atom_max, ERL_NIF_LATIN1)))
continue;

if(0 != strncmp(atom, "write_threads", atom_max))
continue;

// We have a setting, now peek at the parameter:
if(0 == enif_get_int(env, tuple_data[1], &local.n_threads))
throw std::invalid_argument("on_load::n_threads");

if(0 >= local.n_threads || eleveldb::N_THREADS_MAX < static_cast<size_t>(local.n_threads))
throw std::range_error("on_load::n_threads");
}
}

/* Spin up the thread pool, set up all private data: */
eleveldb_priv_data *priv = new eleveldb_priv_data(local.n_threads);

*priv_data = priv;

// Initialize common atoms

// must initialize atoms before processing options
#define ATOM(Id, Value) { Id = enif_make_atom(env, Value); }
ATOM(eleveldb::ATOM_OK, "ok");
ATOM(eleveldb::ATOM_ERROR, "error");
Expand Down Expand Up @@ -1062,7 +1105,29 @@ try

#undef ATOM

return 0;

// read options that apply to global eleveldb environment
if(enif_is_list(env, load_info))
{
EleveldbOptions load_options;

fold(env, load_info, parse_init_option, load_options);

/* Spin up the thread pool, set up all private data: */
eleveldb_priv_data *priv = new eleveldb_priv_data(load_options);

*priv_data = priv;

} // if

else
{
// anything non-zero is "fail"
ret_val=1;
} // else
// Initialize common atoms

return ret_val;
}


Expand Down
10 changes: 5 additions & 5 deletions src/eleveldb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@

-spec init() -> ok | {error, any()}.
init() ->
NumWriteThreads = case os:getenv("ELEVELDB_N_WRITE_THREADS") of
false -> 71; % must be a prime number
N -> erlang:list_to_integer(N) % exception on bad value
end,
%% NumWriteThreads = case os:getenv("ELEVELDB_N_WRITE_THREADS") of
%% false -> 71; % must be a prime number
%% N -> erlang:list_to_integer(N) % exception on bad value
%% end,
SoName = case code:priv_dir(?MODULE) of
{error, bad_name} ->
case code:which(?MODULE) of
Expand All @@ -81,7 +81,7 @@ init() ->
Dir ->
filename:join(Dir, "eleveldb")
end,
erlang:load_nif(SoName, [{write_threads,NumWriteThreads}]).
erlang:load_nif(SoName, application:get_all_env(eleveldb)).

-type open_options() :: [{create_if_missing, boolean()} |
{error_if_exists, boolean()} |
Expand Down