Skip to content

Commit

Permalink
Mas d34 leveled.i465 stopfold (#467)
Browse files Browse the repository at this point in the history
* Test and fix - issue with folding beyond JournalSQN

Test previously fails, as even on a fast machine the fold goes on for 5s beyond the last object found.

With change to reduce batch size, and stop when batch goes beyond JournalSQN - success with << 100ms spent folding after the last object discovered

* Wait after suite for delete_pending to close

#462

* Avoid processing key changes in object fold runner

As the key changes are going to be discarded
  • Loading branch information
martinsumner authored Jan 15, 2025
1 parent fe9710b commit 03a2092
Show file tree
Hide file tree
Showing 7 changed files with 137 additions and 54 deletions.
2 changes: 1 addition & 1 deletion include/leveled.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
-define(MAX_SSTSLOTS, 256).
-define(MAX_MERGEBELOW, 24).
-define(LOADING_PAUSE, 1000).
-define(LOADING_BATCH, 1000).
-define(LOADING_BATCH, 200).
-define(CACHE_SIZE_JITTER, 25).
-define(JOURNAL_SIZE_JITTER, 20).
-define(LONG_RUNNING, 1000000).
Expand Down
4 changes: 2 additions & 2 deletions rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
]}.

{deps, [
{lz4, ".*", {git, "https://github.com/nhs-riak/erlang-lz4", {branch, "nhse-develop-3.4"}}},
{zstd, ".*", {git, "https://github.com/nhs-riak/zstd-erlang", {branch, "nhse-develop"}}},
{lz4, ".*", {git, "https://github.com/OpenRiak/erlang-lz4", {branch, "openriak-3.4"}}},
{zstd, ".*", {git, "https://github.com/OpenRiak/zstd-erlang", {branch, "openriak-3.2"}}},
{eqwalizer_support, {git_subdir, "https://github.com/OpenRiak/eqwalizer.git", {branch, "openriak-3.4"}, "eqwalizer_support"}}
]}.

