From b6027ece28cb69d87cfec82ffbfb5f2e26339b42 Mon Sep 17 00:00:00 2001 From: David Ansari Date: Fri, 13 Dec 2024 17:17:50 +0100 Subject: [PATCH] Fix dead lettering crash Fixes #12933 The assumption that `x-last-death-*` annotations must have been set whenever the `deaths` annotation is set was wrong. Reproducation steps, Option 1: 1. In v3.13.7, dead letter a message from Q1 to Q2 (both can be classic queues). 2. Re-publish the message including its x-death header from Q2 back to Q1. (RabbitMQ 3.13.7 will interpret this x-death header and set the deaths annotation.) 3. Upgrade to v4.0.4 4. Dead letter the message from Q1 to Q2 will cause the following crash: ``` crasher: initial call: rabbit_amqqueue_process:init/1 pid: <0.577.0> registered_name: [] exception exit: {{badkey,<<"x-last-death-exchange">>}, [{mc,record_death,4,[{file,"mc.erl"},{line,410}]}, {rabbit_dead_letter,publish,5, [{file,"rabbit_dead_letter.erl"},{line,38}]}, {rabbit_amqqueue_process,'-dead_letter_msgs/4-fun-0-', 7, [{file,"rabbit_amqqueue_process.erl"},{line,1060}]}, {rabbit_variable_queue,'-ackfold/4-fun-0-',3, [{file,"rabbit_variable_queue.erl"},{line,655}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}, {rabbit_variable_queue,ackfold,4, [{file,"rabbit_variable_queue.erl"},{line,652}]}, {rabbit_priority_queue,ackfold,4, [{file,"rabbit_priority_queue.erl"},{line,309}]}, {rabbit_amqqueue_process, '-dead_letter_rejected_msgs/3-fun-0-',5, [{file,"rabbit_amqqueue_process.erl"}, {line,1038}]}]} ``` Reproduction steps, Option 2: 1. Run a 4.0.4 / 3.13.7 mixed version cluster where both queues Q1 and Q2 are hosted on the 4.0.4 node. 2. Send a message to Q1 which dead letters to Q2. 3. Re-publish a message with the x-death AMQP 0.9.1 header from Q2 to Q1. However, this time make sure to publish to the 3.13.7 node which forwards this message to Q1 on the 4.0.4 node. 4. Subsequently dead lettering this message from Q1 to Q2 (happening on the 4.0.4 node) will also cause the crash. The modified test case in this commit was able to repro this crash via Option 2 in the mixed version cluster tests on the `v4.0.x` branch. --- deps/rabbit/src/mc.erl | 6 ++--- deps/rabbit/test/dead_lettering_SUITE.erl | 32 ++++++++++++----------- 2 files changed, 20 insertions(+), 18 deletions(-) diff --git a/deps/rabbit/src/mc.erl b/deps/rabbit/src/mc.erl index b3c51dca3976..1ddc32d6a36d 100644 --- a/deps/rabbit/src/mc.erl +++ b/deps/rabbit/src/mc.erl @@ -433,9 +433,9 @@ record_death(Reason, SourceQueue, [{Key, NewDeath} | Deaths0] end end, - Anns0#{<<"x-last-death-reason">> := ReasonBin, - <<"x-last-death-queue">> := SourceQueue, - <<"x-last-death-exchange">> := Exchange, + Anns0#{<<"x-last-death-reason">> => ReasonBin, + <<"x-last-death-queue">> => SourceQueue, + <<"x-last-death-exchange">> => Exchange, deaths := Deaths}; _ -> Deaths = case Env of diff --git a/deps/rabbit/test/dead_lettering_SUITE.erl b/deps/rabbit/test/dead_lettering_SUITE.erl index b793cb3abebd..e867f77f9a09 100644 --- a/deps/rabbit/test/dead_lettering_SUITE.erl +++ b/deps/rabbit/test/dead_lettering_SUITE.erl @@ -1327,7 +1327,8 @@ dead_letter_headers_should_be_appended_for_each_event(Config) -> dead_letter_headers_should_not_be_appended_for_republish(Config) -> %% here we (re-)publish a message with the DL headers already set - {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + {Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + {Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1), Args = ?config(queue_args, Config), Durable = ?config(queue_durable, Config), QName = ?config(queue_name, Config), @@ -1335,44 +1336,45 @@ dead_letter_headers_should_not_be_appended_for_republish(Config) -> DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}, {<<"x-dead-letter-routing-key">>, longstr, DlxName}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}), P = <<"msg1">>, %% Publish message - publish(Ch, QName, [P]), + publish(Ch0, QName, [P]), wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), - [DTag] = consume(Ch, QName, [P]), + [DTag] = consume(Ch0, QName, [P]), wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, - multiple = false, - requeue = false}), + amqp_channel:cast(Ch0, #'basic.nack'{delivery_tag = DTag, + multiple = false, + requeue = false}), wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]), {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P, props = #'P_basic'{headers = Headers1}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DlxName}), + amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}), {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), + amqp_channel:cast(Ch0, #'basic.ack'{delivery_tag = DTag1}), wait_for_messages(Config, [[DlxName, <<"0">>, <<"0">>, <<"0">>]]), - #'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + #'queue.delete_ok'{} = amqp_channel:call(Ch0, #'queue.delete'{queue = QName}), DeadLetterArgs1 = DeadLetterArgs ++ [{<<"x-message-ttl">>, long, 1}], - #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}), - publish(Ch, QName, [P], Headers1), + publish(Ch1, QName, [P], Headers1), wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]), {#'basic.get_ok'{}, #amqp_msg{payload = P, props = #'P_basic'{headers = Headers2}}} = - amqp_channel:call(Ch, #'basic.get'{queue = DlxName}), + amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}), {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)), - ok = rabbit_ct_client_helpers:close_connection(Conn). + ok = rabbit_ct_client_helpers:close_connection(Conn0), + ok = rabbit_ct_client_helpers:close_connection(Conn1). %% Dead-lettering a message modifies its headers: %% the exchange name is replaced with that of the latest dead-letter exchange,