Skip to content

Commit

Permalink
Fix dead lettering crash
Browse files Browse the repository at this point in the history
Fixes #12933

The assumption that `x-last-death-*` annotations must have been set
whenever the `deaths` annotation is set was wrong.

Reproducation steps, Option 1:
1. In v3.13.7, dead letter a message from Q1 to Q2 (both can be classic queues).
2. Re-publish the message including its x-death header from Q2 back to Q1.
(RabbitMQ 3.13.7 will interpret this x-death header and set the deaths annotation.)
3. Upgrade to v4.0.4
4. Dead letter the message from Q1 to Q2 will cause the following crash:
```
crasher:
  initial call: rabbit_amqqueue_process:init/1
  pid: <0.577.0>
  registered_name: []
  exception exit: {{badkey,<<"x-last-death-exchange">>},
                   [{mc,record_death,4,[{file,"mc.erl"},{line,410}]},
                    {rabbit_dead_letter,publish,5,
                        [{file,"rabbit_dead_letter.erl"},{line,38}]},
                    {rabbit_amqqueue_process,'-dead_letter_msgs/4-fun-0-',
                        7,
                        [{file,"rabbit_amqqueue_process.erl"},{line,1060}]},
                    {rabbit_variable_queue,'-ackfold/4-fun-0-',3,
                        [{file,"rabbit_variable_queue.erl"},{line,655}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,2146}]},
                    {rabbit_variable_queue,ackfold,4,
                        [{file,"rabbit_variable_queue.erl"},{line,652}]},
                    {rabbit_priority_queue,ackfold,4,
                        [{file,"rabbit_priority_queue.erl"},{line,309}]},
                    {rabbit_amqqueue_process,
                        '-dead_letter_rejected_msgs/3-fun-0-',5,
                        [{file,"rabbit_amqqueue_process.erl"},
                         {line,1038}]}]}
```

Reproduction steps, Option 2:
1. Run a 4.0.4 / 3.13.7 mixed version cluster where both queues Q1 and Q2
   are hosted on the 4.0.4 node.
2. Send a message to Q1 which dead letters to Q2.
3. Re-publish a message with the x-death AMQP 0.9.1 header from Q2 to
   Q1. However, this time make sure to publish to the 3.13.7 node which
   forwards this message to Q1 on the 4.0.4 node.
4. Subsequently dead lettering this message from Q1 to Q2 (happening on
   the 4.0.4 node) will also cause the crash.

The modified test case in this commit was able to repro this crash via
Option 2 in the mixed version cluster tests on the `v4.0.x` branch.
  • Loading branch information
ansd committed Dec 13, 2024
1 parent fdfbaf8 commit b6027ec
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
6 changes: 3 additions & 3 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -433,9 +433,9 @@ record_death(Reason, SourceQueue,
[{Key, NewDeath} | Deaths0]
end
end,
Anns0#{<<"x-last-death-reason">> := ReasonBin,
<<"x-last-death-queue">> := SourceQueue,
<<"x-last-death-exchange">> := Exchange,
Anns0#{<<"x-last-death-reason">> => ReasonBin,
<<"x-last-death-queue">> => SourceQueue,
<<"x-last-death-exchange">> => Exchange,
deaths := Deaths};
_ ->
Deaths = case Env of
Expand Down
32 changes: 17 additions & 15 deletions deps/rabbit/test/dead_lettering_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1327,52 +1327,54 @@ dead_letter_headers_should_be_appended_for_each_event(Config) ->

dead_letter_headers_should_not_be_appended_for_republish(Config) ->
%% here we (re-)publish a message with the DL headers already set
{Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
{Conn0, Ch0} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
{Conn1, Ch1} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 1),
Args = ?config(queue_args, Config),
Durable = ?config(queue_durable, Config),
QName = ?config(queue_name, Config),
DlxName = ?config(queue_name_dlx, Config),

DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>},
{<<"x-dead-letter-routing-key">>, longstr, DlxName}],
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = DlxName, arguments = Args, durable = Durable}),

P = <<"msg1">>,

%% Publish message
publish(Ch, QName, [P]),
publish(Ch0, QName, [P]),
wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
[DTag] = consume(Ch, QName, [P]),
[DTag] = consume(Ch0, QName, [P]),
wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
multiple = false,
requeue = false}),
amqp_channel:cast(Ch0, #'basic.nack'{delivery_tag = DTag,
multiple = false,
requeue = false}),
wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
{#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
props = #'P_basic'{headers = Headers1}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),
{array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death1, <<"reason">>)),

amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
amqp_channel:cast(Ch0, #'basic.ack'{delivery_tag = DTag1}),

wait_for_messages(Config, [[DlxName, <<"0">>, <<"0">>, <<"0">>]]),

#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = QName}),
#'queue.delete_ok'{} = amqp_channel:call(Ch0, #'queue.delete'{queue = QName}),
DeadLetterArgs1 = DeadLetterArgs ++ [{<<"x-message-ttl">>, long, 1}],
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),
#'queue.declare_ok'{} = amqp_channel:call(Ch0, #'queue.declare'{queue = QName, arguments = DeadLetterArgs1 ++ Args, durable = Durable}),

publish(Ch, QName, [P], Headers1),
publish(Ch1, QName, [P], Headers1),

wait_for_messages(Config, [[DlxName, <<"1">>, <<"1">>, <<"0">>]]),
{#'basic.get_ok'{}, #amqp_msg{payload = P,
props = #'P_basic'{headers = Headers2}}} =
amqp_channel:call(Ch, #'basic.get'{queue = DlxName}),
amqp_channel:call(Ch0, #'basic.get'{queue = DlxName}),

{array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death2, <<"reason">>)),
ok = rabbit_ct_client_helpers:close_connection(Conn).
ok = rabbit_ct_client_helpers:close_connection(Conn0),
ok = rabbit_ct_client_helpers:close_connection(Conn1).

%% Dead-lettering a message modifies its headers:
%% the exchange name is replaced with that of the latest dead-letter exchange,
Expand Down

0 comments on commit b6027ec

Please sign in to comment.