Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix potential to add duplicated consumer #15051

Merged
merged 1 commit into from
May 18, 2022

Conversation

poorbarcode
Copy link
Contributor

@poorbarcode poorbarcode commented Apr 6, 2022

Motivation

It's because of this issue #13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:

for ( long requestId = 1; i < 5; i++ ){
  ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
          priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
          conf.isReplicateSubscriptionState(),
          InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
          startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
          // Use the current epoch to subscribe.
          conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
  cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
}

The root cause is below snippet:

if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
"Consumer is already present on the connection");
return null;
}
}

If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.

The right way to remove consumer (line 1015) is when the existingConsumerFuture is completedExceptionally.

Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle subscribe correctly on the broker side.

Modifications

Modify the process execution sequence to improve stability

Verifying this change

  • Make sure that the change passes the CI checks.

Documentation

  • no-need-doc

  • doc-not-needed

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Apr 6, 2022
@Technoboy- Technoboy- changed the title [fix] [broker] make ServerCnx.handleSubscribe thread safety [fix] [broker] Fix ServerCnx.handleSubscribe add duplicated consumer issue Apr 7, 2022
@Technoboy- Technoboy- changed the title [fix] [broker] Fix ServerCnx.handleSubscribe add duplicated consumer issue [fix] [broker] Fix ServerCnx.handleSubscribe potentially add duplicated consumer issue Apr 7, 2022
@Technoboy- Technoboy- changed the title [fix] [broker] Fix ServerCnx.handleSubscribe potentially add duplicated consumer issue [fix] [broker] Fix ServerCnx.handleSubscribe add potentially duplicated consumer issue Apr 7, 2022
@poorbarcode poorbarcode changed the title [fix] [broker] Fix ServerCnx.handleSubscribe add potentially duplicated consumer issue [fix] [broker] Fix potential risk: Command-Subscribe add duplicated consumer Apr 7, 2022
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?

@RobertIndie RobertIndie added type/bug The PR fixed a bug or issue reported a bug area/broker labels Apr 7, 2022
@Technoboy- Technoboy- changed the title [fix] [broker] Fix potential risk: Command-Subscribe add duplicated consumer [fix][broker] Fix potential to add duplicated consumer Apr 7, 2022
Copy link
Member

@RobertIndie RobertIndie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your explanation. LGTM

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Apr 8, 2022

Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?

The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it.

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

Please improve the error message.

@poorbarcode poorbarcode force-pushed the fix/15050 branch 2 times, most recently from 7e7f914 to 34b9a14 Compare April 8, 2022 04:30
@lhotari
Copy link
Member

lhotari commented Apr 8, 2022

Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?

The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it.

