diff --git a/src/riak_kv_crdt.erl b/src/riak_kv_crdt.erl index 11739da755..1c20040e82 100644 --- a/src/riak_kv_crdt.erl +++ b/src/riak_kv_crdt.erl @@ -31,7 +31,7 @@ -ifdef(TEST). -ifdef(EQC). --export([test_split/0, test_split/1]). +-compile(export_all). -include_lib("eqc/include/eqc.hrl"). -endif. -include_lib("eunit/include/eunit.hrl"). @@ -190,11 +190,11 @@ update_crdt(Dict, Actor, ?CRDT_OP{mod=Mod, op=Ops, ctx=OpCtx}) when Mod==?MAP_TY error -> orddict:store(Mod, {undefined, to_record(Mod, InitialVal)}, Dict); {ok, {Meta, LocalCRDT=?CRDT{value=LocalReplica}}} -> - Merged = Mod:merge(InitialVal, LocalReplica), - case Mod:update(PostOps, Actor, Merged) of + case Mod:update(PostOps, Actor, LocalReplica) of {error, _}=E -> E; {ok, NewVal} -> - orddict:store(Mod, {Meta, LocalCRDT?CRDT{value=NewVal}}, Dict) + Merged = Mod:merge(InitialVal, NewVal), + orddict:store(Mod, {Meta, LocalCRDT?CRDT{value=Merged}}, Dict) end end end. @@ -238,8 +238,6 @@ fetch_with_default(Mod, Dict) -> %% essentially as a No Op (for example, and update to a nested Map %% that has no operations. While this does not affect correctness, it %% is not ideal. -split_ops(Ops) when is_list(Ops) -> - split_ops(Ops, [], []); split_ops({AddOp, _}=Op) when AddOp == add; AddOp == add_all -> {{update, []}, {update, [Op]}}; @@ -247,23 +245,33 @@ split_ops({RemOp, _}=Op) when RemOp == remove; RemOp == remove_all -> {{update, [Op]}, {update, []}}; split_ops({update, Ops}) -> - {Pre, Post} = split_ops(Ops), + %% Map or Set update list + {Pre, Post} = split_ops(Ops, [], []), {{update, Pre}, {update, Post}}. split_ops([], Pre, Post) -> {lists:reverse(Pre), lists:reverse(Post)}; split_ops([{remove, _Key}=Op | Rest], Pre, Post) -> split_ops(Rest, [Op | Pre], Post); -split_ops([{remove_all, _ELems}=Op | Rest], Pre, Post) -> +split_ops([{remove_all, _Elems}=Op | Rest], Pre, Post) -> split_ops(Rest, [Op | Pre], Post); -split_ops([{update, _Key, {remove, _}}=Op | Rest], Pre, Post) -> - split_ops(Rest, [Op | Pre], Post); -split_ops([{update, {_Name, ?MAP_TYPE}=Key, Op} | Rest], Pre0, Post0) -> - {Pre, Post} = split_ops(Op), - split_ops(Rest, [{update, Key, Pre} | Pre0], [{update, Key, Post} | Post0]); +split_ops([{update, Key, {remove, _}}=Op | Rest], Pre, Post) -> + split_ops(Rest, [Op | Pre], [{add, Key} | Post]); +split_ops([{update, Key, {remove_all, _}}=Op | Rest], Pre, Post) -> + split_ops(Rest, [Op | Pre], [{add, Key} | Post]); +split_ops([{update, {_Name, _}=Key, {update, Ops}} | Rest], Pre0, Post0) when is_list(Ops) -> + {Pre, Post} = split_ops(Ops,[], []), + Pre1 = maybe_prepend(Key, Pre, Pre0), + Post1 = maybe_prepend(Key, Post, Post0), + split_ops(Rest, Pre1, [{add, Key} | Post1]); split_ops([Op | Rest], Pre, Post) -> split_ops(Rest, Pre, [Op | Post]). + +maybe_prepend(_Key, [], Acc) -> + Acc; +maybe_prepend(Key, Ops, Acc) -> + [{update,Key,{update, Ops}} | Acc]. %% This uses an exported but marked INTERNAL %% function of `riak_object:set_contents' to preserve %% non-crdt sibling values and Metadata @@ -417,6 +425,46 @@ get_context(Type, Value) -> %% =================================================================== -ifdef(TEST). +pre_post_test() -> + M = riak_dt_map:new(), + {ok, M2} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add_all, [<<"a">>, <<"b">>, <<"c">>]}}]}, a, M), + Context = M2, + {ok, M3} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add, [<<"d">>]}}]}, a, M2), + {ok, M4} = riak_dt_map:update({update, [{remove, {<<"s">>, riak_dt_orswot}}]}, a, M3), + Op = {update, [{update, {<<"s">>, riak_dt_orswot}, {remove, <<"a">>}}]}, + {Pre, Post} = split_ops(Op), + ?debugFmt("Pre ~p~n post ~p~n", [Pre,Post]), + {ok, PreMerge} = riak_dt_map:update(Pre, a, Context), + Local = M4, + {ok, Local2} = riak_dt_map:update(Post, a, Local), + PostMerge = riak_dt_map:merge(PreMerge, Local2), + ?debugFmt("local ~p~n", [PostMerge]), + %% Set field should be present, since it's a field update (an + %% element remove) "concurrent" with field removal. + %% Expect the field to be present, and it's value to be [b,c] + Set = riak_dt_map:value({get, {<<"s">>, riak_dt_orswot}}, PostMerge), + ?assertNot(error == Set), + ?assertEqual([<<"b">>, <<"c">>], lists:sort(Set)). + +pre_post_empty_test() -> + M = riak_dt_map:new(), + {ok, M2} = riak_dt_map:update({update, [{update, {<<"s">>, riak_dt_orswot}, {add_all, [<<"a">>, <<"b">>, <<"c">>]}}]}, a, M), + Context = M2, + Op = {update, [{update, {<<"s">>, riak_dt_orswot}, {remove, <<"a">>}}]}, + {Pre, Post} = split_ops(Op), + ?debugFmt("Pre ~p~n post ~p~n", [Pre,Post]), + {ok, PreMerge} = riak_dt_map:update(Pre, a, Context), + Local = M, + {ok, Local2} = riak_dt_map:update(Post, a, Local), + PostMerge = riak_dt_map:merge(PreMerge, Local2), + ?debugFmt("local ~p~n", [PostMerge]), + %% Set field should be present, since it's a field update (an + %% element remove) "concurrent" with field removal. + %% Expect the field to be present, and it's value to be [b,c] + Set = riak_dt_map:value({get, {<<"s">>, riak_dt_orswot}}, PostMerge), + ?assertNot(error == Set), + ?assertEqual([<<"b">>, <<"c">>], lists:sort(Set)). + -ifdef(EQC). -define(QC_OUT(P), eqc:on_output(fun(Str, Args) -> @@ -444,13 +492,11 @@ prop_split() -> io:format("Split Pre ~p~n", [Pre]), io:format("Split Post ~p~n", [Post]) end, - collect(length(AgOps), - collect(depth(AgOps, 0), + collect(with_title("operation length"), length(AgOps), + collect(with_title("operation depth"), depth(AgOps, 0), conjunction([ {pre, only_rem_ops(Pre)}, - {pre_depth, equals(depth(Pre, 0), depth(AgOps, 0))}, - {post, only_add_ops(Post)}, - {post_depth, equals(depth(Post, 0), depth(AgOps, 0))} + {post, only_add_ops(Post)} ]))) ) end). @@ -489,13 +535,17 @@ only_rem_ops({update, Ops}) -> only_rem_ops(Ops) -> only_type_ops(fun is_rem_op/1, Ops). -is_add_op({update, {_Name, ?MAP_TYPE}, {update, Ops}}) -> +is_add_op({update, {_Name, _}, {update, Ops}}) -> only_add_ops(Ops); is_add_op(Op) -> not is_rem_op(Op). -is_rem_op({update, {_Name, ?MAP_TYPE}, {update, Ops}}) -> +is_rem_op({update, {_Name, _}, {update, Ops}}) -> only_rem_ops(Ops); +is_rem_op({update, {_Name, _}, {remove, _E}}) -> + true; +is_rem_op({update, {_Name, _}, {remove_all, _Es}}) -> + true; is_rem_op({remove, _Key}) -> true; is_rem_op({remove_all, _Elems}) ->