Expand Down
1 change: 1 addition & 0 deletions src/leveled_codec.erl
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
maybe_compress/2,
create_value_for_journal/3,
revert_value_from_journal/1,
revert_value_from_journal/2,
generate_ledgerkv/5,
get_size/2,
get_keyandobjhash/2,
Expand Down
104 changes: 63 additions & 41 deletions src/leveled_inker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,8 @@ handle_call({fold,
Folder =
fun() ->
fold_from_sequence(
StartSQN,
StartSQN,
State#state.journal_sqn,
{FilterFun, InitAccFun, FoldFun},
Acc,
Manifest
Expand Down Expand Up @@ -1240,80 +1241,101 @@ start_new_activejournal(SQN, RootPath, CDBOpts) ->



-spec fold_from_sequence(integer(), {fun(), fun(), fun()}, any(), list())
-> any().
-spec fold_from_sequence(
non_neg_integer(),
pos_integer(),
{fun(), fun(), fun()},
any(),
list()) -> any().
%% @doc
%%
%% Scan from the starting sequence number to the end of the Journal. Apply
%% the FilterFun as it scans over the CDB file to build up a Batch of relevant
%% objects - and then apply the FoldFun to the batch once the batch is
%% complete
%%
%% Inputs - MinSQN, FoldFuns, OverallAccumulator, Inker's Manifest
%% Inputs - MinSQN, JournalSQN, FoldFuns, OverallAccumulator, Inker's Manifest
%%
%% The fold loops over all the CDB files in the Manifest. Each file is looped
%% over in batches using foldfile_between_sequence/7. The batch is a range of
%% sequence numbers (so the batch size may be << ?LOADING_BATCH) in compacted
%% files
fold_from_sequence(_MinSQN, _FoldFuns, Acc, []) ->
fold_from_sequence(_MinSQN, _JournalSQN, _FoldFuns, Acc, []) ->
Acc;
fold_from_sequence(MinSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
{NextMinSQN, Acc0} = foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN),
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(MinSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
fold_from_sequence(
MinSQN, JournalSQN, FoldFuns, Acc, [{LowSQN, FN, Pid, _LK}|Rest])
when LowSQN >= MinSQN ->
{NextMinSQN, Acc0} =
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
),
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest);
fold_from_sequence(
MinSQN, JournalSQN, FoldFuns, Acc, [{_LowSQN, FN, Pid, _LK}|Rest]) ->
% If this file has a LowSQN less than the minimum, we can skip it if the
% next file also has a LowSQN below the minimum
{NextMinSQN, Acc0} =
case Rest of
[] ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
[{NextSQN, _NxtFN, _NxtPid, _NxtLK}|_Rest] when NextSQN > MinSQN ->
foldfile_between_sequence(MinSQN,
MinSQN + ?LOADING_BATCH,
FoldFuns,
Acc,
Pid,
undefined,
FN);
foldfile_between_sequence(
MinSQN,
MinSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
Acc,
Pid,
undefined,
FN
);
_ ->
{MinSQN, Acc}
end,
fold_from_sequence(NextMinSQN, FoldFuns, Acc0, Rest).
fold_from_sequence(NextMinSQN, JournalSQN, FoldFuns, Acc0, Rest).

foldfile_between_sequence(MinSQN, MaxSQN, FoldFuns,
Acc, CDBpid, StartPos, FN) ->
foldfile_between_sequence(
MinSQN, MaxSQN, JournalSQN, FoldFuns, Acc, CDBpid, StartPos, FN) ->
{FilterFun, InitAccFun, FoldFun} = FoldFuns,
InitBatchAcc = {MinSQN, MaxSQN, InitAccFun(FN, MinSQN)},

case leveled_cdb:cdb_scan(CDBpid, FilterFun, InitBatchAcc, StartPos) of
{eof, {AccMinSQN, _AccMaxSQN, BatchAcc}} ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{_LastPosition, {AccMinSQN, _AccMaxSQN, BatchAcc}}
when AccMinSQN >= JournalSQN ->
{AccMinSQN, FoldFun(BatchAcc, Acc)};
{LastPosition, {_AccMinSQN, _AccMaxSQN, BatchAcc}} ->
UpdAcc = FoldFun(BatchAcc, Acc),
NextSQN = MaxSQN + 1,
foldfile_between_sequence(NextSQN,
NextSQN + ?LOADING_BATCH,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN)
foldfile_between_sequence(
NextSQN,
NextSQN + ?LOADING_BATCH,
JournalSQN,
FoldFuns,
UpdAcc,
CDBpid,
LastPosition,
FN
)
end.


sequencenumbers_fromfilenames(Filenames, Regex, IntName) ->
lists:foldl(
fun(FN, Acc) ->
Expand Down
18 changes: 9 additions & 9 deletions src/leveled_runner.erl
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
FilterFun =
fun(JKey, JVal, _Pos, Acc, ExtractFun) ->
{SQN, InkTag, LedgerKey} = JKey,
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
case {InkTag, leveled_codec:from_ledgerkey(Tag, LedgerKey)} of
{?INKT_STND, {B, K}} ->
% Ignore tombstones and non-matching Tags and Key changes
% objects.
Expand All @@ -335,14 +335,14 @@ foldobjects_allkeys(SnapFun, Tag, FoldObjectsFun, sqn_order) ->
_ ->
{VBin, _VSize} = ExtractFun(JVal),
{Obj, _IdxSpecs} =
leveled_codec:revert_value_from_journal(VBin),
ToLoop =
case SQN of
MaxSQN -> stop;
_ -> loop
end,
{ToLoop,
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}}
leveled_codec:revert_value_from_journal(
VBin,
true
),
{
case SQN of MaxSQN -> stop; _ -> loop end,
{MinSQN, MaxSQN, [{B, K, SQN, Obj}|BatchAcc]}
}
end;
_ ->
{loop, Acc}
Expand Down
59 changes: 59 additions & 0 deletions test/end_to_end/riak_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
fetchclocks_modifiedbetween/1,
crossbucket_aae/1,
handoff/1,
handoff_close/1,
dollar_bucket_index/1,
dollar_key_index/1,
bigobject_memorycheck/1,
Expand All @@ -22,6 +23,7 @@ all() -> [
fetchclocks_modifiedbetween,
crossbucket_aae,
handoff,
handoff_close,
dollar_bucket_index,
dollar_key_index,
bigobject_memorycheck,
Expand Down Expand Up @@ -1697,6 +1699,63 @@ dollar_key_index(_Config) ->
ok = leveled_bookie:book_close(Bookie1),
testutil:reset_filestructure().

handoff_close(_Config) ->
RootPath = testutil:reset_filestructure(),
KeyCount = 500000,
Bucket = {<<"BType">>, <<"BName">>},
StartOpts1 =
[
{root_path, RootPath},
{max_journalobjectcount, KeyCount + 1},
{max_pencillercachesize, 12000},
{sync_strategy, testutil:sync_strategy()}
],
{ok, Bookie1} = leveled_bookie:book_start(StartOpts1),
ObjList1 =
testutil:generate_objects(
KeyCount div 10,
{fixed_binary, 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
ObjList2 =
testutil:generate_objects(
KeyCount - (KeyCount div 10),
{fixed_binary, KeyCount div 10 + 1}, [],
crypto:strong_rand_bytes(512),
fun() -> [] end,
Bucket
),
testutil:riakload(Bookie1, ObjList1),
FoldObjectsFun =
fun(_, _, _, Acc) ->
[os:timestamp()|Acc]
end,
{async, Runner} =
leveled_bookie:book_objectfold(
Bookie1,
?RIAK_TAG,
{FoldObjectsFun, []},
true,
sqn_order
),
testutil:riakload(Bookie1, ObjList2),
TSList = Runner(),
QueryCompletionTime = os:timestamp(),
LastTS = hd(TSList),
io:format(
"Found ~w objects with Last TS ~w completion time ~w~n",
[length(TSList), LastTS, QueryCompletionTime]
),
true = KeyCount div 10 == length(TSList),
TimeSinceLastObjectTouchedMS =
timer:now_diff(QueryCompletionTime, LastTS) div 1000,
true = TimeSinceLastObjectTouchedMS < 1000,
leveled_bookie:book_destroy(Bookie1),
testutil:reset_filestructure().


%% @doc test that the riak specific $bucket indexes can be iterated
%% using leveled's existing folders
dollar_bucket_index(_Config) ->
Expand Down
3 changes: 2 additions & 1 deletion test/end_to_end/testutil.erl
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ end_per_suite(_Config) ->
ok = logger:set_primary_config(level, notice),
ok = logger:set_handler_config(default, level, all),
ok = logger:set_handler_config(cth_log_redirect, level, all),

% 10s delay to allow for any delete_pending files to close wihtout crashing
timer:sleep(10000),
ok.

riak_object(Bucket, Key, Value, MetaData) ->
Expand Down

0 comments on commit 03a2092

Please sign in to comment.