Skip to content

Commit

Permalink
Merge pull request #2802 from rabbitmq/rabbitmq-server-2798
Browse files Browse the repository at this point in the history
Allow dynamic Shovels specify queue x-arguments when topology is declared

(cherry picked from commit 7725db9)

Conflicts:
	deps/rabbit/src/rabbit_amqqueue.erl
  • Loading branch information
michaelklishin committed Feb 10, 2021
1 parent d9260bb commit b4504dc
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 43 deletions.
110 changes: 88 additions & 22 deletions deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
-export([rebalance/3]).
-export([collect_info_all/2]).

-export([is_policy_applicable/2]).
-export([is_policy_applicable/2, declare_args/0]).

%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
Expand Down Expand Up @@ -872,98 +872,164 @@ declare_args() ->
{<<"x-queue-mode">>, fun check_queue_mode/2},
{<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2},
{<<"x-queue-type">>, fun check_queue_type/2},
{<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}].
{<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}].

consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
{<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}].

check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
true -> ok;
false -> {error, {unacceptable_type, Type}}
end.
false -> {error, rabbit_misc:format("expected integer, got ~p", [Type])}
end;
check_int_arg(Val, _) when is_integer(Val) ->
ok;
check_int_arg(_Val, _) ->
{error, {unacceptable_type, "expected integer"}}.

check_bool_arg({bool, _}, _) -> ok;
check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
check_bool_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
check_bool_arg(true, _) -> ok;
check_bool_arg(false, _) -> ok;
check_bool_arg(_Val, _) -> {error, {unacceptable_type, "expected boolean"}}.

check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
Error -> Error
end;
check_non_neg_int_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
Error -> Error
end.

check_expires_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end;
check_expires_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end.

check_message_ttl_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end;
check_message_ttl_arg(Val, Args) ->
case check_int_arg(Val, Args) of
ok -> rabbit_misc:check_expiry(Val);
Error -> Error
end.

check_max_priority_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
ok -> {error, {max_value_exceeded, Val}};
Error -> Error
end;
check_max_priority_arg(Val, Args) ->
case check_non_neg_int_arg(Val, Args) of
ok when Val =< ?MAX_SUPPORTED_PRIORITY -> ok;
ok -> {error, {max_value_exceeded, Val}};
Error -> Error
end.

check_single_active_consumer_arg({Type, Val}, Args) ->
case check_bool_arg({Type, Val}, Args) of
ok -> ok;
Error -> Error
end.
check_bool_arg({Type, Val}, Args);
check_single_active_consumer_arg(Val, Args) ->
check_bool_arg(Val, Args).

check_default_quorum_initial_group_size_arg({Type, Val}, Args) ->
check_initial_cluster_size_arg({Type, Val}, Args) ->
case check_non_neg_int_arg({Type, Val}, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
Error -> Error
end;
check_initial_cluster_size_arg(Val, Args) ->
case check_non_neg_int_arg(Val, Args) of
ok when Val == 0 -> {error, {value_zero, Val}};
ok -> ok;
Error -> Error
end.

%% Note that the validity of x-dead-letter-exchange is already verified
%% by rabbit_channel's queue.declare handler.
check_dlxname_arg({longstr, _}, _) -> ok;
check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}.
check_dlxname_arg({Type, _}, _) -> {error, {unacceptable_type, Type}};
check_dlxname_arg(Val, _) when is_list(Val) or is_binary(Val) -> ok;
check_dlxname_arg(_Val, _) -> {error, {unacceptable_type, "expected a string (valid exchange name)"}}.

