Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduced Replication #402

Merged
merged 13 commits into from
Sep 20, 2013
120 changes: 0 additions & 120 deletions ebin/riak_repl.app

This file was deleted.

47 changes: 47 additions & 0 deletions src/riak_repl.app.src
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
% -*- mode: erlang -*-
{application,
riak_repl,
[{description, "Enterprise replication for Riak"},
{id, "riak_repl"},
{vsn, "1.4.2"},
{applications, [kernel,
stdlib,
sasl,
crypto,
ssl,
riak_core,
riak_kv,
ranch]},
{registered, [riak_repl_connector_sup,
riak_repl_leader,
riak_repl_stats,
riak_rep_sup]},
{mod, {riak_repl_app, []}},
{env, [
%% milliseconds to wait after checking all listeners
{client_retry_timeout, 30000},
%% milliseconds to wait for successfull connect
{client_connect_timeout, 15000},

{fullsync_on_connect, true},
% minutes
{fullsync_interval, 360},
{data_root, "data/riak_repl"},
{merkle_bufsize, 1048576},
%% bytes
{server_max_pending, 5},
{client_ack_frequency, 5},
{queue_size, 104857600},
{fullsync_strategies, [keylist, syncv1]},
{min_get_workers, 5},
{max_get_workers, 100},
{min_put_workers, 5},
{max_put_workers, 100},
%% whether to issue gets directly against the vnode
{vnode_gets, true},
%% How many fullsync diff objects to send before needing an
%% ACK from the client. Setting this too high will clog your
%% TCP buffers.
{diff_batch_size, 100}
]}
]}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was riak_repl.app.src changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It did not exist before, so if one needs to add/remove a module to the repl application, you need to update ebin/riak_repl.app. By adding src/riak_repl.app.src the app file is automatically generated and updated as modules are added/removed.

The fact that I needed to update the .app file has bit me on other features before (cascading realtime), and this was the straw that broke the camel's back. My investigations indicated there was no good reason the for the .app file to be static, so I made it dynamically generated.

15 changes: 8 additions & 7 deletions src/riak_repl2_rt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
%% Copyright (c) 2007-2012 Basho Technologies, Inc. All Rights Reserved.
-module(riak_repl2_rt).

%% @doc Realtime replication
%% @doc Realtime replication
%%
%% High level responsibility...
%%
Expand Down Expand Up @@ -98,7 +98,7 @@ ensure_rt(WantEnabled0, WantStarted0) ->
_ ->
%% Do enables/starts first to capture maximum amount of rtq

%% Create a registration to begin queuing, rtsource_sup:ensure_started
%% Create a registration to begin queuing, rtsource_sup:ensure_started
%% will bring up an rtsource process that will re-register
[riak_repl2_rtq:register(Remote) || Remote <- ToEnable],
[riak_repl2_rtsource_conn_sup:enable(Remote) || Remote <- ToStart],
Expand Down Expand Up @@ -129,13 +129,14 @@ register_remote_locator() ->
register_sink(Pid) ->
gen_server:call(?SERVER, {register_sink, Pid}, infinity).

%% Get list of sink pids
%% Get list of sink pids
%% TODO: Remove this once rtsink_sup is working right
get_sink_pids() ->
gen_server:call(?SERVER, get_sink_pids, infinity).

%% Realtime replication post-commit hook
postcommit(RObj) ->
lager:debug("maybe a mutate happened?~n ~p", [RObj]),
case riak_repl_util:repl_helper_send_realtime(RObj, riak_client:new(node(), undefined))++[RObj] of
%% always put the objects onto the shared queue in the new format; we'll
%% down-convert if we have to before sending them to the RT sinks (based
Expand Down Expand Up @@ -165,13 +166,13 @@ handle_call(status, _From, State = #state{sinks = SinkPids}) ->
riak_repl2_rtsource_conn:status(Pid, Timeout)
catch
_:_ ->
{Remote, Pid, unavailable}
{Remote, Pid, unavailable}
end || {Remote, Pid} <- riak_repl2_rtsource_conn_sup:enabled()],
Sinks = [try
riak_repl2_rtsink_conn:status(Pid, Timeout)
catch
_:_ ->
{will_be_remote_name, Pid, unavailable}
{will_be_remote_name, Pid, unavailable}
end || Pid <- SinkPids],
Status = [{enabled, enabled()},
{started, started()},
Expand All @@ -190,7 +191,7 @@ handle_cast(_Msg, State) ->
%% TODO: log unknown message
{noreply, State}.

handle_info({'DOWN', _MRef, process, SinkPid, _Reason},
handle_info({'DOWN', _MRef, process, SinkPid, _Reason},
State = #state{sinks = Sinks}) ->
%%TODO: Check how ranch logs sink process death
Sinks2 = Sinks -- [SinkPid],
Expand All @@ -199,7 +200,7 @@ handle_info(Msg, State) ->
%%TODO: Log unhandled message - e.g. timed out status result
lager:warning("unhandled message - e.g. timed out status result: ~p", Msg),
{noreply, State}.

terminate(_Reason, _State) ->
ok.

Expand Down
23 changes: 22 additions & 1 deletion src/riak_repl_console.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
add_block_provider_redirect/1,
show_block_provider_redirect/1,
show_local_cluster_id/1,
delete_block_provider_redirect/1
delete_block_provider_redirect/1,
full_objects/1
]).

add_listener(Params) ->
Expand Down Expand Up @@ -601,6 +602,26 @@ show_local_cluster_id([]) ->
io_lib:format("~p", [riak_core_ring:cluster_name(Ring)])),
io:format("local cluster id: ~p~n", [ClusterId]).

full_objects([]) ->
Value = riak_core_metadata:get({riak_repl, reduced_n}, full_objects, [{default, always}]),
io:format("full_objects value = ~p~n", [Value]),
Value;

full_objects(["never"]) ->
riak_core_metadata:put({riak_repl, reduced_n}, full_objects, never),
full_objects([]);

full_objects(["always"]) ->
riak_core_metadata:put({riak_repl, reduced_n}, full_objects, always),
full_objects([]);

full_objects([M]) ->
NewVal = erlang:list_to_integer(M),
riak_core_metadata:put({riak_repl, reduced_n}, full_objects, NewVal),
full_objects([]).

%% helper functions

parse_ip_and_maybe_port(String, Hostname) ->
case string:tokens(String, ":") of
[IPStr, PortStr] ->
Expand Down
Loading