From 778bac1f9547d6920d3ade7bd33c3e165dc7ba36 Mon Sep 17 00:00:00 2001 From: mocchira Date: Wed, 28 Feb 2018 16:58:44 +0900 Subject: [PATCH] Add first_n function --- c_src/eleveldb.cc | 31 +++++++++++++++++++++++++++++++ c_src/eleveldb.h | 1 + c_src/workitems.cc | 42 ++++++++++++++++++++++++++++++++++++++++++ c_src/workitems.h | 20 ++++++++++++++++++++ src/eleveldb.erl | 29 +++++++++++++++++++++++++++-- 5 files changed, 121 insertions(+), 2 deletions(-) diff --git a/c_src/eleveldb.cc b/c_src/eleveldb.cc index a453a8e5..f62a0355 100644 --- a/c_src/eleveldb.cc +++ b/c_src/eleveldb.cc @@ -81,6 +81,7 @@ static ErlNifFunc nif_funcs[] = {"async_iterator_move", 3, eleveldb::async_iterator_move}, {"async_count", 2, eleveldb::async_count}, + {"async_first_n", 3, eleveldb::async_first_n}, {"property_cache", 2, eleveldb::property_cache}, {"property_cache_get", 1, eleveldb::property_cache_get}, @@ -1061,6 +1062,36 @@ async_count( return submit_to_thread_queue(work_item, env, caller_ref); } // async_count +ERL_NIF_TERM +async_first_n( + ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + const ERL_NIF_TERM& caller_ref = argv[0]; + const ERL_NIF_TERM& dbh_ref = argv[1]; + const ERL_NIF_TERM& number_of_recs_ref = argv[2]; + unsigned long number_of_recs = 0; + + ReferencePtr db_ptr; + + db_ptr.assign(DbObject::RetrieveDbObject(env, dbh_ref)); + + if(NULL==db_ptr.get() || 0!=db_ptr->GetCloseRequested()) + { + return enif_make_badarg(env); + } + + // likely useless + if(NULL == db_ptr->m_Db) + return send_reply(env, caller_ref, error_einval(env)); + + if (enif_get_ulong(env, number_of_recs_ref, &number_of_recs) == 0) + return enif_make_badarg(env); + + eleveldb::WorkTask *work_item = new eleveldb::FirstNTask(env, caller_ref, db_ptr, number_of_recs); + return submit_to_thread_queue(work_item, env, caller_ref); +} // async_first_n ERL_NIF_TERM async_close( diff --git a/c_src/eleveldb.h b/c_src/eleveldb.h index a3e7b052..f86c239d 100644 --- a/c_src/eleveldb.h +++ b/c_src/eleveldb.h @@ -52,6 +52,7 @@ ERL_NIF_TERM async_iterator_move(ErlNifEnv* env, int argc, const ERL_NIF_TERM ar ERL_NIF_TERM async_iterator_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); ERL_NIF_TERM async_count(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +ERL_NIF_TERM async_first_n(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); } // namespace eleveldb diff --git a/c_src/workitems.cc b/c_src/workitems.cc index 25174f17..9708cfc8 100644 --- a/c_src/workitems.cc +++ b/c_src/workitems.cc @@ -623,6 +623,48 @@ CountTask::DoWork() return work_result(local_env(), ATOM_OK, enif_make_uint64(local_env_, cnt)); } +/** + * FirstNTask functions + */ + +FirstNTask::FirstNTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + unsigned long _number_of_recs) + : WorkTask(_caller_env, _caller_ref, _db_handle), + _number_of_recs(_number_of_recs) +{} + +FirstNTask::~FirstNTask() {} + +work_result +FirstNTask::DoWork() +{ + + ERL_NIF_TERM tail = enif_make_list(local_env(), 0); + ERL_NIF_TERM head; + uint64_t cnt = 0; + + leveldb::Iterator* it = m_DbPtr->m_Db->NewIterator(leveldb::ReadOptions()); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + // add a record as a tuble to the returned list + head = enif_make_tuple2(local_env(), + slice_to_binary(local_env(), it->key()), + slice_to_binary(local_env(), it->value())); + tail = enif_make_list_cell(local_env(), head, tail); + cnt++; + // Once the iterator reaches the Nth record, exit the loop. + if (cnt == _number_of_recs) break; + } + leveldb::Status status = it->status(); + delete it; + if(!status.ok()){ + return work_result(local_env(), ATOM_ERROR, status); + } + + return work_result(local_env(), ATOM_OK, tail); +} + } // namespace eleveldb diff --git a/c_src/workitems.h b/c_src/workitems.h index e65d2d7e..50cd4f4b 100644 --- a/c_src/workitems.h +++ b/c_src/workitems.h @@ -353,6 +353,26 @@ class CountTask : public WorkTask }; // class CountTask +/** + * Background object for async first_n, + */ + +class FirstNTask : public WorkTask +{ +protected: + unsigned long _number_of_recs; + +public: + FirstNTask(ErlNifEnv *_caller_env, + ERL_NIF_TERM _caller_ref, + DbObjectPtr_t & _db_handle, + unsigned long _number_of_recs); + + virtual ~FirstNTask(); + + virtual work_result DoWork(); + +}; // class FirstNTask } // namespace eleveldb diff --git a/src/eleveldb.erl b/src/eleveldb.erl index e5d60f34..25cd91f4 100644 --- a/src/eleveldb.erl +++ b/src/eleveldb.erl @@ -43,7 +43,8 @@ iterator_move/2, iterator_close/1]). --export([count/1]). +-export([count/1, + first_n/2]). -export([property_cache/2, property_cache_get/1, @@ -264,6 +265,16 @@ count(Ref) -> async_count(_CallerRef, _Ref) -> erlang:nif_error({error, not_loaded}). +%% Retrieving the first N records in the database and return it. +-spec first_n(db_ref(), pos_integer()) -> {ok, list({binary(), binary()})} | {error, any()}. +first_n(Ref, N) -> + CallerRef = make_ref(), + async_first_n(CallerRef, Ref, N), + ?WAIT_FOR_REPLY(CallerRef). + +async_first_n(_CallerRef, _Ref, _N) -> + erlang:nif_error({error, not_loaded}). + -type fold_fun() :: fun(({Key::binary(), Value::binary()}, any()) -> any()). %% Fold over the keys and values in the database @@ -576,6 +587,8 @@ test_open(TestDir) -> ?assertEqual({ok, <<"123">>}, ?MODULE:get(Ref, <<"abc">>, [])), ?assertEqual(not_found, ?MODULE:get(Ref, <<"def">>, [])), ?assertEqual({ok, 1}, ?MODULE:count(Ref)), + ?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 1)), + ?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 100)), assert_close(Ref). test_open_many(TestDir, HowMany) -> @@ -598,7 +611,9 @@ test_open_many(TestDir, HowMany) -> lists:foreach( fun({Ref, Key, Val}) -> ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])), - ?assertEqual({ok, 1}, ?MODULE:count(Ref)) + ?assertEqual({ok, 1}, ?MODULE:count(Ref)), + ?assertEqual({ok, [{Key, Val}]}, ?MODULE:first_n(Ref, 1)), + ?assertEqual({ok, [{Key, Val}]}, ?MODULE:first_n(Ref, 100)) end, WorkSet), lists:foreach(fun assert_close/1, [R || {R, _, _} <- WorkSet]). @@ -616,6 +631,8 @@ test_fold(TestDir) -> [{<<"abc">>, <<"123">>}, {<<"def">>, <<"456">>}, {<<"hij">>, <<"789">>}], lists:reverse(?MODULE:fold(Ref, fun accumulate/2, [], []))), ?assertEqual({ok, 3}, ?MODULE:count(Ref)), + ?assertEqual({ok, [{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 1)), + ?assertEqual({ok, [{<<"hij">>, <<"789">>},{<<"def">>, <<"456">>},{<<"abc">>, <<"123">>}]}, ?MODULE:first_n(Ref, 100)), assert_close(Ref). test_fold_keys(TestDir) -> @@ -727,6 +744,14 @@ run_load(TestDir, IntSeq) -> ?assertEqual({ok, Val}, ?MODULE:get(Ref, Key, [])) end, KVOut), ?assertEqual({ok, length(IntSeq)}, ?MODULE:count(Ref)), + {ok, Records1} = ?MODULE:first_n(Ref, 1), + ?assertEqual(1, length(Records1)), + {ok, Records2} = ?MODULE:first_n(Ref, 10), + ?assertEqual(10, length(Records2)), + {ok, Records3} = ?MODULE:first_n(Ref, length(IntSeq)), + ?assertEqual(length(IntSeq), length(Records3)), + {ok, Records4} = ?MODULE:first_n(Ref, length(IntSeq) + 999), + ?assertEqual(length(IntSeq), length(Records4)), assert_close(Ref). %% ===================================================================