Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mas d34 leveled.i465 stopfold #467

Merged
merged 3 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(LOADING_BATCH, 200).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
Expand Down
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
]}.

{deps, [
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
{lz4, ".*", {git, "https://github.com/OpenRiak/erlang-lz4", {branch, "openriak-3.4"}}},
{zstd, ".*", {git, "https://github.com/OpenRiak/zstd-erlang", {branch, "openriak-3.2"}}},
{eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}}
]}.

Expand Down
1 change: 1 addition & 0 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
maybe_compress/2,
create_value_for_journal/3,
revert_value_from_journal/1,
revert_value_from_journal/2,
generate_ledgerkv/5,
get_size/2,
get_keyandobjhash/2,
Expand Down
104 changes: 63 additions & 41 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ handle_call({fold,
Folder =
fun() ->
fold_from_sequence(
StartSQN,
StartSQN,
State#state.journal_sqn,
{FilterFun, InitAccFun, FoldFun},
Acc,
Manifest
Expand Down Expand Up @@ -1240,80 +1241,101 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->



-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
-> any().
-spec fold_from_sequence(
non_neg_integer(),
pos_integer(),
{fun(), fun(), fun()},
any(),
list()) -> any().
%% @doc
%%
%% Scan from the starting sequence number to the end of the Journal. Apply
%% the FilterFun as it scans over the CDB file to build up a Batch of relevant
%% objects - and then apply the FoldFun to the batch once the batch is
%% complete
%%
%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest
%% Inputs - MinSQN, JournalSQN, FoldFuns, OverallAccumulator, Inker's Manifest
%%
%% The fold loops over all the CDB files in the Manifest. Each file is looped
%% over in batches using foldfile_between_sequence/7. The batch is a range of
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
%% files
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
fold_from_sequence(
MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
{NextMinSQN, Acc0} =
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
),
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(
MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
{NextMinSQN, Acc0} =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
_ ->
{MinSQN, Acc}
end,
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest).

foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
foldfile_between_sequence(
MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) ->
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},

case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}}
when AccMinSQN >= JournalSQN ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
foldfile_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN)
foldfile_between_sequence(
NextSQN,
NextSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN
)
end.


sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
lists:foldl(
fun(FN, Acc) ->
Expand Down
18 changes: 9 additions & 9 deletions src/leveled_runner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
FilterFun =
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
{SQN, InkTag, LedgerKey} = JKey,
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
{?INKT_STND, {B, K}} ->
% Ignore tombstones and non-matching Tags and Key changes
% objects.
Expand All @@ -335,14 +335,14 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
_ ->
{VBin, _VSize} = ExtractFun(JVal),
{Obj, _IdxSpecs} =
leveled_codec:revert_value_from_journal(VBin),
ToLoop =
case SQN of
MaxSQN -> stop;
_ -> loop
end,
{ToLoop,
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}}
leveled_codec:revert_value_from_journal(
VBin,
true
),
{
case SQN of MaxSQN -> stop; _ -> loop end,
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}
}
end;
_ ->
{loop, Acc}
Expand Down
59 changes: 59 additions & 0 deletions test/end_to_end/riak_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
fetchclocks_modifiedbetween/1,
crossbucket_aae/1,
handoff/1,
handoff_close/1,
dollar_bucket_index/1,
dollar_key_index/1,
bigobject_memorycheck/1,
Expand All @@ -22,6 +23,7 @@ all() -> [
fetchclocks_modifiedbetween,
crossbucket_aae,
handoff,
handoff_close,
dollar_bucket_index,
dollar_key_index,
bigobject_memorycheck,
Expand Down Expand Up @@ -1697,6 +1699,63 @@ dollar_key_index(_Config) ->
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().

handoff_close(_Config) ->
RootPath = testutil:reset_filestructure(),
KeyCount = 500000,
Bucket = {<<"BType">>, <<"BName">>},
StartOpts1 =
[
{root_path, RootPath},
{max_journalobjectcount, KeyCount + 1},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()}
],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjList1 =
testutil:generate_objects(
KeyCount div 10,
{fixed_binary, 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
ObjList2 =
testutil:generate_objects(
KeyCount - (KeyCount div 10),
{fixed_binary, KeyCount div 10 + 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList1),
FoldObjectsFun =
fun(_, _, _, Acc) ->
[os:timestamp()|Acc]
end,
{async, Runner} =
leveled_bookie:book_objectfold(
Bookie1,
?RIAK_TAG,
{FoldObjectsFun, []},
true,
sqn_order
),
testutil:riakload(Bookie1, ObjList2),
TSList = Runner(),
QueryCompletionTime = os:timestamp(),
LastTS = hd(TSList),
io:format(
"Found ~w objects with Last TS ~w completion time ~w~n",
[length(TSList), LastTS, QueryCompletionTime]
),
true = KeyCount div 10 == length(TSList),
TimeSinceLastObjectTouchedMS =
timer:now_diff(QueryCompletionTime, LastTS) div 1000,
true = TimeSinceLastObjectTouchedMS < 1000,
leveled_bookie:book_destroy(Bookie1),
testutil:reset_filestructure().


%% @doc test that the riak specific $bucket indexes can be iterated
%% using leveled's existing folders
dollar_bucket_index(_Config) ->
Expand Down
3 changes: 2 additions & 1 deletion test/end_to_end/testutil.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ end_per_suite(_Config) ->
ok = logger:set_primary_config(level, notice),
ok = logger:set_handler_config(default, level, all),
ok = logger:set_handler_config(cth_log_redirect, level, all),

% 10s delay to allow for any delete_pending files to close wihtout crashing
timer:sleep(10000),
ok.

riak_object(Bucket, Key, Value, MetaData) ->
Expand Down
Loading