-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix potential to add duplicated consumer #15051
Conversation
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your explanation. LGTM
The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
Please improve the error message.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Outdated
Show resolved
Hide resolved
7e7f914
to
34b9a14
Compare
I was wondering about the same thing whether this is just a refactoring. This does fix a race condition in providing the error message. It's possible the the CompletableFuture completes after it has been checked in the first place.
In that case, the old code might have provided a wrong error code and removed the future.
@poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that? |
This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry. You can reproduce the problem like this (The code is in the attachment):
ConsumerImpl.java.txt |
I'm sorry I didn't explain clearly what I meant. Please explain how this PR prevents adding duplicate consumers. I don't see a chance in behavior for that particular detail. I do acknowledge that this PR makes sense to fix a race where the consumer gets removed when it shouldn't. (I explained that in my previous comment). However, I don't see how this prevents adding of duplicate consumers which appears in #14970. |
It's just a guess at one of the possibilities for "Why #14970 problem happend". Because there has only one instance in
After code-review on the second possibility, finds possibility-case and fixed it. Maybe there are other problems that can reproduce #14970 problem |
I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented. As mentioned before, this PR is useful since it fixes a race condition and returns ServiceNotReady in the case that the future completes after the check has been made (as explained in #15051 (comment)). |
Please be aware of this: apache/pulsar-test-infra#28 (comment) There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR. |
Sorry. I click on the wrong button |
@lhotari |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! cloud you please add a test for test, prevent it from being changed back later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Great work! Please add a unit test to prevent regression.
/pulsarbot run-failure-checks |
/pulsarbot run-failure-checks |
@codelipenghui Could you take a look ? |
/pulsarbot run-failure-checks |
It's because of this issue #13787. Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below: ``` for ( long requestId = 1; i < 5; i++ ){ ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); cnx.sendRequestWithId(request1, requestId).thenRun(() -> {}); } ``` The root cause is below snippet: https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021 If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated. The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally. Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side. Modify the process execution sequence to improve stability (cherry picked from commit 7bf495a)
It's because of this issue apache#13787. Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below: ``` for ( long requestId = 1; i < 5; i++ ){ ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); cnx.sendRequestWithId(request1, requestId).thenRun(() -> {}); } ``` The root cause is below snippet: https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021 If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated. The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally. Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side. Modify the process execution sequence to improve stability (cherry picked from commit 7bf495a) (cherry picked from commit 9dead56)
### Motivation It's because of this issue #13787. Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below: ``` for ( long requestId = 1; i < 5; i++ ){ ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this)); cnx.sendRequestWithId(request1, requestId).thenRun(() -> {}); } ``` The root cause is below snippet: https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021 If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated. The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally. Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side. ### Modifications Modify the process execution sequence to improve stability (cherry picked from commit 7bf495a)
@poorbarcode Could you migrate this PR to branch-2.8? It looks like the Mockito version of branch-2.8 doesn't support the creation of static mocks.
|
ok. see #16826 |
Squash merge branch 'fix_mutiple_consumers' into '2.8.1' Fixes #<xyz> ### Motivation --bug=100319271 修复Consumer重名问题 chery pick apache#15051 TAPD: --bug=100319271
Motivation
It's because of this issue #13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
The root cause is below snippet:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
Lines 994 to 1021 in c2c05c4
If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.
The right way to remove consumer (line 1015) is when the
existingConsumerFuture
is completedExceptionally.Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle
subscribe
correctly on the broker side.Modifications
Modify the process execution sequence to improve stability
Verifying this change
Documentation
no-need-doc
doc-not-needed