diff --git a/src/yz_kv.erl b/src/yz_kv.erl index 8cfe7213..e7b97031 100644 --- a/src/yz_kv.erl +++ b/src/yz_kv.erl @@ -26,6 +26,10 @@ -include_lib("riak_core/include/riak_core_bucket_type.hrl"). -include("yokozuna.hrl"). +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. + -type write_reason() :: delete | handoff | put | anti_entropy. @@ -82,7 +86,7 @@ get(C, Bucket, Key) -> Other end. -%% @doc calculates the hash of a riak object, returns binary +%% @doc calculates the hash of a riak object, returns binary -spec hash_object(riak_object:riak_object()) -> binary(). hash_object(Obj) -> Vclock = riak_object:vclock(Obj), @@ -252,8 +256,7 @@ index(_, delete, _, P, BKey, ShortPL, Index) -> ok = yz_solr:delete(Index, [{bkey, BKey}]), ok = update_hashtree(delete, P, ShortPL, BKey), ok; - -index(Obj, _Reason, Ring, P, BKey, ShortPL, Index) -> +index(Obj, Reason, Ring, P, BKey, ShortPL, Index) -> case riak_object:get_values(Obj) of [notfound] -> ok = index(Obj, delete, Ring, P, BKey, ShortPL, Index); @@ -263,8 +266,8 @@ index(Obj, _Reason, Ring, P, BKey, ShortPL, Index) -> LP = yz_cover:logical_partition(LI, P), Hash = hash_object(Obj), Docs = yz_doc:make_docs(Obj, Hash, ?INT_TO_BIN(LFPN), ?INT_TO_BIN(LP)), - DelOp = cleanup(length(Docs), {Obj, BKey, LP}), - ok = yz_solr:index(Index, Docs, DelOp), + ok = yz_solr:index(Index, Docs, delete_operation(Obj, Reason, Docs, + BKey, LP)), ok = update_hashtree({insert, Hash}, P, ShortPL, BKey) end. @@ -460,3 +463,133 @@ is_owner_or_future_owner(P, Node, Ring) -> -spec is_service_up(atom(), node()) -> boolean(). is_service_up(Service, Node) -> lists:member(Service, riak_core_node_watcher:services(Node)). + +%% @private +%% +%% @doc Check if object has 2.0 CRDT datatype entry or property for +%% strong consistency. +-spec is_datatype_or_consistent(obj()) -> boolean()|{error, _}. +is_datatype_or_consistent(Obj) -> + Bucket = riak_object:bucket(Obj), + case riak_core_bucket:get_bucket(Bucket) of + BProps when is_list(BProps) -> + is_datatype(BProps) orelse lists:member({consistent, true}, BProps); + {error, _}=Err -> + Err + end. + +%% @private +%% +%% @doc Check if Bucket Properties contain CRDT datatype. +-spec is_datatype(riak_kv_bucket:props()) -> boolean(). +is_datatype(BProps) -> + Type = proplists:get_value(datatype, BProps), + Mod = riak_kv_crdt:to_mod(Type), + riak_kv_crdt:supported(Mod). + +%% @private +%% +%% @doc Set yz_solr:index delete operation(s) on write_reason. +-spec delete_operation(obj(), put|handoff|anti_entropy, [doc()], bkey(), lp()) -> + []|[{id, _}]|[{siblings, _}]. +delete_operation(Obj, put, Docs, BKey, LP) -> + case is_datatype_or_consistent(Obj) of + true -> []; + false -> cleanup(length(Docs), {Obj, BKey, LP}) + end; +delete_operation(Obj, _Reason, Docs, BKey, LP) -> + cleanup(length(Docs), {Obj, BKey, LP}). + + +%%%=================================================================== +%%% Tests +%%%=================================================================== + +-ifdef(TEST). + +is_datatype_or_consistent_test_() -> +{setup, + fun() -> + meck:new(riak_core_capability, []), + meck:expect(riak_core_capability, get, + fun({riak_core, bucket_types}) -> true; + (X) -> meck:passthrough([X]) end), + meck:expect(riak_core_capability, get, + fun({riak_kv, crdt}, []) -> + [pncounter,riak_dt_pncounter,riak_dt_orswot, + riak_dt_map]; + (X, Y) -> meck:passthrough([X, Y]) end), + application:load(riak_core), + application:set_env(riak_core, default_bucket_props, []), + riak_core_ring_events:start_link(), + riak_core_ring_manager:start_link(test), + riak_core_claimant:start_link(), + riak_core_metadata_manager:start_link([]), + riak_core_ring_manager:setup_ets(test), + riak_core_metadata_hashtree:start_link(), + ok + end, + fun(_) -> + process_flag(trap_exit, true), + riak_core_ring_manager:cleanup_ets(test), + catch application:stop(riak_core), + catch(riak_core_metadata_hashtree:stop()), + catch(riak_core_claimant:stop()), + catch(riak_core_ring_manager:stop()), + catch(exit(whereis(riak_core_ring_events), shutdown)), + application:unset_env(riak_core, default_bucket_props), + meck:unload(riak_core_capability) + end, + [ + ?_test(begin + Bucket1 = <<"bucket">>, + BucketType = <<"type">>, + Bucket2 = {BucketType, <<"bucket2">>}, + riak_core_bucket:set_bucket(Bucket1, [{consistent, true}]), + riak_core_bucket_type:create(BucketType, [{consistent, true}]), + riak_core_bucket_type:activate(BucketType), + TypeProps = riak_core_bucket_type:get(BucketType), + ?assert(proplists:get_value(consistent, TypeProps)), + ?assertEqual([{consistent,true}, {name, Bucket1}], + riak_core_bucket:get_bucket(Bucket1)), + BTProps = riak_core_bucket:get_bucket(Bucket2), + ?assert(proplists:get_value(consistent, BTProps)), + ?assertEqual(Bucket2, proplists:get_value(name, BTProps)), + [?assert(is_datatype_or_consistent(riak_object:new(B, K, V))) + || {B, K, V} <- [{Bucket1, <<"k1">>, hi}, + {Bucket2, <<"k2">>, hey}]] + end), + ?_test(begin + BucketType1 = <<"counters">>, + BucketType2 = <<"maps">>, + Bucket1 = {BucketType1, <<"crdt">>}, + Bucket2 = {BucketType2, <<"crdtz">>}, + riak_core_bucket_type:create(BucketType1, [{datatype, counter}]), + riak_core_bucket_type:activate(BucketType1), + riak_core_bucket_type:create(BucketType2, [{datatype, map}]), + riak_core_bucket_type:activate(BucketType2), + BTProps1 = riak_core_bucket:get_bucket(Bucket1), + BTProps2 = riak_core_bucket:get_bucket(Bucket2), + ?assertEqual(counter, proplists:get_value(datatype, BTProps1)), + ?assertEqual(map, proplists:get_value(datatype, BTProps2)), + [?assert(is_datatype_or_consistent(riak_object:new(B, K, V))) + || {B, K, V} <- [{Bucket1, <<"k1">>, hi}, + {Bucket2, <<"k2">>, hey}]] + end), + ?_test(begin + Bucket1 = <<"buckety">>, + BucketType = <<"typey">>, + Bucket2 = {BucketType, <<"bucketjumpy">>}, + riak_core_bucket:set_bucket(Bucket1, []), + riak_core_bucket_type:create(BucketType, []), + riak_core_bucket_type:activate(BucketType), + ?assertEqual([{name, Bucket1}], + riak_core_bucket:get_bucket(Bucket1)), + BTProps = riak_core_bucket:get_bucket(Bucket2), + ?assertEqual(Bucket2, proplists:get_value(name, BTProps)), + [?assertNot(is_datatype_or_consistent(riak_object:new(B, K, V))) + || {B, K, V} <- [{Bucket1, <<"k1">>, hi}, + {Bucket2, <<"k2">>, hey}]] + end)]}. + +-endif.