diff --git a/Makefile b/Makefile index e26f32d89f29..094affc2df1e 100644 --- a/Makefile +++ b/Makefile @@ -130,7 +130,9 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000} + {channel_queue_cleanup_interval, 60000}, + %% Default max message size is 128 MB + {max_message_size, 134217728} ] endef diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 8e5c6164c9c4..ef3dafd11665 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -554,6 +554,9 @@ end}. }. +{mapping, "msx_message_size", "rabbit.max_message_size", + [{datatype, integer}, {validators, ["less_then_512MB"]}]}. + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) -> Size > 0 andalso Size < 2147483648 end}. +{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0", +fun(Size) when is_integer(Size) -> + Size > 0 andalso Size < 536870912 +end}. + {validator, "less_than_1", "Flooat is not beetween 0 and 1", fun(Float) when is_float(Float) -> Float > 0 andalso Float < 1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b0652854..eeae24719335 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> ok + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B is larger than max size ~B"; + _ -> + "message size ~B is larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d8031ce6d781..c0a4ba53a398 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -25,6 +25,7 @@ -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). +-define(TIMEOUT_CHANNEL_EXCEPTION, 5000). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). @@ -60,10 +61,16 @@ groups() -> topic_matching, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]} + {max_length_mirrored, [], MaxLengthTests}]}, + max_message_size ]} ]. +suite() -> + [ + {timetrap, {seconds, 30}} + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -1299,6 +1306,74 @@ sync_mirrors(QName, Config) -> _ -> ok end. +gen_binary_mb(N) -> + B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>, + << B1M || _ <- lists:seq(1, N) >>. + +assert_channel_alive(Ch) -> + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, + #amqp_msg{payload = <<"HI">>}). + +assert_channel_fail_max_size(Ch, Monitor) -> + receive + {'DOWN', Monitor, process, Ch, + {shutdown, + {server_initiated_close, 406, _Error}}} -> + ok + after ?TIMEOUT_CHANNEL_EXCEPTION -> + error({channel_exception_expected, max_message_size}) + end. + +max_message_size(Config) -> + Binary2M = gen_binary_mb(2), + Binary4M = gen_binary_mb(4), + Binary6M = gen_binary_mb(6), + Binary10M = gen_binary_mb(10), + + Size2Mb = 1024 * 1024 * 2, + Size2Mb = byte_size(Binary2M), + + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]), + + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + %% Binary is whithin the max size limit + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), + %% The channel process is alive + assert_channel_alive(Ch), + + Monitor = monitor(process, Ch), + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), + assert_channel_fail_max_size(Ch, Monitor), + + %% increase the limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]), + + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}), + assert_channel_alive(Ch1), + + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}), + assert_channel_fail_max_size(Ch1, Monitor1), + + %% increase beyond the hard limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]), + Val = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_channel, get_max_message_size, []), + + ?assertEqual(?MAX_MSG_SIZE, Val). + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% ---------------------------------------------------------------------------