Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add command reply modes for replying from a given member #314

Merged
merged 2 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 48 additions & 8 deletions src/ra.erl
Original file line number Diff line number Diff line change
Expand Up @@ -754,18 +754,54 @@ overview(System) ->
%% If there is no majority of Ra servers online, this function will return
%% a timeout.
%%
%% When `TimeoutOrOptions' is a map, it supports the following option keys:
%% <ul>
%% <li>`timeout': the time to wait before returning `{timeout, ServerId}'</li>
%% <li>`reply_from': the node which should reply to the command call. The
%% default value is `leader'. If the option is `local' or a `member' and a
%% local node or the given member is not available, the command may be
%% processed successfully but the caller may not receive a response, timing out
%% instead. The following values are supported for `reply_from':
%% <ul>
%% <li>`leader': the cluster leader replies.</li>
%% <li>`local': a member on the some node as the caller replies.</li>
%% <li>`{member, ServerId}': the member for the given {@link ra_server_id()}
%% replies.</li>
%% </ul></li>
%% </ul>
%%
%% @param ServerId the server id to send the command to
%% @param Command an arbitrary term that the state machine can handle
%% @param Timeout the time to wait before returning {timeout, ServerId}
%% @param TimeoutOrOptions the time to wait before returning
%% `{timeout, ServerId}', or a map of options.
%% @end
-spec process_command(ServerId :: ra_server_id() | [ra_server_id()],
Command :: term(),
Timeout :: timeout()) ->
{ok, Reply :: term(), Leader :: ra_server_id()} |
-spec process_command(ServerId, Command, TimeoutOrOptions) ->
{ok, Reply, Leader} |
{error, term()} |
{timeout, ra_server_id()}.
process_command(ServerId, Cmd, Timeout) ->
ra_server_proc:command(ServerId, usr(Cmd, await_consensus), Timeout).
{timeout, ra_server_id()}
when
ServerId :: ra_server_id() | [ra_server_id()],
Command :: term(),
TimeoutOrOptions :: timeout() | Options,
Options :: #{timeout => timeout(),
reply_from => leader | local | {member, ra_server_id()}},
Reply :: term(),
Leader :: ra_server_id().
process_command(ServerId, Command, Timeout)
when Timeout =:= infinity orelse is_integer(Timeout) ->
process_command(ServerId, Command, #{timeout => Timeout});
process_command(ServerId, Command, Options) when is_map(Options) ->
Timeout = maps:get(timeout, Options, ?DEFAULT_TIMEOUT),
ReplyFrom = case Options of
#{reply_from := local} ->
local;
#{reply_from := {member, Member}} ->
{member, Member};
_ ->
leader
end,
ReplyMode = {await_consensus, #{reply_from => ReplyFrom}},
ra_server_proc:command(ServerId, usr(Command, ReplyMode), Timeout).

%% @doc Same as `process_command/3' with the default timeout of 5000 ms.
%% @param ServerId the server id to send the command to
Expand Down Expand Up @@ -1051,6 +1087,10 @@ register_external_log_reader({_, Node} = ServerId)

%% internal

-spec usr(UserCommand, ReplyMode) -> Command when
UserCommand :: term(),
ReplyMode :: ra_server:command_reply_mode(),
Command :: {ra_server:command_type(), UserCommand, ReplyMode}.
usr(Data, Mode) ->
{'$usr', Data, Mode}.

Expand Down
3 changes: 3 additions & 0 deletions src/ra.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@

-type states() :: leader | follower | candidate | await_condition.

%% A member of the cluster from which replies should be sent.
-type ra_reply_from() :: leader | local | {member, ra_server_id()}.

-define(RA_PROTO_VERSION, 1).
%% the protocol version should be incremented whenever extensions need to be
%% done to the core protocol records (below). It is only ever exchanged by the
Expand Down
22 changes: 22 additions & 0 deletions src/ra_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@

-type command_priority() :: normal | low.

-type command_reply_options() :: #{reply_from => ra_reply_from()}.

-type command_reply_mode() :: after_log_append |
await_consensus |
{await_consensus,
command_reply_options()} |
{notify,
command_correlation(), pid()} |
noreply.
Expand Down Expand Up @@ -1572,6 +1576,12 @@ filter_follower_effects(Effects) ->
[C | Acc];
({log, _, _, _Opts} = C, Acc) ->
[C | Acc];
({reply, _, _, leader}, Acc) ->
Acc;
({reply, _, _, _} = C, Acc) ->
%% If the reply-from is not `leader', the follower
%% might be the replier.
[C | Acc];
({monitor, _ProcOrNode, Comp, _} = C, Acc)
when Comp =/= machine ->
%% only machine monitors should not be emitted
Expand Down Expand Up @@ -2307,6 +2317,18 @@ add_reply(_, '$ra_no_reply', _, Effects, Notifys) ->
{Effects, Notifys};
add_reply(#{from := From}, Reply, await_consensus, Effects, Notifys) ->
{[{reply, From, {wrap_reply, Reply}} | Effects], Notifys};
add_reply(#{from := From}, Reply,
{await_consensus, Options}, Effects, Notifys) ->
Replier = case Options of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a bit strange to have a list but only evaluate the first item. I wonder if perhaps we should skip the list for now or use a map perhaps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to a map - if we need to add more options in the future I think it will be a better choice, and it's easier to work with than the list here since these options are mutually exclusive 👍

#{reply_from := local} ->
local;
#{reply_from := {member, Member}} ->
{member, Member};
_ ->
leader
end,
ReplyEffect = {reply, From, {wrap_reply, Reply}, Replier},
{[ReplyEffect | Effects], Notifys};
add_reply(_, Reply, {notify, Corr, Pid},
Effects, Notifys) ->
% notify are casts and thus have to include their own pid()
Expand Down
59 changes: 58 additions & 1 deletion src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,29 @@ handle_effect(_, {cast, To, Msg}, _, State, Actions) ->
%% TODO: handle send failure
_ = gen_cast(To, Msg, State),
{State, Actions};
handle_effect(RaftState, {reply, {Pid, _Tag} = From, Reply, Replier}, _,
State, Actions) ->
case Replier of
leader ->
ok = gen_statem:reply(From, Reply);
local ->
case can_execute_locally(RaftState, node(Pid), State) of
true ->
ok = gen_statem:reply(From, Reply);
false ->
ok
end;
{member, Member} ->
case can_execute_on_member(RaftState, Member, State) of
true ->
ok = gen_statem:reply(From, Reply);
false ->
ok
end;
_ ->
ok
end,
{State, Actions};
handle_effect(_, {reply, From, Reply}, _, State, Actions) ->
% reply directly
ok = gen_statem:reply(From, Reply),
Expand Down Expand Up @@ -1571,15 +1594,40 @@ validate_reply_mode(after_log_append) ->
ok;
validate_reply_mode(await_consensus) ->
ok;
validate_reply_mode({await_consensus, Options}) when is_map(Options) ->
validate_reply_mode_options(Options);
validate_reply_mode({notify, Correlation, Pid})
when (is_integer(Correlation) orelse is_reference(Correlation)) andalso
when (is_integer(Correlation) orelse
is_reference(Correlation)) andalso
is_pid(Pid) ->
ok;
validate_reply_mode({notify, Correlation, Pid, Options})
when (is_integer(Correlation) orelse
is_reference(Correlation)) andalso
is_pid(Pid) andalso
is_map(Options) ->
validate_reply_mode_options(Options);
validate_reply_mode(noreply) ->
ok;
validate_reply_mode(ReplyMode) ->
{error, {invalid_reply_mode, ReplyMode}}.

validate_reply_mode_options(Options) when is_map(Options) ->
maps:fold(fun (Key, Value, ok) ->
case {Key, Value} of
{reply_from, local} ->
ok;
{reply_from, {member, _}} ->
ok;
{reply_from, leader} ->
ok;
{_, _} ->
{error, {unknown_option, Key, Value}}
end;
(_Key, _Value, Error) ->
Error
end, ok, Options).

make_command(Type, {call, From}, Data, Mode) ->
Ts = erlang:system_time(millisecond),
{Type, #{from => From, ts => Ts}, Data, Mode};
Expand Down Expand Up @@ -1651,6 +1699,15 @@ can_execute_locally(RaftState, TargetNode, State) ->
false
end.

can_execute_on_member(_RaftState, Member,
#state{server_state = #{cfg := #cfg{id = Member}}}) ->
true;
can_execute_on_member(leader, Member, State) ->
Members = do_state_query(members, State#state.server_state),
not lists:member(Member, Members);
can_execute_on_member(_RaftState, _Member, _State) ->
false.

handle_node_status_change(Node, Status, InfoList, RaftState,
#state{monitors = Monitors0,
server_state = ServerState0} = State0) ->
Expand Down
102 changes: 102 additions & 0 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ all_tests() ->
start_servers,
server_recovery,
process_command,
process_command_reply_from_local,
process_command_reply_from_member,
process_command_with_unknown_reply_mode,
pipeline_command,
pipeline_command_reject,
Expand Down Expand Up @@ -308,6 +310,73 @@ process_command(Config) ->
{ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT),
terminate_cluster(Cluster).

process_command_reply_from_local(Config) ->
PrivDir = filename:join(?config(priv_dir, Config),
?config(test_name, Config)),
Options = #{reply_from => local,
timeout => ?PROCESS_COMMAND_TIMEOUT},

Cluster = start_remote_cluster(3, PrivDir, local_command_cluster,
add_machine()),
{ok, _, Leader} = ra:members(hd(Cluster)),
{_, FollowerNode} = Follower =
hd([Member || Member <- Cluster, Member =/= Leader]),

%% The leader will reply if no node in the cluster is local to the caller.
{ok, 5, _} = ra:process_command(Leader, 5, Options),

%% The reply will come from the follower.
{ok, 10, _} = rpc:call(FollowerNode,
ra, process_command, [Leader, 5, Options]),

ct:pal("stopping member: ~p", [Follower]),
ra:stop_server(?SYS, Follower),

%% The server is stopped so the command is not handled.
?assertEqual({error, noproc},
ra:process_command(Follower, 5, Options)),
{ok, {_, 10}, _} = ra:leader_query(Leader, fun(State) -> State end),

%% The local member can't reply to the command request since it is stopped.
?assertMatch({timeout, _},
rpc:call(FollowerNode,
ra, process_command, [Leader, 5, Options])),

terminate_cluster(Cluster).

process_command_reply_from_member(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
{simple, fun erlang:'+'/2, 9}),

{ok, _, Leader} = ra:members(A),
Follower = hd([Member || Member <- Cluster, Member =/= Leader]),
Options = #{reply_from => {member, Follower},
timeout => ?PROCESS_COMMAND_TIMEOUT},

{ok, 14, _Leader} = ra:process_command(A, 5, Options),

ct:pal("stopping member: ~p", [Follower]),
ra:stop_server(?SYS, Follower),

%% The process is no longer alive so the command is not handled.
?assertEqual({error, noproc}, ra:process_command(Follower, 5, Options)),
{ok, {_, 14}, _} = ra:leader_query(Leader, fun(State) -> State end),

%% The command is successfully handled on the leader but the member is
%% not available to reply to the caller.
?assertMatch({timeout, _}, ra:process_command(Leader, 5, Options)),
{ok, {_, 19}, _} = ra:leader_query(Leader, fun(State) -> State end),

%% If the given member is not part of the cluster then the reply is
%% performed by the leader.
{ok, 24, _} =
ra:process_command(Leader, 5,
#{reply_from => {member, does_not_exist},
timeout => ?PROCESS_COMMAND_TIMEOUT}),

terminate_cluster(Cluster).

process_command_with_unknown_reply_mode(Config) ->
[A, _B, _C] = Cluster =
start_local_cluster(3, ?config(test_name, Config),
Expand Down Expand Up @@ -974,3 +1043,36 @@ gather_applied(Acc, Timeout) ->
Acc
end.

start_remote_cluster(Num, PrivDir, ClusterName, Machine) ->
Nodes = [begin
Name = "node" ++ erlang:integer_to_list(N),
Node = start_peer(Name, PrivDir),
{ClusterName, Node}
end || N <- lists:seq(1, Num)],
{ok, _, Failed} = ra:start_cluster(default, ClusterName, Machine, Nodes),
?assertEqual([], Failed),
Nodes.

start_peer(Name, PrivDir) ->
Dir0 = filename:join(PrivDir, Name),
Dir = "'\"" ++ Dir0 ++ "\"'",
Host = get_current_host(),
Pa = string:join(["-pa" | search_paths()] ++ ["-s ra -ra data_dir", Dir],
" "),
ct:pal("starting peer node ~s on host ~s for node ~s with ~s",
[Name, Host, node(), Pa]),
{ok, S} = slave:start_link(Host, Name, Pa),
_ = rpc:call(S, ra, start, []),
ok = ct_rpc:call(S, logger, set_primary_config,
[level, all]),
S.

get_current_host() ->
NodeStr = atom_to_list(node()),
Host = re:replace(NodeStr, "^[^@]+@", "", [{return, list}]),
list_to_atom(Host).

search_paths() ->
Ld = code:lib_dir(),
lists:filter(fun (P) -> string:prefix(P, Ld) =:= nomatch end,
code:get_path()).