diff --git a/README.md b/README.md index a3df31f5b..8f60b60bb 100644 --- a/README.md +++ b/README.md @@ -240,6 +240,8 @@ Samples are in the [`samples/`](https://github.com/googleapis/java-pubsub/tree/m | Sample | Source Code | Try it | | --------------------------- | --------------------------------- | ------ | +| Native Image Pub Sub Sample | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java) | +| Publish Operations | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/native-image-sample/src/main/java/utilities/PublishOperations.java) | | Create Avro Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateAvroSchemaExample.java) | | Create Proto Schema Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreateProtoSchemaExample.java) | | Create Pull Subscription Example | [source code](https://github.com/googleapis/java-pubsub/blob/main/samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/java-pubsub&page=editor&open_in_editor=samples/snippets/src/main/java/pubsub/CreatePullSubscriptionExample.java) | diff --git a/samples/native-image-sample/README.md b/samples/native-image-sample/README.md new file mode 100644 index 000000000..ace93bd8e --- /dev/null +++ b/samples/native-image-sample/README.md @@ -0,0 +1,77 @@ +# Pub/Sub Sample Application with Native Image + +The Pub/Sub sample application demonstrates some common operations with Pub/Sub and is compatible with Native Image compilation. + +## Setup Instructions + +You will need to follow these prerequisite steps in order to run the samples: + +1. If you have not already, [create a Google Cloud Platform Project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#creating_a_project). + +2. Install the [Google Cloud SDK](https://cloud.google.com/sdk/) which will allow you to run the sample with your project's credentials. + + Once installed, log in with Application Default Credentials using the following command: + + ``` + gcloud auth application-default login + ``` + + **Note:** Authenticating with Application Default Credentials is convenient to use during development, but we recommend [alternate methods of authentication](https://cloud.google.com/docs/authentication/production) during production use. + +3. Install the GraalVM compiler. + + You can follow the [official installation instructions](https://www.graalvm.org/docs/getting-started/#install-graalvm) from the GraalVM website. + After following the instructions, ensure that you install the native image extension installed by running: + + ``` + gu install native-image + ``` + + Once you finish following the instructions, verify that the default version of Java is set to the GraalVM version by running `java -version` in a terminal. + + You will see something similar to the below output: + + ``` + $ java -version + + openjdk version "11.0.7" 2020-04-14 + OpenJDK Runtime Environment GraalVM CE 20.1.0 (build 11.0.7+10-jvmci-20.1-b02) + OpenJDK 64-Bit Server VM GraalVM CE 20.1.0 (build 11.0.7+10-jvmci-20.1-b02, mixed mode, sharing) + ``` + +4. [Enable the Pub/Sub APIs](https://console.cloud.google.com/apis/api/pubsub.googleapis.com). + +### Sample + +Navigate to this directory in a new terminal. + +1. Compile the application using the Native Image Compiler. This step may take a few minutes. + + ``` + mvn package -P native -DskipTests + ``` + +2. Run the application: + + ``` + ./target/native-image-sample + ``` + +3. The application will create a new Pub/Sub topic, send and receive a message from it, and then delete the topic. + + ``` + Created topic: projects/YOUR_PROJECT_ID/topics/graal-pubsub-test-00e72640-4e36-4aff-84d2-13b7569b2289 under project: YOUR_PROJECT_ID + Created pull subscription: projects/YOUR_PROJECT_ID/subscriptions/graal-pubsub-test-sub2fb5e3f3-cb26-439b-b88c-9cb0cfca9e45 + Published message with ID: 457327433078420 + Received Payload: Pub/Sub Native Image Test published message at timestamp: 2020-09-23T19:45:42.746514Z + Deleted topic projects/YOUR_PROJECT_ID/topics/graal-pubsub-test-00e72640-4e36-4aff-84d2-13b7569b2289 + Deleted subscription projects/YOUR_PROJECT_ID/subscriptions/graal-pubsub-test-sub2fb5e3f3-cb26-439b-b88c-9cb0cfca9e45 + ``` + +## Sample Integration Test with native Image Support + +In order to run the sample integration test as a native image, call the following command: + + ``` + mvn test -Pnative + ``` \ No newline at end of file diff --git a/samples/native-image-sample/pom.xml b/samples/native-image-sample/pom.xml new file mode 100644 index 000000000..afdfa8e56 --- /dev/null +++ b/samples/native-image-sample/pom.xml @@ -0,0 +1,175 @@ + + + 4.0.0 + com.google.cloud + native-image-sample + Native Image Sample + https://github.com/googleapis/java-pubsub + + + + com.google.cloud.samples + shared-configuration + 1.2.0 + + + + 11 + 11 + UTF-8 + + + + + + com.google.cloud + libraries-bom + 24.3.0 + pom + import + + + + + + + com.google.cloud + google-cloud-core + + + com.google.cloud + google-cloud-pubsub + + + + junit + junit + 4.13.2 + test + + + com.google.truth + truth + 1.1.3 + test + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + dependency-jars/ + pubsub.NativeImagePubSubSample + + + + + + org.apache.maven.plugins + maven-dependency-plugin + 3.2.0 + + + copy-dependencies + package + + copy-dependencies + + + + ${project.build.directory}/dependency-jars/ + + + + + + + + + + + + native + + + + com.google.cloud + native-image-support + 0.12.4 + + + org.junit.vintage + junit-vintage-engine + 5.8.2 + test + + + org.graalvm.buildtools + junit-platform-native + 0.9.9 + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + 2.22.2 + + + **/*IT + + + + + org.graalvm.buildtools + native-maven-plugin + 0.9.9 + true + + pubsub.NativeImagePubSubSample + + + --no-fallback + --no-server + --features=com.google.cloud.nativeimage.features.ProtobufMessageFeature + + + + + build-native + + build + test + + package + + + test-native + + test + + test + + + + + + + + \ No newline at end of file diff --git a/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java b/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java new file mode 100644 index 000000000..2e96091a6 --- /dev/null +++ b/samples/native-image-sample/src/main/java/pubsub/NativeImagePubSubSample.java @@ -0,0 +1,385 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import com.google.api.gax.rpc.NotFoundException; +import com.google.cloud.ServiceOptions; +import com.google.cloud.pubsub.v1.AckReplyConsumer; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.TopicAdminClient; +import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStub; +import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings; +import com.google.iam.v1.GetIamPolicyRequest; +import com.google.iam.v1.Policy; +import com.google.iam.v1.TestIamPermissionsRequest; +import com.google.iam.v1.TestIamPermissionsResponse; +import com.google.protobuf.FieldMask; +import com.google.pubsub.v1.AcknowledgeRequest; +import com.google.pubsub.v1.DeadLetterPolicy; +import com.google.pubsub.v1.DetachSubscriptionRequest; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.ProjectTopicName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PullRequest; +import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.ReceivedMessage; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.Topic; +import com.google.pubsub.v1.TopicName; +import com.google.pubsub.v1.UpdateSubscriptionRequest; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import utilities.PublishOperations; + +/** Pub/Sub sample application compiled with Native Image. */ +public class NativeImagePubSubSample { + + /** Driver for the Pub/Sub Sample application which publishes a message to a specified topic. */ + public static void main(String[] args) throws Exception { + Instant startTime = Instant.now(); + String projectId = ServiceOptions.getDefaultProjectId(); + + String topicId = "native-pubsub-test-" + UUID.randomUUID().toString(); + String pullSubId = "native-pubsub-test-sub" + UUID.randomUUID().toString(); + String pushSubId = "native-pubsub-test-sub" + UUID.randomUUID().toString(); + + try { + // Topic management operations + createTopic(projectId, topicId); + createPullSubscription(projectId, pullSubId, topicId); + createPushSubscription(projectId, pushSubId, topicId); + detachSubscription(projectId, pushSubId); + getTopicPolicy(projectId, topicId); + getSubscriptionPolicy(projectId, pullSubId); + listSubscriptionInProject(projectId); + listSubscriptionInTopic(projectId, topicId); + listTopics(projectId); + updateSubscriptionDeadLetterTopic(projectId, pushSubId, topicId, topicId); + testTopicPermissions(projectId, topicId); + testSubscriptionPermissions(projectId, pushSubId); + + // Publish Operations + PublishOperations.publishMessage(projectId, topicId); + PublishOperations.publishWithBatchSettings(projectId, topicId); + PublishOperations.publishWithCustomAttributes(projectId, topicId); + PublishOperations.publishWithErrorHandler(projectId, topicId); + + // Receive messages + subscribeSync(projectId, pullSubId); + receiveMessagesWithDeliveryAttempts(projectId, pullSubId); + } finally { + deleteTopic(projectId, topicId); + deleteSubscription(projectId, pullSubId); + deleteSubscription(projectId, pushSubId); + } + Instant endTime = Instant.now(); + Duration duration = Duration.between(startTime, endTime); + System.out.println("Duration: " + duration.toString()); + } + + static void createTopic(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + Topic topic = topicAdminClient.createTopic(topicName); + System.out.println("Created topic: " + topic.getName() + " under project: " + projectId); + } + } + + static void createPullSubscription(String projectId, String subscriptionId, String topicId) + throws IOException { + + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + Subscription subscription = + subscriptionAdminClient.createSubscription( + subscriptionName, topicName, PushConfig.getDefaultInstance(), 10); + System.out.println("Created pull subscription: " + subscription.getName()); + } + } + + static void createPushSubscription(String projectId, String subscriptionId, String topicId) + throws IOException { + + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Intentionally set pushEndpoint empty just to exercise API call + PushConfig pushConfig = PushConfig.newBuilder().setPushEndpoint("").build(); + + Subscription subscription = + subscriptionAdminClient.createSubscription(subscriptionName, topicName, pushConfig, 10); + System.out.println("Created push subscription: " + subscription.getName()); + } + } + + static void detachSubscription(String projectId, String subscriptionId) throws IOException { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + topicAdminClient.detachSubscription( + DetachSubscriptionRequest.newBuilder() + .setSubscription(subscriptionName.toString()) + .build()); + } + + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + Subscription subscription = subscriptionAdminClient.getSubscription(subscriptionName); + if (subscription.getDetached()) { + System.out.println("Subscription is detached."); + } else { + throw new RuntimeException("Subscription detachment was not successful."); + } + } + } + + static void getSubscriptionPolicy(String projectId, String subscriptionId) throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + GetIamPolicyRequest getIamPolicyRequest = + GetIamPolicyRequest.newBuilder().setResource(subscriptionName.toString()).build(); + Policy policy = subscriptionAdminClient.getIamPolicy(getIamPolicyRequest); + System.out.println("Subscription policy: " + policy.toString().trim()); + } + } + + static void getTopicPolicy(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + GetIamPolicyRequest getIamPolicyRequest = + GetIamPolicyRequest.newBuilder().setResource(topicName.toString()).build(); + Policy policy = topicAdminClient.getIamPolicy(getIamPolicyRequest); + System.out.println("Topic policy: " + policy.toString().trim()); + } + } + + static void listSubscriptionInProject(String projectId) throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectName projectName = ProjectName.of(projectId); + int count = 0; + for (Subscription subscription : + subscriptionAdminClient.listSubscriptions(projectName).iterateAll()) { + count += 1; + } + System.out.println("Subscriptions in project count: " + count); + } + } + + static void listSubscriptionInTopic(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + int count = 0; + for (String subscription : topicAdminClient.listTopicSubscriptions(topicName).iterateAll()) { + count += 1; + } + System.out.println("Subscriptions under topic: " + count); + } + } + + static void listTopics(String projectId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + ProjectName projectName = ProjectName.of(projectId); + int count = 0; + for (Topic topic : topicAdminClient.listTopics(projectName).iterateAll()) { + count += 1; + } + System.out.println("Topic count under project: " + count); + } + } + + static void receiveMessagesWithDeliveryAttempts(String projectId, String subscriptionId) { + + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + // Instantiate an asynchronous message receiver. + MessageReceiver receiver = + new MessageReceiver() { + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) { + consumer.ack(); + } + }; + + Subscriber subscriber = null; + try { + subscriber = Subscriber.newBuilder(subscriptionName, receiver).build(); + // Start the subscriber. + subscriber.startAsync().awaitRunning(); + System.out.println("Successfully started an async message receiver."); + } finally { + // Shut down the subscriber after 10s. Stop receiving messages. + subscriber.stopAsync(); + } + } + + static void subscribeSync(String projectId, String subscriptionId) throws IOException { + SubscriberStubSettings subscriberStubSettings = + SubscriberStubSettings.newBuilder() + .setTransportChannelProvider( + SubscriberStubSettings.defaultGrpcTransportProviderBuilder() + .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size). + .build()) + .build(); + + try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) { + String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId); + PullRequest pullRequest = + PullRequest.newBuilder().setMaxMessages(1).setSubscription(subscriptionName).build(); + + PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); + List ackIds = new ArrayList<>(); + for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { + String payload = message.getMessage().getData().toStringUtf8(); + ackIds.add(message.getAckId()); + System.out.println("Received Payload: " + payload); + } + + AcknowledgeRequest acknowledgeRequest = + AcknowledgeRequest.newBuilder() + .setSubscription(subscriptionName) + .addAllAckIds(ackIds) + .build(); + + subscriber.acknowledgeCallable().call(acknowledgeRequest); + } + } + + static void updateSubscriptionDeadLetterTopic( + String projectId, String subscriptionId, String topicId, String deadLetterTopicId) + throws IOException { + + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + TopicName topicName = TopicName.of(projectId, topicId); + TopicName deadLetterTopicName = TopicName.of(projectId, deadLetterTopicId); + + DeadLetterPolicy deadLetterPolicy = + DeadLetterPolicy.newBuilder() + .setDeadLetterTopic(deadLetterTopicName.toString()) + .setMaxDeliveryAttempts(20) + .build(); + + Subscription subscription = + Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setDeadLetterPolicy(deadLetterPolicy) + .build(); + + FieldMask updateMask = FieldMask.newBuilder().addPaths("dead_letter_policy").build(); + + UpdateSubscriptionRequest request = + UpdateSubscriptionRequest.newBuilder() + .setSubscription(subscription) + .setUpdateMask(updateMask) + .build(); + + Subscription response = subscriptionAdminClient.updateSubscription(request); + System.out.println("Updated subscription " + response.getName()); + } + } + + static void testSubscriptionPermissions(String projectId, String subscriptionId) + throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + + List permissions = new ArrayList<>(); + permissions.add("pubsub.subscriptions.consume"); + permissions.add("pubsub.subscriptions.update"); + + TestIamPermissionsRequest testIamPermissionsRequest = + TestIamPermissionsRequest.newBuilder() + .setResource(subscriptionName.toString()) + .addAllPermissions(permissions) + .build(); + + TestIamPermissionsResponse testedPermissionsResponse = + subscriptionAdminClient.testIamPermissions(testIamPermissionsRequest); + + System.out.println( + "Tested PubSub subscription permissions\n" + testedPermissionsResponse.toString().trim()); + } + } + + static void testTopicPermissions(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId); + + List permissions = new ArrayList<>(); + permissions.add("pubsub.topics.attachSubscription"); + permissions.add("pubsub.topics.publish"); + permissions.add("pubsub.topics.update"); + + TestIamPermissionsRequest testIamPermissionsRequest = + TestIamPermissionsRequest.newBuilder() + .setResource(topicName.toString()) + .addAllPermissions(permissions) + .build(); + + TestIamPermissionsResponse testedPermissionsResponse = + topicAdminClient.testIamPermissions(testIamPermissionsRequest); + + System.out.println( + "Tested topic permissions\n" + testedPermissionsResponse.toString().trim()); + } + } + + static void deleteTopic(String projectId, String topicId) throws IOException { + try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) { + TopicName topicName = TopicName.of(projectId, topicId); + try { + topicAdminClient.deleteTopic(topicName); + System.out.println("Deleted topic " + topicName); + } catch (NotFoundException e) { + System.out.println(e.getMessage()); + } + } + } + + static void deleteSubscription(String projectId, String subscriptionId) throws IOException { + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create()) { + ProjectSubscriptionName subscriptionName = + ProjectSubscriptionName.of(projectId, subscriptionId); + try { + subscriptionAdminClient.deleteSubscription(subscriptionName); + System.out.println("Deleted subscription " + subscriptionName); + } catch (NotFoundException e) { + System.out.println(e.getMessage()); + } + } + } +} diff --git a/samples/native-image-sample/src/main/java/utilities/PublishOperations.java b/samples/native-image-sample/src/main/java/utilities/PublishOperations.java new file mode 100644 index 000000000..068312025 --- /dev/null +++ b/samples/native-image-sample/src/main/java/utilities/PublishOperations.java @@ -0,0 +1,159 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utilities; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; +import java.io.IOException; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +/** Sample methods for Publishing messages to a topic in Pub/Sub. */ +public class PublishOperations { + + public static void publishMessage(String projectId, String topicId) throws Exception { + + Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topicId)).build(); + + try { + String message = "Pub/Sub Native Image Test published message at timestamp: " + Instant.now(); + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + publisher.publish(pubsubMessage); + + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + String messageId = messageIdFuture.get(); + + System.out.println("Published message with ID: " + messageId); + } finally { + publisher.shutdown(); + } + } + + public static void publishWithCustomAttributes(String projectId, String topicId) + throws Exception { + + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = Publisher.newBuilder(topicName).build(); + + try { + String message = "first message"; + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = + PubsubMessage.newBuilder() + .setData(data) + .putAllAttributes(Collections.singletonMap("year", "2020")) + .build(); + + // Once published, returns a server-assigned message id (unique within the topic) + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + String messageId = messageIdFuture.get(); + System.out.println("Published a message with custom attributes: " + messageId); + } finally { + publisher.shutdown(); + } + } + + public static void publishWithBatchSettings(String projectId, String topicId) + throws IOException, ExecutionException, InterruptedException { + + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = Publisher.newBuilder(topicName).build(); + List> messageIdFutures = new ArrayList<>(); + + try { + // schedule publishing one message at a time : messages get automatically batched + for (int i = 0; i < 100; i++) { + String message = "message " + i; + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + // Once published, returns a server-assigned message id (unique within the topic) + ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + messageIdFutures.add(messageIdFuture); + } + } finally { + // Wait on any pending publish requests. + List messageIds = ApiFutures.allAsList(messageIdFutures).get(); + System.out.println("Published " + messageIds.size() + " messages with batch settings."); + + publisher.shutdown(); + } + } + + public static void publishWithErrorHandler(String projectId, String topicId) throws IOException { + + TopicName topicName = TopicName.of(projectId, topicId); + Publisher publisher = null; + + try { + // Create a publisher instance with default settings bound to the topic + publisher = Publisher.newBuilder(topicName).build(); + + List messages = Arrays.asList("first message", "second message"); + + for (final String message : messages) { + ByteString data = ByteString.copyFromUtf8(message); + PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build(); + + // Once published, returns a server-assigned message id (unique within the topic) + ApiFuture future = publisher.publish(pubsubMessage); + + // Add an asynchronous callback to handle success / failure + ApiFutures.addCallback( + future, + new ApiFutureCallback() { + + @Override + public void onFailure(Throwable throwable) { + if (throwable instanceof ApiException) { + ApiException apiException = ((ApiException) throwable); + // details on the API exception + System.out.println(apiException.getStatusCode().getCode()); + System.out.println(apiException.isRetryable()); + } + System.out.println("Error publishing message : " + message); + } + + @Override + public void onSuccess(String messageId) { + // Once published, returns server-assigned message ids (unique within the topic) + System.out.println("Success Callback: Published message " + messageId); + } + }, + MoreExecutors.directExecutor()); + } + } finally { + if (publisher != null) { + // When finished with the publisher, shutdown to free up resources. + publisher.shutdown(); + } + } + } +} diff --git a/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java b/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java new file mode 100644 index 000000000..c221d735a --- /dev/null +++ b/samples/native-image-sample/src/test/java/pubsub/NativeImagePubSubSampleIT.java @@ -0,0 +1,139 @@ +/* + * Copyright 2022 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package pubsub; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.ServiceOptions; +import com.google.pubsub.v1.SubscriptionName; +import com.google.pubsub.v1.TopicName; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.util.UUID; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import utilities.PublishOperations; + +public class NativeImagePubSubSampleIT { + + private static String TOPIC_ID = "native-pubsub-test-" + UUID.randomUUID(); + private static String PULL_SUB_ID = "native-pubsub-test-sub" + UUID.randomUUID(); + private static String PUSH_SUB_ID = "native-pubsub-test-sub" + UUID.randomUUID(); + private static String PROJECT_ID = ServiceOptions.getDefaultProjectId(); + private static final TopicName TOPIC_NAME = TopicName.of(PROJECT_ID, TOPIC_ID); + private static final SubscriptionName PULL_SUBSCRIPTION_NAME = + SubscriptionName.of(PROJECT_ID, PULL_SUB_ID); + private static final SubscriptionName PUSH_SUBSCRIPTION_NAME = + SubscriptionName.of(PROJECT_ID, PUSH_SUB_ID); + + private ByteArrayOutputStream bout; + private PrintStream out; + + @Before + public void setUp() { + bout = new ByteArrayOutputStream(); + out = new PrintStream(bout); + System.setOut(out); + } + + @After + public void cleanUp() throws IOException { + NativeImagePubSubSample.deleteTopic(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.deleteSubscription(PROJECT_ID, PULL_SUB_ID); + NativeImagePubSubSample.deleteSubscription(PROJECT_ID, PUSH_SUB_ID); + } + + @Test + public void testRunTopicManagementOperations() throws IOException { + // Topic management operations + NativeImagePubSubSample.createTopic(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.createPullSubscription(PROJECT_ID, PULL_SUB_ID, TOPIC_ID); + NativeImagePubSubSample.createPushSubscription(PROJECT_ID, PUSH_SUB_ID, TOPIC_ID); + NativeImagePubSubSample.detachSubscription(PROJECT_ID, PUSH_SUB_ID); + NativeImagePubSubSample.getTopicPolicy(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.getSubscriptionPolicy(PROJECT_ID, PULL_SUB_ID); + NativeImagePubSubSample.listSubscriptionInProject(PROJECT_ID); + NativeImagePubSubSample.listSubscriptionInTopic(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.listTopics(PROJECT_ID); + NativeImagePubSubSample.updateSubscriptionDeadLetterTopic( + PROJECT_ID, PUSH_SUB_ID, TOPIC_ID, TOPIC_ID); + NativeImagePubSubSample.testTopicPermissions(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.testSubscriptionPermissions(PROJECT_ID, PUSH_SUB_ID); + + // Verify create topic and subscriptions + assertThat(bout.toString()) + .contains("Created topic: " + TOPIC_NAME.toString() + " under project: " + PROJECT_ID); + assertThat(bout.toString()) + .contains("Created pull subscription: " + PULL_SUBSCRIPTION_NAME.toString()); + assertThat(bout.toString()) + .contains("Created push subscription: " + PUSH_SUBSCRIPTION_NAME.toString()); + + // Verify detach subscription + assertThat(bout.toString()).contains("Subscription is detached"); + + // Verify topic and subscription IAM policy + assertThat(bout.toString()).contains("Topic policy: etag: \"\\000 \\001"); + assertThat(bout.toString()).contains("Subscription policy: etag: \"\\000 \\001\""); + + // Verify listing of subscriptions and topics + assertThat(bout.toString()).contains("Subscriptions in project count:"); + assertThat(bout.toString()).contains("Subscriptions under topic:"); + assertThat(bout.toString()).contains("Topic count under project:"); + + // Verify update of subscription + assertThat(bout.toString()).contains("Updated subscription " + PUSH_SUBSCRIPTION_NAME); + + // Verify topic permissions + assertThat(bout.toString()).contains("Tested topic permissions"); + assertThat(bout.toString()).contains("permissions: \"pubsub.topics.attachSubscription\""); + assertThat(bout.toString()).contains("permissions: \"pubsub.topics.publish\""); + assertThat(bout.toString()).contains("permissions: \"pubsub.topics.update\""); + + // Verify subscription permissions + assertThat(bout.toString()).contains("Tested PubSub subscription permissions"); + assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.consume\""); + assertThat(bout.toString()).contains("permissions: \"pubsub.subscriptions.update\""); + } + + @Test + public void testPublishAndSubscribe() throws Exception { + NativeImagePubSubSample.createTopic(PROJECT_ID, TOPIC_ID); + NativeImagePubSubSample.createPullSubscription(PROJECT_ID, PULL_SUB_ID, TOPIC_ID); + + bout.reset(); + + // Publish + PublishOperations.publishMessage(PROJECT_ID, TOPIC_ID); + PublishOperations.publishWithBatchSettings(PROJECT_ID, TOPIC_ID); + PublishOperations.publishWithCustomAttributes(PROJECT_ID, TOPIC_ID); + PublishOperations.publishWithErrorHandler(PROJECT_ID, TOPIC_ID); + + // Subscribe + NativeImagePubSubSample.subscribeSync(PROJECT_ID, PULL_SUB_ID); + NativeImagePubSubSample.receiveMessagesWithDeliveryAttempts(PROJECT_ID, PULL_SUB_ID); + + assertThat(bout.toString()).contains("Published message with ID"); + assertThat(bout.toString()).contains("Published 100 messages with batch settings."); + assertThat(bout.toString()).contains("Published a message with custom attributes"); + assertThat(bout.toString()).contains("Success Callback: Published message"); + assertThat(bout.toString()).contains("Success Callback: Published message"); + assertThat(bout.toString()).contains("Received Payload"); + assertThat(bout.toString()).contains("Successfully started an async message receiver"); + } +} diff --git a/samples/pom.xml b/samples/pom.xml index c3399fdec..e138ae3bc 100644 --- a/samples/pom.xml +++ b/samples/pom.xml @@ -46,6 +46,7 @@ install-without-bom snapshot snippets + native-image-sample