Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Mutators to support changing object on read/write #661

Merged
merged 26 commits into from
Oct 4, 2013
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9153e4c
Step one in reduced replications: a hook to mutate.
lordnull Aug 20, 2013
a62a352
Refactored to use the new metadata system.
lordnull Aug 21, 2013
e3832e5
Added test for when no mutators ever registered.
lordnull Aug 21, 2013
6369d64
Made get mutation dependant on a put mutation.
lordnull Aug 21, 2013
4a23881
Gets mutators no longer get bucket properties.
lordnull Aug 21, 2013
087bd33
Hooked meta data into get/put path and tests.
lordnull Aug 22, 2013
13b1dac
Fixed mutator mutation order.
lordnull Aug 23, 2013
1580dfa
added priorities to mutators.
lordnull Aug 26, 2013
9fb7fa5
Fixed typo
lordnull Aug 30, 2013
fc4ca3c
Refactored / changed mutator to be more resilient.
lordnull Sep 3, 2013
22b9ed8
Fixed vnode call to mutator system.
lordnull Sep 3, 2013
548974b
refactored get mutation again.
lordnull Sep 9, 2013
b9c9a7f
Get mutators can now (legitimately) return notfound
lordnull Sep 9, 2013
3355c87
Allowed mutators to use notfound as a response on get.
lordnull Sep 10, 2013
727a8b2
Much documentation for mutation.
lordnull Sep 10, 2013
f89adf0
Fixed resolver for mutators.
lordnull Sep 11, 2013
f93f53d
Removed eunit test hrl inclusion
lordnull Sep 23, 2013
4255be2
Added comment about conflict resultion on mutators
lordnull Sep 23, 2013
d072919
Fixed typo.
lordnull Sep 23, 2013
7595fa9
Added comments and documentation
lordnull Sep 23, 2013
ac0795a
Added capability declaration
lordnull Sep 25, 2013
95c67c4
Fixed setup/teardown to reflect metadata requirements
lordnull Sep 26, 2013
6f26412
Fixed grammatical errors in docs.
lordnull Oct 3, 2013
2d3dd83
Added warning about removing mutator code.
lordnull Oct 3, 2013
c52a1e6
Only doing a sort on mutation in worst case
lordnull Oct 3, 2013
4a45469
Fixed typo
lordnull Oct 3, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/riak_kv_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ start(_Type, _StartArgs) ->
[enabled, disabled],
disabled),

% get/put mutator is supported, but not all nodes may have it
riak_core_capability:register({riak_kv, mutators}, [true, false], false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This capability is not being queried anywhere that I can see.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in repl before attempting to install the reduced repl mutator: https://github.com/basho/riak_repl/blob/develop/src/riak_repl_sup.erl#L20


HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false),
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
Expand Down
240 changes: 240 additions & 0 deletions src/riak_kv_mutator.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
%% -------------------------------------------------------------------
%%
%% riak_kv_mutators - Storage and retrieval for get/put mutation
%% functions
%%
%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% @doc There are circumstances where the object stored on disk is not
%% the object to return; and there are times the object written to the
%% data storage backend is not meant to be the object given. An
%% example would be storing only meta data for an object on a remote
%% cluster. This module is an interface to register mutators that will
%% be run.
%%
%% This doubles as a behavior defining module for the mutators.
%%
%% == Words of Warning ==
%%
%% Once an object has been stored with a given mutator list, attempting to
%% retreive that object will require those mutators. If the mutators done't
%% exists or have changed in a non-backwards compatible way, you can expect
%% to have at worst a crash, or at best a corrupted object.
%%
%% == Callbacks ==
%%
%% A mutator callback must implement 2 function: mutate_put/5 and mutate_get/1.
%%
%% <code><b>mutate_put(MetaData, Value, ExposedMeta,
%% FullObject, BucketProperties) -> Result</b></code>
%%
%% Types:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think edoc can generate all this from dialyzer -type and -callback specs now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking into this (and trying it as well), edoc lists the functions but doesn't give type information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you did like -type metadata() :: dict(). and then used metadata() it'd show them better?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to define better. I'd love to be able to have it automatically use the correct types, but that's a ways out. Adding links to types is a next step, but would it be better inline (in the code element) or in the expanded area?

