Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce a configurable limit to message size, reduce default to 128 MiB #1812

Merged
merged 5 commits into from
Jan 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ define PROJECT_ENV
{vhost_restart_strategy, continue},
%% {global, prefetch count}
{default_consumer_prefetch, {false, 0}},
{channel_queue_cleanup_interval, 60000}
{channel_queue_cleanup_interval, 60000},
%% Default max message size is 128 MB
{max_message_size, 134217728}
]
endef

Expand Down
8 changes: 8 additions & 0 deletions priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,9 @@ end}.
}.


{mapping, "msx_message_size", "rabbit.max_message_size",
[{datatype, integer}, {validators, ["less_then_512MB"]}]}.

%% Customising Socket Options.
%%
%% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for
Expand Down Expand Up @@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 2147483648
end}.

{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0",
fun(Size) when is_integer(Size) ->
Size > 0 andalso Size < 536870912
end}.

{validator, "less_than_1", "Flooat is not beetween 0 and 1",
fun(Float) when is_float(Float) ->
Float > 0 andalso Float < 1
Expand Down
58 changes: 40 additions & 18 deletions src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
-export([get_vhost/1, get_user/1]).
%% For testing
-export([build_topic_variable_map/3]).
-export([list_queue_states/1]).
-export([list_queue_states/1, get_max_message_size/0]).

%% Mgmt HTTP API refactor
-export([handle_method/5]).
Expand Down Expand Up @@ -158,7 +158,9 @@
delivery_flow,
interceptor_state,
queue_states,
queue_cleanup_timer
queue_cleanup_timer,
%% Message content size limit
max_message_size
}).

-define(QUEUE, lqueue).
Expand Down Expand Up @@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
_ ->
Limiter0
end,
MaxMessageSize = get_max_message_size(),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
Expand Down Expand Up @@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
reply_consumer = none,
delivery_flow = Flow,
interceptor_state = undefined,
queue_states = #{}},
queue_states = #{},
max_message_size = MaxMessageSize},
State1 = State#ch{
interceptor_state = rabbit_channel_interceptor:init(State)},
State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer),
Expand Down Expand Up @@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) ->

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

-spec get_max_message_size() -> non_neg_integer().

get_max_message_size() ->
case application:get_env(rabbit, max_message_size) of
{ok, MS} when is_integer(MS) ->
erlang:min(MS, ?MAX_MSG_SIZE);
_ ->
?MAX_MSG_SIZE
end.

%%---------------------------------------------------------------------------

reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}.
Expand Down Expand Up @@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct,
extract_topic_variable_map_from_amqp_params(_) ->
#{}.

check_msg_size(Content) ->
check_msg_size(Content, MaxMessageSize) ->
Size = rabbit_basic:maybe_gc_large_msg(Content),
case Size > ?MAX_MSG_SIZE of
true -> precondition_failed("message size ~B larger than max size ~B",
[Size, ?MAX_MSG_SIZE]);
false -> ok
case Size of
S when S > MaxMessageSize ->
ErrorMessage = case MaxMessageSize of
?MAX_MSG_SIZE ->
"message size ~B is larger than max size ~B";
_ ->
"message size ~B is larger than configured max size ~B"
end,
precondition_failed(ErrorMessage,
[Size, MaxMessageSize]);
_ -> ok
end.

check_vhost_queue_limit(#resource{name = QueueName}, VHost) ->
Expand Down Expand Up @@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username} = User,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid}) ->
check_msg_size(Content),
Content, State = #ch{virtual_host = VHostPath,
tx = Tx,
channel = ChannelNum,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username} = User,
conn_name = ConnName,
delivery_flow = Flow,
conn_pid = ConnPid,
max_message_size = MaxMessageSize}) ->
check_msg_size(Content, MaxMessageSize),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, User),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
Expand Down
77 changes: 76 additions & 1 deletion test/unit_inbroker_parallel_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

-define(TIMEOUT_LIST_OPS_PASS, 5000).
-define(TIMEOUT, 30000).
-define(TIMEOUT_CHANNEL_EXCEPTION, 5000).

-define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>).

Expand Down Expand Up @@ -60,10 +61,16 @@ groups() ->
topic_matching,
{queue_max_length, [], [
{max_length_simple, [], MaxLengthTests},
{max_length_mirrored, [], MaxLengthTests}]}
{max_length_mirrored, [], MaxLengthTests}]},
max_message_size
]}
].

suite() ->
[
{timetrap, {seconds, 30}}
].

%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
Expand Down Expand Up @@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) ->
_ -> ok
end.

gen_binary_mb(N) ->
B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>,
<< B1M || _ <- lists:seq(1, N) >>.

assert_channel_alive(Ch) ->
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>},
#amqp_msg{payload = <<"HI">>}).

assert_channel_fail_max_size(Ch, Monitor) ->
receive
{'DOWN', Monitor, process, Ch,
{shutdown,
{server_initiated_close, 406, _Error}}} ->
ok
after ?TIMEOUT_CHANNEL_EXCEPTION ->
error({channel_exception_expected, max_message_size})
end.

max_message_size(Config) ->
Binary2M = gen_binary_mb(2),
Binary4M = gen_binary_mb(4),
Binary6M = gen_binary_mb(6),
Binary10M = gen_binary_mb(10),

Size2Mb = 1024 * 1024 * 2,
Size2Mb = byte_size(Binary2M),

rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]),

{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),

%% Binary is whithin the max size limit
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
%% The channel process is alive
assert_channel_alive(Ch),

Monitor = monitor(process, Ch),
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}),
assert_channel_fail_max_size(Ch, Monitor),

%% increase the limit
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]),

{_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),

amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}),
assert_channel_alive(Ch1),

amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}),
assert_channel_alive(Ch1),

amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}),
assert_channel_alive(Ch1),

Monitor1 = monitor(process, Ch1),
amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}),
assert_channel_fail_max_size(Ch1, Monitor1),

%% increase beyond the hard limit
rabbit_ct_broker_helpers:rpc(Config, 0,
application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]),
Val = rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_channel, get_max_message_size, []),

?assertEqual(?MAX_MSG_SIZE, Val).

%% ---------------------------------------------------------------------------
%% rabbitmqctl helpers.
%% ---------------------------------------------------------------------------
Expand Down