Skip to content
angrycub edited this page Sep 24, 2014 · 23 revisions

Riak Replication Support

Get leaders across the cluster

rp(riak_core_util:rpc_every_member_ann(riak_core_cluster_mgr, get_leader, [], 5000)).

Pretty print JSON stats

curl -q http://localhost:10018/riak-repl/stats 
curl -q http://localhost:8091/riak-repl/stats | python -mjson.tool

Quickly collect rtq sizes across a cluster

rp(rpc:multicall(erlang, apply, [fun() -> {node(), proplists:get_value(bytes, riak_repl2_rtq:status())} end, []])).

Check the status of the leader process

rp(sys:get_status(riak_repl2_leader_gs)).

Set the leader if it's undefined

riak_core_cluster_mgr:set_leader(riak_repl2_leader:leader_node(), undefined).

*or*

riak_core_cluster_mgr:set_leader('riak@hostname.com', undefined).

Disable cluster member fun

To disable updating list of remote IPs

rpc:multicall(erlang, apply, [fun() -> riak_core_cluster_mgr:register_save_cluster_members_fun(fun(_ClusterName, _Members) -> ok end) end, []]).

To re-enable

rpc:multicall(erlang, apply, [fun() -> riak_core_cluster_mgr:register_save_cluster_members_fun(fun(ClusterName, Members) -> riak_core_ring_manager:ring_trans(fun riak_repl_ring:set_clusterIpAddrs/2, {ClusterName, Members}) end) end, []]).

Kill leaders on every node

(possibly forcing a reelection.)

[exit(P, kill) || P <- element(1, rpc:multicall(riak_repl2_leader, helper_pid, []))].

%%% V2 REPLICATION
Kill = fun() -> exit(whereis(riak_repl_leader_gs), kill) end,
riak_core_util:rpc_every_member_ann(erlang, apply, [Kill, []]).
%% Note, the registered name is different than the module name
%% gs = "gen_server"

Get Cluster Manager Status

rp(sys:get_status(riak_core_cluster_manager)).
rp(rpc:call(riak_repl2_leader:leader_node(), sys, get_status, [riak_core_connection_manager])).

Get Connection Manager Status

%% One node
rp(sys:get_status(riak_core_connection_manager)).

%% All nodes
rp(rpc:multicall(sys, get_status, [riak_core_connection_manager])).

Check realtime sink distribution (on sink clusters)

rpc:multicall(supervisor, which_children, [riak_repl2_rtsink_conn_sup]).

Remove dirty IP's from the Ring

riak_core_ring_manager:ring_trans(fun(Ring, Name) ->
    riak_repl_ring:set_clusterIpAddrs(Ring, {Name, [{IP, Port} || {IP, Port} <- riak_repl_ring:get_clusterIpAddrs(Ring, Name), is_list(IP), is_integer(Port)]}) end, "sink_clustername").

Disable AAE at runtime (not specific to repl)

riak_kv_entropy_manager:disable().
riak_kv_entropy_manager:cancel_exchanges().

Dig out locators

rp(rpc:multicall(erlang, apply, [fun() -> {node(), element(5, element(2, hd(element(2, lists:nth(3, lists:nth(5, element(4, sys:get_status(riak_core_cluster_manager))))))))} end, []])).

Get status of the q if it's timing out

io:format("~s~n", [element(2,process_info(whereis(riak_repl2_rtq), backtrace))]).

Count RT source connections

rp(fun() -> 
    {RTSs, Failed} = riak_core_util:rpc_every_member_ann(riak_repl2_rt, status, [], 10000),
    Active = lists:flatten([[{Node, list_to_binary(proplists:get_value(source, Sink))} || Sink <- proplists:get_value(sinks, RTS)] || {Node, RTS} <- RTSs]),
    Sources = lists:usort([Source || {_N, Source} <- Active]),
    Acc0 = orddict:from_list([{{N, Source}, 0} || {N, _} <- RTSs, Source <- Sources]),
    lists:foldl(fun(X, Acc) -> orddict:update_counter(X, 1, Acc) end, Acc0, Active)
end()).

Analyze RT source connections

c/o @lordnull

rr(riak_repl2_rtsource_conn),
Enabled = riak_repl2_rtsource_conn_sup:enabled(),
lists:map(fun({Remote, Pid}) ->
    io:format("==== ~p @ ~p ====~n", [Remote, Pid]),
    RawStatus = sys:get_status(Pid, 240000),
    rp(RawStatus),
    {status, _Pid, _Module, ProcessData} = RawStatus,
    [_PDict, _, _, _, [_, _, OtherData]] = ProcessData,
    {data, [{"State", State}]} = OtherData,
    HelperPid = State#state.helper_pid,
    case HelperPid of
        undefined ->
            io:format("~p at ~p does not have a helperPid~n", [Remote, Pid]),
            {Remote, Pid, {error, nohelper}};
        _ ->
            try sys:get_status(HelperPid, 240000) of
                HelperStatus ->
                    rp(HelperStatus),
                    {Remote, Pid, HelperStatus}
            catch
                W:Y ->
                    io:format("could not get status for ~p due to ~p:~p~n", [HelperPid, W,Y]),
                    {Remote, Pid, {{W,Y}, HelperPid}}
            end
    end
end, Enabled).

