Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Client] Remove data race in MultiTopicsConsumerImpl to ensure correct message order #12456

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.client.api;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -24,16 +25,24 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Cleanup;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.ClientBuilderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -70,6 +79,7 @@ protected PulsarClient createNewPulsarClient(ClientBuilder clientBuilder) throws
// method calls on the interface.
Mockito.withSettings().defaultAnswer(AdditionalAnswers.delegatesTo(internalExecutorService)));
}

@Override
public ExecutorService getInternalExecutorService() {
return internalExecutorServiceDelegate;
Expand Down Expand Up @@ -119,4 +129,69 @@ public void testMultiTopicsConsumerCloses() throws Exception {
verify(internalExecutorServiceDelegate, times(0))
.schedule(any(Runnable.class), anyLong(), any());
}

// test that reproduces the issue that PR https://github.com/apache/pulsar/pull/12456 fixes
// where MultiTopicsConsumerImpl has a data race that causes out-of-order delivery of messages
@Test
public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
throws PulsarAdminException, PulsarClientException, ExecutionException, InterruptedException,
TimeoutException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

Producer<Long>[] producers = new Producer[numPartitions];

for (int i = 0; i < numPartitions; i++) {
producers[i] = pulsarClient.newProducer(Schema.INT64)
// produce to each partition directly so that order can be maintained in sending
.topic(topicName + "-partition-" + i)
.enableBatching(true)
.maxPendingMessages(30000)
.maxPendingMessagesAcrossPartitions(60000)
.batchingMaxMessages(10000)
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(4 * 1024 * 1024)
.blockIfQueueFull(true)
.create();
}

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
// consume on the partitioned topic
.topic(topicName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.receiverQueueSize(numMessages)
.subscriptionName(methodName)
.subscribe();

// produce sequence numbers to each partition topic
long sequenceNumber = 1L;
for (int i = 0; i < numMessages; i++) {
for (Producer<Long> producer : producers) {
producer.newMessage()
.value(sequenceNumber)
.sendAsync();
}
sequenceNumber++;
}
for (Producer<Long> producer : producers) {
producer.close();
}

// receive and validate sequences in the partitioned topic
Map<String, AtomicLong> receivedSequences = new HashMap<>();
int receivedCount = 0;
while (receivedCount < numPartitions * numMessages) {
Message<Long> message = consumer.receiveAsync().get(5, TimeUnit.SECONDS);
consumer.acknowledge(message);
receivedCount++;
AtomicLong receivedSequenceCounter =
receivedSequences.computeIfAbsent(message.getTopicName(), k -> new AtomicLong(1L));
Assert.assertEquals(message.getValue().longValue(), receivedSequenceCounter.getAndIncrement());
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -427,8 +427,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
}
if (message != null) {
} else {
messageProcessed(message);
result.complete(beforeConsume(message));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
}

private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
consumer.receiveAsync().thenAccept(message -> {
consumer.receiveAsync().thenAcceptAsync(message -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
Expand All @@ -260,16 +260,16 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);

// Since we din't get a mutex, the condition on the incoming queue might have changed after
// Since we didn't get a mutex, the condition on the incoming queue might have changed after
// we have paused the current consumer. We need to re-check in order to avoid this consumer
// from getting stalled.
resumeReceivingFromPausedConsumersIfNeeded();
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
internalPinnedExecutor.execute(() -> receiveMessageFromConsumer(consumer));
// Call receiveAsync() if the incoming queue is not full. Because this block is run with
// thenAcceptAsync, there is no chance for recursion that would lead to stack overflow.
receiveMessageFromConsumer(consumer);
}
}).exceptionally(ex -> {
}, internalPinnedExecutor).exceptionally(ex -> {
if (ex instanceof PulsarClientException.AlreadyClosedException
|| ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
// ignore the exception that happens when the consumer is closed
Expand All @@ -281,6 +281,7 @@ private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
});
}

// Must be called from the internalPinnedExecutor thread
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(consumer.getTopic(),
Expand Down Expand Up @@ -409,17 +410,19 @@ protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
});
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -165,7 +166,7 @@ public void testReceiveAsyncCanBeCancelled() {
// given
MultiTopicsConsumerImpl<byte[]> consumer = createMultiTopicsConsumer();
CompletableFuture<Message<byte[]>> future = consumer.receiveAsync();
assertTrue(consumer.hasNextPendingReceive());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.hasNextPendingReceive()));
// when
future.cancel(true);
// then
Expand Down