diff --git a/src/riak_kv_app.erl b/src/riak_kv_app.erl
index 498dd561e2..b20a0f8098 100644
--- a/src/riak_kv_app.erl
+++ b/src/riak_kv_app.erl
@@ -187,6 +187,9 @@ start(_Type, _StartArgs) ->
[enabled, disabled],
disabled),
+ % get/put mutator is supported, but not all nodes may have it
+ riak_core_capability:register({riak_kv, mutators}, [true, false], false),
+
HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false),
%% Go ahead and mark the riak_kv service as up in the node watcher.
%% The riak_core_ring_handler blocks until all vnodes have been started
diff --git a/src/riak_kv_mutator.erl b/src/riak_kv_mutator.erl
new file mode 100644
index 0000000000..499d283542
--- /dev/null
+++ b/src/riak_kv_mutator.erl
@@ -0,0 +1,247 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_kv_mutators - Storage and retrieval for get/put mutation
+%% functions
+%%
+%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc There are circumstances where the object stored on disk is not
+%% the object to return; and there are times the object written to the
+%% data storage backend is not meant to be the object given. An
+%% example would be storing only meta data for an object on a remote
+%% cluster. This module is an interface to register mutators that will
+%% be run.
+%%
+%% This doubles as a behavior defining module for the mutators.
+%%
+%% == Words of Warning ==
+%%
+%% Once an object has been stored with a given mutator list, attempting to
+%% retreive that object will require those mutators. If the mutators done't
+%% exists or have changed in a non-backwards compatible way, you can expect
+%% to have at worst a crash, or at best a corrupted object.
+%%
+%% == Callbacks ==
+%%
+%% A mutator callback must implement 2 function: mutate_put/5 and mutate_get/1.
+%%
+%% mutate_put(MetaData, Value, ExposedMeta,
+%% FullObject, BucketProperties) -> Result
+%%
+%% Types:
+%% ```
+%% MetaData = dict()
+%% Value = term()
+%% ExposedMeta = dict()
+%% FullObject = riak_object:riak_object()
+%% BucketProperties = orddict:orddict()
+%% Result = {NewMeta, NewValue, NewExposedMeta}
+%% NewMeta = dict()
+%% NewValue = term()
+%% NewExposedMeta = dict()'''
+%%
+%% The mutate_put callback is called for each metadata/value pair a riak_object
+%% has. The return value of NewMeta and NewValue are used by the storage backend
+%% while the NewExposedMeta is used for the client return where NewMeta would
+%% normally. The NewExposedMeta is merged with the NewMeta to generate the
+%% exposed metadata; if the same key is found, the NewExposedMeta value is used.
+%%
+%% The mutations are run in the same process as the vnode.
+%%
+%% mutate_get(Object) -> Result
+%%
+%% Types:
+%% ```
+%% Object = riak_object:riak_object()
+%% Result = riak_object:riak_object() | 'notfound'
+%% '''
+%% Take the object from storage and reverse whatever mutation was applied. Note
+%% the bucket properties are not part of this callback, so if some data is
+%% important to reverse a mutation, it must be put in the metadata by the
+%% `mutate_put' function. Also note how the entire object is given as opposed to
+%% simply a metadata/value pair. Care must be taken not to corrupt the object.
+%%
+%% A return of ``'notfound''' stops the mutator chain and returns immediately. This
+%% provides an escape hatch of sorts; if the mutator cannot reverse the mutation
+%% effectively, return ``'notfound'''.
+
+-module(riak_kv_mutator).
+
+-export([register/1, register/2, unregister/1]).
+-export([get/0]).
+-export([mutate_put/2, mutate_get/1]).
+
+-callback mutate_put(Meta :: dict(), Value :: any(), ExposedMeta :: dict(), FullObject :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {dict(), any(), dict()}.
+-callback mutate_get(FullObject :: riak_object:riak_object()) -> riak_object:riak_object() | 'notfound'.
+
+-define(DEFAULT_PRIORITY, 0).
+
+%% @doc Register the given module as a mutator with the default priority of 0.
+%% @see register/2
+-spec register(Module :: atom()) -> 'ok'.
+register(Module) ->
+ ?MODULE:register(Module, ?DEFAULT_PRIORITY).
+
+%% @doc Register a module as a mutator with the given priority. Modules with
+%% equal priority are done in default (alphabetical) order. A module
+%% can only be registered once. When there is a conflict (two different
+%% lists), those lists are merged. Note that if an object is stored with
+%% a mutator, that mutator is used on retrieval. If that mutator code is
+%% removed or changed in a backwards incompatible manner, at best the
+%% object will be corrupted; at worst it will cause a crash.
+-spec register(Module :: atom(), Priority :: term()) -> 'ok'.
+register(Module, Priority) ->
+ Modifier = fun
+ (undefined) ->
+ [{Priority, Module}];
+ (Values) ->
+ Values2 = merge_values(Values),
+ insert_mutator(Module, Priority, Values2)
+ end,
+ riak_core_metadata:put({riak_kv, mutators}, list, Modifier).
+
+%% @doc Remove a module from the mutator list. Removing a mutator from the
+%% list does not remove the mutator from use for retreiving objects. Any
+%% object that was stored while the mutator was registered will use that
+%% mutator on get. Thus, if the code for a mutator is not available or
+%% was changed in a non-backwards compatible way, at best one can expect
+%% corrupt objects, at worst a crash.
+-spec unregister(Module :: atom()) -> 'ok'.
+unregister(Module) ->
+ Modifier = fun
+ (undefined) ->
+ [];
+ (Values) ->
+ Values2 = merge_values(Values),
+ lists:keydelete(Module, 2, Values2)
+ end,
+ riak_core_metadata:put({riak_kv, mutators}, list, Modifier, []).
+
+%% @doc Retrieve the list of mutators in the order to apply them when doing a
+%% a put mutation. To get the order when doing a get mutation, reverse the list.
+-spec get() -> [atom()].
+get() ->
+ Resolver = fun
+ ('$deleted', '$deleted') ->
+ [];
+ ('$deleted', Values) ->
+ Values;
+ (Values, '$deleted') ->
+ Values;
+ (Values, Values) ->
+ Values;
+ (Values1, Values2) ->
+ merge_values([Values1, Values2])
+ end,
+ ModulesAndPriors = riak_core_metadata:get({riak_kv, mutators}, list, [{default, []}, {resolver, Resolver}]),
+ Modules = [M || {_P, M} <- ModulesAndPriors],
+ {ok, Modules}.
+
+%% @doc Unmutate an object after retrieval from storage. When an object is
+%% mutated, the mutators applied are put into the object's metadata.
+%% If the mutator does not exist anymore or has changed in a backwards
+%% incompatible manner, at best there will be corrupt objects, at worst
+%% a crash.
+-spec mutate_get(Object :: riak_object:riak_object()) -> riak_object:riak_object().
+mutate_get(Object) ->
+ [Meta | _] = riak_object:get_metadatas(Object),
+ case dict:find(mutators_applied, Meta) of
+ error ->
+ Object;
+ {ok, Applied} ->
+ DeMutateOrder = lists:reverse(Applied),
+ mutate_get(Object, DeMutateOrder)
+ end.
+
+mutate_get(Object, []) ->
+ Object;
+mutate_get(Object, [Mutator | Tail]) ->
+ % so event though the mutate_put callback has to return
+ % {Meta, Value, Exposed} values, the get callback gets away with just
+ % giving the object? This is to avoid complicated interaction with
+ % notfound return.
+ case Mutator:mutate_get(Object) of
+ notfound ->
+ notfound;
+ Object2 ->
+ mutate_get(Object2, Tail)
+ end.
+
+%% @doc Mutate an object in preparation to storage, returning a tuple of the
+%% object to store and the object to return to the client. For each sibling
+%% the object has {Meta, Value} pair, each mutator is called with a copy
+%% that iteration's Meta used as the exposed meta." Later mutators are
+%% given the results of previous mutators. Once all mutations are complete,
+%% two {@link riak_object:riak_object()}s are returned. The first is what
+%% is to be stored, while the second has the exposed meta set with the
+%% orginal value(s).
+-spec mutate_put(Object :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {riak_object:riak_object(), riak_object:riak_object()}.
+mutate_put(Object, BucketProps) ->
+ Contents = riak_object:get_contents(Object),
+ {ok, Modules} = ?MODULE:get(),
+ MetasValuesRevealeds = lists:map(fun({InMeta, InValue}) ->
+ {InMeta1, InValue1, InRevealed} = lists:foldl(fun(Module, {InInMeta, InInValue, InInRevealed}) ->
+ % why not just give the riak_object? because of a warning
+ % in riak_object stating that set_contents is for internal
+ % use only. Hopefully this qualifies.
+ Module:mutate_put(InInMeta, InInValue, InInRevealed, Object, BucketProps)
+ end, {InMeta, InValue, dict:new()}, Modules),
+ InMeta2 = dict:store(mutators_applied, Modules, InMeta1),
+ {InMeta2, InValue1, InRevealed}
+ end, Contents),
+ Contents2 = [{M,V} || {M,V,_R} <- MetasValuesRevealeds],
+ Mutated = riak_object:set_contents(Object, Contents2),
+ FakedContents = lists:map(fun({InMeta, InContent, InRevealed}) ->
+ FixedMeta = dict:merge(fun(_Key, _NotMutated, MutatedVal) ->
+ MutatedVal
+ end, InMeta, InRevealed),
+ {FixedMeta, InContent}
+ end, MetasValuesRevealeds),
+ Faked = riak_object:set_contents(Object, FakedContents),
+ {Mutated, Faked}.
+
+merge_values([]) ->
+ [];
+
+merge_values(Values) ->
+ case lists:filter(fun erlang:is_list/1, Values) of
+ [] ->
+ [];
+ [Head | Tail] ->
+ merge_values(Tail, Head)
+ end.
+
+merge_values([], Acc) ->
+ Acc;
+
+merge_values([{Priority, Module} = Mutator | Tail], Acc) ->
+ Acc2 = case lists:keyfind(Module, 2, Acc) of
+ false ->
+ ordsets:add_element(Mutator, Acc);
+ {P1, _} when P1 < Priority ->
+ Acc;
+ Else ->
+ ordsets:add_element(Mutator, ordsets:del_element(Else))
+ end,
+ merge_values(Tail, Acc2).
+
+insert_mutator(Module, Priority, Mutators) ->
+ Mutators2 = lists:keydelete(Module, 2, Mutators),
+ ordsets:add_element({Priority, Module}, Mutators2).
+
diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl
index 10c1105c06..4940928a2c 100644
--- a/src/riak_kv_vnode.erl
+++ b/src/riak_kv_vnode.erl
@@ -1104,15 +1104,17 @@ perform_put({true, Obj},
modstate=ModState}=State,
#putargs{returnbody=RB,
bkey={Bucket, Key},
+ bprops=BProps,
reqid=ReqID,
index_specs=IndexSpecs}) ->
- case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of
+ {Obj2, Fake} = riak_kv_mutator:mutate_put(Obj, BProps),
+ case encode_and_put(Obj2, Mod, Bucket, Key, IndexSpecs, ModState) of
{{ok, UpdModState}, EncodedVal} ->
update_hashtree(Bucket, Key, EncodedVal, State),
?INDEX(Obj, put, Idx),
case RB of
true ->
- Reply = {dw, Idx, Obj, ReqID};
+ Reply = {dw, Idx, Fake, ReqID};
false ->
Reply = {dw, Idx, ReqID}
end;
@@ -1211,7 +1213,12 @@ do_get(_Sender, BKey, ReqID,
do_get_term({Bucket, Key}, Mod, ModState) ->
case do_get_object(Bucket, Key, Mod, ModState) of
{ok, Obj, _UpdModState} ->
- {ok, Obj};
+ case riak_kv_mutator:mutate_get(Obj) of
+ notfound ->
+ {error, notfound};
+ Obj2 ->
+ {ok, Obj2}
+ end;
%% @TODO Eventually it would be good to
%% make the use of not_found or notfound
%% consistent throughout the code.
@@ -1504,7 +1511,7 @@ get_hashtree_token() ->
-spec max_hashtree_tokens() -> pos_integer().
max_hashtree_tokens() ->
app_helper:get_env(riak_kv,
- anti_entropy_max_async,
+ anti_entropy_max_async,
?DEFAULT_HASHTREE_TOKENS).
%% @private
@@ -1654,7 +1661,7 @@ handoff_data_encoding_method() ->
%% its encoding method, but if that fails we use the legacy zlib and protocol buffer decoding:
decode_binary_object(BinaryObject) ->
try binary_to_term(BinaryObject) of
- { Method, BinObj } ->
+ { Method, BinObj } ->
case Method of
encode_raw -> {B, K, Val} = BinObj,
BKey = {B, K},
@@ -1670,7 +1677,7 @@ decode_binary_object(BinaryObject) ->
%% An exception means we have a legacy handoff object:
catch
_:_ -> do_zlib_decode(BinaryObject)
- end.
+ end.
do_zlib_decode(BinaryObject) ->
DecodedObject = zlib:unzip(BinaryObject),
@@ -1863,11 +1870,13 @@ list_buckets_test_() ->
application:start(folsom),
riak_core_stat_cache:start_link(),
riak_kv_stat:register_stats(),
+ riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
Env
end,
fun(Env) ->
riak_core_ring_manager:cleanup_ets(test),
riak_core_stat_cache:stop(),
+ riak_kv_test_util:stop_process(riak_core_metadata_manager),
application:stop(folsom),
application:stop(sasl),
[application:unset_env(riak_kv, K) ||
@@ -1919,6 +1928,7 @@ list_buckets_test_i(BackendMod) ->
filter_keys_test() ->
riak_core_ring_manager:setup_ets(test),
clean_test_dirs(),
+ riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
{S, B, K} = backend_with_known_key(riak_kv_memory_backend),
Caller1 = new_result_listener(keys),
handle_coverage(?KV_LISTKEYS_REQ{bucket=B,
@@ -1939,6 +1949,7 @@ filter_keys_test() ->
?assertEqual({ok, []}, results_from_listener(Caller3)),
riak_core_ring_manager:cleanup_ets(test),
+ riak_kv_test_util:stop_process(riak_core_metadata_manager),
flush_msgs().
%% include bitcask.hrl for HEADER_SIZE macro
@@ -1948,6 +1959,7 @@ filter_keys_test() ->
%% preparation for a write prevents the write from going through.
bitcask_badcrc_test() ->
riak_core_ring_manager:setup_ets(test),
+ riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]),
clean_test_dirs(),
{S, B, K} = backend_with_known_key(riak_kv_bitcask_backend),
DataDir = filename:join(bitcask_test_dir(), "0"),
@@ -1964,6 +1976,7 @@ bitcask_badcrc_test() ->
{raw, 456, self()},
S),
riak_core_ring_manager:cleanup_ets(test),
+ riak_kv_test_util:stop_process(riak_core_metadata_manager),
flush_msgs().
diff --git a/test/fsm_eqc_util.erl b/test/fsm_eqc_util.erl
index 6e7691b55d..fa968ff2d3 100644
--- a/test/fsm_eqc_util.erl
+++ b/test/fsm_eqc_util.erl
@@ -44,7 +44,7 @@ non_blank_string() ->
%% with utf8 conversion.
lower_char() ->
choose(16#20, 16#7f).
-
+
vclock() ->
?LET(VclockSym, vclock_sym(), eval(VclockSym)).
@@ -80,10 +80,10 @@ maybe_tombstone() ->
%% brother sister otherbrother
%% \ | /
%% current
-%%
+%%
lineage() ->
elements([current, ancestor, brother, sister, otherbrother]).
-
+
merge(ancestor, Lineage) -> Lineage; % order should match Clocks list in riak_objects
merge(Lineage, ancestor) -> Lineage; % as last modified is used as tie breaker with
merge(_, current) -> current; % allow_mult=false
@@ -111,9 +111,9 @@ partvals() ->
not_empty(fsm_eqc_util:longer_list(2, partval())).
%% Generate 5 riak objects with the same bkey
-%%
+%%
riak_objects() ->
- ?LET({{Bucket,Key},AncestorVclock0,Tombstones},
+ ?LET({{Bucket,Key},AncestorVclock0,Tombstones},
{noshrink(bkey()),vclock(),vector(5, maybe_tombstone())},
begin
AncestorVclock = vclock:increment(<<"dad">>, AncestorVclock0),
@@ -144,7 +144,7 @@ add_tombstone(Obj) ->
[{M,V}] = riak_object:get_contents(Obj),
NewM = dict:store(<<"X-Riak-Deleted">>, true, M),
riak_object:set_contents(Obj, [{NewM, V}]).
-
+
some_up_node_status(NumNodes) ->
at_least_one_up(nodes_status(NumNodes)).
@@ -186,6 +186,7 @@ start_mock_servers() ->
start_fake_get_put_monitor(),
riak_core_stat_cache:start_link(),
riak_kv_stat:register_stats(),
+ riak_core_metadata_manager:start_link([{data_dir, "fsm_eqc_test_data"}]),
riak_core_ring_events:start_link(),
riak_core_node_watcher_events:start_link(),
riak_core_node_watcher:start_link(),
@@ -194,6 +195,7 @@ start_mock_servers() ->
cleanup_mock_servers() ->
stop_fake_get_put_monitor(),
+ riak_kv_test_util:stop_process(riak_core_metadata_manager),
application:stop(folsom),
application:stop(riak_core).
diff --git a/test/riak_kv_mutator_tests.erl b/test/riak_kv_mutator_tests.erl
new file mode 100644
index 0000000000..e1028c830e
--- /dev/null
+++ b/test/riak_kv_mutator_tests.erl
@@ -0,0 +1,274 @@
+-module(riak_kv_mutator_tests).
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+
+-export([mutate_put/5, mutate_get/1]).
+
+functionality_test_() ->
+ {foreach, fun() ->
+ purge_data_dir(),
+ {ok, MetadataManagerPid} = riak_core_metadata_manager:start_link([{data_dir, "test_data"}]),
+ {ok, HashtreePid} = riak_core_metadata_hashtree:start_link("test_data"),
+ {HashtreePid, MetadataManagerPid}
+ end,
+ fun({HashtreePid, MetadataManagerPid}) ->
+ unlink(MetadataManagerPid),
+ exit(MetadataManagerPid, kill),
+ unlink(HashtreePid),
+ exit(HashtreePid, kill),
+ Mon = erlang:monitor(process, MetadataManagerPid),
+ receive
+ {'DOWN', Mon, process, MetadataManagerPid, _Why} ->
+ ok
+ end,
+ Mon2 = erlang:monitor(process, HashtreePid),
+ receive
+ {'DOWN', Mon2, process, HashtreePid, _AlsoWhy} ->
+ ok
+ end,
+ purge_data_dir()
+ end, [
+
+ fun(_) -> {"register a mutator", fun() ->
+ Got = riak_kv_mutator:register(fake_module),
+ ?assertEqual(ok, Got)
+ end} end,
+
+ fun(_) -> {"retrieve mutators", fun() ->
+ ok = riak_kv_mutator:register(fake_module),
+ ok = riak_kv_mutator:register(fake_module_2),
+ Got = riak_kv_mutator:get(),
+ ?assertEqual({ok, [fake_module, fake_module_2]}, Got)
+ end} end,
+
+ fun(_) -> {"retrieve an empty list of mutators", fun() ->
+ Got = riak_kv_mutator:get(),
+ ?assertEqual({ok, []}, Got)
+ end} end,
+
+ fun(_) -> {"unregister", fun() ->
+ ok = riak_kv_mutator:register(fake_module),
+ Got1 = riak_kv_mutator:unregister(fake_module),
+ ?assertEqual(ok, Got1),
+ Got2 = riak_kv_mutator:get(),
+ ?assertEqual({ok, []}, Got2)
+ end} end,
+
+ fun(_) -> {"mutator list ordered set", fun() ->
+ Mods = [a,z,c,b],
+ [riak_kv_mutator:register(M) || M <- Mods],
+ Got1 = riak_kv_mutator:get(),
+ Expected = ordsets:from_list(Mods),
+ ?assertEqual({ok, Expected}, Got1)
+ end} end,
+
+ fun(_) -> {"register a mutator with a priority", fun() ->
+ Got1 = riak_kv_mutator:register(fake_module, 3),
+ ?assertEqual(ok, Got1),
+ Got2 = riak_kv_mutator:get(),
+ ?assertEqual({ok, [fake_module]}, Got2)
+ end} end,
+
+ fun(_) -> {"register a mutator twice, differing priorities", fun() ->
+ ok = riak_kv_mutator:register(fake_module, 3),
+ ok = riak_kv_mutator:register(fake_module, 7),
+ Got = riak_kv_mutator:get(),
+ ?assertEqual({ok, [fake_module]}, Got)
+ end} end,
+
+ fun(_) -> {"priority determines mutator order", fun() ->
+ ok = riak_kv_mutator:register(fake_module, 7),
+ ok = riak_kv_mutator:register(fake_module_2, 2),
+ Got = riak_kv_mutator:get(),
+ ?assertEqual({ok, [fake_module_2, fake_module]}, Got)
+ end} end,
+
+ fun(_) -> {"mutate a put", fun() ->
+ Object = riak_object:new(<<"bucket">>, <<"key">>, <<"original_data">>, dict:from_list([{<<"mutations">>, 0}])),
+ riak_kv_mutator:register(?MODULE),
+ {FullMutate, MetaMutates} = riak_kv_mutator:mutate_put(Object, [{<<"bucket_prop">>, <<"bprop">>}]),
+ ExpectedVal = <<"mutatedbprop">>,
+ ExpectedMetaMutations = 1,
+ ?assertEqual(ExpectedVal, riak_object:get_value(FullMutate)),
+ ?assertEqual(ExpectedMetaMutations, dict:fetch(<<"mutations">>, riak_object:get_metadata(MetaMutates)))
+ end} end,
+
+ fun(_) -> {"do not mutate on get if not mutated on put", fun() ->
+ Data = <<"original_data">>,
+ Object = riak_object:new(<<"bucket">>, <<"key">>, Data, dict:from_list([{<<"mutations">>, 0}])),
+ riak_kv_mutator:register(?MODULE),
+ Got = riak_kv_mutator:mutate_get(Object),
+ ?assertEqual(Data, riak_object:get_value(Got)),
+ ?assertEqual(0, dict:fetch(<<"mutations">>, riak_object:get_metadata(Got)))
+
+ end} end,
+
+ fun(_) -> {"mutate a get", fun() ->
+ riak_kv_mutator:register(?MODULE),
+ Object = riak_object:new(<<"bucket">>, <<"key">>, <<"original_data">>, dict:from_list([{<<"mutations">>, 0}])),
+ {Object2, _MetaObject} = riak_kv_mutator:mutate_put(Object, [{<<"bucket_prop">>, <<"warble">>}]),
+ Object3 = riak_kv_mutator:mutate_get(Object2),
+ ExpectedVal = <<"mutatedwarble">>,
+ ExpectedMetaMutations = 2,
+ ?assertEqual(ExpectedVal, riak_object:get_value(Object3)),
+ ?assertEqual(ExpectedMetaMutations, dict:fetch(<<"mutations">>, riak_object:get_metadata(Object3)))
+ end} end,
+
+ fun(_) -> {"get mutations are reversed order from put mutators", fun() ->
+ meck:new(m1, [non_strict]),
+ meck:new(m2, [non_strict]),
+ meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} ->
+ N * 2;
+ _ ->
+ 7
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ MetaChanges1 = dict:store(<<"mutations">>, Mutations, MetaChanges),
+ {Meta2, Value, MetaChanges1}
+ end),
+ meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} ->
+ N + 3;
+ _ ->
+ 20
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ MetaChanges1 = dict:store(<<"mutations">>, Mutations, MetaChanges),
+ {Meta2, Value, MetaChanges1}
+ end),
+ meck:expect(m1, mutate_get, fun(Object) ->
+ MetaValues = riak_object:get_contents(Object),
+ MetaValues2 = lists:map(fun({Meta, Value}) ->
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} ->
+ N div 2;
+ _ ->
+ 9
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ {Meta2, Value}
+ end, MetaValues),
+ riak_object:set_contents(Object, MetaValues2)
+ end),
+ meck:expect(m2, mutate_get, fun(Obj) ->
+ Contents = riak_object:get_contents(Obj),
+ Contents2 = lists:map(fun({Meta, Value}) ->
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} ->
+ N - 3;
+ _ ->
+ 77
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ {Meta2, Value}
+ end, Contents),
+ riak_object:set_contents(Obj, Contents2)
+ end),
+ riak_kv_mutator:register(m1),
+ riak_kv_mutator:register(m2),
+ Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])),
+ {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []),
+ Obj3 = riak_kv_mutator:mutate_get(Obj2),
+ Meta = riak_object:get_metadata(Obj3),
+ ?assertEqual({ok, 11}, dict:find(<<"mutations">>, Meta)),
+ meck:unload([m1,m2])
+ end} end,
+
+ fun(_) -> {"notfound is a valid return for a get mutator", fun() ->
+ meck:new(m1, [non_strict]),
+ meck:new(m2, [non_strict]),
+ meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ {Meta, Value, MetaChanges}
+ end),
+ meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ {Meta, Value, MetaChanges}
+ end),
+ meck:expect(m1, mutate_get, fun(Object) ->
+ Object
+ end),
+ meck:expect(m2, mutate_get, fun(_Obj) ->
+ notfound
+ end),
+ riak_kv_mutator:register(m1),
+ riak_kv_mutator:register(m2),
+ Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])),
+ {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []),
+ Obj3 = riak_kv_mutator:mutate_get(Obj2),
+ ?assertEqual(notfound, Obj3),
+ meck:unload([m1,m2])
+ end} end,
+
+ fun(_) -> {"notfound prevents other mutators being called", fun() ->
+ meck:new(m1, [non_strict]),
+ meck:new(m2, [non_strict]),
+ meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ {Meta, Value, MetaChanges}
+ end),
+ meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) ->
+ {Meta, Value, MetaChanges}
+ end),
+ meck:expect(m1, mutate_get, fun(Object) ->
+ Contents = riak_object:get_contents(Object),
+ Contents2 = lists:map(fun({Meta, Value}) ->
+ Meta2 = dict:store(mutated, true, Meta),
+ {Meta2, Value}
+ end, Contents),
+ riak_object:set_contents(Object, Contents2)
+ end),
+ meck:expect(m2, mutate_get, fun(_Obj) ->
+ notfound
+ end),
+ riak_kv_mutator:register(m1),
+ riak_kv_mutator:register(m2),
+ Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])),
+ {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []),
+ Obj3 = riak_kv_mutator:mutate_get(Obj2),
+ ?assertEqual(notfound, Obj3),
+ meck:unload([m1,m2])
+ end} end
+
+ ]}.
+
+purge_data_dir() ->
+ {ok, CWD} = file:get_cwd(),
+ DataDir = filename:join(CWD, "test_data"),
+ DataFiles = filename:join([DataDir, "*"]),
+ [file:delete(File) || File <- filelib:wildcard(DataFiles)],
+ file:del_dir(DataDir).
+
+mutate_put(Meta, _Value, ExposedMeta, _Object, BucketProps) ->
+ NewVal = case proplists:get_value(<<"bucket_prop">>, BucketProps) of
+ BProp when is_binary(BProp) ->
+ <<"mutated", BProp/binary>>;
+ _ ->
+ <<"mutated">>
+ end,
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} when is_integer(N) ->
+ N + 1;
+ _ ->
+ 0
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ RevealedMeta2 = dict:store(<<"mutations">>, Mutations, ExposedMeta),
+ {Meta2, NewVal, RevealedMeta2}.
+
+mutate_get(Object) ->
+ MetaValues = riak_object:get_contents(Object),
+ MetaValues2 = lists:map(fun({Meta, Value}) ->
+ Mutations = case dict:find(<<"mutations">>, Meta) of
+ {ok, N} when is_integer(N) ->
+ N + 1;
+ _ ->
+ 0
+ end,
+ Meta2 = dict:store(<<"mutations">>, Mutations, Meta),
+ {Meta2, Value}
+ end, MetaValues),
+ riak_object:set_contents(Object, MetaValues2).
+
+-endif.
\ No newline at end of file