Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Unblock stuck Key_Shared subscription after consumer reconnect #21396

Conversation

ghost
Copy link

@ghost ghost commented Oct 19, 2023

Fixes #21199

Motivation

There's strange behaviour while not acknowledging a message due to a processing error.

The setup is:

  • 14 consumers listen to a non-partitioned key-shared topic A;
  • in case any of them encounters a corrupted message, it restarts in some time. During some short time there is 13 consumers;
  • also consumers are disconnected in similar fashion while performing rolling restart;
  • batches are disabled on the producer and on the consumer (enableBatchIndexAcknowledgment = false).

The flow is:

  1. producer sends a corrupted message to the topic with (message 1);
  2. producer sends correct messages to the topic (message 2 and message 3);
  3. consumer 1 fails to process the corrupted message (message 1);
  4. the corrupted message 1 is recorded for replay (PersistentDispatcherMultipleConsumers#redeliveryMessages);
  5. consumer 1 proceeds processing messages further for some time (e.g. the message 2 is processed successfully);
  6. in some time consumer 1 stops;
  7. consumer 2 picks up the corrupted message 1 from the replay set (PersistentDispatcherMultipleConsumers#readMoreEntries) and fails too;
  8. consumer 1 spins up again and becomes a recently joined consumer (PersistentStickyKeyDispatcherMultipleConsumers#recentlyJoinedConsumers);
  9. consumer 1 waits for message 1 to be acknowledged by anyone in order to be removed from recently joined consumers (PersistentStickyKeyDispatcherMultipleConsumers#removeConsumersFromRecentJoinedConsumers), so that it would be able to receive message 3;
  10. steps 4-9 are repeated for all consumers until message 1 to be acknowledged, so none of the restarted consumers receives any messages.

As a result, not acknowledging message 1 for message 1, followed by restart of consumer 1 from the cluster, leads to full blocking of the topic.

I checked the code and I suppose it is implemented this way in order to prevent breaking the order.

Modifications

I used MessageRedeliveryController in order to track if the not-acked message has not yet been sent to a consumer. Until it is not, I block sending other messages with the same key hash. After the not-acked message is sent, the other message are also allowed to be sent.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: nborisov#1

@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Oct 19, 2023
@ghost ghost force-pushed the unblock_stuck_keyshared_after_consumer_connect branch from 67a2b08 to 573700a Compare October 19, 2023 10:20
Comment on lines 269 to 271
entriesWithSameKey.stream()
.filter(entryWithTheSameKey -> !entriesForC.contains(entryWithTheSameKey))
.forEach(entryToReplay -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw. In the Pulsar code base, the Java Streams API is avoided in performance hotspots to reduce GC pressure. I'm not sure if that helps in practice, but that's one reason why plain for loops are preferred. :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good note, refactored.

@liudezhi2098
Copy link
Contributor

Will this change affect the ordering of consumption in the Key_Shared subscription mode?

@ghost
Copy link
Author

ghost commented Nov 10, 2023

the changes could break messages ordering. closing the MR

@ghost ghost closed this Nov 10, 2023
@ghost ghost deleted the unblock_stuck_keyshared_after_consumer_connect branch November 10, 2023 19:04
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-required Your PR changes impact docs and you will update later. ready-to-test
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Subscription consumption stuck on consumer reconnect
3 participants