I was wondering about the same thing whether this is just a refactoring. This does fix a race condition in providing the error message. It's possible the the CompletableFuture completes after it has been checked in the first place.

                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {

In that case, the old code might have provided a wrong error code and removed the future.

                        if (!existingConsumerFuture.isDone()) {
                            error = ServerError.ServiceNotReady;
                        } else {
                            error = getErrorCode(existingConsumerFuture);
                            consumers.remove(consumerId, existingConsumerFuture);
                        }

@poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Apr 8, 2022

Seems that this PR only enhances the exception handling. Could you provide more information on how this PR fixed the duplicated consumer issue?

The key point for the problem was "possibly delete a running CompletableFuture", the logic of the original code was not rigorous enough. this PR solved it.

I was wondering about the same thing whether this is just a refactoring. This does fix a race condition in providing the error message. It's possible the the CompletableFuture completes after it has been checked. In that case, the old code might have provided a wrong error code.

@poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?

@lhotari @RobertIndie

This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.

You can reproduce the problem like this (The code is in the attachment):

  • Override ConsumerImpl.java by acttachment file. These change will Execute command-Subscribe more times in a short time
  • Overide ServerCnx.java by acttachment file. These change controls the execution order of multiple threads, increasing the probability of problems to 100%
  • Add SubscribeProcessController.java with attachment file. It is multi thread execution-order-controller

ConsumerImpl.java.txt
ServerCnx.java.txt
SubscribeProcessController.java.txt

@lhotari
Copy link
Member

lhotari commented Apr 8, 2022

@poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?

@lhotari @RobertIndie

This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.

You can reproduce the problem like this (The code is in the attachment):

I'm sorry I didn't explain clearly what I meant. Please explain how this PR prevents adding duplicate consumers. I don't see a chance in behavior for that particular detail. I do acknowledge that this PR makes sense to fix a race where the consumer gets removed when it shouldn't. (I explained that in my previous comment). However, I don't see how this prevents adding of duplicate consumers which appears in #14970.

@poorbarcode poorbarcode closed this Apr 8, 2022
@poorbarcode poorbarcode reopened this Apr 8, 2022
@poorbarcode
Copy link
Contributor Author

@poorbarcode I don't think that this PR prevents duplicated consumers. Could you explain how it achieves that?

@lhotari @RobertIndie
This PR fixed that : When things go wrong with consumer operations, duplicated consumers registry.
You can reproduce the problem like this (The code is in the attachment):

I'm sorry I didn't explain clearly what I meant. Please explain how this PR prevents adding duplicate consumers. I don't see a chance in behavior for that particular detail. I do acknowledge that this PR makes sense to fix a race where the consumer gets removed when it shouldn't. (I explained that in my previous comment). However, I don't see how this prevents adding of duplicate consumers which appears in #14970.

It's just a guess at one of the possibilities for "Why #14970 problem happend".

Because there has only one instance in consumerSet, so two consumer instance has same address & consumerId, so there are only two possibilities:

  • Consumer restart use same SocketAddress.
  • One consumer instance execute command-subscribe more times in a short time.

After code-review on the second possibility, finds possibility-case and fixed it.

Maybe there are other problems that can reproduce #14970 problem

@lhotari
Copy link
Member

lhotari commented Apr 8, 2022

It's just a guess at one of the possibilities for "Why #14970 problem happend".

Because there has only one instance in consumerSet, so two consumer instance has same address & consumerId, so there are only two possibilities:

  • Consumer restart use same SocketAddress.
  • One consumer instance execute command-subscribe more times in a short time.

After code-review on the second possibility, finds possibility-case and fixed it.

Maybe there are other problems that can reproduce #14970 problem

I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented.

As mentioned before, this PR is useful since it fixes a race condition and returns ServiceNotReady in the case that the future completes after the check has been made (as explained in #15051 (comment)).

@lhotari
Copy link
Member

lhotari commented Apr 8, 2022

poorbarcode reopened this 1 hour ago

/pulsarbot run-failure-checks

Please be aware of this: apache/pulsar-test-infra#28 (comment)

There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR.

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Apr 8, 2022

/pulsarbot run-failure-checks

Please be aware of this: apache/pulsar-test-infra#28 (comment)

There shouldn't be a need to close & re-open the PR to rerun failed jobs. Please read the above explanations to understand what some failures are about. They don't need to clear off to be able to merge the PR.

Sorry. I click on the wrong button

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Apr 8, 2022

I don't think that this PR fixes that issue. ServerCnx is tied to a single TCP/IP connection and even without the changes in this PR, duplicates on the same connection are prevented.

@lhotari
Maybe you ignored the async-execute-code in ServerCnx.handleSubscribe. if without async-execute, you are right: "all task will run at same eventLoop". So I still think I'm right: "this PR fixed #14970 problem, a part of possible"

Copy link
Contributor

@congbobo184 congbobo184 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! cloud you please add a test for test, prevent it from being changed back later

Copy link
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Great work! Please add a unit test to prevent regression.

@poorbarcode
Copy link
Contributor Author

/pulsarbot run-failure-checks

@poorbarcode
Copy link
Contributor Author

/pulsarbot run-failure-checks

@poorbarcode
Copy link
Contributor Author

@codelipenghui Could you take a look ?

@poorbarcode
Copy link
Contributor Author

/pulsarbot run-failure-checks

@congbobo184 congbobo184 merged commit 7bf495a into apache:master May 18, 2022
codelipenghui pushed a commit that referenced this pull request May 20, 2022
It's because of this issue #13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
```
for ( long requestId = 1; i < 5; i++ ){
  ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
          priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
          conf.isReplicateSubscriptionState(),
          InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
          startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
          // Use the current epoch to subscribe.
          conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
  cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
}
```

The root cause is below snippet:
https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.

The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.

Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.

Modify the process execution sequence to improve stability

(cherry picked from commit 7bf495a)
@poorbarcode poorbarcode deleted the fix/15050 branch May 23, 2022 11:39
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request May 23, 2022
It's because of this issue apache#13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
```
for ( long requestId = 1; i < 5; i++ ){
  ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
          priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
          conf.isReplicateSubscriptionState(),
          InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
          startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
          // Use the current epoch to subscribe.
          conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
  cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
}
```

The root cause is below snippet:
https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.

The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.

Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.

Modify the process execution sequence to improve stability

(cherry picked from commit 7bf495a)
(cherry picked from commit 9dead56)
mattisonchao pushed a commit that referenced this pull request May 25, 2022
### Motivation
It's because of this issue #13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
```
for ( long requestId = 1; i < 5; i++ ){
  ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
          priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
          conf.isReplicateSubscriptionState(),
          InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
          startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
          // Use the current epoch to subscribe.
          conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
  cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
}
```

The root cause is below snippet:
https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.

The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.

Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.

### Modifications

Modify the process execution sequence to improve stability

(cherry picked from commit 7bf495a)
@mattisonchao mattisonchao added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label May 25, 2022
@BewareMyPower
Copy link
Contributor

@poorbarcode Could you migrate this PR to branch-2.8? It looks like the Mockito version of branch-2.8 doesn't support the creation of static mocks.

org.mockito.exceptions.base.MockitoException: 
The used MockMaker PowerMockMaker does not support the creation of static mocks

Mockito's inline mock maker supports static mocks based on the Instrumentation API.
You can simply enable this mock mode, by placing the 'mockito-inline' artifact where you are currently using 'mockito-core'.
Note that Mockito's inline mock maker is not supported on Android.

	at org.apache.pulsar.broker.service.ServerCnxTest.testNeverDelayConsumerFutureWhenNotFail(ServerCnxTest.java:1921)

@poorbarcode
Copy link
Contributor Author

poorbarcode commented Jul 27, 2022

@poorbarcode Could you migrate this PR to branch-2.8? It looks like the Mockito version of branch-2.8 doesn't support the creation of static mocks.

ok. see #16826

BewareMyPower pushed a commit that referenced this pull request Jul 28, 2022
…16826)

### Motivation

see #15051

There have conflicts when cherry-picking #15051 PR (branch 2.8 has no `mock static` support), so I created a separate PR to fix branch-2.8

### Modifications

Swap the execution order of duplicate validation and maximum validation
@BewareMyPower BewareMyPower added cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.8.4 and removed release/2.8.4 labels Jul 28, 2022
aloyszhang pushed a commit to aloyszhang/pulsar that referenced this pull request Aug 5, 2022
Squash merge branch 'fix_mutiple_consumers' into '2.8.1'
Fixes #<xyz>

### Motivation
--bug=100319271 修复Consumer重名问题 chery pick apache#15051

TAPD: --bug=100319271
@BewareMyPower BewareMyPower removed cherry-picked/branch-2.8 Archived: 2.8 is end of life release/2.8.4 labels Sep 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/broker cherry-picked/branch-2.9 Archived: 2.9 is end of life cherry-picked/branch-2.10 doc-not-needed Your PR changes do not impact docs release/2.9.3 release/2.10.1 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

10 participants