check_dlxrk_arg({longstr, _}, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_dlxrk_arg(Val, Args) when is_binary(Val) ->
case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of
undefined -> {error, routing_key_but_no_dlx_defined};
_ -> ok
end;
check_dlxrk_arg(_Val, _Args) ->
{error, {unacceptable_type, "expected a string"}}.

-define(KNOWN_OVERFLOW_MODES, [<<"drop-head">>, <<"reject-publish">>, <<"reject-publish-dlx">>]).
check_overflow({longstr, Val}, _Args) ->
case lists:member(Val, [<<"drop-head">>,
<<"reject-publish">>,
<<"reject-publish-dlx">>]) of
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
true -> ok;
false -> {error, invalid_overflow}
end;
check_overflow({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_overflow(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_OVERFLOW_MODES) of
true -> ok;
false -> {error, invalid_overflow}
end;
check_overflow(_Val, _Args) ->
{error, invalid_overflow}.

-define(KNOWN_QUEUE_MODES, [<<"default">>, <<"lazy">>]).
check_queue_mode({longstr, Val}, _Args) ->
case lists:member(Val, [<<"default">>, <<"lazy">>]) of
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, invalid_queue_mode}
false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
end;
check_queue_mode({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_queue_mode(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_QUEUE_MODES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue mode '~s'", [Val])}
end;
check_queue_mode(_Val, _Args) ->
{error, invalid_queue_mode}.

-define(KNOWN_QUEUE_TYPES, [<<"classic">>, <<"quorum">>]).
check_queue_type({longstr, Val}, _Args) ->
case lists:member(Val, [<<"classic">>, <<"quorum">>]) of
case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
true -> ok;
false -> {error, invalid_queue_type}
false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
end;
check_queue_type({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
{error, {unacceptable_type, Type}};
check_queue_type(Val, _Args) when is_binary(Val) ->
case lists:member(Val, ?KNOWN_QUEUE_TYPES) of
true -> ok;
false -> {error, rabbit_misc:format("unsupported queue type '~s'", [Val])}
end;
check_queue_type(_Val, _Args) ->
{error, invalid_queue_type}.

-spec list() -> [amqqueue:amqqueue()].

Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/src/rabbit_parameter_validation.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,20 @@ regex(Name, Term) ->
proplist(Name, Constraints, Term) when is_list(Term) ->
{Results, Remainder}
= lists:foldl(
fun ({Key, Fun, Needed}, {Results0, Term0}) ->
%% if the optional/mandatory flag is not provided in a constraint tuple,
%% assume 'optional'
fun ({Key, Fun}, {Results0, Term0}) ->
case lists:keytake(Key, 1, Term0) of
{value, {Key, Value}, Term1} ->
{[Fun(Key, Value) | Results0],
Term1};
{value, {Key, Type, Value}, Term1} ->
{[Fun(Key, Type, Value) | Results0],
Term1};
false ->
{Results0, Term0}
end;
({Key, Fun, Needed}, {Results0, Term0}) ->
case {lists:keytake(Key, 1, Term0), Needed} of
{{value, {Key, Value}, Term1}, _} ->
{[Fun(Key, Value) | Results0],
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_shovel/src/rabbit_shovel_behaviour.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
ack_mode => ack_mode(),
atom() => term()}.

-export_type([state/0, source_config/0, dest_config/0, uri/0]).
-export_type([state/0, source_config/0, dest_config/0, uri/0, tag/0]).

-callback parse(binary(), {source | destination, Conf :: proplists:proplist()}) ->
source_config() | dest_config().
Expand Down
48 changes: 29 additions & 19 deletions deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ amqp10_src_validation(_Def, User) ->

amqp091_src_validation(_Def, User) ->
[
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"src-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2,optional},
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue-args">>, fun validate_queue_args/2, optional},
{<<"prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
%% a deprecated pre-3.7 setting
{<<"delete-after">>, fun validate_delete_after/2, optional},
%% currently used multi-protocol friend name, introduced in 3.7
Expand Down Expand Up @@ -151,6 +152,7 @@ amqp091_dest_validation(_Def, User) ->
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue-args">>, fun validate_queue_args/2, optional},
{<<"add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
Expand Down Expand Up @@ -206,6 +208,11 @@ validate_delete_after(Name, Term) ->
{error, "~s should be number, \"never\" or \"queue-length\", actually was "
"~p", [Name, Term]}.

validate_queue_args(Name, Term0) ->
Term = rabbit_data_coercion:to_proplist(Term0),

rabbit_parameter_validation:proplist(Name, rabbit_amqqueue:declare_args(), Term).

validate_amqp10_map(Name, Terms0) ->
Terms = rabbit_data_coercion:to_proplist(Terms0),
Str = fun rabbit_parameter_validation:binary/2,
Expand Down Expand Up @@ -292,14 +299,15 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
}.

parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
DestURIs = get_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestURIs = get_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
DestDeclFun = fun (Conn, _Ch) ->
case DestQ of
none -> ok;
_ -> ensure_queue(Conn, DestQ)
_ -> ensure_queue(Conn, DestQ, rabbit_misc:to_amqp_table(DestQArgs))
end
end,
{X, Key} = case DestQ of
Expand Down Expand Up @@ -371,10 +379,11 @@ parse_amqp10_source(Def) ->
prefetch_count => PrefetchCount}, Headers}.

parse_amqp091_source(Def) ->
SrcURIs = get_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcURIs = get_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>), %% [1]
SrcQ = pget(<<"src-queue">>, Def, none),
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {fun (_Conn, Ch) ->
Expand All @@ -385,7 +394,7 @@ parse_amqp091_source(Def) ->
end, <<>>, [{<<"src-exchange">>, SrcX},
{<<"src-exchange-key">>, SrcXKey}]};
_ -> {fun (Conn, _Ch) ->
ensure_queue(Conn, SrcQ)
ensure_queue(Conn, SrcQ, rabbit_misc:to_amqp_table(SrcQArgs))
end, SrcQ, [{<<"src-queue">>, SrcQ}]}
end,
DeleteAfter = pget(<<"src-delete-after">>, Def,
Expand Down Expand Up @@ -416,15 +425,16 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
translate_ack_mode(<<"no-ack">>) -> no_ack.

ensure_queue(Conn, Queue) ->
ensure_queue(Conn, Queue, XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
passive = true})
catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
{ok, Ch2} = amqp_connection:open_channel(Conn),
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
durable = true}),
amqp_channel:call(Ch2, #'queue.declare'{queue = Queue,
durable = true,
arguments = XArgs}),
catch amqp_channel:close(Ch2)

after
Expand Down
8 changes: 8 additions & 0 deletions deps/rabbitmq_shovel/src/rabbit_shovel_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ terminate({shutdown, restart}, State = #state{name = Name}) ->
{terminated, "needed a restart"}),
close_connections(State),
ok;
terminate({{shutdown, {server_initiated_close, Code, Reason}}, _}, State = #state{name = Name}) ->
rabbit_log_shovel:error("Shovel ~s is stopping: one of its connections closed "
"with code ~b, reason: ~s",
[human_readable_name(Name), Code, Reason]),
rabbit_shovel_status:report(State#state.name, State#state.type,
{terminated, "needed a restart"}),
close_connections(State),
ok;
terminate(Reason, State = #state{name = Name}) ->
rabbit_log_shovel:error("Shovel ~s is stopping, reason: ~p", [human_readable_name(Name), Reason]),
rabbit_shovel_status:report(State#state.name, State#state.type,
Expand Down
15 changes: 15 additions & 0 deletions deps/rabbitmq_shovel/test/dynamic_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ groups() ->
[
{non_parallel_tests, [], [
simple,
quorum_queues,
set_properties_using_proplist,
set_properties_using_map,
set_empty_properties_using_proplist,
Expand Down Expand Up @@ -80,6 +81,20 @@ simple(Config) ->
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).

quorum_queues(Config) ->
with_ch(Config,
fun (Ch) ->
shovel_test_utils:set_param(
Config,
<<"test">>, [
{<<"src-queue">>, <<"src">>},
{<<"dest-queue">>, <<"dest">>},
{<<"src-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}},
{<<"dest-queue-args">>, #{<<"x-queue-type">> => <<"quorum">>}}
]),
publish_expect(Ch, <<>>, <<"src">>, <<"dest">>, <<"hello">>)
end).

set_properties_using_map(Config) ->
with_ch(Config,
fun (Ch) ->
Expand Down

0 comments on commit b4504dc

Please sign in to comment.