From 03fc22ddd9d0b47fb735f36c4c5421a7baeb2ad9 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Thu, 27 Dec 2018 19:26:37 +0400 Subject: [PATCH 1/4] Introduce a configurable limit to message size. Add `max_message_size` configuration to configure limit in bytes. If message is bigger - channel exception will be thrown. Default limit is 128MB. There is still a hard limit of 521MB. [#161983593] --- Makefile | 4 +- priv/schema/rabbit.schema | 8 +++ src/rabbit_channel.erl | 10 +++- test/unit_inbroker_parallel_SUITE.erl | 79 +++++++++++++++++++++++++++ 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index e26f32d89f29..094affc2df1e 100644 --- a/Makefile +++ b/Makefile @@ -130,7 +130,9 @@ define PROJECT_ENV {vhost_restart_strategy, continue}, %% {global, prefetch count} {default_consumer_prefetch, {false, 0}}, - {channel_queue_cleanup_interval, 60000} + {channel_queue_cleanup_interval, 60000}, + %% Default max message size is 128 MB + {max_message_size, 134217728} ] endef diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 5c6078a413e6..5c62daaf8a02 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -554,6 +554,9 @@ end}. }. +{mapping, "msx_message_size", "rabbit.max_message_size", + [{datatype, integer}, {validators, ["less_then_512MB"]}]}. + %% Customising Socket Options. %% %% See (http://www.erlang.org/doc/man/inet.html#setopts-2) for @@ -1361,6 +1364,11 @@ fun(Size) when is_integer(Size) -> Size > 0 andalso Size < 2147483648 end}. +{validator, "less_then_512MB", "Max message size should be less than 512MB and gre than 0", +fun(Size) when is_integer(Size) -> + Size > 0 andalso Size < 536870912 +end}. + {validator, "less_than_1", "Flooat is not beetween 0 and 1", fun(Float) when is_float(Float) -> Float > 0 andalso Float < 1 diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b0652854..f626f059a02f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -990,7 +990,15 @@ check_msg_size(Content) -> case Size > ?MAX_MSG_SIZE of true -> precondition_failed("message size ~B larger than max size ~B", [Size, ?MAX_MSG_SIZE]); - false -> ok + false -> + case application:get_env(rabbit, max_message_size) of + {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize -> + precondition_failed("message size ~B larger than" + " configured max size ~B", + [Size, MaxSize]); + + _ -> ok + end end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d8031ce6d781..a97da8a715ac 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -58,6 +58,7 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, + max_message_size, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, {max_length_mirrored, [], MaxLengthTests}]} @@ -1299,6 +1300,84 @@ sync_mirrors(QName, Config) -> _ -> ok end. +gen_binary_mb(N) -> + B1M = << <<"_">> || _ <- lists:seq(1, 1024 * 1024) >>, + << B1M || _ <- lists:seq(1, N) >>. + +assert_channel_alive(Ch) -> + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, + #amqp_msg{payload = <<"HI">>}). + +assert_channel_fail_max_size(Ch, Monitor, ExpectedException) -> + receive + {'DOWN', Monitor, process, Ch, + {shutdown, + {server_initiated_close, 406, Exception}}} -> + ?assertMatch(Exception, ExpectedException) + after 100000 -> + error({channel_exception_expected, max_message_size}) + end. + +max_message_size(Config) -> + Binary128M = gen_binary_mb(128), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + %% Default message size is 128MB + Size128Mb = 1024 * 1024 * 128, + Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0, + application, get_env, [rabbit, max_message_size, undefined]), + + Size128Mb = byte_size(Binary128M), + %% Binary is whithin the max size limit + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + %% Channel process is alive + assert_channel_alive(Ch), + + Monitor = monitor(process, Ch), + %% This publish should cause a channel exception + BinaryBiggerThan128M = <<"_", Binary128M/binary>>, + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + ct:pal("Assert channel error 128"), + ExpectedException = <<"PRECONDITION_FAILED - message size ", + (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary, + " larger than configured max size ", + (integer_to_binary(Size128Mb))/binary>>, + assert_channel_fail_max_size(Ch, Monitor, ExpectedException), + + + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + + %% Set a bigger message size + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + assert_channel_alive(Ch1), + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + assert_channel_alive(Ch1), + + %% Set message size above 512MB. + %% The actual limit will be 512MB + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), + + Binary512M = << Binary128M/binary, Binary128M/binary, + Binary128M/binary, Binary128M/binary>>, + + BinaryBiggerThan512M = <<"_", Binary512M/binary>>, + + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), + assert_channel_alive(Ch1), + + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), + ct:pal("Assert channel error 512"), + ExpectedException1 = <<"PRECONDITION_FAILED - message size ", + (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, + " larger than max size ", + (integer_to_binary(byte_size(Binary512M)))/binary>>, + assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1). + %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. %% --------------------------------------------------------------------------- From 7858e4499d679c7399e7593c34b06d55982abec4 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Fri, 28 Dec 2018 17:14:25 +0400 Subject: [PATCH 2/4] Move application environment read to the channel intialisation. This can save us some reductions by avoiding ets read. [#161983593] --- src/rabbit_channel.erl | 59 +++++++++++++++------------ test/unit_inbroker_parallel_SUITE.erl | 19 +++++---- 2 files changed, 45 insertions(+), 33 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f626f059a02f..1579383f2596 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +481,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -985,20 +994,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> - case application:get_env(rabbit, max_message_size) of - {ok, MaxSize} when is_integer(MaxSize) andalso Size > MaxSize -> - precondition_failed("message size ~B larger than" - " configured max size ~B", - [Size, MaxSize]); - - _ -> ok - end + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B larger than max size ~B"; + _ -> + "message size ~B larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1172,16 +1180,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index a97da8a715ac..94fcc3a177ee 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1343,13 +1343,12 @@ max_message_size(Config) -> (integer_to_binary(Size128Mb))/binary>>, assert_channel_fail_max_size(Ch, Monitor, ExpectedException), - - {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - %% Set a bigger message size rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), assert_channel_alive(Ch1), @@ -1361,22 +1360,26 @@ max_message_size(Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), + %% Need a new channel for changes to take effect + rabbit_ct_client_helpers:close_channel(Ch1), + Ch2 = rabbit_ct_client_helpers:open_channel(Config), + Binary512M = << Binary128M/binary, Binary128M/binary, Binary128M/binary, Binary128M/binary>>, BinaryBiggerThan512M = <<"_", Binary512M/binary>>, - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), - assert_channel_alive(Ch1), + amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), + assert_channel_alive(Ch2), - Monitor1 = monitor(process, Ch1), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), + Monitor2 = monitor(process, Ch2), + amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), ct:pal("Assert channel error 512"), ExpectedException1 = <<"PRECONDITION_FAILED - message size ", (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, " larger than max size ", (integer_to_binary(byte_size(Binary512M)))/binary>>, - assert_channel_fail_max_size(Ch1, Monitor1, ExpectedException1). + assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1). %% --------------------------------------------------------------------------- %% rabbitmqctl helpers. From 2cd21ee0e87cfd646b753cfbb8de32b50773bd68 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 2 Jan 2019 00:49:36 +0300 Subject: [PATCH 3/4] Open the channel after max length limit is configured --- test/unit_inbroker_parallel_SUITE.erl | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 94fcc3a177ee..126f3b9083bd 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1320,16 +1320,18 @@ assert_channel_fail_max_size(Ch, Monitor, ExpectedException) -> max_message_size(Config) -> Binary128M = gen_binary_mb(128), - {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + %% Default message size is 128MB Size128Mb = 1024 * 1024 * 128, + Size128Mb = byte_size(Binary128M), + Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0, application, get_env, [rabbit, max_message_size, undefined]), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Size128Mb = byte_size(Binary128M), %% Binary is whithin the max size limit amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), - %% Channel process is alive + %% The channel process is alive assert_channel_alive(Ch), Monitor = monitor(process, Ch), From b1b995ca9f7eb45c90cbfb31194c81af3e798321 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 2 Jan 2019 02:36:56 +0300 Subject: [PATCH 4/4] Rework message size limit test * Use smaller messages for tests * No need to publish a message above the hard limit, use a helper (these are unit tests) * Wording --- src/rabbit_channel.erl | 23 ++++--- test/unit_inbroker_parallel_SUITE.erl | 89 ++++++++++++--------------- 2 files changed, 54 insertions(+), 58 deletions(-) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1579383f2596..eeae24719335 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -443,12 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, - MaxMessageSize = case application:get_env(rabbit, max_message_size) of - {ok, MS} when is_integer(MS) -> - erlang:min(MS, ?MAX_MSG_SIZE); - _ -> - ?MAX_MSG_SIZE - end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -802,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -1000,9 +1005,9 @@ check_msg_size(Content, MaxMessageSize) -> S when S > MaxMessageSize -> ErrorMessage = case MaxMessageSize of ?MAX_MSG_SIZE -> - "message size ~B larger than max size ~B"; + "message size ~B is larger than max size ~B"; _ -> - "message size ~B larger than configured max size ~B" + "message size ~B is larger than configured max size ~B" end, precondition_failed(ErrorMessage, [Size, MaxMessageSize]); diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 126f3b9083bd..c0a4ba53a398 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -25,6 +25,7 @@ -define(TIMEOUT_LIST_OPS_PASS, 5000). -define(TIMEOUT, 30000). +-define(TIMEOUT_CHANNEL_EXCEPTION, 5000). -define(CLEANUP_QUEUE_NAME, <<"cleanup-queue">>). @@ -58,13 +59,18 @@ groups() -> set_disk_free_limit_command, set_vm_memory_high_watermark_command, topic_matching, - max_message_size, {queue_max_length, [], [ {max_length_simple, [], MaxLengthTests}, - {max_length_mirrored, [], MaxLengthTests}]} + {max_length_mirrored, [], MaxLengthTests}]}, + max_message_size ]} ]. +suite() -> + [ + {timetrap, {seconds, 30}} + ]. + %% ------------------------------------------------------------------- %% Testsuite setup/teardown. %% ------------------------------------------------------------------- @@ -1308,80 +1314,65 @@ assert_channel_alive(Ch) -> amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = <<"HI">>}). -assert_channel_fail_max_size(Ch, Monitor, ExpectedException) -> +assert_channel_fail_max_size(Ch, Monitor) -> receive {'DOWN', Monitor, process, Ch, {shutdown, - {server_initiated_close, 406, Exception}}} -> - ?assertMatch(Exception, ExpectedException) - after 100000 -> + {server_initiated_close, 406, _Error}}} -> + ok + after ?TIMEOUT_CHANNEL_EXCEPTION -> error({channel_exception_expected, max_message_size}) end. max_message_size(Config) -> - Binary128M = gen_binary_mb(128), + Binary2M = gen_binary_mb(2), + Binary4M = gen_binary_mb(4), + Binary6M = gen_binary_mb(6), + Binary10M = gen_binary_mb(10), + + Size2Mb = 1024 * 1024 * 2, + Size2Mb = byte_size(Binary2M), - %% Default message size is 128MB - Size128Mb = 1024 * 1024 * 128, - Size128Mb = byte_size(Binary128M), + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 3]), - Size128Mb = rabbit_ct_broker_helpers:rpc(Config, 0, - application, get_env, [rabbit, max_message_size, undefined]), {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), %% Binary is whithin the max size limit - amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), %% The channel process is alive assert_channel_alive(Ch), Monitor = monitor(process, Ch), - %% This publish should cause a channel exception - BinaryBiggerThan128M = <<"_", Binary128M/binary>>, - amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), - ct:pal("Assert channel error 128"), - ExpectedException = <<"PRECONDITION_FAILED - message size ", - (integer_to_binary(byte_size(BinaryBiggerThan128M)))/binary, - " larger than configured max size ", - (integer_to_binary(Size128Mb))/binary>>, - assert_channel_fail_max_size(Ch, Monitor, ExpectedException), - - %% Set a bigger message size + amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary4M}), + assert_channel_fail_max_size(Ch, Monitor), + + %% increase the limit rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, max_message_size, 1024 * 1024 * 256]), + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 8]), {_, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary128M}), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary2M}), assert_channel_alive(Ch1), - amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan128M}), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary4M}), assert_channel_alive(Ch1), - %% Set message size above 512MB. - %% The actual limit will be 512MB - rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, max_message_size, 1024 * 1024 * 515]), - - %% Need a new channel for changes to take effect - rabbit_ct_client_helpers:close_channel(Ch1), - Ch2 = rabbit_ct_client_helpers:open_channel(Config), - - Binary512M = << Binary128M/binary, Binary128M/binary, - Binary128M/binary, Binary128M/binary>>, + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary6M}), + assert_channel_alive(Ch1), - BinaryBiggerThan512M = <<"_", Binary512M/binary>>, + Monitor1 = monitor(process, Ch1), + amqp_channel:call(Ch1, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary10M}), + assert_channel_fail_max_size(Ch1, Monitor1), - amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = Binary512M}), - assert_channel_alive(Ch2), + %% increase beyond the hard limit + rabbit_ct_broker_helpers:rpc(Config, 0, + application, set_env, [rabbit, max_message_size, 1024 * 1024 * 600]), + Val = rabbit_ct_broker_helpers:rpc(Config, 0, + rabbit_channel, get_max_message_size, []), - Monitor2 = monitor(process, Ch2), - amqp_channel:call(Ch2, #'basic.publish'{routing_key = <<"nope">>}, #amqp_msg{payload = BinaryBiggerThan512M}), - ct:pal("Assert channel error 512"), - ExpectedException1 = <<"PRECONDITION_FAILED - message size ", - (integer_to_binary(byte_size(BinaryBiggerThan512M)))/binary, - " larger than max size ", - (integer_to_binary(byte_size(Binary512M)))/binary>>, - assert_channel_fail_max_size(Ch2, Monitor2, ExpectedException1). + ?assertEqual(?MAX_MSG_SIZE, Val). %% --------------------------------------------------------------------------- %% rabbitmqctl helpers.