Skip to content

Commit

Permalink
Add pulsar-client-binary-proto-lookup executor
Browse files Browse the repository at this point in the history
Signed-off-by: Zixuan Liu <nodeces@gmail.com>
  • Loading branch information
nodece committed Oct 9, 2024
1 parent e11061a commit f2fdbbf
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static java.lang.String.format;
import static org.apache.pulsar.client.api.PulsarClientException.FailedFeatureCheck.SupportsGetPartitionedMetadataWithoutAutoCreation;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.opentelemetry.api.common.Attributes;
import java.net.InetSocketAddress;
import java.net.URI;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -77,20 +79,31 @@ public class BinaryProtoLookupService implements LookupService {
public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, executor);
this(client, serviceUrl, null, useTls, scheduleExecutor);
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService executor)
ExecutorService scheduleExecutor)
throws PulsarClientException {
this(client, serviceUrl, null, useTls, scheduleExecutor,
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-binary-proto-lookup")));
}

public BinaryProtoLookupService(PulsarClientImpl client,
String serviceUrl,
String listenerName,
boolean useTls,
ExecutorService scheduleExecutor,
ExecutorService internalPinnedExecutor)
throws PulsarClientException {
this.client = client;
this.useTls = useTls;
this.executor = executor;
this.executor = scheduleExecutor;
this.maxLookupRedirects = client.getConfiguration().getMaxLookupRedirects();
this.serviceNameResolver = new PulsarServiceNameResolver();
this.listenerName = listenerName;
Expand All @@ -105,7 +118,7 @@ public BinaryProtoLookupService(PulsarClientImpl client,
histoGetSchema = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "schema").build());
histoListTopics = histo.withAttributes(Attributes.builder().put("pulsar.lookup.type", "list-topics").build());

internalPinnedExecutor = client.getInternalExecutorService();
this.internalPinnedExecutor = internalPinnedExecutor;
}

@Override
Expand Down Expand Up @@ -431,7 +444,7 @@ private void getTopicsUnderNamespace(InetSocketAddress socketAddress,

@Override
public void close() throws Exception {
// no-op
internalPinnedExecutor.shutdown();
}

public static class LookupDataResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ public void setup() throws Exception {
Executors.newSingleThreadExecutor(new DefaultThreadFactory("pulsar-client-test-internal-executor"));
doReturn(internalExecutor).when(client).getInternalExecutorService();

lookup = spy(
new BinaryProtoLookupService(client, "pulsar://localhost:6650", false, mock(ExecutorService.class)));
lookup = spy(new BinaryProtoLookupService(client, "pulsar://localhost:6650", null, false,
mock(ExecutorService.class), internalExecutor));

topicName = TopicName.get("persistent://tenant1/ns1/t1");
}

Expand Down

0 comments on commit f2fdbbf

Please sign in to comment.