diff --git a/src/riak_kv_app.erl b/src/riak_kv_app.erl index 498dd561e2..b20a0f8098 100644 --- a/src/riak_kv_app.erl +++ b/src/riak_kv_app.erl @@ -187,6 +187,9 @@ start(_Type, _StartArgs) -> [enabled, disabled], disabled), + % get/put mutator is supported, but not all nodes may have it + riak_core_capability:register({riak_kv, mutators}, [true, false], false), + HealthCheckOn = app_helper:get_env(riak_kv, enable_health_checks, false), %% Go ahead and mark the riak_kv service as up in the node watcher. %% The riak_core_ring_handler blocks until all vnodes have been started diff --git a/src/riak_kv_mutator.erl b/src/riak_kv_mutator.erl new file mode 100644 index 0000000000..499d283542 --- /dev/null +++ b/src/riak_kv_mutator.erl @@ -0,0 +1,247 @@ +%% ------------------------------------------------------------------- +%% +%% riak_kv_mutators - Storage and retrieval for get/put mutation +%% functions +%% +%% Copyright (c) 2013 Basho Technologies, Inc. All Rights Reserved. +%% +%% This file is provided to you under the Apache License, +%% Version 2.0 (the "License"); you may not use this file +%% except in compliance with the License. You may obtain +%% a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, +%% software distributed under the License is distributed on an +%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +%% KIND, either express or implied. See the License for the +%% specific language governing permissions and limitations +%% under the License. +%% +%% ------------------------------------------------------------------- + +%% @doc There are circumstances where the object stored on disk is not +%% the object to return; and there are times the object written to the +%% data storage backend is not meant to be the object given. An +%% example would be storing only meta data for an object on a remote +%% cluster. This module is an interface to register mutators that will +%% be run. +%% +%% This doubles as a behavior defining module for the mutators. +%% +%% == Words of Warning == +%% +%% Once an object has been stored with a given mutator list, attempting to +%% retreive that object will require those mutators. If the mutators done't +%% exists or have changed in a non-backwards compatible way, you can expect +%% to have at worst a crash, or at best a corrupted object. +%% +%% == Callbacks == +%% +%% A mutator callback must implement 2 function: mutate_put/5 and mutate_get/1. +%% +%% mutate_put(MetaData, Value, ExposedMeta, +%% FullObject, BucketProperties) -> Result +%% +%% Types: +%% ``` +%% MetaData = dict() +%% Value = term() +%% ExposedMeta = dict() +%% FullObject = riak_object:riak_object() +%% BucketProperties = orddict:orddict() +%% Result = {NewMeta, NewValue, NewExposedMeta} +%% NewMeta = dict() +%% NewValue = term() +%% NewExposedMeta = dict()''' +%% +%% The mutate_put callback is called for each metadata/value pair a riak_object +%% has. The return value of NewMeta and NewValue are used by the storage backend +%% while the NewExposedMeta is used for the client return where NewMeta would +%% normally. The NewExposedMeta is merged with the NewMeta to generate the +%% exposed metadata; if the same key is found, the NewExposedMeta value is used. +%% +%% The mutations are run in the same process as the vnode. +%% +%% mutate_get(Object) -> Result +%% +%% Types: +%% ``` +%% Object = riak_object:riak_object() +%% Result = riak_object:riak_object() | 'notfound' +%% ''' +%% Take the object from storage and reverse whatever mutation was applied. Note +%% the bucket properties are not part of this callback, so if some data is +%% important to reverse a mutation, it must be put in the metadata by the +%% `mutate_put' function. Also note how the entire object is given as opposed to +%% simply a metadata/value pair. Care must be taken not to corrupt the object. +%% +%% A return of ``'notfound''' stops the mutator chain and returns immediately. This +%% provides an escape hatch of sorts; if the mutator cannot reverse the mutation +%% effectively, return ``'notfound'''. + +-module(riak_kv_mutator). + +-export([register/1, register/2, unregister/1]). +-export([get/0]). +-export([mutate_put/2, mutate_get/1]). + +-callback mutate_put(Meta :: dict(), Value :: any(), ExposedMeta :: dict(), FullObject :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {dict(), any(), dict()}. +-callback mutate_get(FullObject :: riak_object:riak_object()) -> riak_object:riak_object() | 'notfound'. + +-define(DEFAULT_PRIORITY, 0). + +%% @doc Register the given module as a mutator with the default priority of 0. +%% @see register/2 +-spec register(Module :: atom()) -> 'ok'. +register(Module) -> + ?MODULE:register(Module, ?DEFAULT_PRIORITY). + +%% @doc Register a module as a mutator with the given priority. Modules with +%% equal priority are done in default (alphabetical) order. A module +%% can only be registered once. When there is a conflict (two different +%% lists), those lists are merged. Note that if an object is stored with +%% a mutator, that mutator is used on retrieval. If that mutator code is +%% removed or changed in a backwards incompatible manner, at best the +%% object will be corrupted; at worst it will cause a crash. +-spec register(Module :: atom(), Priority :: term()) -> 'ok'. +register(Module, Priority) -> + Modifier = fun + (undefined) -> + [{Priority, Module}]; + (Values) -> + Values2 = merge_values(Values), + insert_mutator(Module, Priority, Values2) + end, + riak_core_metadata:put({riak_kv, mutators}, list, Modifier). + +%% @doc Remove a module from the mutator list. Removing a mutator from the +%% list does not remove the mutator from use for retreiving objects. Any +%% object that was stored while the mutator was registered will use that +%% mutator on get. Thus, if the code for a mutator is not available or +%% was changed in a non-backwards compatible way, at best one can expect +%% corrupt objects, at worst a crash. +-spec unregister(Module :: atom()) -> 'ok'. +unregister(Module) -> + Modifier = fun + (undefined) -> + []; + (Values) -> + Values2 = merge_values(Values), + lists:keydelete(Module, 2, Values2) + end, + riak_core_metadata:put({riak_kv, mutators}, list, Modifier, []). + +%% @doc Retrieve the list of mutators in the order to apply them when doing a +%% a put mutation. To get the order when doing a get mutation, reverse the list. +-spec get() -> [atom()]. +get() -> + Resolver = fun + ('$deleted', '$deleted') -> + []; + ('$deleted', Values) -> + Values; + (Values, '$deleted') -> + Values; + (Values, Values) -> + Values; + (Values1, Values2) -> + merge_values([Values1, Values2]) + end, + ModulesAndPriors = riak_core_metadata:get({riak_kv, mutators}, list, [{default, []}, {resolver, Resolver}]), + Modules = [M || {_P, M} <- ModulesAndPriors], + {ok, Modules}. + +%% @doc Unmutate an object after retrieval from storage. When an object is +%% mutated, the mutators applied are put into the object's metadata. +%% If the mutator does not exist anymore or has changed in a backwards +%% incompatible manner, at best there will be corrupt objects, at worst +%% a crash. +-spec mutate_get(Object :: riak_object:riak_object()) -> riak_object:riak_object(). +mutate_get(Object) -> + [Meta | _] = riak_object:get_metadatas(Object), + case dict:find(mutators_applied, Meta) of + error -> + Object; + {ok, Applied} -> + DeMutateOrder = lists:reverse(Applied), + mutate_get(Object, DeMutateOrder) + end. + +mutate_get(Object, []) -> + Object; +mutate_get(Object, [Mutator | Tail]) -> + % so event though the mutate_put callback has to return + % {Meta, Value, Exposed} values, the get callback gets away with just + % giving the object? This is to avoid complicated interaction with + % notfound return. + case Mutator:mutate_get(Object) of + notfound -> + notfound; + Object2 -> + mutate_get(Object2, Tail) + end. + +%% @doc Mutate an object in preparation to storage, returning a tuple of the +%% object to store and the object to return to the client. For each sibling +%% the object has {Meta, Value} pair, each mutator is called with a copy +%% that iteration's Meta used as the exposed meta." Later mutators are +%% given the results of previous mutators. Once all mutations are complete, +%% two {@link riak_object:riak_object()}s are returned. The first is what +%% is to be stored, while the second has the exposed meta set with the +%% orginal value(s). +-spec mutate_put(Object :: riak_object:riak_object(), BucketProps :: orddict:orddict()) -> {riak_object:riak_object(), riak_object:riak_object()}. +mutate_put(Object, BucketProps) -> + Contents = riak_object:get_contents(Object), + {ok, Modules} = ?MODULE:get(), + MetasValuesRevealeds = lists:map(fun({InMeta, InValue}) -> + {InMeta1, InValue1, InRevealed} = lists:foldl(fun(Module, {InInMeta, InInValue, InInRevealed}) -> + % why not just give the riak_object? because of a warning + % in riak_object stating that set_contents is for internal + % use only. Hopefully this qualifies. + Module:mutate_put(InInMeta, InInValue, InInRevealed, Object, BucketProps) + end, {InMeta, InValue, dict:new()}, Modules), + InMeta2 = dict:store(mutators_applied, Modules, InMeta1), + {InMeta2, InValue1, InRevealed} + end, Contents), + Contents2 = [{M,V} || {M,V,_R} <- MetasValuesRevealeds], + Mutated = riak_object:set_contents(Object, Contents2), + FakedContents = lists:map(fun({InMeta, InContent, InRevealed}) -> + FixedMeta = dict:merge(fun(_Key, _NotMutated, MutatedVal) -> + MutatedVal + end, InMeta, InRevealed), + {FixedMeta, InContent} + end, MetasValuesRevealeds), + Faked = riak_object:set_contents(Object, FakedContents), + {Mutated, Faked}. + +merge_values([]) -> + []; + +merge_values(Values) -> + case lists:filter(fun erlang:is_list/1, Values) of + [] -> + []; + [Head | Tail] -> + merge_values(Tail, Head) + end. + +merge_values([], Acc) -> + Acc; + +merge_values([{Priority, Module} = Mutator | Tail], Acc) -> + Acc2 = case lists:keyfind(Module, 2, Acc) of + false -> + ordsets:add_element(Mutator, Acc); + {P1, _} when P1 < Priority -> + Acc; + Else -> + ordsets:add_element(Mutator, ordsets:del_element(Else)) + end, + merge_values(Tail, Acc2). + +insert_mutator(Module, Priority, Mutators) -> + Mutators2 = lists:keydelete(Module, 2, Mutators), + ordsets:add_element({Priority, Module}, Mutators2). + diff --git a/src/riak_kv_vnode.erl b/src/riak_kv_vnode.erl index 10c1105c06..4940928a2c 100644 --- a/src/riak_kv_vnode.erl +++ b/src/riak_kv_vnode.erl @@ -1104,15 +1104,17 @@ perform_put({true, Obj}, modstate=ModState}=State, #putargs{returnbody=RB, bkey={Bucket, Key}, + bprops=BProps, reqid=ReqID, index_specs=IndexSpecs}) -> - case encode_and_put(Obj, Mod, Bucket, Key, IndexSpecs, ModState) of + {Obj2, Fake} = riak_kv_mutator:mutate_put(Obj, BProps), + case encode_and_put(Obj2, Mod, Bucket, Key, IndexSpecs, ModState) of {{ok, UpdModState}, EncodedVal} -> update_hashtree(Bucket, Key, EncodedVal, State), ?INDEX(Obj, put, Idx), case RB of true -> - Reply = {dw, Idx, Obj, ReqID}; + Reply = {dw, Idx, Fake, ReqID}; false -> Reply = {dw, Idx, ReqID} end; @@ -1211,7 +1213,12 @@ do_get(_Sender, BKey, ReqID, do_get_term({Bucket, Key}, Mod, ModState) -> case do_get_object(Bucket, Key, Mod, ModState) of {ok, Obj, _UpdModState} -> - {ok, Obj}; + case riak_kv_mutator:mutate_get(Obj) of + notfound -> + {error, notfound}; + Obj2 -> + {ok, Obj2} + end; %% @TODO Eventually it would be good to %% make the use of not_found or notfound %% consistent throughout the code. @@ -1504,7 +1511,7 @@ get_hashtree_token() -> -spec max_hashtree_tokens() -> pos_integer(). max_hashtree_tokens() -> app_helper:get_env(riak_kv, - anti_entropy_max_async, + anti_entropy_max_async, ?DEFAULT_HASHTREE_TOKENS). %% @private @@ -1654,7 +1661,7 @@ handoff_data_encoding_method() -> %% its encoding method, but if that fails we use the legacy zlib and protocol buffer decoding: decode_binary_object(BinaryObject) -> try binary_to_term(BinaryObject) of - { Method, BinObj } -> + { Method, BinObj } -> case Method of encode_raw -> {B, K, Val} = BinObj, BKey = {B, K}, @@ -1670,7 +1677,7 @@ decode_binary_object(BinaryObject) -> %% An exception means we have a legacy handoff object: catch _:_ -> do_zlib_decode(BinaryObject) - end. + end. do_zlib_decode(BinaryObject) -> DecodedObject = zlib:unzip(BinaryObject), @@ -1863,11 +1870,13 @@ list_buckets_test_() -> application:start(folsom), riak_core_stat_cache:start_link(), riak_kv_stat:register_stats(), + riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]), Env end, fun(Env) -> riak_core_ring_manager:cleanup_ets(test), riak_core_stat_cache:stop(), + riak_kv_test_util:stop_process(riak_core_metadata_manager), application:stop(folsom), application:stop(sasl), [application:unset_env(riak_kv, K) || @@ -1919,6 +1928,7 @@ list_buckets_test_i(BackendMod) -> filter_keys_test() -> riak_core_ring_manager:setup_ets(test), clean_test_dirs(), + riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]), {S, B, K} = backend_with_known_key(riak_kv_memory_backend), Caller1 = new_result_listener(keys), handle_coverage(?KV_LISTKEYS_REQ{bucket=B, @@ -1939,6 +1949,7 @@ filter_keys_test() -> ?assertEqual({ok, []}, results_from_listener(Caller3)), riak_core_ring_manager:cleanup_ets(test), + riak_kv_test_util:stop_process(riak_core_metadata_manager), flush_msgs(). %% include bitcask.hrl for HEADER_SIZE macro @@ -1948,6 +1959,7 @@ filter_keys_test() -> %% preparation for a write prevents the write from going through. bitcask_badcrc_test() -> riak_core_ring_manager:setup_ets(test), + riak_core_metadata_manager:start_link([{data_dir, "kv_vnode_test_meta"}]), clean_test_dirs(), {S, B, K} = backend_with_known_key(riak_kv_bitcask_backend), DataDir = filename:join(bitcask_test_dir(), "0"), @@ -1964,6 +1976,7 @@ bitcask_badcrc_test() -> {raw, 456, self()}, S), riak_core_ring_manager:cleanup_ets(test), + riak_kv_test_util:stop_process(riak_core_metadata_manager), flush_msgs(). diff --git a/test/fsm_eqc_util.erl b/test/fsm_eqc_util.erl index 6e7691b55d..fa968ff2d3 100644 --- a/test/fsm_eqc_util.erl +++ b/test/fsm_eqc_util.erl @@ -44,7 +44,7 @@ non_blank_string() -> %% with utf8 conversion. lower_char() -> choose(16#20, 16#7f). - + vclock() -> ?LET(VclockSym, vclock_sym(), eval(VclockSym)). @@ -80,10 +80,10 @@ maybe_tombstone() -> %% brother sister otherbrother %% \ | / %% current -%% +%% lineage() -> elements([current, ancestor, brother, sister, otherbrother]). - + merge(ancestor, Lineage) -> Lineage; % order should match Clocks list in riak_objects merge(Lineage, ancestor) -> Lineage; % as last modified is used as tie breaker with merge(_, current) -> current; % allow_mult=false @@ -111,9 +111,9 @@ partvals() -> not_empty(fsm_eqc_util:longer_list(2, partval())). %% Generate 5 riak objects with the same bkey -%% +%% riak_objects() -> - ?LET({{Bucket,Key},AncestorVclock0,Tombstones}, + ?LET({{Bucket,Key},AncestorVclock0,Tombstones}, {noshrink(bkey()),vclock(),vector(5, maybe_tombstone())}, begin AncestorVclock = vclock:increment(<<"dad">>, AncestorVclock0), @@ -144,7 +144,7 @@ add_tombstone(Obj) -> [{M,V}] = riak_object:get_contents(Obj), NewM = dict:store(<<"X-Riak-Deleted">>, true, M), riak_object:set_contents(Obj, [{NewM, V}]). - + some_up_node_status(NumNodes) -> at_least_one_up(nodes_status(NumNodes)). @@ -186,6 +186,7 @@ start_mock_servers() -> start_fake_get_put_monitor(), riak_core_stat_cache:start_link(), riak_kv_stat:register_stats(), + riak_core_metadata_manager:start_link([{data_dir, "fsm_eqc_test_data"}]), riak_core_ring_events:start_link(), riak_core_node_watcher_events:start_link(), riak_core_node_watcher:start_link(), @@ -194,6 +195,7 @@ start_mock_servers() -> cleanup_mock_servers() -> stop_fake_get_put_monitor(), + riak_kv_test_util:stop_process(riak_core_metadata_manager), application:stop(folsom), application:stop(riak_core). diff --git a/test/riak_kv_mutator_tests.erl b/test/riak_kv_mutator_tests.erl new file mode 100644 index 0000000000..e1028c830e --- /dev/null +++ b/test/riak_kv_mutator_tests.erl @@ -0,0 +1,274 @@ +-module(riak_kv_mutator_tests). + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +-export([mutate_put/5, mutate_get/1]). + +functionality_test_() -> + {foreach, fun() -> + purge_data_dir(), + {ok, MetadataManagerPid} = riak_core_metadata_manager:start_link([{data_dir, "test_data"}]), + {ok, HashtreePid} = riak_core_metadata_hashtree:start_link("test_data"), + {HashtreePid, MetadataManagerPid} + end, + fun({HashtreePid, MetadataManagerPid}) -> + unlink(MetadataManagerPid), + exit(MetadataManagerPid, kill), + unlink(HashtreePid), + exit(HashtreePid, kill), + Mon = erlang:monitor(process, MetadataManagerPid), + receive + {'DOWN', Mon, process, MetadataManagerPid, _Why} -> + ok + end, + Mon2 = erlang:monitor(process, HashtreePid), + receive + {'DOWN', Mon2, process, HashtreePid, _AlsoWhy} -> + ok + end, + purge_data_dir() + end, [ + + fun(_) -> {"register a mutator", fun() -> + Got = riak_kv_mutator:register(fake_module), + ?assertEqual(ok, Got) + end} end, + + fun(_) -> {"retrieve mutators", fun() -> + ok = riak_kv_mutator:register(fake_module), + ok = riak_kv_mutator:register(fake_module_2), + Got = riak_kv_mutator:get(), + ?assertEqual({ok, [fake_module, fake_module_2]}, Got) + end} end, + + fun(_) -> {"retrieve an empty list of mutators", fun() -> + Got = riak_kv_mutator:get(), + ?assertEqual({ok, []}, Got) + end} end, + + fun(_) -> {"unregister", fun() -> + ok = riak_kv_mutator:register(fake_module), + Got1 = riak_kv_mutator:unregister(fake_module), + ?assertEqual(ok, Got1), + Got2 = riak_kv_mutator:get(), + ?assertEqual({ok, []}, Got2) + end} end, + + fun(_) -> {"mutator list ordered set", fun() -> + Mods = [a,z,c,b], + [riak_kv_mutator:register(M) || M <- Mods], + Got1 = riak_kv_mutator:get(), + Expected = ordsets:from_list(Mods), + ?assertEqual({ok, Expected}, Got1) + end} end, + + fun(_) -> {"register a mutator with a priority", fun() -> + Got1 = riak_kv_mutator:register(fake_module, 3), + ?assertEqual(ok, Got1), + Got2 = riak_kv_mutator:get(), + ?assertEqual({ok, [fake_module]}, Got2) + end} end, + + fun(_) -> {"register a mutator twice, differing priorities", fun() -> + ok = riak_kv_mutator:register(fake_module, 3), + ok = riak_kv_mutator:register(fake_module, 7), + Got = riak_kv_mutator:get(), + ?assertEqual({ok, [fake_module]}, Got) + end} end, + + fun(_) -> {"priority determines mutator order", fun() -> + ok = riak_kv_mutator:register(fake_module, 7), + ok = riak_kv_mutator:register(fake_module_2, 2), + Got = riak_kv_mutator:get(), + ?assertEqual({ok, [fake_module_2, fake_module]}, Got) + end} end, + + fun(_) -> {"mutate a put", fun() -> + Object = riak_object:new(<<"bucket">>, <<"key">>, <<"original_data">>, dict:from_list([{<<"mutations">>, 0}])), + riak_kv_mutator:register(?MODULE), + {FullMutate, MetaMutates} = riak_kv_mutator:mutate_put(Object, [{<<"bucket_prop">>, <<"bprop">>}]), + ExpectedVal = <<"mutatedbprop">>, + ExpectedMetaMutations = 1, + ?assertEqual(ExpectedVal, riak_object:get_value(FullMutate)), + ?assertEqual(ExpectedMetaMutations, dict:fetch(<<"mutations">>, riak_object:get_metadata(MetaMutates))) + end} end, + + fun(_) -> {"do not mutate on get if not mutated on put", fun() -> + Data = <<"original_data">>, + Object = riak_object:new(<<"bucket">>, <<"key">>, Data, dict:from_list([{<<"mutations">>, 0}])), + riak_kv_mutator:register(?MODULE), + Got = riak_kv_mutator:mutate_get(Object), + ?assertEqual(Data, riak_object:get_value(Got)), + ?assertEqual(0, dict:fetch(<<"mutations">>, riak_object:get_metadata(Got))) + + end} end, + + fun(_) -> {"mutate a get", fun() -> + riak_kv_mutator:register(?MODULE), + Object = riak_object:new(<<"bucket">>, <<"key">>, <<"original_data">>, dict:from_list([{<<"mutations">>, 0}])), + {Object2, _MetaObject} = riak_kv_mutator:mutate_put(Object, [{<<"bucket_prop">>, <<"warble">>}]), + Object3 = riak_kv_mutator:mutate_get(Object2), + ExpectedVal = <<"mutatedwarble">>, + ExpectedMetaMutations = 2, + ?assertEqual(ExpectedVal, riak_object:get_value(Object3)), + ?assertEqual(ExpectedMetaMutations, dict:fetch(<<"mutations">>, riak_object:get_metadata(Object3))) + end} end, + + fun(_) -> {"get mutations are reversed order from put mutators", fun() -> + meck:new(m1, [non_strict]), + meck:new(m2, [non_strict]), + meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} -> + N * 2; + _ -> + 7 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + MetaChanges1 = dict:store(<<"mutations">>, Mutations, MetaChanges), + {Meta2, Value, MetaChanges1} + end), + meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} -> + N + 3; + _ -> + 20 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + MetaChanges1 = dict:store(<<"mutations">>, Mutations, MetaChanges), + {Meta2, Value, MetaChanges1} + end), + meck:expect(m1, mutate_get, fun(Object) -> + MetaValues = riak_object:get_contents(Object), + MetaValues2 = lists:map(fun({Meta, Value}) -> + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} -> + N div 2; + _ -> + 9 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + {Meta2, Value} + end, MetaValues), + riak_object:set_contents(Object, MetaValues2) + end), + meck:expect(m2, mutate_get, fun(Obj) -> + Contents = riak_object:get_contents(Obj), + Contents2 = lists:map(fun({Meta, Value}) -> + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} -> + N - 3; + _ -> + 77 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + {Meta2, Value} + end, Contents), + riak_object:set_contents(Obj, Contents2) + end), + riak_kv_mutator:register(m1), + riak_kv_mutator:register(m2), + Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])), + {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []), + Obj3 = riak_kv_mutator:mutate_get(Obj2), + Meta = riak_object:get_metadata(Obj3), + ?assertEqual({ok, 11}, dict:find(<<"mutations">>, Meta)), + meck:unload([m1,m2]) + end} end, + + fun(_) -> {"notfound is a valid return for a get mutator", fun() -> + meck:new(m1, [non_strict]), + meck:new(m2, [non_strict]), + meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + {Meta, Value, MetaChanges} + end), + meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + {Meta, Value, MetaChanges} + end), + meck:expect(m1, mutate_get, fun(Object) -> + Object + end), + meck:expect(m2, mutate_get, fun(_Obj) -> + notfound + end), + riak_kv_mutator:register(m1), + riak_kv_mutator:register(m2), + Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])), + {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []), + Obj3 = riak_kv_mutator:mutate_get(Obj2), + ?assertEqual(notfound, Obj3), + meck:unload([m1,m2]) + end} end, + + fun(_) -> {"notfound prevents other mutators being called", fun() -> + meck:new(m1, [non_strict]), + meck:new(m2, [non_strict]), + meck:expect(m1, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + {Meta, Value, MetaChanges} + end), + meck:expect(m2, mutate_put, fun(Meta, Value, MetaChanges, _Object, _Props) -> + {Meta, Value, MetaChanges} + end), + meck:expect(m1, mutate_get, fun(Object) -> + Contents = riak_object:get_contents(Object), + Contents2 = lists:map(fun({Meta, Value}) -> + Meta2 = dict:store(mutated, true, Meta), + {Meta2, Value} + end, Contents), + riak_object:set_contents(Object, Contents2) + end), + meck:expect(m2, mutate_get, fun(_Obj) -> + notfound + end), + riak_kv_mutator:register(m1), + riak_kv_mutator:register(m2), + Obj = riak_object:new(<<"bucket">>, <<"key">>, <<"data">>, dict:from_list([{<<"mutations">>, 11}])), + {Obj2, _Faked} = riak_kv_mutator:mutate_put(Obj, []), + Obj3 = riak_kv_mutator:mutate_get(Obj2), + ?assertEqual(notfound, Obj3), + meck:unload([m1,m2]) + end} end + + ]}. + +purge_data_dir() -> + {ok, CWD} = file:get_cwd(), + DataDir = filename:join(CWD, "test_data"), + DataFiles = filename:join([DataDir, "*"]), + [file:delete(File) || File <- filelib:wildcard(DataFiles)], + file:del_dir(DataDir). + +mutate_put(Meta, _Value, ExposedMeta, _Object, BucketProps) -> + NewVal = case proplists:get_value(<<"bucket_prop">>, BucketProps) of + BProp when is_binary(BProp) -> + <<"mutated", BProp/binary>>; + _ -> + <<"mutated">> + end, + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} when is_integer(N) -> + N + 1; + _ -> + 0 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + RevealedMeta2 = dict:store(<<"mutations">>, Mutations, ExposedMeta), + {Meta2, NewVal, RevealedMeta2}. + +mutate_get(Object) -> + MetaValues = riak_object:get_contents(Object), + MetaValues2 = lists:map(fun({Meta, Value}) -> + Mutations = case dict:find(<<"mutations">>, Meta) of + {ok, N} when is_integer(N) -> + N + 1; + _ -> + 0 + end, + Meta2 = dict:store(<<"mutations">>, Mutations, Meta), + {Meta2, Value} + end, MetaValues), + riak_object:set_contents(Object, MetaValues2). + +-endif. \ No newline at end of file