Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

samples: publish with ordering keys #156

Merged
merged 6 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions samples/snippets/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ To run this sample:

.. code-block:: bash

$ python publisher.py
$ python publisher.py --help

usage: publisher.py [-h]
project_id
{list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings}
{list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys}
...

This application demonstrates how to perform basic operations on topics
Expand All @@ -125,7 +125,7 @@ To run this sample:

positional arguments:
project_id Your Google Cloud project ID
{list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings}
{list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings,publish-with-ordering-keys,resume-publish-with-ordering-keys}
list Lists all Pub/Sub topics in the given project.
create Create a new Pub/Sub topic.
delete Deletes an existing Pub/Sub topic.
Expand All @@ -141,6 +141,11 @@ To run this sample:
batch settings.
publish-with-retry-settings
Publishes messages with custom retry settings.
publish-with-ordering-keys
Publishes messages with ordering keys.
resume-publish-with-ordering-keys
Resume publishing messages with ordering keys when
unrecoverable errors occur.

optional arguments:
-h, --help show this help message and exit
Expand All @@ -160,11 +165,11 @@ To run this sample:

.. code-block:: bash

$ python subscriber.py
$ python subscriber.py --help

usage: subscriber.py [-h]
project_id
{list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts}
{list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts}
...

This application demonstrates how to perform basic operations on
Expand All @@ -175,13 +180,15 @@ To run this sample:

positional arguments:
project_id Your Google Cloud project ID
{list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts}
{list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,create-with-ordering,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts}
list-in-topic Lists all subscriptions for a given topic.
list-in-project Lists all subscriptions in the current project.
create Create a new pull subscription on the given topic.
create-with-dead-letter-policy
Create a subscription with dead letter policy.
create-push Create a new push subscription on the given topic.
create-with-ordering
Create a subscription with dead letter policy.
delete Deletes an existing Pub/Sub topic.
update-push Updates an existing Pub/Sub subscription's push
endpoint URL. Note that certain properties of a
Expand All @@ -208,7 +215,6 @@ To run this sample:
-h, --help show this help message and exit



Identity and Access Management
+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Expand All @@ -221,20 +227,15 @@ Identity and Access Management
To run this sample:

.. code-block:: bash

$ python iam.py

usage: iam.py [-h]
project
{get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions}
...

This application demonstrates how to perform basic operations on IAM
policies with the Cloud Pub/Sub API.

For more information, see the README.md under /pubsub and the documentation
at https://cloud.google.com/pubsub/docs.

positional arguments:
project Your Google Cloud project ID
{get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions}
Expand All @@ -250,14 +251,10 @@ To run this sample:
check-subscription-permissions
Checks to which permissions are available on the given
subscription.

optional arguments:
-h, --help show this help message and exit





The client library
-------------------------------------------------------------------------------

Expand All @@ -273,4 +270,4 @@ to `browse the source`_ and `report issues`_.
https://github.com/GoogleCloudPlatform/google-cloud-python/issues


.. _Google Cloud SDK: https://cloud.google.com/sdk/
.. _Google Cloud SDK: https://cloud.google.com/sdk/
110 changes: 108 additions & 2 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,94 @@ def publish_messages_with_retry_settings(project_id, topic_id):
# [END pubsub_publisher_retry_settings]


def publish_with_ordering_keys(project_id, topic_id):
"""Publishes messages with ordering keys."""
# [START pubsub_publish_with_ordering_keys]
from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options,
client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path, data=data, ordering_key=ordering_key
)
print(future.result())

print("Published messages with ordering keys.")
# [END pubsub_publish_with_ordering_keys]


def resume_publish_with_ordering_keys(project_id, topic_id):
"""Resume publishing messages with ordering keys when unrecoverable errors occur."""
# [START pubsub_resume_publish_with_ordering_keys]
from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"

publisher_options = pubsub_v1.types.PublisherOptions(
enable_message_ordering=True
anguillanneuf marked this conversation as resolved.
Show resolved Hide resolved
)
# Sending messages to the same region ensures they are received in order
# even when multiple publishers are used.
client_options = {"api_endpoint": " us-east1-pubsub.googleapis.com:443"}
publisher = pubsub_v1.PublisherClient(
publisher_options=publisher_options,
client_options=client_options
)
# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_id}`
topic_path = publisher.topic_path(project_id, topic_id)

for message in [
("message1", "key1"),
("message2", "key2"),
("message3", "key1"),
("message4", "key2"),
]:
# Data must be a bytestring
data = message[0].encode("utf-8")
ordering_key = message[1]
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path, data=data, ordering_key=ordering_key
)
try:
print(future.result())
except RuntimeError:
# Resume publish on an ordering key that has had unrecoverable errors.
publisher.resume_publish(topic_path, ordering_key)

print("Published messages with ordering keys.")
# [END pubsub_resume_publish_with_ordering_keys]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter,
Expand All @@ -288,7 +376,9 @@ def publish_messages_with_retry_settings(project_id, topic_id):
delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__)
delete_parser.add_argument("topic_id")

publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__)
publish_parser = subparsers.add_parser(
"publish", help=publish_messages.__doc__
)
publish_parser.add_argument("topic_id")

publish_with_custom_attributes_parser = subparsers.add_parser(
Expand All @@ -298,7 +388,8 @@ def publish_messages_with_retry_settings(project_id, topic_id):
publish_with_custom_attributes_parser.add_argument("topic_id")

publish_with_error_handler_parser = subparsers.add_parser(
"publish-with-error-handler", help=publish_messages_with_error_handler.__doc__,
"publish-with-error-handler",
help=publish_messages_with_error_handler.__doc__,
)
publish_with_error_handler_parser.add_argument("topic_id")

Expand All @@ -314,6 +405,17 @@ def publish_messages_with_retry_settings(project_id, topic_id):
)
publish_with_retry_settings_parser.add_argument("topic_id")

publish_with_ordering_keys_parser = subparsers.add_parser(
"publish-with-ordering-keys", help=publish_with_ordering_keys.__doc__,
)
publish_with_ordering_keys_parser.add_argument("topic_id")

resume_publish_with_ordering_keys_parser = subparsers.add_parser(
"resume-publish-with-ordering-keys",
help=resume_publish_with_ordering_keys.__doc__,
)
resume_publish_with_ordering_keys_parser.add_argument("topic_id")

args = parser.parse_args()

if args.command == "list":
Expand All @@ -332,3 +434,7 @@ def publish_messages_with_retry_settings(project_id, topic_id):
publish_messages_with_batch_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-retry-settings":
publish_messages_with_retry_settings(args.project_id, args.topic_id)
elif args.command == "publish-with-ordering-keys":
publish_with_ordering_keys(args.project_id, args.topic_id)
elif args.command == "resume-publish-with-ordering-keys":
resume_publish_with_ordering_keys(args.project_id, args.topic_id)
14 changes: 14 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,17 @@ def test_publish_with_error_handler(topic_publish, capsys):

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_ordering_keys(topic_publish, capsys):
publisher.publish_messages_with_ordering_keys(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published messages with ordering keys." in out


def test_resume_publish_with_error_handler(topic_publish, capsys):
publisher.resume_publish_with_ordering_keys(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published messages with ordering keys." in out
32 changes: 32 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,28 @@ def create_push_subscription(project_id, topic_id, subscription_id, endpoint):
# [END pubsub_create_push_subscription]


def create_subscription_with_ordering(project_id, topic_id, subscription_id):
"""Create a subscription with dead letter policy."""
# [START pubsub_enable_subscription_ordering]
from google.cloud import pubsub_v1

# TODO(developer): Choose an existing topic.
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic_path() does not exist anymore on subscriber client, had to fix quite a few of these in #158. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is a massive PR! Thank you for the heads-up! I will aim to merge this one after that one and the changes are in.

subscription_path = subscriber.subscription_path(project_id, subscription_id)

with subscriber:
subscription = subscriber.create_subscription(
subscription_path, topic_path, enable_message_ordering=True
)
print("Created subscription with ordering: {}".format(subscription))
# [END pubsub_enable_subscription_ordering]


def delete_subscription(project_id, subscription_id):
"""Deletes an existing Pub/Sub topic."""
# [START pubsub_delete_subscription]
Expand Down Expand Up @@ -654,6 +676,12 @@ def callback(message):
create_push_parser.add_argument("subscription_id")
create_push_parser.add_argument("endpoint")

create_subscription_with_ordering_parser = subparsers.add_parser(
"create-with-ordering", help=create_subscription_with_ordering.__doc__
)
create_subscription_with_ordering_parser.add_argument("topic_id")
create_subscription_with_ordering_parser.add_argument("subscription_id")

delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__)
delete_parser.add_argument("subscription_id")

Expand Down Expand Up @@ -746,6 +774,10 @@ def callback(message):
create_push_subscription(
args.project_id, args.topic_id, args.subscription_id, args.endpoint,
)
elif args.command == "create-with-ordering":
create_subscription_with_ordering(
args.project_id, args.topic_id, args.subscription_id
)
elif args.command == "delete":
delete_subscription(args.project_id, args.subscription_id)
elif args.command == "update-push":
Expand Down
11 changes: 11 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID
SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID
SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID
SUBSCRIPTION_ORDERING = "subscription-test-subscription-ordering-" + UUID
ENDPOINT = "https://{}.appspot.com/push".format(PROJECT)
NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT)

Expand Down Expand Up @@ -209,6 +210,16 @@ def eventually_consistent_test():
eventually_consistent_test()


def test_create_subscription_with_ordering(subscriber_client, capsys):
subscriber.create_subscription_with_ordering(PROJECT, TOPIC, SUBSCRIPTION_ORDERING)
out, _ = capsys.readouterr()
assert "Created subscription with ordering" in out
assert "enable_message_ordering: true" in out

subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ORDERING)
subscriber_client.delete_subscription(subscription_path)


def test_update(subscriber_client, subscription_admin, capsys):
subscriber.update_push_subscription(
PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT
Expand Down