Clean up postcommit hooks set in custom bucket property

Prior to fix for #588, if you had a custom bucket property set for a disabled replication system it would not be removed by the fixup. This script clears out the repl postcommit hooks from any customer buckets.

riak_core_ring_manager:ring_trans(
  fun(Ring, _) ->
          Buckets = riak_core_ring:get_buckets(Ring),
          ClearReplPostCommitFun =
              fun(Bucket, AccRing) ->
                      BProps = riak_core_bucket:get_bucket(Bucket, AccRing),
                      PostCommit =
                          case proplists:get_value(postcommit, BProps, []) of
                              {struct, _}=X ->
                                  [X];
                              X ->
                                  X
                          end,
                      CleanPostCommit = PostCommit --
                          [{struct,[{<<"mod">>,<<"riak_repl_leader">>},
                                    {<<"fun">>,<<"postcommit">>}]},
                           {struct,[{<<"mod">>,<<"riak_repl2_rt">>},
                                    {<<"fun">>,<<"postcommit">>}]}],
                      NewBProps = lists:keystore(postcommit, 1, BProps, 
                                                      {postcommit, CleanPostCommit}),
                      riak_core_ring:update_meta({bucket, Bucket}, NewBProps, AccRing)
              end,
          NewRing0 = lists:foldl(ClearReplPostCommitFun, Ring, Buckets),
         NewRing = riak_core_ring:update_member_meta(node(), NewRing0, node(),
                                                      unused, now()),
          {new_ring, NewRing}
  end, undefined).

Finding a long-running riak_repl2_rtq function

At Open X, EE 1.4.2+patches was under-performing and so not keeping up with their hourly ETL load (millions of IP addresses). The sinks were only lightly loaded, and the source had little to no TCP buffers, indicating that neither network latency nor RT sink performance were the problem.

After a few failed attempts at diagnosing the problem by the team, Jon Meredith led a live debugging session with the admin at Open X and found the root cause in a few minutes, using the steps below:

Run a few basic commands to see what is going on
rp(process_info(whereis(riak_repl2_rtq), garbage_collection)).
rp(sys:get_status(riak_repl2_rtq)).

When these didn't reveal anything alarming...

Check to see what the rtq is doing
io:format("~s\n", [element(2, process_info(whereis(riak_repl2_rtq), backtrace))]).

This showed us the following stack trace:

0x00007fc3c43f1240 Return addr 0x0000000003dc6658 (riak_repl2_rtq:'-clear_non_deliverables/3-lc$^0/1-0-'/3 + 520)
y(0)     {c,"riak-tq.prod.ca",11350544,11350545,0,2773400,1,undefined,true}
 
0x00007fc3c43f1250 Return addr 0x0000000003dc63c8 (riak_repl2_rtq:'-clear_non_deliverables/3-fun-0-'/3 + 400)
y(0)     {c,"riak-tq.prod.lc",11350544,11350545,0,2774077,1,undefined,true}
 
0x00007fc3c43f1260 Return addr 0x00007fc3d8d4ba18 (lists:foldl/3 + 120)
y(0)     []
y(1)     []
y(2)     11406509
 
0x00007fc3c43f1280 Return addr 0x00007fc3d8ffe548 (ets:do_foldl/4 + 264)
y(0)     #Fun<riak_repl2_rtq.8.100495617>
y(1)     []
 
0x00007fc3c43f1298 Return addr 0x00007fc3d8ffe338 (ets:foldl/3 + 264)
y(0)     11406509
y(1)     36700404
y(2)     #Fun<riak_repl2_rtq.8.100495617>
Timing the 'clear_non_deliverables' revealed it was taking 200+ ms per invocation:

Load the following beam in basho-patches:

https://dl.dropboxusercontent.com/u/2596739/timeit.beam

run:

m(timeit).
timeit:timeit(riak_repl2_rtq, clear_non_deliverables, 3). timer:sleep(5000). dbg:stop_clear().

Since this function is called for each RT replication ack, it was limiting replication to 5 objects per second max.

Conclusion

It was found that clean_non_deliverables was doing a fold over the entire RT queue ETS table each time. The final resolution was to remove the function from the ack path, as it was essentially doing garbage collection and not necessary in the replication call path.

Clone this wiki locally