From 078c89d16b986648d40029e4be708fe56ccbbc8f Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 18 Oct 2022 13:52:31 -0500 Subject: [PATCH 1/4] ra_SUITE: Fix indentation for process_command/3 call --- test/ra_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index ac67c9ff..61dbe7d0 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -300,7 +300,7 @@ process_command(Config) -> [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), {simple, fun erlang:'+'/2, 9}), - {ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), + {ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), terminate_cluster(Cluster). pipeline_command(Config) -> From 0b8d9989bdb22ddc1052276d1f6e7445dc9833e6 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 18 Oct 2022 13:52:53 -0500 Subject: [PATCH 2/4] ra_SUITE: Underscore unused binding --- test/ra_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index 61dbe7d0..f2a1e731 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -842,7 +842,7 @@ wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout) case get_gen_statem_status(Ref) of ExpectedStatus -> ok; - OtherStatus when Timeout >= 0 -> + _OtherStatus when Timeout >= 0 -> timer:sleep(500), wait_for_gen_statem_status(Ref, ExpectedStatus, Timeout - 500); OtherStatus -> From 20ba89cd4c87270060b5bb81681bf4a9980bf684 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Tue, 18 Oct 2022 13:53:38 -0500 Subject: [PATCH 3/4] Validate command reply modes `ra_server:command_reply_mode()` was not previously validated on the `ra_server`-side. If a command were sent with an unknown reply mode, the Ra server would never reply or notify the caller. Sending an unknown reply mode might happen when a client sends a new command to a Ra server which is operating on an old version of Ra. This change validates reply modes within the `ra_server`, replying with an error tuple when an invalid reply mode is passed. --- src/ra_server_proc.erl | 83 ++++++++++++++++++++++++++++++------------ test/ra_SUITE.erl | 17 +++++++++ 2 files changed, 76 insertions(+), 24 deletions(-) diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index 90825689..adec4611 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -354,32 +354,54 @@ leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, #state{server_state = ServerState0} = State0) -> - %% normal priority commands are written immediately - Cmd = make_command(CmdType, EventType, Data, ReplyMode), - {leader, ServerState, Effects} = - ra_server:handle_leader({command, Cmd}, ServerState0), - {State, Actions} = - ?HANDLE_EFFECTS(Effects, EventType, - State0#state{server_state = ServerState}), - {keep_state, State, Actions}; + case validate_reply_mode(ReplyMode) of + ok -> + %% normal priority commands are written immediately + Cmd = make_command(CmdType, EventType, Data, ReplyMode), + {leader, ServerState, Effects} = + ra_server:handle_leader({command, Cmd}, ServerState0), + {State, Actions} = + ?HANDLE_EFFECTS(Effects, EventType, + State0#state{server_state = ServerState}), + {keep_state, State, Actions}; + Error -> + case EventType of + {call, From} -> + {keep_state, State0, [{reply, From, Error}]}; + _ -> + {keep_state, State0, []} + end + end; leader(EventType, {command, low, {CmdType, Data, ReplyMode}}, #state{delayed_commands = Delayed} = State0) -> - %% cache the low priority command until the flush_commands message arrives - - Cmd = make_command(CmdType, EventType, Data, ReplyMode), - %% if there are no prior delayed commands - %% (and thus no action queued to do so) - %% queue a state timeout to flush them - %% We use a cast to ourselves instead of a zero timeout as we want to - %% get onto the back of the erlang mailbox not just the current gen_statem - %% event buffer. - case queue:is_empty(Delayed) of - true -> - ok = gen_statem:cast(self(), flush_commands); - false -> - ok - end, - {keep_state, State0#state{delayed_commands = queue:in(Cmd, Delayed)}, []}; + case validate_reply_mode(ReplyMode) of + ok -> + %% cache the low priority command until the flush_commands message + %% arrives + + Cmd = make_command(CmdType, EventType, Data, ReplyMode), + %% if there are no prior delayed commands + %% (and thus no action queued to do so) + %% queue a state timeout to flush them + %% We use a cast to ourselves instead of a zero timeout as we want + %% to get onto the back of the erlang mailbox not just the current + %% gen_statem event buffer. + case queue:is_empty(Delayed) of + true -> + ok = gen_statem:cast(self(), flush_commands); + false -> + ok + end, + State = State0#state{delayed_commands = queue:in(Cmd, Delayed)}, + {keep_state, State, []}; + Error -> + case EventType of + {call, From} -> + {keep_state, State0, [{reply, From, Error}]}; + _ -> + {keep_state, State0, []} + end + end; leader(EventType, {aux_command, Cmd}, State0) -> {_, ServerState, Effects} = ra_server:handle_aux(?FUNCTION_NAME, EventType, Cmd, State0#state.server_state), @@ -1543,6 +1565,19 @@ read_chunks_and_send_rpc(RPC0, Res1 end. +validate_reply_mode(after_log_append) -> + ok; +validate_reply_mode(await_consensus) -> + ok; +validate_reply_mode({notify, Correlation, Pid}) + when (is_integer(Correlation) orelse is_reference(Correlation)) andalso + is_pid(Pid) -> + ok; +validate_reply_mode(noreply) -> + ok; +validate_reply_mode(ReplyMode) -> + {error, {invalid_reply_mode, ReplyMode}}. + make_command(Type, {call, From}, Data, Mode) -> Ts = erlang:system_time(millisecond), {Type, #{from => From, ts => Ts}, Data, Mode}; diff --git a/test/ra_SUITE.erl b/test/ra_SUITE.erl index f2a1e731..813c4632 100644 --- a/test/ra_SUITE.erl +++ b/test/ra_SUITE.erl @@ -15,6 +15,10 @@ -define(PROCESS_COMMAND_TIMEOUT, 6000). -define(SYS, default). +%% The dialyzer catches that the given reply mode is not included in the +%% `ra_server:command_reply_mode()' type: +-dialyzer({nowarn_function, [process_command_with_unknown_reply_mode/1]}). + all() -> [ {group, tests} @@ -29,6 +33,7 @@ all_tests() -> start_servers, server_recovery, process_command, + process_command_with_unknown_reply_mode, pipeline_command, pipeline_command_reject, pipeline_command_2_forwards_to_leader, @@ -303,6 +308,18 @@ process_command(Config) -> {ok, 14, _Leader} = ra:process_command(A, 5, ?PROCESS_COMMAND_TIMEOUT), terminate_cluster(Cluster). +process_command_with_unknown_reply_mode(Config) -> + [A, _B, _C] = Cluster = + start_local_cluster(3, ?config(test_name, Config), + {simple, fun erlang:'+'/2, 9}), + Command = 5, + ReplyMode = bad_reply_mode, + RaCommand = {'$usr', Command, ReplyMode}, + ?assertEqual({error, {invalid_reply_mode, ReplyMode}}, + ra_server_proc:command(A, RaCommand, + ?PROCESS_COMMAND_TIMEOUT)), + terminate_cluster(Cluster). + pipeline_command(Config) -> [A, _B, _C] = Cluster = start_local_cluster(3, ?config(test_name, Config), From be203b9cd2a19a442a39413ec57581ec411260b2 Mon Sep 17 00:00:00 2001 From: Michael Davis Date: Wed, 19 Oct 2022 09:54:41 -0500 Subject: [PATCH 4/4] Count commands received with invalid reply-modes --- src/ra.hrl | 5 ++++- src/ra_server_proc.erl | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index 14503d48..2d6856f5 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -253,6 +253,7 @@ -define(C_RA_SRV_AER_RECEIVED_FOLLOWER_EMPTY, ?C_RA_LOG_RESERVED + 17). -define(C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, ?C_RA_LOG_RESERVED + 18). -define(C_RA_SRV_LOCAL_QUERIES, ?C_RA_LOG_RESERVED + 19). +-define(C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, ?C_RA_LOG_RESERVED + 20). -define(RA_SRV_COUNTER_FIELDS, @@ -294,7 +295,9 @@ {term_and_voted_for_updates, ?C_RA_SRV_TERM_AND_VOTED_FOR_UPDATES, counter, "Total number of updates of term and voted for"}, {local_queries, ?C_RA_SRV_LOCAL_QUERIES, counter, - "Total number of local queries"} + "Total number of local queries"}, + {invalid_reply_mode_commands, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, counter, + "Total number of commands received with an invalid reply-mode"} ]). -define(RA_COUNTER_FIELDS, ?RA_LOG_COUNTER_FIELDS ++ ?RA_SRV_COUNTER_FIELDS). diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index adec4611..72de91d1 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -353,7 +353,7 @@ leader(EventType, {local_call, Msg}, State) -> leader(EventType, {leader_cast, Msg}, State) -> leader(EventType, Msg, State); leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, - #state{server_state = ServerState0} = State0) -> + #state{conf = Conf, server_state = ServerState0} = State0) -> case validate_reply_mode(ReplyMode) of ok -> %% normal priority commands are written immediately @@ -365,6 +365,7 @@ leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, State0#state{server_state = ServerState}), {keep_state, State, Actions}; Error -> + ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), case EventType of {call, From} -> {keep_state, State0, [{reply, From, Error}]}; @@ -373,7 +374,7 @@ leader(EventType, {command, normal, {CmdType, Data, ReplyMode}}, end end; leader(EventType, {command, low, {CmdType, Data, ReplyMode}}, - #state{delayed_commands = Delayed} = State0) -> + #state{conf = Conf, delayed_commands = Delayed} = State0) -> case validate_reply_mode(ReplyMode) of ok -> %% cache the low priority command until the flush_commands message @@ -395,6 +396,7 @@ leader(EventType, {command, low, {CmdType, Data, ReplyMode}}, State = State0#state{delayed_commands = queue:in(Cmd, Delayed)}, {keep_state, State, []}; Error -> + ok = incr_counter(Conf, ?C_RA_SRV_INVALID_REPLY_MODE_COMMANDS, 1), case EventType of {call, From} -> {keep_state, State0, [{reply, From, Error}]};