Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge CRDTs inside riak_object merge #758

Merged
merged 1 commit into from
Dec 13, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions src/riak_kv_crdt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
-export([update/3, merge/1, value/2, new/3,
supported/1, to_mod/1, from_mod/1]).
-export([to_binary/2, to_binary/1, from_binary/1]).
-export([log_merge_errors/4, meta/2, merge_value/2]).

-include("riak_kv_wm_raw.hrl").
-include_lib("riak_kv_types.hrl").
Expand Down Expand Up @@ -103,6 +104,12 @@ merge_object(RObj) ->
maybe_log_sibling_crdts(Bucket, Key, CRDTs),
{CRDTs, NonCRDTSiblings}.

%% @doc log any accumulated merge errors
-spec log_merge_errors(riak_object:bucket(), riak_object:key(), crdts(), list()) -> ok.
log_merge_errors(Bucket, Key, CRDTs, Errors) ->
log_errors(Bucket, Key, Errors),
maybe_log_sibling_crdts(Bucket, Key, CRDTs).

log_errors(_, _, []) ->
ok;
log_errors(Bucket, Key, Errors) ->
Expand All @@ -124,9 +131,8 @@ merge_contents(Contents) ->
{orddict:new(), [], []},
Contents).

%% @private worker for `merge_contents/1' if the content is a CRDT,
%% de-binary it, merge it and store the most merged value in the
%% accumulator dictionary.
%% @doc if the content is a CRDT, de-binary it, merge it and store the
%% most merged value in the accumulator dictionary.
-spec merge_value(ro_content(), {crdts(), ro_contents()}) ->
{crdts(), ro_contents()}.
merge_value({MD, <<?TAG:8/integer, Version:8/integer, CRDTBin/binary>>=Content},
Expand Down Expand Up @@ -293,7 +299,12 @@ merge_meta(CType, Meta1, Meta2) ->
end,
%% Make sure the content type is
%% up-to-date
dict:store(?MD_CTYPE, CType, Meta).
drop_the_dot(dict:store(?MD_CTYPE, CType, Meta)).

%% @private Never keep a dot for CRDTs, we want all values to survive
%% a riak_obect:merge/2
drop_the_dot(Dict) ->
dict:erase(riak_object:dot_key(), Dict).

lastmod(Meta) ->
dict:fetch(?MD_LASTMOD, Meta).
Expand Down
4 changes: 1 addition & 3 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1186,9 +1186,7 @@ handle_crdt(_, undefined, _VId, RObj) ->
handle_crdt(true, CRDTOp, VId, RObj) ->
riak_kv_crdt:update(RObj, VId, CRDTOp);
handle_crdt(false, _CRDTOp, _Vid, RObj) ->
%% non co-ord put that was a CRDT operation? Merge the values if
%% there are siblings 'cos that is the point of CRDTs: no siblings
riak_kv_crdt:merge(RObj).
RObj.

perform_put({fail, _, _}=Reply, State, _PutArgs) ->
{Reply, State};
Expand Down
119 changes: 85 additions & 34 deletions src/riak_object.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
%% -type bkey() :: {bucket(), key()}.
-type value() :: term().

%% Used by the merge function to accumulate contents
-record(merge_acc, {
drop=orddict:new() :: orddict:orddict(),
keep=ordsets:new() :: ordset:ordset(),
crdt=orddict:new() :: orddict:orddict(),
error=[] :: list(binary())
}).

-record(r_content, {
metadata :: dict(),
value :: term()
Expand All @@ -51,6 +59,8 @@
}).
-opaque riak_object() :: #r_object{}.

-type merge_acc() :: #merge_acc{}.
-type r_content() :: #r_content{}.
-type index_op() :: add | remove.
-type index_value() :: integer() | binary().
-type binary_version() :: v0 | v1.
Expand Down Expand Up @@ -82,6 +92,7 @@
-export([update_last_modified/1]).
-export([strict_descendant/2]).
-export([assign_dot/2]).
-export([dot_key/0]).

%% @doc Constructor for new riak objects.
-spec new(Bucket::bucket(), Key::key(), Value::value()) -> riak_object().
Expand Down Expand Up @@ -260,11 +271,8 @@ compare_content_dates(C1,C2) ->
-spec merge(riak_object(), riak_object()) -> riak_object().
merge(OldObject, NewObject) ->
NewObj1 = apply_updates(NewObject),
OldObject#r_object{contents=merge_contents(NewObject#r_object.contents,
OldObject#r_object.contents,
NewObj1#r_object.vclock,
OldObject#r_object.vclock,
dvv_enabled()),
Contents = merge_contents(NewObject, OldObject, dvv_enabled()),
OldObject#r_object{contents=Contents,
vclock=vclock:merge([OldObject#r_object.vclock,
NewObj1#r_object.vclock]),
updatemetadata=dict:store(clean, true, dict:new()),
Expand All @@ -273,50 +281,85 @@ merge(OldObject, NewObject) ->

%% @doc Merge the r_objects contents by converting the inner dict to
%% a list, ensuring a sane order, and merging into a unique list.
merge_contents(NewContents, OldContents, _NewClock, _OldClock, false) ->
OldContents1 = lists:map(fun convert_to_dict_list/1, OldContents),
NewContents1 = lists:map(fun convert_to_dict_list/1, NewContents),
Result = lists:umerge(lists:usort(NewContents1),
lists:usort(OldContents1)),
merge_contents(NewObject, OldObject, false) ->
OldContents = lists:map(fun convert_to_dict_list/1, OldObject#r_object.contents),
NewContents = lists:map(fun convert_to_dict_list/1, NewObject#r_object.contents),
Result = lists:umerge(lists:usort(NewContents),
lists:usort(OldContents)),
lists:map(fun convert_from_dict_list/1, Result);
merge_contents(NewContents, OldContents, NewClock, OldClock, true) ->
{MaybeDropContents, OldContents1} = lists:foldl(fun(C, A) ->
fold_contents(C, A, NewClock)
end,
{orddict:new(), []},
OldContents),
{_DropContents, NewContents1} = lists:foldl(fun(C, A) ->
fold_contents(C, A, OldClock)
end,
{MaybeDropContents, []},
NewContents),

Result = lists:umerge(lists:usort(NewContents1),
lists:usort(OldContents1)),
lists:map(fun convert_from_dict_list/1, Result).


fold_contents({r_content, Dict, _Value}=C0, {DropContents, KeepContents}, Clock) ->
%% @private with DVV enabled, use event dots in sibling metadata to
%% remove dominated siblings and stop fake concurrency that causes
%% sibling explsion. Also, since every sibling is iterated over (some
%% twice!) why not merge CRDTs here, too?
merge_contents(NewObject, OldObject, true) ->
Bucket = bucket(NewObject),
Key = key(NewObject),
MergeAcc0 = prune_object_siblings(OldObject, vclock(NewObject)),
MergeAcc = prune_object_siblings(NewObject, vclock(OldObject), MergeAcc0),
#merge_acc{crdt=CRDT, error=Error} = MergeAcc,
riak_kv_crdt:log_merge_errors(Bucket, Key, CRDT, Error),
merge_acc_to_contents(MergeAcc).

%% @private de-duplicates, removes dominated siblings, merges CRDTs
-spec prune_object_siblings(riak_object(), vclock:vclock()) -> merge_acc().
prune_object_siblings(Object, Clock) ->
prune_object_siblings(Object, Clock, #merge_acc{}).

-spec prune_object_siblings(riak_object(), vclock:vclock(), merge_acc()) -> merge_acc().
prune_object_siblings(Object, Clock, MergeAcc) ->
lists:foldl(fun(Content, Acc) ->
fold_contents(Content, Acc, Clock)
end,
MergeAcc,
Object#r_object.contents).

%% @private called once for each content entry for each object being
%% merged. It's job is to drop siblings that are causally dominated,
%% remove duplicate values, and merge CRDT values down to a single
%% value.
-spec fold_contents(r_content(), merge_acc(), vclock:vclock()) -> merge_acc().
fold_contents({r_content, Dict, Value}=C0, MergeAcc, Clock) ->
#merge_acc{drop=Drop, keep=Keep, crdt=CRDT, error=Error} = MergeAcc,
case get_dot(Dict) of
{ok, Dot} ->
case {vclock:descends(Clock, [Dot]), orddict:is_key(Dot, DropContents)} of
case {vclock:descends(Clock, [Dot]), orddict:is_key(Dot, Drop)} of
{true, true} ->
%% Dot present on both sides, keep it
Content = convert_to_dict_list(C0),
{DropContents, [Content | KeepContents]};
MergeAcc#merge_acc{keep=ordsets:add_element(Content, Keep)};
{true, false} ->
%% Dominated dot, add it to the drop list
{orddict:store(Dot, C0, DropContents), KeepContents};
MergeAcc#merge_acc{drop=orddict:store(Dot, C0, Drop)};
{false, _} ->
%% Not dominated, keep it
Content = convert_to_dict_list(C0),
{DropContents, [Content | KeepContents]}
MergeAcc#merge_acc{keep=ordsets:add_element(Content, Keep)}
end;
undefined ->
Content = convert_to_dict_list(C0),
{DropContents, [Content | KeepContents]}
%% CRDTs and legacy data don't have dots, which are you?
case riak_kv_crdt:merge_value({Dict, Value}, {CRDT, [], []}) of
{CRDT, [], [E]} ->
MergeAcc#merge_acc{error=[E | Error]};
{CRDT, [{Dict, Value}], []} ->
Content = convert_to_dict_list(C0),
MergeAcc#merge_acc{keep=ordsets:add_element(Content, Keep)};
{CRDT2, [], []} ->
MergeAcc#merge_acc{crdt=CRDT2}
end
end.

%% @private Transform a `merge_acc()' to a list of `r_content()'
-spec merge_acc_to_contents(merge_acc()) -> list(r_content()).
merge_acc_to_contents(MergeAcc) ->
#merge_acc{keep=Keep, crdt=CRDTs} = MergeAcc,
Keep2 = lists:map(fun convert_from_dict_list/1, ordsets:to_list(Keep)),
orddict:fold(fun(_Type, {Meta, CRDT}, Acc) ->
[{r_content, riak_kv_crdt:meta(Meta, CRDT),
riak_kv_crdt:to_binary(CRDT)} | Acc]
end,
Keep2,
CRDTs).