%% ```
%% MetaData = dict()
%% Value = term()
%% ExposedMeta = dict()
%% FullObject = riak_object:riak_object()
%% BucketProperties = orddict:orddict()
%% Result = {NewMeta, NewValue, NewExposedMeta}
%% NewMeta = dict()
%% NewValue = term()
%% NewExposedMeta = dict()'''
%%
%% The mutate_put callback is called for each metadata/value pair a riak_object
%% has. The return value of NewMeta and NewValue are used by the storage backend
%% while the NewExposedMeta is used for the client return where NewMeta would
%% normally. The NewExposedMeta is merged with the NewMeta to generate the
%% exposed metadata; if the same key is found, the NewExposedMeta value is used.
%%
%% The mutations are run in the same process as the vnode.
%%
%% <code><b>mutate_get(Object) -> Result</b></code>
%%
%% Types:
%% ```
%% Object = riak_object:riak_object()
%% Result = riak_object:riak_object() | 'notfound'
%% '''
%% Take the object from storage and reverse whatever mutation was applied. Note
%% the bucket properties are not part of this callback, so if some data is
%% important to reverse a mutation, it must be put in the metadata by the
%% `mutate_put' function. Also note how the entire object is given as opposed to
%% simply a metadata/value pair. Care must be taken not to corrupt the object.
%%
%% A return of ``'notfound''' stops the mutator chain and returns immediately. This
%% provides an escape hatch of sorts; if the mutator cannot reverse the mutation
%% effectively, return ``'notfound'''.

-module(riak_kv_mutator).

-export([register/1, register/2, unregister/1]).
-export([get/0]).
-export([mutate_put/2, mutate_get/1]).

