Skip to content

Commit

Permalink
Update src/riak_kv_mrc_pipe.erl
Browse files Browse the repository at this point in the history
Reduce phase must be started on live node.
  • Loading branch information
dreyk authored and beerriot committed Feb 2, 2013
1 parent 480077a commit 58040e8
Showing 1 changed file with 15 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/riak_kv_mrc_pipe.erl
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,12 @@ mr2pipe_phases([]) ->
mr2pipe_phases(Query) ->
%% now() is used as a random hash to choose which vnode to collect
%% the reduce inputs
Now = now(),
ConstantHashCookie = const_hash_cookie(),

%% first convert phase
QueryT = list_to_tuple(Query),
Numbered = lists:zip(Query, lists:seq(0, length(Query)-1)),
Fittings0 = lists:flatten([mr2pipe_phase(P,I,Now,QueryT) ||
Fittings0 = lists:flatten([mr2pipe_phase(P,I,ConstantHashCookie,QueryT) ||
{P,I} <- Numbered]),

%% clean up naive 'keep' translationg
Expand Down Expand Up @@ -460,8 +460,7 @@ query_type(Idx, QueryT) ->
-spec reduce2pipe(reduce_query_fun(), term(), boolean(),
Index :: integer(), ConstantHashSeed :: term()) ->
[ riak_pipe:fitting_spec() ].
reduce2pipe(FunSpec, Arg, Keep, I, ConstHashCookie) ->
Hash = chash:key_of(ConstHashCookie),
reduce2pipe(FunSpec, Arg, Keep, I, Hash) ->
[#fitting_spec{name={reduce,I},
module=riak_kv_w_reduce,
arg={rct,
Expand Down Expand Up @@ -1038,3 +1037,15 @@ compat_fun(66856669, 6, Fun) ->
%% dunno
compat_fun(_, _, _) ->
error.

const_hash_cookie()->
Random = chash:key_of(now()),
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
NodeCount = length(riak_core_ring:all_members(Ring)),
case riak_core_apl:get_primary_apl(Random,NodeCount,riak_pipe) of
[{{Partition, _Node},_Type}|_]->
riak_pipe_vnode:hash_for_partition(Partition);
_->
%%TODO: May be return some error?
Random
end.

0 comments on commit 58040e8

Please sign in to comment.