diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java index 9625171b1a0793..06826ae8afd795 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java @@ -21,6 +21,7 @@ import static java.lang.String.format; import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.DefaultThreadFactory; import io.opentelemetry.api.common.Attributes; import java.net.InetSocketAddress; import java.net.URI; @@ -29,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -58,7 +60,7 @@ public class BinaryProtoLookupService implements LookupService { private final PulsarClientImpl client; private final ServiceNameResolver serviceNameResolver; private final boolean useTls; - private final ExecutorService executor; + private final ExecutorService scheduleExecutor; private final String listenerName; private final int maxLookupRedirects; private final ExecutorService internalPinnedExecutor; @@ -74,23 +76,44 @@ public class BinaryProtoLookupService implements LookupService { private final LatencyHistogram histoGetSchema; private final LatencyHistogram histoListTopics; + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated + public BinaryProtoLookupService(PulsarClientImpl client, + String serviceUrl, + boolean useTls, + ExecutorService scheduleExecutor) + throws PulsarClientException { + this(client, serviceUrl, null, useTls, scheduleExecutor); + } + + /** + * @deprecated use {@link + * #BinaryProtoLookupService(PulsarClientImpl, String, String, boolean, ExecutorService, ExecutorService)} instead. + */ + @Deprecated public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, + String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor) throws PulsarClientException { - this(client, serviceUrl, null, useTls, executor); + this(client, serviceUrl, listenerName, useTls, scheduleExecutor, + Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup"))); } public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, String listenerName, boolean useTls, - ExecutorService executor) + ExecutorService scheduleExecutor, + ExecutorService internalPinnedExecutor) throws PulsarClientException { this.client = client; this.useTls = useTls; - this.executor = executor; + this.scheduleExecutor = scheduleExecutor; this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects(); this.serviceNameResolver = new PulsarServiceNameResolver(); this.listenerName = listenerName; @@ -105,7 +128,7 @@ public BinaryProtoLookupService(PulsarClientImpl client, histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build()); histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build()); - internalPinnedExecutor = client.getInternalExecutorService(); + this.internalPinnedExecutor = internalPinnedExecutor; } @Override @@ -417,7 +440,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, return null; } - ((ScheduledExecutorService) executor).schedule(() -> { + ((ScheduledExecutorService) scheduleExecutor).schedule(() -> { log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in" + " {} ms", namespace, nextDelay); remainingTime.addAndGet(-nextDelay); @@ -431,7 +454,9 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress, @Override public void close() throws Exception { - // no-op + if (internalPinnedExecutor != null && !internalPinnedExecutor.isShutdown()) { + internalPinnedExecutor.shutdown(); + } } public static class LookupDataResult { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java index e0d4bf35f8a223..c9edd646da1353 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java @@ -27,6 +27,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.util.HashedWheelTimer; import io.netty.util.Timer; +import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.time.Clock; @@ -121,6 +122,7 @@ public class PulsarClientImpl implements PulsarClient { private boolean needStopTimer; private final ExecutorProvider externalExecutorProvider; private final ExecutorProvider internalExecutorProvider; + private ExecutorService binaryProtoLookupExecutor = null; private final ScheduledExecutorProvider scheduledExecutorProvider; private final boolean createdEventLoopGroup; @@ -221,8 +223,10 @@ private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopG if (conf.getServiceUrl().startsWith("http")) { lookup = new HttpLookupService(instrumentProvider, conf, this.eventLoopGroup); } else { + binaryProtoLookupExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("pulsar-client-binary-proto-lookup")); lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.getListenerName(), - conf.isUseTls(), this.scheduledExecutorProvider.getExecutor()); + conf.isUseTls(), this.scheduledExecutorProvider.getExecutor(), binaryProtoLookupExecutor); } if (timer == null) { this.timer = new HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS); @@ -976,6 +980,16 @@ private void shutdownExecutors() throws PulsarClientException { pulsarClientException = PulsarClientException.unwrap(t); } } + + if (binaryProtoLookupExecutor!=null && !binaryProtoLookupExecutor.isShutdown()) { + try { + binaryProtoLookupExecutor.shutdownNow(); + } catch (Throwable t) { + log.warn("Failed to shutdown binaryProtoLookupExecutor", t); + pulsarClientException = PulsarClientException.unwrap(t); + } + } + if (pulsarClientException != null) { throw pulsarClientException; } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java index 464873942ed799..83019633faf451 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/BinaryProtoLookupServiceTest.java @@ -58,6 +58,7 @@ public class BinaryProtoLookupServiceTest { @AfterMethod public void cleanup() throws Exception { internalExecutor.shutdownNow(); + lookup.close(); } @BeforeMethod @@ -91,8 +92,9 @@ public void setup() throws Exception { Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor")); doReturn(internalExecutor).when(client).getInternalExecutorService(); - lookup = spy( - new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class))); + lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false, + mock(ExecutorService.class), internalExecutor)); + topicName = TopicName.get("persistent://tenant1/ns1/t1"); }