From 8e48dc0ac526b9c240c0293f10eaaf40090397a1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 5 Sep 2024 16:35:26 +0800 Subject: [PATCH] [fix][client] Fix concurrent lookup with properties might have different results --- .../client/api/LookupPropertiesTest.java | 43 +++++++++++++++++++ .../client/impl/BinaryProtoLookupService.java | 20 +++++---- 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java index cb8b2d1e526af..768dc29731f49 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/LookupPropertiesTest.java @@ -19,23 +19,30 @@ package org.apache.pulsar.client.api; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; +import java.util.stream.IntStream; import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.MultiBrokerBaseTest; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl; +import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.namespace.LookupOptions; +import org.apache.pulsar.client.impl.LookupTopicResult; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.client.impl.ProducerImpl; import org.apache.pulsar.client.impl.PulsarClientImpl; import org.apache.pulsar.common.naming.ServiceUnitId; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.FutureUtil; import org.testng.Assert; import org.testng.annotations.Test; @@ -72,6 +79,7 @@ private static ServiceConfiguration addCustomConfigs(ServiceConfiguration config @Test public void testLookupProperty() throws Exception { + admin.namespaces().unload("public/default"); final var topic = "test-lookup-property"; admin.topics().createPartitionedTopic(topic, 16); @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder() @@ -89,7 +97,35 @@ public void testLookupProperty() throws Exception { Assert.assertEquals(port, additionalBrokers.get(0).getBrokerListenPort().orElseThrow()); } + @Test + public void testConcurrentLookupProperties() throws Exception { + @Cleanup final var client = (PulsarClientImpl) PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build(); + final var futures = new ArrayList>(); + BrokerIdAwareLoadManager.clientIdList.clear(); + + final var clientIdList = IntStream.range(0, 10).mapToObj(i -> "key-" + i).toList(); + for (var clientId : clientIdList) { + client.getConfiguration().setLookupProperties(Collections.singletonMap(CLIENT_KEY, clientId)); + futures.add(client.getLookup().getBroker(TopicName.get("test-concurrent-lookup-properties"))); + client.getConfiguration().setLookupProperties(Collections.emptyMap()); + } + FutureUtil.waitForAll(futures).get(); + Assert.assertEquals(clientIdList, BrokerIdAwareLoadManager.clientIdList); + } + public static class BrokerIdAwareLoadManager extends ExtensibleLoadManagerImpl { + + static final List clientIdList = Collections.synchronizedList(new ArrayList<>()); + + @Override + public CompletableFuture> assign(Optional topic, + ServiceUnitId serviceUnit, LookupOptions options) { + getClientId(options).ifPresent(clientIdList::add); + return super.assign(topic, serviceUnit, options); + } + @Override public CompletableFuture> selectAsync(ServiceUnitId bundle, Set excludeBrokerSet, LookupOptions options) { @@ -106,5 +142,12 @@ public CompletableFuture> selectAsync(ServiceUnitId bundle, Set .orElseGet(() -> super.selectAsync(bundle, excludeBrokerSet, options)); }); } + + private static Optional getClientId(LookupOptions options) { + if (options.getProperties() == null) { + return Optional.empty(); + } + return Optional.ofNullable(options.getProperties().get(CLIENT_KEY)); + } } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9dd04acce7ee3..b45d6e9f6a80a 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -24,6 +24,7 @@ import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -32,6 +33,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.SchemaSerializationException; import org.apache.pulsar.client.impl.metrics.LatencyHistogram; @@ -60,7 +62,7 @@ public class BinaryProtoLookupService implements LookupService { private final String listenerName; private final int maxLookupRedirects; - private final ConcurrentHashMap> + private final ConcurrentHashMap>, CompletableFuture> lookupInProgress = new ConcurrentHashMap<>(); private final ConcurrentHashMap> @@ -118,10 +120,12 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException { public CompletableFuture getBroker(TopicName topicName) { long startTime = System.nanoTime(); final MutableObject newFutureCreated = new MutableObject<>(); + final Pair> key = Pair.of(topicName, + client.getConfiguration().getLookupProperties()); try { - return lookupInProgress.computeIfAbsent(topicName, tpName -> { - CompletableFuture newFuture = - findBroker(serviceNameResolver.resolveHost(), false, topicName, 0); + return lookupInProgress.computeIfAbsent(key, tpName -> { + CompletableFuture newFuture = findBroker(serviceNameResolver.resolveHost(), false, + topicName, 0, key.getRight()); newFutureCreated.setValue(newFuture); newFuture.thenRun(() -> { @@ -135,7 +139,7 @@ public CompletableFuture getBroker(TopicName topicName) { } finally { if (newFutureCreated.getValue() != null) { newFutureCreated.getValue().whenComplete((v, ex) -> { - lookupInProgress.remove(topicName, newFutureCreated.getValue()); + lookupInProgress.remove(key, newFutureCreated.getValue()); }); } } @@ -167,7 +171,7 @@ public CompletableFuture getPartitionedTopicMetadata( } private CompletableFuture findBroker(InetSocketAddress socketAddress, - boolean authoritative, TopicName topicName, final int redirectCount) { + boolean authoritative, TopicName topicName, final int redirectCount, Map properties) { CompletableFuture addressFuture = new CompletableFuture<>(); if (maxLookupRedirects > 0 && redirectCount > maxLookupRedirects) { @@ -179,7 +183,7 @@ private CompletableFuture findBroker(InetSocketAddress socket client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId, - client.getConfiguration().getLookupProperties()); + properties); clientCnx.newLookup(request, requestId).whenComplete((r, t) -> { if (t != null) { // lookup failed @@ -204,7 +208,7 @@ private CompletableFuture findBroker(InetSocketAddress socket // (2) redirect to given address if response is: redirect if (r.redirect) { - findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1) + findBroker(responseBrokerAddress, r.authoritative, topicName, redirectCount + 1, properties) .thenAccept(addressFuture::complete) .exceptionally((lookupException) -> { Throwable cause = FutureUtil.unwrapCompletionException(lookupException);