Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the way we handle removes with a context #761

Merged
merged 5 commits into from
Dec 18, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 70 additions & 20 deletions src/riak_kv_crdt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

-ifdef(TEST).
-ifdef(EQC).
-export([test_split/0, test_split/1]).
-compile(export_all).
-include_lib("eqc/include/eqc.hrl").
-endif.
-include_lib("eunit/include/eunit.hrl").
Expand Down Expand Up @@ -190,11 +190,11 @@ update_crdt(Dict, Actor, ?CRDT_OP{mod=Mod, op=Ops, ctx=OpCtx}) when Mod==?MAP_TY
error ->
orddict:store(Mod, {undefined, to_record(Mod, InitialVal)}, Dict);
{ok, {Meta, LocalCRDT=?CRDT{value=LocalReplica}}} ->
Merged = Mod:merge(InitialVal, LocalReplica),
case Mod:update(PostOps, Actor, Merged) of
case Mod:update(PostOps, Actor, LocalReplica) of
{error, _}=E -> E;
{ok, NewVal} ->
orddict:store(Mod, {Meta, LocalCRDT?CRDT{value=NewVal}}, Dict)
Merged = Mod:merge(InitialVal, NewVal),
orddict:store(Mod, {Meta, LocalCRDT?CRDT{value=Merged}}, Dict)
end
end
end.
Expand Down Expand Up @@ -238,32 +238,40 @@ fetch_with_default(Mod, Dict) ->
%% essentially as a No Op (for example, and update to a nested Map
%% that has no operations. While this does not affect correctness, it
%% is not ideal.
split_ops(Ops) when is_list(Ops) ->
split_ops(Ops, [], []);
split_ops({AddOp, _}=Op) when AddOp == add;
AddOp == add_all ->
{{update, []}, {update, [Op]}};
split_ops({RemOp, _}=Op) when RemOp == remove;
RemOp == remove_all ->
{{update, [Op]}, {update, []}};
split_ops({update, Ops}) ->
{Pre, Post} = split_ops(Ops),
%% Map or Set update list
{Pre, Post} = split_ops(Ops, [], []),
{{update, Pre}, {update, Post}}.

split_ops([], Pre, Post) ->
{lists:reverse(Pre), lists:reverse(Post)};
split_ops([{remove, _Key}=Op | Rest], Pre, Post) ->
split_ops(Rest, [Op | Pre], Post);
split_ops([{remove_all, _ELems}=Op | Rest], Pre, Post) ->
split_ops([{remove_all, _Elems}=Op | Rest], Pre, Post) ->
split_ops(Rest, [Op | Pre], Post);
split_ops([{update, _Key, {remove, _}}=Op | Rest], Pre, Post) ->
split_ops(Rest, [Op | Pre], Post);
split_ops([{update, {_Name, ?MAP_TYPE}=Key, Op} | Rest], Pre0, Post0) ->
{Pre, Post} = split_ops(Op),
split_ops(Rest, [{update, Key, Pre} | Pre0], [{update, Key, Post} | Post0]);
split_ops([{update, Key, {remove, _}}=Op | Rest], Pre, Post) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, could this be solved by ensuring that any nested update (whether add or remove) in the map results in an increment to the clock and dot? I guess I'm just concerned that this is a larger issue with riak_dt_map.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain the issue?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't riak_dt_map already increment its clock and apply the dot to fields that contain nested removes? I guess that's my question.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the remove is applied to the context, so the increment is applied to the context. If the replica local copy has already incremented past that point for the field in question the update loses. That is the purpose of the patch, to ensure that the field update is applied in the adds list.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So to answer your question: "Yes" and this is how we ensure that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, I think I see now.

split_ops(Rest, [Op | Pre], [{add, Key} | Post]);
split_ops([{update, Key, {remove_all, _}}=Op | Rest], Pre, Post) ->
split_ops(Rest, [Op | Pre], [{add, Key} | Post]);
split_ops([{update, {_Name, _}=Key, {update, Ops}} | Rest], Pre0, Post0) when is_list(Ops) ->
{Pre, Post} = split_ops(Ops,[], []),
Pre1 = maybe_prepend(Key, Pre, Pre0),
Post1 = maybe_prepend(Key, Post, Post0),
split_ops(Rest, Pre1, [{add, Key} | Post1]);
split_ops([Op | Rest], Pre, Post) ->
split_ops(Rest, Pre, [Op | Post]).


maybe_prepend(_Key, [], Acc) ->
Acc;
maybe_prepend(Key, Ops, Acc) ->
[{update,Key,{update, Ops}} | Acc].
%% This uses an exported but marked INTERNAL
%% function of `riak_object:set_contents' to preserve
%% non-crdt sibling values and Metadata
Expand Down Expand Up @@ -417,6 +425,46 @@ get_context(Type, Value) ->
%% ===================================================================
-ifdef(TEST).

pre_post_test() ->
M = riak_dt_map:new(),
{ok, M2} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add_all, [<<"a">>, <<"b">>, <<"c">>]}}]}, a, M),
Context = M2,
{ok, M3} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add, [<<"d">>]}}]}, a, M2),
{ok, M4} = riak_dt_map:update({update, [{remove, {<<"s">>, riak_dt_orswot}}]}, a, M3),
Op = {update, [{update, {<<"s">>, riak_dt_orswot}, {remove, <<"a">>}}]},
{Pre, Post} = split_ops(Op),
?debugFmt("Pre ~p~n post ~p~n", [Pre,Post]),
{ok, PreMerge} = riak_dt_map:update(Pre, a, Context),
Local = M4,
{ok, Local2} = riak_dt_map:update(Post, a, Local),
PostMerge = riak_dt_map:merge(PreMerge, Local2),
?debugFmt("local ~p~n", [PostMerge]),
%% Set field should be present, since it's a field update (an
%% element remove) "concurrent" with field removal.
%% Expect the field to be present, and it's value to be [b,c]
Set = riak_dt_map:value({get, {<<"s">>, riak_dt_orswot}}, PostMerge),
?assertNot(error == Set),
?assertEqual([<<"b">>, <<"c">>], lists:sort(Set)).

pre_post_empty_test() ->
M = riak_dt_map:new(),
{ok, M2} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add_all, [<<"a">>, <<"b">>, <<"c">>]}}]}, a, M),
Context = M2,
Op = {update, [{update, {<<"s">>, riak_dt_orswot}, {remove, <<"a">>}}]},
{Pre, Post} = split_ops(Op),
?debugFmt("Pre ~p~n post ~p~n", [Pre,Post]),
{ok, PreMerge} = riak_dt_map:update(Pre, a, Context),
Local = M,
{ok, Local2} = riak_dt_map:update(Post, a, Local),
PostMerge = riak_dt_map:merge(PreMerge, Local2),
?debugFmt("local ~p~n", [PostMerge]),
%% Set field should be present, since it's a field update (an
%% element remove) "concurrent" with field removal.
%% Expect the field to be present, and it's value to be [b,c]
Set = riak_dt_map:value({get, {<<"s">>, riak_dt_orswot}}, PostMerge),
?assertNot(error == Set),
?assertEqual([<<"b">>, <<"c">>], lists:sort(Set)).

-ifdef(EQC).
-define(QC_OUT(P),
eqc:on_output(fun(Str, Args) ->
Expand Down Expand Up @@ -444,13 +492,11 @@ prop_split() ->
io:format("Split Pre ~p~n", [Pre]),
io:format("Split Post ~p~n", [Post])
end,
collect(length(AgOps),
collect(depth(AgOps, 0),
collect(with_title("operation length"), length(AgOps),
collect(with_title("operation depth"), depth(AgOps, 0),
conjunction([
{pre, only_rem_ops(Pre)},
{pre_depth, equals(depth(Pre, 0), depth(AgOps, 0))},
{post, only_add_ops(Post)},
{post_depth, equals(depth(Post, 0), depth(AgOps, 0))}
{post, only_add_ops(Post)}
])))
)
end).
Expand Down Expand Up @@ -489,13 +535,17 @@ only_rem_ops({update, Ops}) ->
only_rem_ops(Ops) ->
only_type_ops(fun is_rem_op/1, Ops).

is_add_op({update, {_Name, ?MAP_TYPE}, {update, Ops}}) ->
is_add_op({update, {_Name, _}, {update, Ops}}) ->
only_add_ops(Ops);
is_add_op(Op) ->
not is_rem_op(Op).

is_rem_op({update, {_Name, ?MAP_TYPE}, {update, Ops}}) ->
is_rem_op({update, {_Name, _}, {update, Ops}}) ->
only_rem_ops(Ops);
is_rem_op({update, {_Name, _}, {remove, _E}}) ->
true;
is_rem_op({update, {_Name, _}, {remove_all, _Es}}) ->
true;
is_rem_op({remove, _Key}) ->
true;
is_rem_op({remove_all, _Elems}) ->
Expand Down