-callback mutate_put(Meta :: dict(), Value :: any(), ExposedMeta :: dict(), FullObject :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {dict(), any(), dict()}.
-callback mutate_get(FullObject :: riak_object:riak_object()) -> riak_object:riak_object() | 'notfound'.

-define(DEFAULT_PRIORITY, 0).

%% @doc Register the given module as a mutator with the default priority of 0.
%% @see register/2
-spec register(Module :: atom()) -> 'ok'.
register(Module) ->
?MODULE:register(Module, ?DEFAULT_PRIORITY).

%% @doc Register a module as a mutator with the given priority. Modules with
%% equal priority are done in default (alphabetical) order. A module
%% can only be registered once. When there is a conflict (two different
%% lists), those lists are merged. Note that if an object is stored with
%% a mutator, that mutator is used on retrieval. If that mutator code is
%% removed or changed in a backwards incompatible manner, at best the
%% object will be corrupted; at worst it will cause a crash.
-spec register(Module :: atom(), Priority :: term()) -> 'ok'.
register(Module, Priority) ->
Modifier = fun
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice to comment what resolution strategy this is trying to implement.

(undefined) ->
[{Module, Priority}];
(Values) ->
Values2 = merge_values(Values),
orddict:store(Module, Priority, Values2)
end,
riak_core_metadata:put({riak_kv, mutators}, list, Modifier).

%% @doc Remove a module from the mutator list. Removing a mutator from the
%% list does not remove the mutator from use for retreiving objects. Any
%% object that was stored while the mutator was registered will use that
%% mutator on get. Thus, if the code for a mutator is not available or
%% was changed in a non-backwards compatible way, at best one can expect
%% corrupt objects, at worst a crash.
-spec unregister(Module :: atom()) -> 'ok'.
unregister(Module) ->
Modifier = fun
(undefined) ->
[];
(Values) ->
Values2 = merge_values(Values),
orddict:erase(Module, Values2)
end,
riak_core_metadata:put({riak_kv, mutators}, list, Modifier, []).

%% @doc Retrieve the list of mutators in the order to apply them when doing a
%% a put mutation. To get the order when doing a get mutation, reverse the list.
-spec get() -> [atom()].
get() ->
Resolver = fun
('$deleted', '$deleted') ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note for @jrwest: it seems that we should have the metadata tombstone in a public header or is there a way for resolvers not to use this atom directly?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hrm thats a good point. we certainly want to expose tombstones to resolvers (or at least have the option to) but leaking this is a bit nasty. a macro would probably be quickest, but i wonder if there is a better way...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for this code specifically, does it even call riak_core_metadata:delete? If not, don't really need to have these here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well no, but I think it's worth handling anyway for our convenience at the very least. I used delete while testing it when the format was changed, for example.

[];
('$deleted', Values) ->
Values;
(Values, '$deleted') ->
Values;
(Values1, Values2) ->
merge_values([Values1, Values2])
end,
ModulesAndPriors = riak_core_metadata:get({riak_kv, mutators}, list, [{default, []}, {resolver, Resolver}]),
Flipped = [{P, M} || {M, P} <- ModulesAndPriors],
Sorted = lists:sort(Flipped),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A sort in the critical path could become an issue with multiple mutators. I know we are not planning to add those soon, but it seems that the resolution and merge functions could take care of this directly and store values in sorted order, no?

Modules = [M || {_P, M} <- Sorted],
{ok, Modules}.

%% @doc Unmutate an object after retrieval from storage. When an object is
%% mutated, the mutators applied are put into the object's metadata.
%% If the mutator does not exist anymore or has changed in a backwards
%% incompatible manner, at best there will be corrupt objects, at worst
%% a crash.
-spec mutate_get(Object :: riak_object:riak_object()) -> riak_object:riak_object().
mutate_get(Object) ->
[Meta | _] = riak_object:get_metadatas(Object),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? What about siblings with no mutators in their metadata being higher up the list?

case dict:find(mutators_applied, Meta) of
error ->
Object;
{ok, Applied} ->
DeMutateOrder = lists:reverse(Applied),
mutate_get(Object, DeMutateOrder)
end.

mutate_get(Object, []) ->
Object;
mutate_get(Object, [Mutator | Tail]) ->
% so event though the mutate_put callback has to return
% {Meta, Value, Exposed} values, the get callback gets away with just
% giving the object? This is to avoid complicated interaction with
% notfound return.
case Mutator:mutate_get(Object) of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that this makes it possible to invalidate all the data in the cluster if a mutator stored in object metadata is removed from the code. Users of this feature will have to be careful about some downgrade scenarios where data is written with a mutator in the list than then disappears on downgrade.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we just leave this to documentation, or should there be some insurance put in the code here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A note in the documentation would be enough for now. It's good to flag the worse case scenarios.

notfound ->
notfound;
Object2 ->
mutate_get(Object2, Tail)
end.

%% @doc Mutate an object in preparation to storage, returning a tuple of the
%% object to store and the object to return to the client. For each sibling
%% the object has {Meta, Value} pair, each mutator is called with a copy
%% that iteration's Meta used as the exposed meta." Later mutators are
%% given the results of previous mutators. Once all mutations are complete,
%% two {@link riak_object:riak_object()}s are returned. The first is what
%% is to be stored, while the second has the exposed meta set with the
%% orginal value(s).
-spec mutate_put(Object :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {riak_object:riak_object(), riak_object:riak_object()}.
mutate_put(Object, BucketProps) ->
Contents = riak_object:get_contents(Object),
{ok, Modules} = ?MODULE:get(),
MetasValuesRevealeds = lists:map(fun({InMeta, InValue}) ->
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for each sibling in the object, we apply all the mutators, then replace the contents of the object with the result?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, more comments here would be nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully 5934e3b will address this particular issue.

{InMeta1, InValue1, InRevealed} = lists:foldl(fun(Module, {InInMeta, InInValue, InInRevealed}) ->
% why not just give the riak_object? because of a warning
% in riak_object stating that set_contents is for internal
% use only. Hopefully this qualifies.
Module:mutate_put(InInMeta, InInValue, InInRevealed, Object, BucketProps)
end, {InMeta, InValue, dict:new()}, Modules),
InMeta2 = dict:store(mutators_applied, Modules, InMeta1),
{InMeta2, InValue1, InRevealed}
end, Contents),
Contents2 = [{M,V} || {M,V,_R} <- MetasValuesRevealeds],
Mutated = riak_object:set_contents(Object, Contents2),
FakedContents = lists:map(fun({InMeta, InContent, InRevealed}) ->
FixedMeta = dict:merge(fun(_Key, _NotMutated, MutatedVal) ->
MutatedVal
end, InMeta, InRevealed),
{FixedMeta, InContent}
end, MetasValuesRevealeds),
Faked = riak_object:set_contents(Object, FakedContents),
{Mutated, Faked}.

merge_values([]) ->
[];

merge_values(Values) ->
case lists:filter(fun erlang:is_list/1, Values) of
[] ->
[];
[Head | Tail] ->
merge_values(Tail, Head)
end.

merge_values([], Acc) ->
Acc;

merge_values([Head | Tail], Acc) ->
Acc2 = orddict:merge(fun merge_fun/3, Acc, Head),
merge_values(Tail, Acc2).

merge_fun(_Key, P1, P2) when P1 < P2 ->
P1;
merge_fun(_Key, _P1, P2) ->
P2.
25 changes: 19 additions & 6 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1104,15 +1104,17 @@ perform_put({true, Obj},
modstate=ModState}=State,
#putargs{returnbody=RB,
bkey={Bucket, Key},
bprops=BProps,
reqid=ReqID,
index_specs=IndexSpecs}) ->
case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of
{Obj2, Fake} = riak_kv_mutator:mutate_put(Obj, BProps),
case encode_and_put(Obj2, Mod, Bucket, Key, IndexSpecs, ModState) of
{{ok, UpdModState}, EncodedVal} ->
update_hashtree(Bucket, Key, EncodedVal, State),
?INDEX(Obj, put, Idx),
case RB of
true ->
Reply = {dw, Idx, Obj, ReqID};
Reply = {dw, Idx, Fake, ReqID};
false ->
Reply = {dw, Idx, ReqID}
end;
Expand Down Expand Up @@ -1211,7 +1213,12 @@ do_get(_Sender, BKey, ReqID,
do_get_term({Bucket, Key}, Mod, ModState) ->
case do_get_object(Bucket, Key, Mod, ModState) of
{ok, Obj, _UpdModState} ->
{ok, Obj};
case riak_kv_mutator:mutate_get(Obj) of
notfound ->
{error, notfound};
Obj2 ->
{ok, Obj2}
end;
%% @TODO Eventually it would be good to
%% make the use of not_found or notfound
%% consistent throughout the code.
Expand Down Expand Up @@ -1504,7 +1511,7 @@ get_hashtree_token() ->
-spec max_hashtree_tokens() -> pos_integer().
max_hashtree_tokens() ->
app_helper:get_env(riak_kv,
anti_entropy_max_async,
anti_entropy_max_async,
?DEFAULT_HASHTREE_TOKENS).

%% @private
Expand Down Expand Up @@ -1654,7 +1661,7 @@ handoff_data_encoding_method() ->
%% its encoding method, but if that fails we use the legacy zlib and protocol buffer decoding:
decode_binary_object(BinaryObject) ->
try binary_to_term(BinaryObject) of
{ Method, BinObj } ->
{ Method, BinObj } ->
case Method of
encode_raw -> {B, K, Val} = BinObj,
BKey = {B, K},
Expand All @@ -1670,7 +1677,7 @@ decode_binary_object(BinaryObject) ->
%% An exception means we have a legacy handoff object:
catch
_:_ -> do_zlib_decode(BinaryObject)
end.
end.

do_zlib_decode(BinaryObject) ->
DecodedObject = zlib:unzip(BinaryObject),
Expand Down Expand Up @@ -1863,11 +1870,13 @@ list_buckets_test_() ->
application:start(folsom),
riak_core_stat_cache:start_link(),
riak_kv_stat:register_stats(),
riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
Env
end,
fun(Env) ->
riak_core_ring_manager:cleanup_ets(test),
riak_core_stat_cache:stop(),
riak_kv_test_util:stop_process(riak_core_metadata_manager),
application:stop(folsom),
application:stop(sasl),
[application:unset_env(riak_kv, K) ||
Expand Down Expand Up @@ -1919,6 +1928,7 @@ list_buckets_test_i(BackendMod) ->
filter_keys_test() ->
riak_core_ring_manager:setup_ets(test),
clean_test_dirs(),
riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
{S, B, K} = backend_with_known_key(riak_kv_memory_backend),
Caller1 = new_result_listener(keys),
handle_coverage(?KV_LISTKEYS_REQ{bucket=B,
Expand All @@ -1939,6 +1949,7 @@ filter_keys_test() ->
?assertEqual({ok, []}, results_from_listener(Caller3)),

riak_core_ring_manager:cleanup_ets(test),
riak_kv_test_util:stop_process(riak_core_metadata_manager),
flush_msgs().

%% include bitcask.hrl for HEADER_SIZE macro
Expand All @@ -1948,6 +1959,7 @@ filter_keys_test() ->
%% preparation for a write prevents the write from going through.
bitcask_badcrc_test() ->
riak_core_ring_manager:setup_ets(test),
riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
clean_test_dirs(),
{S, B, K} = backend_with_known_key(riak_kv_bitcask_backend),
DataDir = filename:join(bitcask_test_dir(), "0"),
Expand All @@ -1964,6 +1976,7 @@ bitcask_badcrc_test() ->
{raw, 456, self()},
S),
riak_core_ring_manager:cleanup_ets(test),
riak_kv_test_util:stop_process(riak_core_metadata_manager),
flush_msgs().


Expand Down
Loading