Skip to content

Commit

Permalink
Fix potential to add duplicated consumer (apache#15051)
Browse files Browse the repository at this point in the history
It's because of this issue apache#13787.
Then diving into the codes, I find that if the client tries to subscribe multiple times over a short period of time, it is possible to have more than one consumer at the same dispatcher. just like below:
```
for ( long requestId = 1; i < 5; i++ ){
  ByteBuf request1 = Commands.newSubscribe(topic, subscription, consumerId, requestId , getSubType(),
          priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
          conf.isReplicateSubscriptionState(),
          InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
          startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
          // Use the current epoch to subscribe.
          conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
  cnx.sendRequestWithId(request1, requestId).thenRun(() -> {});
}
```

The root cause is below snippet:
https://github.com/apache/pulsar/blob/c2c05c49aff1ebc7b2b7a1d5bd547c33211e4479/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java#L994-L1021
If the consumer1 comes and is not done, then the same consumer2(it's the same with consumer1) comes, it may remove the prior consumer1(line 1015), but consumer1 add to subscription success in the end, Then the same cusumer3 comes, and it succeed, and will cause the same consumer to add duplicated.

The right way to remove consumer (line 1015) is when the `existingConsumerFuture` is completedExceptionally.

Even though the Java client couldn't occur the above behavior, other clients may not. So it's better to handle `subscribe` correctly on the broker side.

Modify the process execution sequence to improve stability

(cherry picked from commit 7bf495a)
  • Loading branch information
poorbarcode authored and codelipenghui committed May 20, 2022
1 parent e3add83 commit 9dead56
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -992,32 +992,31 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
}

if (existingConsumerFuture != null) {
if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
return null;
} else {
if (!existingConsumerFuture.isDone()){
// There was an early request to create a consumer with same consumerId. This can happen
// when
// client timeout is lower the broker timeouts. We need to wait until the previous
// consumer
// creation request either complete or fails.
log.warn("[{}][{}][{}] Consumer with id is already present on the connection,"
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
ServerError error = null;
if (!existingConsumerFuture.isDone()) {
error = ServerError.ServiceNotReady;
} else {
error = getErrorCode(existingConsumerFuture);
consumers.remove(consumerId, existingConsumerFuture);
}
commandSender.sendErrorResponse(requestId, error,
+ " consumerId={}", remoteAddress, topicName, subscriptionName, consumerId);
commandSender.sendErrorResponse(requestId, ServerError.ServiceNotReady,
"Consumer is already present on the connection");
return null;
} else if (existingConsumerFuture.isCompletedExceptionally()){
ServerError error = getErrorCodeWithErrorLog(existingConsumerFuture, true,
String.format("Consumer subscribe failure. remoteAddress: %s, subscription: %s",
remoteAddress, subscriptionName));
consumers.remove(consumerId, existingConsumerFuture);
commandSender.sendErrorResponse(requestId, error,
"Consumer that failed is already present on the connection");
} else {
Consumer consumer = existingConsumerFuture.getNow(null);
log.info("[{}] Consumer with the same id is already created:"
+ " consumerId={}, consumer={}",
remoteAddress, consumerId, consumer);
commandSender.sendSuccessResponse(requestId);
}
return null;
}

boolean createTopicIfDoesNotExist = forceTopicCreation
Expand Down Expand Up @@ -2711,13 +2710,23 @@ public void cancelPublishBufferLimiting() {
}

private <T> ServerError getErrorCode(CompletableFuture<T> future) {
return getErrorCodeWithErrorLog(future, false, null);
}

private <T> ServerError getErrorCodeWithErrorLog(CompletableFuture<T> future, boolean logIfError,
String errorMessageIfLog) {
ServerError error = ServerError.UnknownError;
try {
future.getNow(null);
} catch (Exception e) {
if (e.getCause() instanceof BrokerServiceException) {
error = BrokerServiceException.getClientErrorCode(e.getCause());
}
if (logIfError){
String finalErrorMessage = StringUtils.isNotBlank(errorMessageIfLog)
? errorMessageIfLog : "Unknown Error";
log.error(finalErrorMessage, e);
}
}
return error;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand Down Expand Up @@ -107,10 +109,13 @@
import org.apache.pulsar.common.protocol.Commands.ChecksumType;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -1827,4 +1832,146 @@ public void testTopicIsNotReady() throws Exception {

channel.finish();
}

@Test
public void testNeverDelayConsumerFutureWhenNotFail() throws Exception{
// Mock ServerCnx.field: consumers
ConcurrentLongHashMap.Builder mapBuilder = Mockito.mock(ConcurrentLongHashMap.Builder.class);
Mockito.when(mapBuilder.expectedItems(Mockito.anyInt())).thenReturn(mapBuilder);
Mockito.when(mapBuilder.concurrencyLevel(Mockito.anyInt())).thenReturn(mapBuilder);
ConcurrentLongHashMap consumers = Mockito.mock(ConcurrentLongHashMap.class);
Mockito.when(mapBuilder.build()).thenReturn(consumers);
ArgumentCaptor<Long> ignoreArgumentCaptor = ArgumentCaptor.forClass(Long.class);
final ArgumentCaptor<CompletableFuture> deleteTimesMark = ArgumentCaptor.forClass(CompletableFuture.class);
Mockito.when(consumers.remove(ignoreArgumentCaptor.capture())).thenReturn(true);
Mockito.when(consumers.remove(ignoreArgumentCaptor.capture(), deleteTimesMark.capture())).thenReturn(true);
// case1: exists existingConsumerFuture, already complete or delay done after execute 'isDone()' many times
// case2: exists existingConsumerFuture, delay complete after execute 'isDone()' many times
// Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
// Try a delay of 3 stages. The simulation is successful after repeated judgments.
for(AtomicInteger futureWillDoneAfterDelayTimes = new AtomicInteger(1);
futureWillDoneAfterDelayTimes.intValue() <= 3;
futureWillDoneAfterDelayTimes.incrementAndGet()){
final AtomicInteger futureCallTimes = new AtomicInteger();
final Consumer mockConsumer = Mockito.mock(Consumer.class);
CompletableFuture existingConsumerFuture = new CompletableFuture<Consumer>(){

private boolean complete;

// delay complete after execute 'isDone()' many times
@Override
public boolean isDone() {
if (complete) {
return true;
}
int executeIsDoneCommandTimes = futureCallTimes.incrementAndGet();
return executeIsDoneCommandTimes >= futureWillDoneAfterDelayTimes.intValue();
}

// if trig "getNow()", then complete
@Override
public Consumer get(){
complete = true;
return mockConsumer;
}

// if trig "get()", then complete
@Override
public Consumer get(long timeout, TimeUnit unit){
complete = true;
return mockConsumer;
}

// if trig "get()", then complete
@Override
public Consumer getNow(Consumer ifAbsent){
complete = true;
return mockConsumer;
}

// never fail
public boolean isCompletedExceptionally(){
return false;
}
};
Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
// do test: delay complete after execute 'isDone()' many times
// Why is the design so complicated, see: https://github.com/apache/pulsar/pull/15051
try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
// Inject consumers to ServerCnx
theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
// reset channels( serverChannel, clientChannel )
resetChannel();
setChannelConnected();
// auth check disable
doReturn(false).when(brokerService).isAuthenticationEnabled();
doReturn(false).when(brokerService).isAuthorizationEnabled();
// do subscribe
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
Object responseObj = getResponse();
Predicate<Object> responseAssert = obj -> {
if (responseObj instanceof CommandSuccess) {
return true;
}
if (responseObj instanceof CommandError) {
CommandError commandError = (CommandError) responseObj;
return ServerError.ServiceNotReady == commandError.getError();
}
return false;
};
// assert no consumer-delete event occur
assertFalse(deleteTimesMark.getAllValues().contains(existingConsumerFuture));
// assert without another error occur
assertTrue(responseAssert.test(responseAssert));
// Server will not close the connection
assertTrue(channel.isOpen());
channel.finish();
}
}
// case3: exists existingConsumerFuture, already complete and exception
CompletableFuture existingConsumerFuture = Mockito.mock(CompletableFuture.class);
Mockito.when(consumers.putIfAbsent(Mockito.anyLong(), Mockito.any())).thenReturn(existingConsumerFuture);
// make consumerFuture delay finish
Mockito.when(existingConsumerFuture.isDone()).thenReturn(true);
// when sync get return, future will return success value.
Mockito.when(existingConsumerFuture.get()).thenThrow(new NullPointerException());
Mockito.when(existingConsumerFuture.get(Mockito.anyLong(), Mockito.any())).
thenThrow(new NullPointerException());
Mockito.when(existingConsumerFuture.isCompletedExceptionally()).thenReturn(true);
Mockito.when(existingConsumerFuture.getNow(Mockito.any())).thenThrow(new NullPointerException());
try (MockedStatic<ConcurrentLongHashMap> theMock = Mockito.mockStatic(ConcurrentLongHashMap.class)) {
// Inject consumers to ServerCnx
theMock.when(ConcurrentLongHashMap::newBuilder).thenReturn(mapBuilder);
// reset channels( serverChannel, clientChannel )
resetChannel();
setChannelConnected();
// auth check disable
doReturn(false).when(brokerService).isAuthenticationEnabled();
doReturn(false).when(brokerService).isAuthorizationEnabled();
// do subscribe
ByteBuf clientCommand = Commands.newSubscribe(successTopicName, //
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
"test" /* consumer name */, 0 /* avoid reseting cursor */);
channel.writeInbound(clientCommand);
Object responseObj = getResponse();
Predicate<Object> responseAssert = obj -> {
if (responseObj instanceof CommandError) {
CommandError commandError = (CommandError) responseObj;
return ServerError.ServiceNotReady != commandError.getError();
}
return false;
};
// assert error response
assertTrue(responseAssert.test(responseAssert));
// assert consumer-delete event occur
assertEquals(1L,
deleteTimesMark.getAllValues().stream().filter(f -> f == existingConsumerFuture).count());
// Server will not close the connection
assertTrue(channel.isOpen());
channel.finish();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ public void testTransactionBufferRecoverThrowException() throws Exception {
doThrow(new RuntimeException("test")).when(reader).hasMoreEvents();
// check reader close topic
checkCloseTopic(pulsarClient, transactionBufferSnapshotServiceOriginal,
transactionBufferSnapshotService, originalTopic, field, producer);
transactionBufferSnapshotService, originalTopic, field);
doReturn(true).when(reader).hasMoreEvents();

// mock reader can't read snapshot fail throw PulsarClientException
Expand Down

0 comments on commit 9dead56

Please sign in to comment.