Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Subscription consumption stuck on consumer reconnect #21199

Closed
1 of 2 tasks
ghost opened this issue Sep 18, 2023 · 11 comments · Fixed by #23352 · May be fixed by nborisov/pulsar#2
Closed
1 of 2 tasks

[Bug] Subscription consumption stuck on consumer reconnect #21199

ghost opened this issue Sep 18, 2023 · 11 comments · Fixed by #23352 · May be fixed by nborisov/pulsar#2
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@ghost
Copy link

ghost commented Sep 18, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

Pulsar broker: 2.8.4
Java Pulsar client: 2.8.4

Minimal reproduce step

Non-partitioned topic. Batching is disabled on both producer and consumer. No acknowledge timeout. 5 subscriptions, each has 12 consumers.

One consumer of one subscription fails to process a message and doesn't ack it.
On a fail, I give the consumer a minute more to try to process other messages and ack them, if they are processed successfully. After a minute, I recreate the consumer and try to reprocess the messages, which would help if the error was transient.

What did you expect to see?

I expected to see the subscription backlog consumed further by the consumer with 1 failed message and by the other 11 consumers.

What did you see instead?

If a consumer fails to process one message, processing of all other messages with other keys is also stalled.
Including the other 11 consumers of the subscription.
All the other subscriptions and their consumers of the topic continue processing as expected.

As a symptom, I see the stuck subscription has "waitingReadOp" : false and "subscriptionHavePendingRead" : false, while the other subscription has these fields at true.

stats.txt
stats-internal.txt

Anything else?

The message rate is about 50 messages per second. The same scenario with a few (1-2-5) messages per minute works as expected. So, I believe there might be some race condition.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@ghost ghost added the type/bug The PR fixed a bug or issue reported a bug label Sep 18, 2023
@ghost
Copy link
Author

ghost commented Sep 18, 2023

Based on my testing, the bug reproduces only when there are a few consumers for the subscription. When there's one consumer everything works as expected.

@Technoboy-
Copy link
Contributor

image
I see there have many unack msg, and the owner broker dump file is also needed

@ghost
Copy link
Author

ghost commented Sep 19, 2023

The name of the field says it's "numberOfEntriesSinceFirstNotAckedMessage" (apparently, readPosition-markDeletePosition). It sounds like it includes the number of both successfully processed and not processed messgages since the first error.
Also, if you look at stats, it shows subscriptions.stuckSub.unackedMessages: 2, which is not many, I believe.

@ghost
Copy link
Author

ghost commented Sep 19, 2023

@Technoboy- , what do you mean by "the owner broker dump file"? broker.conf?

@ghost
Copy link
Author

ghost commented Sep 19, 2023

I've just tested unblockStuckSubscriptionEnabled=true flag and it doesn't help.

@ghost
Copy link

ghost commented Oct 18, 2023

In-depth description of this issue.
https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1697029481847159

We observe strange behaviour while not acknowledging a message due to a processing error.
We have the following setup:
14 consumers listen to a non-partitioned key-shared topic A;
in case any of them encounters a corrupted message, it restarts in some time. During some short time there is 13 consumers;
also consumers are disconnected in similar fashion while performing rolling restart;
batches are disabled on the producer and on the consumer (enableBatchIndexAcknowledgment = false);
The flow is:
producer sends a corrupted message to the topic with (message 1);
producer sends correct messages to the topic (message 2 and message 3);
consumer 1 fails to process the corrupted message (message 1);
the corrupted message 1 is recorded for replay (PersistentDispatcherMultipleConsumers#redeliveryMessages);
consumer 1 proceeds processing messages further for some time (e.g. the message 2 is processed successfully);
in some time consumer 1 stops;
consumer 2 picks up the corrupted message 1 from the replay set (PersistentDispatcherMultipleConsumers#readMoreEntries) and fails too;
consumer 1 spins up again and becomes a recently joined consumer (PersistentStickyKeyDispatcherMultipleConsumers#recentlyJoinedConsumers);
consumer 1 waits for message 1 to be acknowledged by anyone in order to be removed from recently joined consumers (PersistentStickyKeyDispatcherMultipleConsumers#removeConsumersFromRecentJoinedConsumers), so that it would be able to receive message 3;
steps 4-9 are repeated for all consumers until message 1 to be acknowledged, so none of the restarted consumers receives any messages.
We have checked the code, and this behaviour looks like a feature. We suppose it is implemented this way in order to prevent breaking the order.
Also, negative acknowledge of message 1 rather than not acknowledging at all doesn't change the behaviour.
As a result, not acknowledging message 1 or a nack for message 1, followed by restart of consumer 1 from the cluster, leads to full blocking of the topic.
So, the question is: Is it a known drawback of Pulsar or could we treat this behaviour as a bug and fix it? Are you aware of any hack to avoid the issue?
If there's a message in a replay set and we restart consumers (rolling restart or some business logic), how do we prevent the indefinite loop of retrying the corrupted message?

@ethqunzhong
Copy link
Contributor

Can normal consumption be restored after unload or restart owner broker ?

@lhotari
Copy link
Member

lhotari commented Aug 21, 2024

@equanz @codelipenghui Do you think that PIP-282 changes in #21406 21953 address this issue?

@equanz
Copy link
Contributor

equanz commented Aug 21, 2024

Do you think that PIP-282 changes in #21953 address this issue?

I think not.

In #21199 (comment) case:
After reconnecting, the consumer will be put in the recentlyJoinedConsumers with the lastSentPosition (maybe message 2). So the consumer can receive messages <= message 2.
#21953

if (readType == ReadType.Replay) {
Position minLastSentPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
if (minLastSentPositionForRecentJoinedConsumer != null
&& minLastSentPositionForRecentJoinedConsumer.compareTo(maxLastSentPosition) < 0) {
maxLastSentPosition = minLastSentPositionForRecentJoinedConsumer;
}
}
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
if ((entries.get(i)).compareTo(maxLastSentPosition) > 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
}
}

I cannot conclude that this behavior is incorrect(Isn't this one of feature to preserve ordering?).

@lhotari
Copy link
Member

lhotari commented Sep 7, 2024

It seems that this issue might be addressed together with PIP-282 changes #21953 and other PRs #23226 (merged) and #23231 (in-progress).
A remaining challenge is the lost acknowledgements issue that I'll follow-up later, that is reported as #22709, but it doesn't yet contain much analysis. #23261 is related. Lost acknowledgements will also cause problems and that issue needs to be resolved as well.

@lhotari
Copy link
Member

lhotari commented Sep 14, 2024

I've created PIP-379: Key_Shared Draining Hashes for Improved Message Ordering as a proposal to address such issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment