diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index dcc0e961275bd..05491d9c281c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -263,6 +263,7 @@ public class PulsarService implements AutoCloseable, ShutdownService { private final ExecutorProvider brokerClientSharedExternalExecutorProvider; private final ScheduledExecutorProvider brokerClientSharedScheduledExecutorProvider; private final Timer brokerClientSharedTimer; + private final ExecutorProvider brokerClientSharedLookupExecutorProvider; private MetricsGenerator metricsGenerator; private final PulsarBrokerOpenTelemetry openTelemetry; @@ -388,6 +389,8 @@ public PulsarService(ServiceConfiguration config, new ScheduledExecutorProvider(1, "broker-client-shared-scheduled-executor"); this.brokerClientSharedTimer = new HashedWheelTimer(new DefaultThreadFactory("broker-client-shared-timer"), 1, TimeUnit.MILLISECONDS); + this.brokerClientSharedLookupExecutorProvider = + new ScheduledExecutorProvider(1, "broker-client-shared-lookup-executor"); // here in the constructor we don't have the offloader scheduler yet this.offloaderStats = LedgerOffloaderStats.create(false, false, null, 0); @@ -696,6 +699,7 @@ public CompletableFuture closeAsync() { brokerClientSharedExternalExecutorProvider.shutdownNow(); brokerClientSharedInternalExecutorProvider.shutdownNow(); brokerClientSharedScheduledExecutorProvider.shutdownNow(); + brokerClientSharedLookupExecutorProvider.shutdownNow(); brokerClientSharedTimer.stop(); monotonicSnapshotClock.close(); @@ -1687,6 +1691,7 @@ public PulsarClientImpl createClientImpl(ClientConfigurationData clientConf) .internalExecutorProvider(brokerClientSharedInternalExecutorProvider) .externalExecutorProvider(brokerClientSharedExternalExecutorProvider) .scheduledExecutorProvider(brokerClientSharedScheduledExecutorProvider) + .lookupExecutorProvider(brokerClientSharedLookupExecutorProvider) .build(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index b45d6e9f6a80a..795cdc6d69383 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -58,9 +60,11 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; + private final ExecutorService lookupPinnedExecutor; + private final boolean createdLookupPinnedExecutor; private final ConcurrentHashMap>, CompletableFuture> lookupInProgress = new ConcurrentHashMap<>(); @@ -73,23 +77,43 @@ public class BinaryProtoLookupService implements LookupService { private final LatencyHistogram histoGetSchema; private final LatencyHistogram histoListTopics; + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + boolean useTls, + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, null, useTls, scheduleExecutor); + } + + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, + String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, null); } public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor, + ExecutorService lookupPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; @@ -103,6 +127,15 @@ public BinaryProtoLookupService(PulsarClientImpl client, histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "metadata").build()); histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); + + if (lookupPinnedExecutor == null) { + this.createdLookupPinnedExecutor = true; + this.lookupPinnedExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); + } else { + this.createdLookupPinnedExecutor = false; + this.lookupPinnedExecutor = lookupPinnedExecutor; + } } @Override @@ -180,7 +213,7 @@ private CompletableFuture findBroker(InetSocketAddress socket return addressFuture; } - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newLookup(topicName.toString(), listenerName, authoritative, requestId, properties); @@ -247,7 +280,7 @@ private CompletableFuture findBroker(InetSocketAddress socket } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { addressFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -260,7 +293,7 @@ private CompletableFuture getPartitionedTopicMetadata( long startTime = System.nanoTime(); CompletableFuture partitionFuture = new CompletableFuture<>(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { boolean finalAutoCreationEnabled = metadataAutoCreationEnabled; if (!metadataAutoCreationEnabled && !clientCnx.isSupportsGetPartitionedMetadataWithoutAutoCreation()) { if (useFallbackForNonPIP344Brokers) { @@ -301,7 +334,7 @@ private CompletableFuture getPartitionedTopicMetadata( } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(connectionException -> { + }, lookupPinnedExecutor).exceptionally(connectionException -> { partitionFuture.completeExceptionally(FutureUtil.unwrapCompletionException(connectionException)); return null; }); @@ -324,7 +357,7 @@ public CompletableFuture> getSchema(TopicName topicName, by return schemaFuture; } InetSocketAddress socketAddress = serviceNameResolver.resolveHost(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.ofNullable(BytesSchemaVersion.of(version))); @@ -340,7 +373,7 @@ public CompletableFuture> getSchema(TopicName topicName, by } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally(ex -> { + }, lookupPinnedExecutor).exceptionally(ex -> { schemaFuture.completeExceptionally(FutureUtil.unwrapCompletionException(ex)); return null; }); @@ -385,7 +418,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, String topicsHash) { long startTime = System.nanoTime(); - client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> { + client.getCnxPool().getConnection(socketAddress).thenAcceptAsync(clientCnx -> { long requestId = client.newRequestId(); ByteBuf request = Commands.newGetTopicsOfNamespaceRequest( namespace.toString(), requestId, mode, topicsPattern, topicsHash); @@ -404,7 +437,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, } client.getCnxPool().releaseConnection(clientCnx); }); - }).exceptionally((e) -> { + }, lookupPinnedExecutor).exceptionally((e) -> { long nextDelay = Math.min(backoff.next(), remainingTime.get()); if (nextDelay <= 0) { getTopicsResultFuture.completeExceptionally( @@ -414,7 +447,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -428,7 +461,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (createdLookupPinnedExecutor && lookupPinnedExecutor != null && !lookupPinnedExecutor.isShutdown()) { + lookupPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e0d4bf35f8a22..603844eeb786e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -113,6 +113,7 @@ public class PulsarClientImpl implements PulsarClient { private final boolean createdExecutorProviders; private final boolean createdScheduledProviders; + private final boolean createdLookupProviders; private LookupService lookup; private Map urlLookupMap = new ConcurrentHashMap<>(); private final ConnectionPool cnxPool; @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private final ExecutorProvider lookupExecutorProvider; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -163,29 +165,39 @@ public SchemaInfoProvider load(String topicName) { private TransactionCoordinatorClientImpl tcClient; public PulsarClientImpl(ClientConfigurationData conf) throws PulsarClientException { - this(conf, null, null, null, null, null, null); + this(conf, null, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) throws PulsarClientException { - this(conf, eventLoopGroup, null, null, null, null, null); + this(conf, eventLoopGroup, null, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, null, null, null, null); + this(conf, eventLoopGroup, cnxPool, null, null, null, null, null); } public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool cnxPool, Timer timer) throws PulsarClientException { - this(conf, eventLoopGroup, cnxPool, timer, null, null, null); + this(conf, eventLoopGroup, cnxPool, timer, null, null, null, null); + } + + public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, + Timer timer, ExecutorProvider externalExecutorProvider, + ExecutorProvider internalExecutorProvider, + ScheduledExecutorProvider scheduledExecutorProvider) + throws PulsarClientException { + this(conf, eventLoopGroup, connectionPool, timer, externalExecutorProvider, internalExecutorProvider, + scheduledExecutorProvider, null); } @Builder(builderClassName = "PulsarClientImplBuilder") private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, ConnectionPool connectionPool, Timer timer, ExecutorProvider externalExecutorProvider, ExecutorProvider internalExecutorProvider, - ScheduledExecutorProvider scheduledExecutorProvider) throws PulsarClientException { + ScheduledExecutorProvider scheduledExecutorProvider, + ExecutorProvider lookupExecutorProvider) throws PulsarClientException { EventLoopGroup eventLoopGroupReference = null; ConnectionPool connectionPoolReference = null; @@ -198,6 +210,7 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG } this.createdExecutorProviders = externalExecutorProvider == null; this.createdScheduledProviders = scheduledExecutorProvider == null; + this.createdLookupProviders = lookupExecutorProvider == null; eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup : getEventLoopGroup(conf); this.eventLoopGroup = eventLoopGroupReference; if (conf == null || isBlank(conf.getServiceUrl())) { @@ -218,11 +231,14 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener"); this.internalExecutorProvider = internalExecutorProvider != null ? internalExecutorProvider : new ExecutorProvider(conf.getNumIoThreads(), "pulsar-client-internal"); + this.lookupExecutorProvider = lookupExecutorProvider != null ? lookupExecutorProvider : + new ExecutorProvider(1, "pulsar-client-lookup"); if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), + this.lookupExecutorProvider.getExecutor()); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -976,6 +992,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (createdLookupProviders && lookupExecutorProvider != null && !lookupExecutorProvider.isShutdown()) { + try { + lookupExecutorProvider.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown lookupExecutorProvider", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index f691215b04e08..11e00eefcfdde 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -25,25 +25,41 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.api.PulsarClientException.LookupException; import org.apache.pulsar.client.impl.BinaryProtoLookupService.LookupDataResult; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.metrics.InstrumentProvider; +import org.apache.pulsar.common.api.proto.BaseCommand; +import org.apache.pulsar.common.api.proto.BaseCommand.Type; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.protocol.Commands; +import org.awaitility.Awaitility; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; public class BinaryProtoLookupServiceTest { private BinaryProtoLookupService lookup; private TopicName topicName; + private ExecutorService internalExecutor; + + @AfterMethod + public void cleanup() throws Exception { + internalExecutor.shutdown(); + lookup.close(); + } @BeforeMethod public void setup() throws Exception { @@ -72,9 +88,13 @@ public void setup() throws Exception { doReturn(1L).when(client).newRequestId(); ClientConfigurationData data = new ClientConfigurationData(); doReturn(data).when(client).getConfiguration(); + internalExecutor = + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); + doReturn(internalExecutor).when(client).getInternalExecutorService(); + + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); topicName = TopicName.get("persistent://tenant1/ns1/t1"); } @@ -118,6 +138,37 @@ public void maxLookupRedirectsTest3() throws Exception { } } + @Test + public void testCommandUnChangedInDifferentThread() throws Exception { + BaseCommand successCommand = Commands.newSuccessCommand(10000); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.getType(), Type.SUCCESS); + } + + @Test + public void testCommandChangedInSameThread() throws Exception { + AtomicReference successCommand = new AtomicReference<>(); + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getBroker(topicName).get(); + assertEquals(successCommand.get().getType(), Type.LOOKUP); + + internalExecutor.execute(() -> successCommand.set(Commands.newSuccessCommand(10000))); + Awaitility.await().untilAsserted(() -> { + BaseCommand baseCommand = successCommand.get(); + assertNotNull(baseCommand); + assertEquals(baseCommand.getType(), Type.SUCCESS); + }); + lookup.getPartitionedTopicMetadata(topicName, true, true).get(); + assertEquals(successCommand.get().getType(), Type.PARTITIONED_METADATA); + } + private static LookupDataResult createLookupDataResult(String brokerUrl, boolean redirect) throws Exception { LookupDataResult lookupResult = new LookupDataResult(-1);