%% @private Get the dot from the passed metadata dict
%% (if present and valid).
-spec get_dot(dict()) -> {ok, vclock:dot()} | undefined.
Expand All @@ -336,6 +379,8 @@ get_dot(Dict) ->
%% @doc Convert a r_content's inner dictionary to a list, and sort it to
%% ensure we can do comparsions between r_contents.
convert_to_dict_list({r_content, Dict, Value}) ->
{r_content, lists:usort(dict:to_list(Dict)), Value};
convert_to_dict_list({Dict, Value}) ->
{r_content, lists:usort(dict:to_list(Dict)), Value}.

%% @doc Convert a r_content's inner dictionary from a list to a dict.
Expand Down Expand Up @@ -486,12 +531,18 @@ assign_dot(Object, Dot) ->
throw({error, invalid_dot})
end.

%% @doc the binary key used for the `dot` in metadata.
-spec dot_key() -> binary().
dot_key() ->
?DOT.

%% @private assign the dot to the value only if DVV is enabled. Only
%% call with a valid dot. Only assign dot when there is a single value
%% in contents.
-spec assign_dot(riak_object(), vclock:dot(), boolean()) -> riak_object().
assign_dot(Object, Dot, true) ->
#r_object{contents=[C=#r_content{metadata=Meta0}]} = Object,
%% Dot must actually be a vclock, allbeit it one with a single entry
Object#r_object{contents=[C#r_content{metadata=dict:store(?DOT, Dot, Meta0)}]};
assign_dot(Object, _Dot, _DVVEnabled) ->
Object.
Expand Down