Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add first_n function #2

Merged
merged 1 commit into from
Feb 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions c_src/eleveldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ static ErlNifFunc nif_funcs[] =
{"async_iterator_move", 3, eleveldb::async_iterator_move},

{"async_count", 2, eleveldb::async_count},
{"async_first_n", 3, eleveldb::async_first_n},

{"property_cache", 2, eleveldb::property_cache},
{"property_cache_get", 1, eleveldb::property_cache_get},
Expand Down Expand Up @@ -1061,6 +1062,36 @@ async_count(
return submit_to_thread_queue(work_item, env, caller_ref);
} // async_count

ERL_NIF_TERM
async_first_n(
ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[])
{
const ERL_NIF_TERM& caller_ref = argv[0];
const ERL_NIF_TERM& dbh_ref = argv[1];
const ERL_NIF_TERM& number_of_recs_ref = argv[2];
unsigned long number_of_recs = 0;

ReferencePtr<DbObject> db_ptr;

db_ptr.assign(DbObject::RetrieveDbObject(env, dbh_ref));

if(NULL==db_ptr.get() || 0!=db_ptr->GetCloseRequested())
{
return enif_make_badarg(env);
}

// likely useless
if(NULL == db_ptr->m_Db)
return send_reply(env, caller_ref, error_einval(env));

if (enif_get_ulong(env, number_of_recs_ref, &number_of_recs) == 0)
return enif_make_badarg(env);

eleveldb::WorkTask *work_item = new eleveldb::FirstNTask(env, caller_ref, db_ptr, number_of_recs);
return submit_to_thread_queue(work_item, env, caller_ref);
} // async_first_n

ERL_NIF_TERM
async_close(
Expand Down
1 change: 1 addition & 0 deletions c_src/eleveldb.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM ar
ERL_NIF_TERM async_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM async_count(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);
ERL_NIF_TERM async_first_n(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]);

} // namespace eleveldb

Expand Down
42 changes: 42 additions & 0 deletions c_src/workitems.cc
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,48 @@ CountTask::DoWork()
return work_result(local_env(), ATOM_OK, enif_make_uint64(local_env_, cnt));
}

/**
* FirstNTask functions
*/

FirstNTask::FirstNTask(ErlNifEnv *_caller_env,
ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle,
unsigned long _number_of_recs)
: WorkTask(_caller_env, _caller_ref, _db_handle),
_number_of_recs(_number_of_recs)
{}

FirstNTask::~FirstNTask() {}

work_result
FirstNTask::DoWork()
{

ERL_NIF_TERM tail = enif_make_list(local_env(), 0);
ERL_NIF_TERM head;
uint64_t cnt = 0;

leveldb::Iterator* it = m_DbPtr->m_Db->NewIterator(leveldb::ReadOptions());
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// add a record as a tuble to the returned list
head = enif_make_tuple2(local_env(),
slice_to_binary(local_env(), it->key()),
slice_to_binary(local_env(), it->value()));
tail = enif_make_list_cell(local_env(), head, tail);
cnt++;
// Once the iterator reaches the Nth record, exit the loop.
if (cnt == _number_of_recs) break;
}
leveldb::Status status = it->status();
delete it;
if(!status.ok()){
return work_result(local_env(), ATOM_ERROR, status);
}

return work_result(local_env(), ATOM_OK, tail);
}

} // namespace eleveldb


20 changes: 20 additions & 0 deletions c_src/workitems.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,26 @@ class CountTask : public WorkTask

}; // class CountTask

/**
* Background object for async first_n,
*/

class FirstNTask : public WorkTask
{
protected:
unsigned long _number_of_recs;

public:
FirstNTask(ErlNifEnv *_caller_env,
ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle,
unsigned long _number_of_recs);

virtual ~FirstNTask();

virtual work_result DoWork();

}; // class FirstNTask


} // namespace eleveldb
Expand Down
29 changes: 27 additions & 2 deletions src/eleveldb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
iterator_move/2,
iterator_close/1]).

-export([count/1]).
-export([count/1,
first_n/2]).

-export([property_cache/2,
property_cache_get/1,
Expand Down Expand Up @@ -264,6 +265,16 @@ count(Ref) ->
async_count(_CallerRef, _Ref) ->
erlang:nif_error({error, not_loaded}).

%% Retrieving the first N records in the database and return it.
-spec first_n(db_ref(), pos_integer()) -> {ok, list({binary(), binary()})} | {error, any()}.
first_n(Ref, N) ->
CallerRef = make_ref(),
async_first_n(CallerRef, Ref, N),
?WAIT_FOR_REPLY(CallerRef).

async_first_n(_CallerRef, _Ref, _N) ->
erlang:nif_error({error, not_loaded}).

-type fold_fun() :: fun(({Key::binary(), Value::binary()}, any()) -> any()).

%% Fold over the keys and values in the database
Expand Down Expand Up @@ -576,6 +587,8 @@ test_open(TestDir) ->
?assertEqual({ok, <<"123">>}, ?MODULE:get(Ref, <<"abc">>, [])),
?assertEqual(not_found, ?MODULE:get(Ref, <<"def">>, [])),
?assertEqual({ok, 1}, ?MODULE:count(Ref)),
?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 1)),
?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 100)),
assert_close(Ref).

test_open_many(TestDir, HowMany) ->
Expand All @@ -598,7 +611,9 @@ test_open_many(TestDir, HowMany) ->
lists:foreach(
fun({Ref, Key, Val}) ->
?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])),
?assertEqual({ok, 1}, ?MODULE:count(Ref))
?assertEqual({ok, 1}, ?MODULE:count(Ref)),
?assertEqual({ok, [{Key, Val}]}, ?MODULE:first_n(Ref, 1)),
?assertEqual({ok, [{Key, Val}]}, ?MODULE:first_n(Ref, 100))
end, WorkSet),
lists:foreach(fun assert_close/1, [R || {R, _, _} <- WorkSet]).

Expand All @@ -616,6 +631,8 @@ test_fold(TestDir) ->
[{<<"abc">>, <<"123">>}, {<<"def">>, <<"456">>}, {<<"hij">>, <<"789">>}],
lists:reverse(?MODULE:fold(Ref, fun accumulate/2, [], []))),
?assertEqual({ok, 3}, ?MODULE:count(Ref)),
?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 1)),
?assertEqual({ok, [{<<"hij">>, <<"789">>},{<<"def">>, <<"456">>},{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 100)),
assert_close(Ref).

test_fold_keys(TestDir) ->
Expand Down Expand Up @@ -727,6 +744,14 @@ run_load(TestDir, IntSeq) ->
?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, []))
end, KVOut),
?assertEqual({ok, length(IntSeq)}, ?MODULE:count(Ref)),
{ok, Records1} = ?MODULE:first_n(Ref, 1),
?assertEqual(1, length(Records1)),
{ok, Records2} = ?MODULE:first_n(Ref, 10),
?assertEqual(10, length(Records2)),
{ok, Records3} = ?MODULE:first_n(Ref, length(IntSeq)),
?assertEqual(length(IntSeq), length(Records3)),
{ok, Records4} = ?MODULE:first_n(Ref, length(IntSeq) + 999),
?assertEqual(length(IntSeq), length(Records4)),
assert_close(Ref).

%% ===================================================================
Expand Down