diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index 59936cff..c4f58f23 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -20,6 +20,8 @@ // // ------------------------------------------------------------------- +#include + #include #include #include @@ -157,23 +159,108 @@ static ERL_NIF_TERM slice_to_binary(ErlNifEnv* env, leveldb::Slice s) return result; } +/** struct for grabbing eleveldb environment options via fold + * ... then loading said options into eleveldb_priv_data + */ +struct EleveldbOptions +{ + int m_EleveldbThreads; + int m_LeveldbImmThreads; + int m_LeveldbBGWriteThreads; + int m_LeveldbOverlapThreads; + int m_LeveldbGroomingThreads; + + int m_TotalMemPercent; + int m_TotalMem; + + bool m_LimitedDeveloper; + + EleveldbOptions() + : m_EleveldbThreads(71), + m_LeveldbImmThreads(0), m_LeveldbBGWriteThreads(0), + m_LeveldbOverlapThreads(0), m_LeveldbGroomingThreads(0), + m_TotalMemPercent(0), m_TotalMem(0), + m_LimitedDeveloper(false) + {}; + + void Dump() + { + syslog(LOG_ERR, " m_EleveldbThreads: %d\n", m_EleveldbThreads); + syslog(LOG_ERR, " m_LeveldbImmThreads: %d\n", m_LeveldbImmThreads); + syslog(LOG_ERR, " m_LeveldbBGWriteThreads: %d\n", m_LeveldbBGWriteThreads); + syslog(LOG_ERR, " m_LeveldbOverlapThreads: %d\n", m_LeveldbOverlapThreads); + syslog(LOG_ERR, " m_LeveldbGroomingThreads: %d\n", m_LeveldbGroomingThreads); + + syslog(LOG_ERR, " m_TotalMemPercent: %d\n", m_TotalMemPercent); + syslog(LOG_ERR, " m_TotalMem: %d\n", m_TotalMem); + syslog(LOG_ERR, " m_LimitedDeveloper: %s\n", (m_LimitedDeveloper ? "true" : "false")); + } // Dump +}; // struct EleveldbOptions -/* Module-level private data: */ + +/** Module-level private data: + * singleton instance held by erlang and passed on API calls + */ class eleveldb_priv_data { - eleveldb_priv_data(const eleveldb_priv_data&); // nocopy - eleveldb_priv_data& operator=(const eleveldb_priv_data&); // nocopyassign +public: + EleveldbOptions m_Opts; + eleveldb::eleveldb_thread_pool thread_pool; - public: - eleveldb::eleveldb_thread_pool thread_pool; + explicit eleveldb_priv_data(EleveldbOptions & Options) + : m_Opts(Options), thread_pool(Options.m_EleveldbThreads) + {} + +private: + eleveldb_priv_data(); // no default constructor + eleveldb_priv_data(const eleveldb_priv_data&); // nocopy + eleveldb_priv_data& operator=(const eleveldb_priv_data&); // nocopyassign - eleveldb_priv_data(const size_t n_write_threads) - : thread_pool(n_write_threads) - {} }; +ERL_NIF_TERM parse_init_option(ErlNifEnv* env, ERL_NIF_TERM item, EleveldbOptions& opts) +{ + int arity; + const ERL_NIF_TERM* option; + if (enif_get_tuple(env, item, &arity, &option)) + { + if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM) + { + unsigned long memory_sz; + if (enif_get_ulong(env, option[1], &memory_sz)) + { + if (memory_sz != 0) + { + opts.m_TotalMem = memory_sz; + } + } + } + else if (option[0] == eleveldb::ATOM_TOTAL_LEVELDB_MEM_PERCENT) + { + unsigned long memory_sz; + if (enif_get_ulong(env, option[1], &memory_sz)) + { + if (0 < memory_sz && memory_sz <= 100) + { + // this gets noticed later and applied against gCurrentTotalMemory + opts.m_TotalMemPercent = memory_sz; + } + } + } + else if (option[0] == eleveldb::ATOM_LIMITED_DEVELOPER_MEM) + { + if (option[1] == eleveldb::ATOM_TRUE) + opts.m_LimitedDeveloper = true; + else + opts.m_LimitedDeveloper = false; + } + } + + return eleveldb::ATOM_OK; +} + ERL_NIF_TERM parse_open_option(ErlNifEnv* env, ERL_NIF_TERM item, leveldb::Options& opts) { int arity; @@ -399,13 +486,20 @@ async_open( // convert total_leveldb_mem to byte count if it arrived as percent // This happens now because there is no guarantee as to when the total_memory // value would be read relative to total_leveldb_mem_percent in the option fold - if (0 < opts->total_leveldb_mem && opts->total_leveldb_mem<=100) - opts->total_leveldb_mem=(opts->total_leveldb_mem * gCurrentTotalMemory)/100; + uint64_t use_memory; + + // start with all memory + use_memory=gCurrentTotalMemory; + + // adjust to specific memory size + if (0!=priv.m_Opts.m_TotalMem) + use_memory=gCurrentTotalMemory; - // it could be that we are only activating internal databases (or they come up first) - // give them all RAM knowing flexcache will reduce it to 20% or so - if (0 == opts->total_leveldb_mem && opts->is_internal_db) - opts->total_leveldb_mem = gCurrentTotalMemory; + if (0 < priv.m_Opts.m_TotalMemPercent && priv.m_Opts.m_TotalMemPercent<=100) + use_memory=(opts->total_leveldb_mem * use_memory)/100; // integer math for percentate + + opts->total_leveldb_mem=use_memory; + opts->limited_developer_mem=priv.m_Opts.m_LimitedDeveloper; eleveldb::WorkTask *work_item = new eleveldb::OpenTask(env, caller_ref, db_name, opts); @@ -554,7 +648,7 @@ async_iterator( // Parse out the read options leveldb::ReadOptions *opts = new leveldb::ReadOptions; fold(env, options_ref, parse_read_option, *opts); - + opts->fill_cache=true; eleveldb::WorkTask *work_item = new eleveldb::IterTask(env, caller_ref, db_ptr.get(), keys_only, opts); @@ -954,67 +1048,16 @@ static void on_unload(ErlNifEnv *env, void *priv_data) static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) try { - *priv_data = 0; + int ret_val; + + ret_val=0; + *priv_data = NULL; // inform erlang of our two resource types eleveldb::DbObject::CreateDbObjectType(env); eleveldb::ItrObject::CreateItrObjectType(env); - /* Gather local initialization data: */ - struct _local - { - int n_threads; - - _local() - : n_threads(0) - {} - } local; - - /* Seed our private data with appropriate values: */ - if(!enif_is_list(env, load_info)) - throw std::invalid_argument("on_load::load_info"); - - ERL_NIF_TERM load_info_head; - - while(0 != enif_get_list_cell(env, load_info, &load_info_head, &load_info)) - { - int arity = 0; - ERL_NIF_TERM *tuple_data; - - // Pick out "{write_threads, N}": - if(enif_get_tuple(env, load_info_head, &arity, const_cast(&tuple_data))) - { - if(2 != arity) - continue; - - unsigned int atom_len; - if(0 == enif_get_atom_length(env, tuple_data[0], &atom_len, ERL_NIF_LATIN1)) - continue; - - const unsigned int atom_max = 128; - char atom[atom_max]; - if((atom_len + 1) != static_cast(enif_get_atom(env, tuple_data[0], atom, atom_max, ERL_NIF_LATIN1))) - continue; - - if(0 != strncmp(atom, "write_threads", atom_max)) - continue; - - // We have a setting, now peek at the parameter: - if(0 == enif_get_int(env, tuple_data[1], &local.n_threads)) - throw std::invalid_argument("on_load::n_threads"); - - if(0 >= local.n_threads || eleveldb::N_THREADS_MAX < static_cast(local.n_threads)) - throw std::range_error("on_load::n_threads"); - } - } - - /* Spin up the thread pool, set up all private data: */ - eleveldb_priv_data *priv = new eleveldb_priv_data(local.n_threads); - - *priv_data = priv; - - // Initialize common atoms - +// must initialize atoms before processing options #define ATOM(Id, Value) { Id = enif_make_atom(env, Value); } ATOM(eleveldb::ATOM_OK, "ok"); ATOM(eleveldb::ATOM_ERROR, "error"); @@ -1062,7 +1105,29 @@ try #undef ATOM - return 0; + + // read options that apply to global eleveldb environment + if(enif_is_list(env, load_info)) + { + EleveldbOptions load_options; + + fold(env, load_info, parse_init_option, load_options); + + /* Spin up the thread pool, set up all private data: */ + eleveldb_priv_data *priv = new eleveldb_priv_data(load_options); + + *priv_data = priv; + + } // if + + else + { + // anything non-zero is "fail" + ret_val=1; + } // else + // Initialize common atoms + + return ret_val; } diff --git a/src/eleveldb.erl b/src/eleveldb.erl index 36b44a52..78fe85d0 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -66,10 +66,10 @@ -spec init() -> ok | {error, any()}. init() -> - NumWriteThreads = case os:getenv("ELEVELDB_N_WRITE_THREADS") of - false -> 71; % must be a prime number - N -> erlang:list_to_integer(N) % exception on bad value - end, +%% NumWriteThreads = case os:getenv("ELEVELDB_N_WRITE_THREADS") of +%% false -> 71; % must be a prime number +%% N -> erlang:list_to_integer(N) % exception on bad value +%% end, SoName = case code:priv_dir(?MODULE) of {error, bad_name} -> case code:which(?MODULE) of @@ -81,7 +81,7 @@ init() -> Dir -> filename:join(Dir, "eleveldb") end, - erlang:load_nif(SoName, [{write_threads,NumWriteThreads}]). + erlang:load_nif(SoName, application:get_all_env(eleveldb)). -type open_options() :: [{create_if_missing, boolean()} | {error_if_exists, boolean()} |