Skip to content

Commit

Permalink
FML: fix tombstone merging bug when K & K's tombstone are in same fileid
Browse files Browse the repository at this point in the history
Originally found with bitcask_pulse, I deconstructed the test case to
help understand what was happening: the new EUnit test is
new_20131217_a_test_.

As a result of the puts, key #13 is written 3x to fileid #1 (normal,
tombstone, normal) and 1x to fileid #2 (normal @ the very beginning
of the file).  The merge creates fileid #3 and copies only the
tombstone (the normal entry isn't copied because it is out-of-date).
Before the close, the internal keydir contains the correct info
about key #13, but after the close and re-open, we see key #13's
entries: normal (and most recent) in fileid 32, and tombstone in
fileid #3, oops.

The fix is to remove all of the merge input fileids from the set of fileids
that will survive/exist after the merge is finished.
  • Loading branch information
slfritchie committed Dec 17, 2013
1 parent afc5c3f commit 1712650
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 4 deletions.
15 changes: 11 additions & 4 deletions src/bitcask.erl
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
merge_lock,
merge_start,
max_file_size,
all_fileids :: set(),
after_fileids :: set(),
input_files,
out_file,
merged_files,
Expand Down Expand Up @@ -641,7 +641,14 @@ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
AllFilesIds = [FileId || {FileId, _LiveCount, _TotalCount, _LiveBytes,
_TotalBytes, _OldestTstamp,
_NewestTstamp} <- Fstats],
AllFilesSet = sets:from_list(AllFilesIds),
InFilesIds = [F#filestate.tstamp || F <- InFiles],
InExpiredFilesIds = [F#filestate.tstamp || F <- InExpiredFiles],
%% AllFilesSet contains the list of fileids that will remain
%% after this merge is finished (but does not include fileids that
%% are created by the merge). It is possible that a key K has an
%% entry in fileid F and then a later tombstone also in F. We
%% do not want the merge to carry forward that tombstone.
AllFilesSet = sets:from_list(AllFilesIds -- (InFilesIds++InExpiredFilesIds)),

%% Initialize the other keydirs we need.
{ok, DelKeyDir} = bitcask_nifs:keydir_new(),
Expand All @@ -650,7 +657,7 @@ merge1(Dirname, Opts, FilesToMerge, ExpiredFiles) ->
State = #mstate { dirname = Dirname,
merge_lock = Lock,
max_file_size = get_opt(max_file_size, Opts),
all_fileids = AllFilesSet,
after_fileids = AllFilesSet,
input_files = InFiles,
merge_start = MergeStart,
out_file = fresh, % will be created when needed
Expand Down Expand Up @@ -1138,7 +1145,7 @@ merge_single_entry(K, V, Tstamp, FileId, {_, _, Offset, _} = Pos, State) ->
true ->
State2 = case V of
<<?TOMBSTONE2_STR, DeadFileId:32, _DeadOffset:64>> ->
case sets:is_element(DeadFileId, State#mstate.all_fileids) of
case sets:is_element(DeadFileId, State#mstate.after_fileids) of
true ->
inner_merge_write(K, V, Tstamp, FileId, Offset,
false, State);
Expand Down
90 changes: 90 additions & 0 deletions src/bitcask_merge_delete.erl
Original file line number Diff line number Diff line change
Expand Up @@ -291,4 +291,94 @@ change_open_regression_body() ->
{bummer, unexpected_failure, Else}
end.

new_20131217_a_test_() ->
{timeout, 300, ?_assertEqual(ok, new_20131217_a_body())}.

new_20131217_a_body() ->
TestDir = token:get_name(),
MOD = ?MODULE,
V1 = <<"v">>,
V2 = <<"v22">>,
V3 = <<"v33">>,
V1017_expected = [{1,<<"v33">>}, {2,<<"v33">>}, {3,<<"v33">>},
{4,<<"v33">>}, {5,<<"v33">>}, {6,<<"v33">>},
{7,<<"v33">>}, {8,<<"v33">>}, {9,<<"v33">>},
{10,<<"v33">>}, {11,<<"v33">>}, {12,<<"v33">>},
{13,<<"v33">>}, {14,<<"v33">>}, {15,<<"v33">>},
{16,<<"v22">>}, {17,<<"v22">>}, {18,<<"v22">>},
{19,<<"v22">>}, {20,<<"v22">>}, {21,<<"v22">>}],

_Var1 = erlang:apply(MOD,incr_clock,[]),
_Var2 = erlang:apply(MOD,bc_open,[TestDir]),
_Var3 = erlang:apply(MOD,puts,[_Var2,{1,13},V1]),
_Var10 = erlang:apply(MOD,delete,[_Var2,13]),
not_found = get(_Var2, 13), %not from EQC
_Var14 = erlang:apply(MOD,puts,[_Var2,{1,21},V2]),
{ok, V2} = get(_Var2, 13), %not from EQC
_Var18 = erlang:apply(MOD,puts,[_Var2,{1,15},V3]),
{ok, V3} = get(_Var2, 13), %not from EQC
timer:sleep(1234), %not from EQC
{ok, V3} = get(_Var2, 13), %not from EQC
_Var24 = erlang:apply(MOD,fork_merge,[_Var2, TestDir]),
{ok, V3} = get(_Var2, 13), %not from EQC
timer:sleep(1235), %not from EQC
{ok, V3} = get(_Var2, 13), %not from EQC
_Var27 = erlang:apply(MOD,bc_close,[_Var2]),
_Var28 = erlang:apply(MOD,incr_clock,[]),
_Var106 = erlang:apply(MOD,bc_open,[TestDir]),
{ok, V3} = get(_Var106, 13), %not from EQC
_Var1017 = erlang:apply(MOD,fold,[_Var106]),
{ok, V3} = get(_Var106, 13), %not from EQC
?assertEqual(V1017_expected, lists:sort(_Var1017)),
os:cmd("rm -rf " ++ TestDir),
ok.

-define(NUM_KEYS, 50).
-define(FILE_SIZE, 1000).

bc_open(Dir) ->
bitcask:open(Dir, [read_write, {max_file_size, ?FILE_SIZE}, {open_timeout, 1234}]).

nice_key(K) ->
list_to_binary(io_lib:format("kk~2.2.0w", [K])).

un_nice_key(<<"kk", Num:2/binary>>) ->
list_to_integer(binary_to_list(Num)).

get(H, K) ->
bitcask:get(H, nice_key(K)).

put(H, K, V) ->
ok = bitcask:put(H, nice_key(K), V).

puts(H, {K1, K2}, V) ->
case lists:usort([ put(H, K, V) || K <- lists:seq(K1, K2) ]) of
[ok] -> ok;
Other -> Other
end.

delete(H, K) ->
ok = bitcask:delete(H, nice_key(K)).

fork_merge(H, Dir) ->
case bitcask:needs_merge(H) of
{true, Files} -> catch bitcask_merge_worker:merge(Dir, [], Files);
false -> not_needed
end.

incr_clock() ->
bitcask_time:test__incr_fudge(1).

bc_close(H) ->
ok = bitcask:close(H).

fold_keys(H) ->
bitcask:fold_keys(H, fun(#bitcask_entry{key = KBin}, Ks) -> [un_nice_key(KBin)|Ks] end, []).

fold(H) ->
bitcask:fold(H, fun(KBin, V, Acc) -> [{un_nice_key(KBin),V}|Acc] end, []).

%% 37> io:format("~w.\n", [C76]).
%% [[{set,{var,1},{call,bitcask_pulse,incr_clock,[]}},{set,{var,2},{call,bitcask_pulse,bc_open,[true]}},{set,{var,3},{call,bitcask_pulse,puts,[{var,2},{1,13},<<0>>]}},{set,{var,10},{call,bitcask_pulse,delete,[{var,2},13]}},{set,{var,14},{call,bitcask_pulse,puts,[{var,2},{1,21},<<0,0,0>>]}},{set,{var,18},{call,bitcask_pulse,puts,[{var,2},{1,15},<<0,0,0>>]}},{set,{var,24},{call,bitcask_pulse,fork_merge,[{var,2}]}},{set,{var,27},{call,bitcask_pulse,bc_close,[{var,2}]}},{set,{var,28},{call,bitcask_pulse,incr_clock,[]}},{set,{var,40},{call,bitcask_pulse,fork,[[{init,{state,undefined,false,false,[]}},{set,{not_var,6},{not_call,bitcask_pulse,bc_open,[false]}},{set,{not_var,17},{not_call,bitcask_pulse,fold,[{not_var,6}]}}]]}}],{99742,1075,90258},[{events,[]}]].

-endif. %% TEST
37 changes: 37 additions & 0 deletions src/foo.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-module(foo).
-compile(export_all).

-define(FILE_SIZE, 1000).
-define(BITCASK, "test-bitcask-dir").

t() ->
os:cmd("rm -rf " ++ ?BITCASK),
BC = bc_open(true),
puts(BC, {1,22},<<>>),
puts(BC, {1,11},<<0>>),
puts(BC,{1,22},<<>>),
spawn(fun() -> fork_merge(BC), io:format("fork_merge done!\n") end),
timer:sleep(1000),
io:format("t pid ~p is done\n", [self()]).

bc_open(Writer) ->
case Writer of
true -> catch bitcask:open(?BITCASK, [read_write, {max_file_size, ?FILE_SIZE}, {open_timeout, 1234}]);
false -> catch bitcask:open(?BITCASK, [{open_timeout, 1234}])
end.

puts(H, {K1, K2}, V) ->
case lists:usort([ put(H, K, V) || K <- lists:seq(K1, K2) ]) of
[ok] -> ok;
Other -> Other
end.

put(H, K, V) ->
ok = bitcask:put(H, <<K:32>>, V).

fork_merge(H) ->
case bitcask:needs_merge(H) of
{true, Files} -> catch io:format("Yo, gonna merge!\n"), bitcask_merge_worker:merge(?BITCASK, [], Files);
false -> not_needed
end.

0 comments on commit 1712650

Please sign in to comment.