From dea5c9cdfc44e7de1bdcf38821b3668de25175bb Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Thu, 31 Aug 2023 17:48:02 -0400 Subject: [PATCH 01/10] Thread example --- .../java/pubsub/SubscribeAsyncExample.java | 155 ++++++++++++++++-- 1 file changed, 141 insertions(+), 14 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java index 4d2f40815..20caab538 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java @@ -26,19 +26,133 @@ import com.google.pubsub.v1.PubsubMessage; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.Set; +import java.util.List; +import java.util.ArrayList; +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.core.ExecutorProvider; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.lang.Thread; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import org.threeten.bp.Duration; +import com.google.api.gax.rpc.TransportChannel; +import com.google.auth.Credentials; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.Map; public class SubscribeAsyncExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String projectId = "your-project-id"; - String subscriptionId = "your-subscription-id"; + String projectId = "ordering-keys-testing"; + String subscriptionId = "threads-test"; subscribeAsyncExample(projectId, subscriptionId); } + private static class SinglePubSubChannelProvider implements TransportChannelProvider { + TransportChannel channel; + TransportChannelProvider channelProvider; + + SinglePubSubChannelProvider(ExecutorProvider executorProvider) { + int MAX_INBOUND_MESSAGE_SIZE = + 20 * 1024 * 1024; // 20MB API maximum message size. + int MAX_INBOUND_METADATA_SIZE = + 4 * 1024 * 1024; // 4MB API maximum metadata size + channelProvider = + SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) + .setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) + .setKeepAliveTime(Duration.ofMinutes(5)) + .setExecutor(executorProvider.getExecutor()).build(); + } + + @Override + public TransportChannel getTransportChannel() { + if (channel == null) { + try { + channel = channelProvider.getTransportChannel(); + } catch (Exception e) { + } + } + return channel; + } + + @Override + public boolean acceptsPoolSize() { + return channelProvider.acceptsPoolSize(); + } + + @Override + public String getTransportName() { + return channelProvider.getTransportName(); + } + + @Override + public boolean needsCredentials() { + return channelProvider.needsCredentials(); + } + + @Override + public boolean needsEndpoint() { + return channelProvider.needsEndpoint(); + } + + @Override + public boolean needsExecutor() { + return channelProvider.needsExecutor(); + } + + @Override + public boolean needsHeaders() { + return channelProvider.needsHeaders(); + } + + @Override + public boolean shouldAutoClose() { + return channelProvider.shouldAutoClose(); + } + + @Override + public TransportChannelProvider withCredentials(Credentials credentials) { + channelProvider = channelProvider.withCredentials(credentials); + return this; + } + + @Override + public TransportChannelProvider withEndpoint(String endpoint) { + channelProvider = channelProvider.withEndpoint(endpoint); + return this; + } + + @Override + public TransportChannelProvider withExecutor(Executor executor) { + channelProvider = channelProvider.withExecutor(executor); + return this; + } + + @Override + public TransportChannelProvider withExecutor(ScheduledExecutorService executor) { + channelProvider = channelProvider.withExecutor(executor); + return this; + } + + @Override + public TransportChannelProvider withHeaders(Map headers) { + channelProvider = channelProvider.withHeaders(headers); + return this; + } + + @Override + public TransportChannelProvider withPoolSize(int size) { + channelProvider = channelProvider.withPoolSize(size); + return this; + } + } + public static void subscribeAsyncExample(String projectId, String subscriptionId) { - ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, subscriptionId); + int subCount = 100; // Instantiate an asynchronous message receiver. MessageReceiver receiver = @@ -49,17 +163,30 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId consumer.ack(); }; - Subscriber subscriber = null; - try { - subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); - // Start the subscriber. - subscriber.startAsync().awaitRunning(); - System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); - // Allow the subscriber to run for 30s unless an unrecoverable error occurs. - subscriber.awaitTerminated(30, TimeUnit.SECONDS); - } catch (TimeoutException timeoutException) { + FixedExecutorProvider executorProvider = + FixedExecutorProvider.create(new ScheduledThreadPoolExecutor(1)); + SinglePubSubChannelProvider channelProvider = new SinglePubSubChannelProvider(executorProvider); + + List subscribers = new ArrayList<>(); + for (int i = 0; i < subCount; ++i) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId + i); + Subscriber subscriber = null; + subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).setExecutorProvider(executorProvider).setSystemExecutorProvider(executorProvider).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + subscribers.add(subscriber); + } + System.out.printf("Thread count: %d\n", Thread.activeCount()); + System.out.println("Listening for messages."); + for (Subscriber subscriber : subscribers) { + try { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { // Shut down the subscriber after 30s. Stop receiving messages. - subscriber.stopAsync(); + subscriber.stopAsync(); + } } } } From c58f5a0b03aed519e931cd8905864ea93fd2a37a Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 07:09:49 -0400 Subject: [PATCH 02/10] Add examples for limited and unlimited exeuctors --- ...bscribeAsyncLimitedConcurrencyExample.java | 170 ++++++++++++++++++ ...cribeAsyncUnlimitedConcurrencyExample.java | 93 ++++++++++ 2 files changed, 263 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java create mode 100644 samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java new file mode 100644 index 000000000..935557d0e --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java @@ -0,0 +1,170 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscribeAsyncLimitedConcurrencyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "my-project"; + String subscriptionId = "my-subscription"; + + subscribeAsyncLimitedConcurrencyExample(projectId, subscriptionId); + } + + static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size. + static final int MAX_INBOUND_METADATA_SIZE = 4 * 1024 * 1024; // 4MB API maximum metadata size + + private static ManagedChannel createSingleChannel( + String serviceAddress, int port, Executor executor, Executor offloadExecutor) + throws IOException { + ManagedChannelBuilder builder; + builder = ManagedChannelBuilder.forAddress(serviceAddress, port); + builder = + builder + .executor(executor) + .offloadExecutor(offloadExecutor) + .maxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) + .keepAliveTime(30, TimeUnit.SECONDS); + + ManagedChannel managedChannel = builder.build(); + return managedChannel; + } + + public static void subscribeAsyncLimitedConcurrencyExample( + String projectId, String subscriptionId) { + int subCount = 100; + int transportChannelCount = 20; + final AtomicLong receivedCount = new AtomicLong(); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + consumer.ack(); + long currentCount = receivedCount.incrementAndGet(); + if (currentCount % 100 == 0) { + System.out.println("Received " + currentCount); + } + }; + + ThreadFactory callbackThreadFactory = + new ThreadFactoryBuilder().setNameFormat("callback-pool-%d").build(); + ScheduledThreadPoolExecutor callbackExecutor = + new ScheduledThreadPoolExecutor(10, callbackThreadFactory); + callbackExecutor.setMaximumPoolSize(10); + FixedExecutorProvider callbackExecutorProvider = FixedExecutorProvider.create(callbackExecutor); + ThreadFactory leaseThreadFactory = + new ThreadFactoryBuilder().setNameFormat("lease-pool-%d").build(); + ScheduledThreadPoolExecutor leaseExecutor = + new ScheduledThreadPoolExecutor(10, leaseThreadFactory); + leaseExecutor.setMaximumPoolSize(10); + FixedExecutorProvider leaseExecutorProvider = FixedExecutorProvider.create(leaseExecutor); + ThreadFactory channelThreadFactory = + new ThreadFactoryBuilder().setNameFormat("channel-pool-%d").build(); + ScheduledThreadPoolExecutor channelExecutor = + new ScheduledThreadPoolExecutor(10, channelThreadFactory); + ThreadFactory channelOffloadThreadFactory = + new ThreadFactoryBuilder().setNameFormat("channel-offload-pool-%d").build(); + ScheduledThreadPoolExecutor channelOffloadExecutor = + new ScheduledThreadPoolExecutor(10, channelOffloadThreadFactory); + + ArrayList transportChannelProviders = + new ArrayList<>(transportChannelCount); + + for (int i = 0; i < transportChannelCount; ++i) { + TransportChannelProvider channelProvider = null; + try { + channelProvider = + FixedTransportChannelProvider.create( + GrpcTransportChannel.create( + createSingleChannel( + "pubsub.googleapis.com", 443, channelExecutor, channelOffloadExecutor))); + transportChannelProviders.add(channelProvider); + } catch (Exception e) { + System.out.println("Could not create channel provider: " + e); + return; + } + } + + List subscribers = new ArrayList<>(); + for (int i = 0; i < subCount; ++i) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId + i); + Subscriber subscriber = null; + subscriber = + Subscriber.newBuilder(subscriptionName, receiver) + .setChannelProvider(transportChannelProviders.get(i % transportChannelCount)) + .setExecutorProvider(callbackExecutorProvider) + .setSystemExecutorProvider(leaseExecutorProvider) + .build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + subscribers.add(subscriber); + } + printThreads(); + System.out.println("Listening for messages for 30s before checking threads again."); + try { + Thread.sleep(30000); + } catch (Exception e) { + + } + printThreads(); + + for (Subscriber subscriber : subscribers) { + try { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(120, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + } + + private static void printThreads() { + System.out.println("Thread names:"); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread t : threadSet) { + System.out.println("\t" + t.getName()); + } + System.out.printf("Thread count: %d\n", Thread.activeCount()); + } +} diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java new file mode 100644 index 000000000..253ab99b4 --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscribeAsyncUnlimitedConcurrencyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "my-topic"; + String subscriptionId = "my-subscription" + + subscribeAsyncUnlimitedConcurrencyExample(projectId, subscriptionId); + } + + public static void subscribeAsyncUnlimitedConcurrencyExample( + String projectId, String subscriptionId) { + int subCount = 100; + final AtomicLong receivedCount = new AtomicLong(); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + consumer.ack(); + long currentCount = receivedCount.incrementAndGet(); + if (currentCount % 100 == 0) { + System.out.println("Received " + currentCount); + } + }; + + List subscribers = new ArrayList<>(); + for (int i = 0; i < subCount; ++i) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId + i); + Subscriber subscriber = null; + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + subscribers.add(subscriber); + } + printThreads(); + System.out.println("Listening for messages for 30s before checking threads again."); + try { + Thread.sleep(30000); + } catch (Exception e) { + + } + printThreads(); + for (Subscriber subscriber : subscribers) { + try { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + } + + private static void printThreads() { + System.out.println("Thread names:"); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread t : threadSet) { + System.out.println("\t" + t.getName()); + } + System.out.printf("Thread count: %d\n", Thread.activeCount()); + } +} From 6d8186fccfaa040c5a134fb087ce4775040c4fea Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 07:10:31 -0400 Subject: [PATCH 03/10] Add back missing semicolon --- .../java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java index 253ab99b4..21027663b 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -32,7 +32,7 @@ public class SubscribeAsyncUnlimitedConcurrencyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. String projectId = "my-topic"; - String subscriptionId = "my-subscription" + String subscriptionId = "my-subscription"; subscribeAsyncUnlimitedConcurrencyExample(projectId, subscriptionId); } From 307019a23cad3208307a1a3afb5d507d6becb2f4 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 07:15:55 -0400 Subject: [PATCH 04/10] Revert changes to original async example --- .../java/pubsub/SubscribeAsyncExample.java | 157 ++---------------- 1 file changed, 15 insertions(+), 142 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java index 20caab538..abca3368f 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java @@ -26,133 +26,19 @@ import com.google.pubsub.v1.PubsubMessage; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.Set; -import java.util.List; -import java.util.ArrayList; -import com.google.api.gax.core.FixedExecutorProvider; -import com.google.api.gax.core.ExecutorProvider; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.lang.Thread; -import com.google.api.gax.rpc.TransportChannelProvider; -import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; -import org.threeten.bp.Duration; -import com.google.api.gax.rpc.TransportChannel; -import com.google.auth.Credentials; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.Map; public class SubscribeAsyncExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String projectId = "ordering-keys-testing"; - String subscriptionId = "threads-test"; + String projectId = "your-project-id"; + String subscriptionId = "your-subscription-id"; subscribeAsyncExample(projectId, subscriptionId); } - private static class SinglePubSubChannelProvider implements TransportChannelProvider { - TransportChannel channel; - TransportChannelProvider channelProvider; - - SinglePubSubChannelProvider(ExecutorProvider executorProvider) { - int MAX_INBOUND_MESSAGE_SIZE = - 20 * 1024 * 1024; // 20MB API maximum message size. - int MAX_INBOUND_METADATA_SIZE = - 4 * 1024 * 1024; // 4MB API maximum metadata size - channelProvider = - SubscriptionAdminSettings.defaultGrpcTransportProviderBuilder() - .setMaxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) - .setMaxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) - .setKeepAliveTime(Duration.ofMinutes(5)) - .setExecutor(executorProvider.getExecutor()).build(); - } - - @Override - public TransportChannel getTransportChannel() { - if (channel == null) { - try { - channel = channelProvider.getTransportChannel(); - } catch (Exception e) { - } - } - return channel; - } - - @Override - public boolean acceptsPoolSize() { - return channelProvider.acceptsPoolSize(); - } - - @Override - public String getTransportName() { - return channelProvider.getTransportName(); - } - - @Override - public boolean needsCredentials() { - return channelProvider.needsCredentials(); - } - - @Override - public boolean needsEndpoint() { - return channelProvider.needsEndpoint(); - } - - @Override - public boolean needsExecutor() { - return channelProvider.needsExecutor(); - } - - @Override - public boolean needsHeaders() { - return channelProvider.needsHeaders(); - } - - @Override - public boolean shouldAutoClose() { - return channelProvider.shouldAutoClose(); - } - - @Override - public TransportChannelProvider withCredentials(Credentials credentials) { - channelProvider = channelProvider.withCredentials(credentials); - return this; - } - - @Override - public TransportChannelProvider withEndpoint(String endpoint) { - channelProvider = channelProvider.withEndpoint(endpoint); - return this; - } - - @Override - public TransportChannelProvider withExecutor(Executor executor) { - channelProvider = channelProvider.withExecutor(executor); - return this; - } - - @Override - public TransportChannelProvider withExecutor(ScheduledExecutorService executor) { - channelProvider = channelProvider.withExecutor(executor); - return this; - } - - @Override - public TransportChannelProvider withHeaders(Map headers) { - channelProvider = channelProvider.withHeaders(headers); - return this; - } - - @Override - public TransportChannelProvider withPoolSize(int size) { - channelProvider = channelProvider.withPoolSize(size); - return this; - } - } - public static void subscribeAsyncExample(String projectId, String subscriptionId) { - int subCount = 100; + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); // Instantiate an asynchronous message receiver. MessageReceiver receiver = @@ -163,32 +49,19 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId consumer.ack(); }; - FixedExecutorProvider executorProvider = - FixedExecutorProvider.create(new ScheduledThreadPoolExecutor(1)); - SinglePubSubChannelProvider channelProvider = new SinglePubSubChannelProvider(executorProvider); - - List subscribers = new ArrayList<>(); - for (int i = 0; i < subCount; ++i) { - ProjectSubscriptionName subscriptionName = - ProjectSubscriptionName.of(projectId, subscriptionId + i); - Subscriber subscriber = null; - subscriber = Subscriber.newBuilder(subscriptionName, receiver).setChannelProvider(channelProvider).setExecutorProvider(executorProvider).setSystemExecutorProvider(executorProvider).build(); - // Start the subscriber. - subscriber.startAsync().awaitRunning(); - subscribers.add(subscriber); - } - System.out.printf("Thread count: %d\n", Thread.activeCount()); - System.out.println("Listening for messages."); - for (Subscriber subscriber : subscribers) { - try { - // Allow the subscriber to run for 30s unless an unrecoverable error occurs. - subscriber.awaitTerminated(300, TimeUnit.SECONDS); - } catch (TimeoutException timeoutException) { + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.printf("Listening for messages on %s:\n", subscriptionName.toString()); + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(30, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { // Shut down the subscriber after 30s. Stop receiving messages. - subscriber.stopAsync(); - } + subscriber.stopAsync(); } } } // [END pubsub_subscriber_async_pull] -// [END pubsub_quickstart_subscriber] +// [END pubsub_quickstart_subscriber] \ No newline at end of file From aed305a156d71a6ab214c10aa7db5b7fa154caa1 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 07:16:47 -0400 Subject: [PATCH 05/10] Revert changes to original async example --- .../snippets/src/main/java/pubsub/SubscribeAsyncExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java index abca3368f..4d2f40815 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncExample.java @@ -64,4 +64,4 @@ public static void subscribeAsyncExample(String projectId, String subscriptionId } } // [END pubsub_subscriber_async_pull] -// [END pubsub_quickstart_subscriber] \ No newline at end of file +// [END pubsub_quickstart_subscriber] From 11cddae49c042ecc8a52983fd68adc73a06dc873 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Thu, 31 Aug 2023 17:48:02 -0400 Subject: [PATCH 06/10] Add examples of different threading models --- ...bscribeAsyncLimitedConcurrencyExample.java | 170 ++++++++++++++++++ ...cribeAsyncUnlimitedConcurrencyExample.java | 93 ++++++++++ 2 files changed, 263 insertions(+) create mode 100644 samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java create mode 100644 samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java new file mode 100644 index 000000000..935557d0e --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java @@ -0,0 +1,170 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import com.google.api.gax.core.FixedExecutorProvider; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscribeAsyncLimitedConcurrencyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "my-project"; + String subscriptionId = "my-subscription"; + + subscribeAsyncLimitedConcurrencyExample(projectId, subscriptionId); + } + + static final int MAX_INBOUND_MESSAGE_SIZE = 20 * 1024 * 1024; // 20MB API maximum message size. + static final int MAX_INBOUND_METADATA_SIZE = 4 * 1024 * 1024; // 4MB API maximum metadata size + + private static ManagedChannel createSingleChannel( + String serviceAddress, int port, Executor executor, Executor offloadExecutor) + throws IOException { + ManagedChannelBuilder builder; + builder = ManagedChannelBuilder.forAddress(serviceAddress, port); + builder = + builder + .executor(executor) + .offloadExecutor(offloadExecutor) + .maxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) + .keepAliveTime(30, TimeUnit.SECONDS); + + ManagedChannel managedChannel = builder.build(); + return managedChannel; + } + + public static void subscribeAsyncLimitedConcurrencyExample( + String projectId, String subscriptionId) { + int subCount = 100; + int transportChannelCount = 20; + final AtomicLong receivedCount = new AtomicLong(); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + consumer.ack(); + long currentCount = receivedCount.incrementAndGet(); + if (currentCount % 100 == 0) { + System.out.println("Received " + currentCount); + } + }; + + ThreadFactory callbackThreadFactory = + new ThreadFactoryBuilder().setNameFormat("callback-pool-%d").build(); + ScheduledThreadPoolExecutor callbackExecutor = + new ScheduledThreadPoolExecutor(10, callbackThreadFactory); + callbackExecutor.setMaximumPoolSize(10); + FixedExecutorProvider callbackExecutorProvider = FixedExecutorProvider.create(callbackExecutor); + ThreadFactory leaseThreadFactory = + new ThreadFactoryBuilder().setNameFormat("lease-pool-%d").build(); + ScheduledThreadPoolExecutor leaseExecutor = + new ScheduledThreadPoolExecutor(10, leaseThreadFactory); + leaseExecutor.setMaximumPoolSize(10); + FixedExecutorProvider leaseExecutorProvider = FixedExecutorProvider.create(leaseExecutor); + ThreadFactory channelThreadFactory = + new ThreadFactoryBuilder().setNameFormat("channel-pool-%d").build(); + ScheduledThreadPoolExecutor channelExecutor = + new ScheduledThreadPoolExecutor(10, channelThreadFactory); + ThreadFactory channelOffloadThreadFactory = + new ThreadFactoryBuilder().setNameFormat("channel-offload-pool-%d").build(); + ScheduledThreadPoolExecutor channelOffloadExecutor = + new ScheduledThreadPoolExecutor(10, channelOffloadThreadFactory); + + ArrayList transportChannelProviders = + new ArrayList<>(transportChannelCount); + + for (int i = 0; i < transportChannelCount; ++i) { + TransportChannelProvider channelProvider = null; + try { + channelProvider = + FixedTransportChannelProvider.create( + GrpcTransportChannel.create( + createSingleChannel( + "pubsub.googleapis.com", 443, channelExecutor, channelOffloadExecutor))); + transportChannelProviders.add(channelProvider); + } catch (Exception e) { + System.out.println("Could not create channel provider: " + e); + return; + } + } + + List subscribers = new ArrayList<>(); + for (int i = 0; i < subCount; ++i) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId + i); + Subscriber subscriber = null; + subscriber = + Subscriber.newBuilder(subscriptionName, receiver) + .setChannelProvider(transportChannelProviders.get(i % transportChannelCount)) + .setExecutorProvider(callbackExecutorProvider) + .setSystemExecutorProvider(leaseExecutorProvider) + .build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + subscribers.add(subscriber); + } + printThreads(); + System.out.println("Listening for messages for 30s before checking threads again."); + try { + Thread.sleep(30000); + } catch (Exception e) { + + } + printThreads(); + + for (Subscriber subscriber : subscribers) { + try { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(120, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + } + + private static void printThreads() { + System.out.println("Thread names:"); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread t : threadSet) { + System.out.println("\t" + t.getName()); + } + System.out.printf("Thread count: %d\n", Thread.activeCount()); + } +} diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java new file mode 100644 index 000000000..21027663b --- /dev/null +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -0,0 +1,93 @@ +/* + * Copyright 2016 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; + +public class SubscribeAsyncUnlimitedConcurrencyExample { + public static void main(String... args) throws Exception { + // TODO(developer): Replace these variables before running the sample. + String projectId = "my-topic"; + String subscriptionId = "my-subscription"; + + subscribeAsyncUnlimitedConcurrencyExample(projectId, subscriptionId); + } + + public static void subscribeAsyncUnlimitedConcurrencyExample( + String projectId, String subscriptionId) { + int subCount = 100; + final AtomicLong receivedCount = new AtomicLong(); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + (PubsubMessage message, AckReplyConsumer consumer) -> { + // Handle incoming message, then ack the received message. + consumer.ack(); + long currentCount = receivedCount.incrementAndGet(); + if (currentCount % 100 == 0) { + System.out.println("Received " + currentCount); + } + }; + + List subscribers = new ArrayList<>(); + for (int i = 0; i < subCount; ++i) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId + i); + Subscriber subscriber = null; + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + subscribers.add(subscriber); + } + printThreads(); + System.out.println("Listening for messages for 30s before checking threads again."); + try { + Thread.sleep(30000); + } catch (Exception e) { + + } + printThreads(); + for (Subscriber subscriber : subscribers) { + try { + // Allow the subscriber to run for 30s unless an unrecoverable error occurs. + subscriber.awaitTerminated(300, TimeUnit.SECONDS); + } catch (TimeoutException timeoutException) { + // Shut down the subscriber after 30s. Stop receiving messages. + subscriber.stopAsync(); + } + } + } + + private static void printThreads() { + System.out.println("Thread names:"); + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread t : threadSet) { + System.out.println("\t" + t.getName()); + } + System.out.printf("Thread count: %d\n", Thread.activeCount()); + } +} From 079acc47b7a021fffa70ab17e721f0d060284b1e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 07:31:42 -0400 Subject: [PATCH 07/10] Make variables final to conform to style. --- .../java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java | 5 +++-- .../pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java index 935557d0e..491071f1f 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java @@ -60,6 +60,7 @@ private static ManagedChannel createSingleChannel( builder .executor(executor) .offloadExecutor(offloadExecutor) + .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE) .maxInboundMetadataSize(MAX_INBOUND_METADATA_SIZE) .keepAliveTime(30, TimeUnit.SECONDS); @@ -69,8 +70,8 @@ private static ManagedChannel createSingleChannel( public static void subscribeAsyncLimitedConcurrencyExample( String projectId, String subscriptionId) { - int subCount = 100; - int transportChannelCount = 20; + final int subCount = 100; + final int transportChannelCount = 20; final AtomicLong receivedCount = new AtomicLong(); // Instantiate an asynchronous message receiver. diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java index 21027663b..b1accadd0 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -39,7 +39,7 @@ public static void main(String... args) throws Exception { public static void subscribeAsyncUnlimitedConcurrencyExample( String projectId, String subscriptionId) { - int subCount = 100; + final int subCount = 100; final AtomicLong receivedCount = new AtomicLong(); // Instantiate an asynchronous message receiver. From af96f231fb85760f91942d3916a8420f7e66131e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 08:38:22 -0400 Subject: [PATCH 08/10] Fix catches --- .../pubsub/SubscribeAsyncLimitedConcurrencyExample.java | 7 ++++--- .../pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java | 3 ++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java index 491071f1f..c011bd944 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java @@ -42,8 +42,8 @@ public class SubscribeAsyncLimitedConcurrencyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String projectId = "my-project"; - String subscriptionId = "my-subscription"; + String projectId = "ordering-keys-testing"; + String subscriptionId = "threads-test"; subscribeAsyncLimitedConcurrencyExample(projectId, subscriptionId); } @@ -145,7 +145,8 @@ public static void subscribeAsyncLimitedConcurrencyExample( try { Thread.sleep(30000); } catch (Exception e) { - + System.out.println("Could not sleep: " + e); + return; } printThreads(); diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java index b1accadd0..fbbb5225e 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -68,7 +68,8 @@ public static void subscribeAsyncUnlimitedConcurrencyExample( try { Thread.sleep(30000); } catch (Exception e) { - + System.out.println("Could not sleep: " + e); + return; } printThreads(); for (Subscriber subscriber : subscribers) { From 11fda48daf25aba8f069bb98356dcdca6020468e Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 08:40:46 -0400 Subject: [PATCH 09/10] Fix ids --- .../java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java index c011bd944..1fd6d756a 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncLimitedConcurrencyExample.java @@ -42,8 +42,8 @@ public class SubscribeAsyncLimitedConcurrencyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String projectId = "ordering-keys-testing"; - String subscriptionId = "threads-test"; + String projectId = "my-project"; + String subscriptionId = "my-subscription"; subscribeAsyncLimitedConcurrencyExample(projectId, subscriptionId); } From 2e58cd57f80fbb0ecf7d936bcacab419df1a91f1 Mon Sep 17 00:00:00 2001 From: Kamal Aboul-Hosn Date: Tue, 12 Sep 2023 08:52:21 -0400 Subject: [PATCH 10/10] Fix naming --- .../java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java index fbbb5225e..7419e0ba9 100644 --- a/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java +++ b/samples/snippets/src/main/java/pubsub/SubscribeAsyncUnlimitedConcurrencyExample.java @@ -31,7 +31,7 @@ public class SubscribeAsyncUnlimitedConcurrencyExample { public static void main(String... args) throws Exception { // TODO(developer): Replace these variables before running the sample. - String projectId = "my-topic"; + String projectId = "my-project"; String subscriptionId = "my-subscription"; subscribeAsyncUnlimitedConcurrencyExample(projectId, subscriptionId);