diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 000000000..cf01548a9 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,11 @@ +# Code owners file. +# This file controls who is tagged for review for any given pull request. +# +# For syntax help see: +# https://help.github.com/en/github/creating-cloning-and-archiving-repositories/about-code-owners#codeowners-syntax + + +# The python-samples-owners team is the default owner for anything not +# explicitly taken by someone else. + + /samples/ @anguillanneuf @hongalex @googleapis/python-samples-owners diff --git a/samples/AUTHORING_GUIDE.md b/samples/AUTHORING_GUIDE.md new file mode 100644 index 000000000..55c97b32f --- /dev/null +++ b/samples/AUTHORING_GUIDE.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/AUTHORING_GUIDE.md \ No newline at end of file diff --git a/samples/CONTRIBUTING.md b/samples/CONTRIBUTING.md new file mode 100644 index 000000000..34c882b6f --- /dev/null +++ b/samples/CONTRIBUTING.md @@ -0,0 +1 @@ +See https://github.com/GoogleCloudPlatform/python-docs-samples/blob/master/CONTRIBUTING.md \ No newline at end of file diff --git a/samples/snippets/README.rst b/samples/snippets/README.rst new file mode 100644 index 000000000..2676680af --- /dev/null +++ b/samples/snippets/README.rst @@ -0,0 +1,282 @@ + +.. This file is automatically generated. Do not edit this file directly. + +Google Cloud Pub/Sub Python Samples +=============================================================================== + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/README.rst + + +This directory contains samples for Google Cloud Pub/Sub. `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that allows you to send and receive messages between independent applications. + + + + +.. _Google Cloud Pub/Sub: https://cloud.google.com/pubsub/docs + + +Setup +------------------------------------------------------------------------------- + + + +Authentication +++++++++++++++ + +This sample requires you to have authentication setup. Refer to the +`Authentication Getting Started Guide`_ for instructions on setting up +credentials for applications. + +.. _Authentication Getting Started Guide: + https://cloud.google.com/docs/authentication/getting-started + + + + +Install Dependencies +++++++++++++++++++++ + +#. Clone python-docs-samples and change directory to the sample directory you want to use. + + .. code-block:: bash + + $ git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git + +#. Install `pip`_ and `virtualenv`_ if you do not already have them. You may want to refer to the `Python Development Environment Setup Guide`_ for Google Cloud Platform for instructions. + + .. _Python Development Environment Setup Guide: + https://cloud.google.com/python/setup + +#. Create a virtualenv. Samples are compatible with Python 3.6+. + + .. code-block:: bash + + $ virtualenv env + $ source env/bin/activate + +#. Install the dependencies needed to run the samples. + + .. code-block:: bash + + $ pip install -r requirements.txt + +.. _pip: https://pip.pypa.io/ +.. _virtualenv: https://virtualenv.pypa.io/ + + + + + + +Samples +------------------------------------------------------------------------------- + + +Quickstart ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/quickstart.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python quickstart.py + + + + +Publisher ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/publisher.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python publisher.py + + + usage: publisher.py [-h] + project_id + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + ... + + This application demonstrates how to perform basic operations on topics + with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project_id Your Google Cloud project ID + {list,create,delete,publish,publish-with-custom-attributes,publish-with-error-handler,publish-with-batch-settings,publish-with-retry-settings} + list Lists all Pub/Sub topics in the given project. + create Create a new Pub/Sub topic. + delete Deletes an existing Pub/Sub topic. + publish Publishes multiple messages to a Pub/Sub topic. + publish-with-custom-attributes + Publishes multiple messages with custom attributes to + a Pub/Sub topic. + publish-with-error-handler + Publishes multiple messages to a Pub/Sub topic with an + error handler. + publish-with-batch-settings + Publishes multiple messages to a Pub/Sub topic with + batch settings. + publish-with-retry-settings + Publishes messages with custom retry settings. + + optional arguments: + -h, --help show this help message and exit + + + + + +Subscribers ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/subscriber.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python subscriber.py + + + usage: subscriber.py [-h] + project_id + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + ... + + This application demonstrates how to perform basic operations on + subscriptions with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project_id Your Google Cloud project ID + {list-in-topic,list-in-project,create,create-with-dead-letter-policy,create-push,delete,update-push,update-dead-letter-policy,remove-dead-letter-policy,receive,receive-custom-attributes,receive-flow-control,receive-synchronously,receive-synchronously-with-lease,listen-for-errors,receive-messages-with-delivery-attempts} + list-in-topic Lists all subscriptions for a given topic. + list-in-project Lists all subscriptions in the current project. + create Create a new pull subscription on the given topic. + create-with-dead-letter-policy + Create a subscription with dead letter policy. + create-push Create a new push subscription on the given topic. + delete Deletes an existing Pub/Sub topic. + update-push Updates an existing Pub/Sub subscription's push + endpoint URL. Note that certain properties of a + subscription, such as its topic, are not modifiable. + update-dead-letter-policy + Update a subscription's dead letter policy. + remove-dead-letter-policy + Remove dead letter policy from a subscription. + receive Receives messages from a pull subscription. + receive-custom-attributes + Receives messages from a pull subscription. + receive-flow-control + Receives messages from a pull subscription with flow + control. + receive-synchronously + Pulling messages synchronously. + receive-synchronously-with-lease + Pulling messages synchronously with lease management + listen-for-errors Receives messages and catches errors from a pull + subscription. + receive-messages-with-delivery-attempts + + optional arguments: + -h, --help show this help message and exit + + + + + +Identity and Access Management ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + +.. image:: https://gstatic.com/cloudssh/images/open-btn.png + :target: https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/GoogleCloudPlatform/python-docs-samples&page=editor&open_in_editor=pubsub/cloud-client/iam.py,pubsub/cloud-client/README.rst + + + + +To run this sample: + +.. code-block:: bash + + $ python iam.py + + + usage: iam.py [-h] + project + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + ... + + This application demonstrates how to perform basic operations on IAM + policies with the Cloud Pub/Sub API. + + For more information, see the README.md under /pubsub and the documentation + at https://cloud.google.com/pubsub/docs. + + positional arguments: + project Your Google Cloud project ID + {get-topic-policy,get-subscription-policy,set-topic-policy,set-subscription-policy,check-topic-permissions,check-subscription-permissions} + get-topic-policy Prints the IAM policy for the given topic. + get-subscription-policy + Prints the IAM policy for the given subscription. + set-topic-policy Sets the IAM policy for a topic. + set-subscription-policy + Sets the IAM policy for a topic. + check-topic-permissions + Checks to which permissions are available on the given + topic. + check-subscription-permissions + Checks to which permissions are available on the given + subscription. + + optional arguments: + -h, --help show this help message and exit + + + + + + + + + +The client library +------------------------------------------------------------------------------- + +This sample uses the `Google Cloud Client Library for Python`_. +You can read the documentation for more details on API usage and use GitHub +to `browse the source`_ and `report issues`_. + +.. _Google Cloud Client Library for Python: + https://googlecloudplatform.github.io/google-cloud-python/ +.. _browse the source: + https://github.com/GoogleCloudPlatform/google-cloud-python +.. _report issues: + https://github.com/GoogleCloudPlatform/google-cloud-python/issues + + + +.. _Google Cloud SDK: https://cloud.google.com/sdk/ diff --git a/samples/snippets/README.rst.in b/samples/snippets/README.rst.in new file mode 100644 index 000000000..ddbc64712 --- /dev/null +++ b/samples/snippets/README.rst.in @@ -0,0 +1,30 @@ +# This file is used to generate README.rst + +product: + name: Google Cloud Pub/Sub + short_name: Cloud Pub/Sub + url: https://cloud.google.com/pubsub/docs + description: > + `Google Cloud Pub/Sub`_ is a fully-managed real-time messaging service that + allows you to send and receive messages between independent applications. + +setup: +- auth +- install_deps + +samples: +- name: Quickstart + file: quickstart.py +- name: Publisher + file: publisher.py + show_help: true +- name: Subscribers + file: subscriber.py + show_help: true +- name: Identity and Access Management + file: iam.py + show_help: true + +cloud_client_library: true + +folder: pubsub/cloud-client \ No newline at end of file diff --git a/samples/snippets/iam.py b/samples/snippets/iam.py new file mode 100644 index 000000000..71c55d764 --- /dev/null +++ b/samples/snippets/iam.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python + +# Copyright 2019 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on IAM +policies with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def get_topic_policy(project, topic_id): + """Prints the IAM policy for the given topic.""" + # [START pubsub_get_topic_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + policy = client.get_iam_policy(topic_path) + + print("Policy for topic {}:".format(topic_path)) + for binding in policy.bindings: + print("Role: {}, Members: {}".format(binding.role, binding.members)) + # [END pubsub_get_topic_policy] + + +def get_subscription_policy(project, subscription_id): + """Prints the IAM policy for the given subscription.""" + # [START pubsub_get_subscription_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + policy = client.get_iam_policy(subscription_path) + + print("Policy for subscription {}:".format(subscription_path)) + for binding in policy.bindings: + print("Role: {}, Members: {}".format(binding.role, binding.members)) + + client.close() + # [END pubsub_get_subscription_policy] + + +def set_topic_policy(project, topic_id): + """Sets the IAM policy for a topic.""" + # [START pubsub_set_topic_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + policy = client.get_iam_policy(topic_path) + + # Add all users as viewers. + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) + + # Add a group as a publisher. + policy.bindings.add( + role="roles/pubsub.publisher", members=["group:cloud-logs@google.com"] + ) + + # Set the policy + policy = client.set_iam_policy(topic_path, policy) + + print("IAM policy for topic {} set: {}".format(topic_id, policy)) + # [END pubsub_set_topic_policy] + + +def set_subscription_policy(project, subscription_id): + """Sets the IAM policy for a topic.""" + # [START pubsub_set_subscription_policy] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + policy = client.get_iam_policy(subscription_path) + + # Add all users as viewers. + policy.bindings.add(role="roles/pubsub.viewer", members=["allUsers"]) + + # Add a group as an editor. + policy.bindings.add(role="roles/editor", members=["group:cloud-logs@google.com"]) + + # Set the policy + policy = client.set_iam_policy(subscription_path, policy) + + print("IAM policy for subscription {} set: {}".format(subscription_id, policy)) + + client.close() + # [END pubsub_set_subscription_policy] + + +def check_topic_permissions(project, topic_id): + """Checks to which permissions are available on the given topic.""" + # [START pubsub_test_topic_permissions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + client = pubsub_v1.PublisherClient() + topic_path = client.topic_path(project, topic_id) + + permissions_to_check = ["pubsub.topics.publish", "pubsub.topics.update"] + + allowed_permissions = client.test_iam_permissions(topic_path, permissions_to_check) + + print( + "Allowed permissions for topic {}: {}".format(topic_path, allowed_permissions) + ) + # [END pubsub_test_topic_permissions] + + +def check_subscription_permissions(project, subscription_id): + """Checks to which permissions are available on the given subscription.""" + # [START pubsub_test_subscription_permissions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + client = pubsub_v1.SubscriberClient() + subscription_path = client.subscription_path(project, subscription_id) + + permissions_to_check = [ + "pubsub.subscriptions.consume", + "pubsub.subscriptions.update", + ] + + allowed_permissions = client.test_iam_permissions( + subscription_path, permissions_to_check + ) + + print( + "Allowed permissions for subscription {}: {}".format( + subscription_path, allowed_permissions + ) + ) + + client.close() + # [END pubsub_test_subscription_permissions] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + + get_topic_policy_parser = subparsers.add_parser( + "get-topic-policy", help=get_topic_policy.__doc__ + ) + get_topic_policy_parser.add_argument("topic_id") + + get_subscription_policy_parser = subparsers.add_parser( + "get-subscription-policy", help=get_subscription_policy.__doc__ + ) + get_subscription_policy_parser.add_argument("subscription_id") + + set_topic_policy_parser = subparsers.add_parser( + "set-topic-policy", help=set_topic_policy.__doc__ + ) + set_topic_policy_parser.add_argument("topic_id") + + set_subscription_policy_parser = subparsers.add_parser( + "set-subscription-policy", help=set_subscription_policy.__doc__ + ) + set_subscription_policy_parser.add_argument("subscription_id") + + check_topic_permissions_parser = subparsers.add_parser( + "check-topic-permissions", help=check_topic_permissions.__doc__ + ) + check_topic_permissions_parser.add_argument("topic_id") + + check_subscription_permissions_parser = subparsers.add_parser( + "check-subscription-permissions", help=check_subscription_permissions.__doc__, + ) + check_subscription_permissions_parser.add_argument("subscription_id") + + args = parser.parse_args() + + if args.command == "get-topic-policy": + get_topic_policy(args.project, args.topic_id) + elif args.command == "get-subscription-policy": + get_subscription_policy(args.project, args.subscription_id) + elif args.command == "set-topic-policy": + set_topic_policy(args.project, args.topic_id) + elif args.command == "set-subscription-policy": + set_subscription_policy(args.project, args.subscription_id) + elif args.command == "check-topic-permissions": + check_topic_permissions(args.project, args.topic_id) + elif args.command == "check-subscription-permissions": + check_subscription_permissions(args.project, args.subscription_id) diff --git a/samples/snippets/iam_test.py b/samples/snippets/iam_test.py new file mode 100644 index 000000000..d196953f6 --- /dev/null +++ b/samples/snippets/iam_test.py @@ -0,0 +1,118 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.cloud import pubsub_v1 +import pytest + +import iam + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "iam-test-topic-" + UUID +SUBSCRIPTION = "iam-test-subscription-" + UUID + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.delete_topic(topic_path) + except Exception: + pass + + publisher_client.create_topic(topic_path) + + yield topic_path + + publisher_client.delete_topic(topic_path) + + +@pytest.fixture(scope="module") +def subscriber_client(): + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() + + +@pytest.fixture +def subscription(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber_client.create_subscription(subscription_path, topic=topic) + + yield subscription_path + + subscriber_client.delete_subscription(subscription_path) + + +def test_get_topic_policy(topic, capsys): + iam.get_topic_policy(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + assert topic in out + + +def test_get_subscription_policy(subscription, capsys): + iam.get_subscription_policy(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert subscription in out + + +def test_set_topic_policy(publisher_client, topic): + iam.set_topic_policy(PROJECT, TOPIC) + + policy = publisher_client.get_iam_policy(topic) + assert "roles/pubsub.publisher" in str(policy) + assert "allUsers" in str(policy) + + +def test_set_subscription_policy(subscriber_client, subscription): + iam.set_subscription_policy(PROJECT, SUBSCRIPTION) + + policy = subscriber_client.get_iam_policy(subscription) + assert "roles/pubsub.viewer" in str(policy) + assert "allUsers" in str(policy) + + +def test_check_topic_permissions(topic, capsys): + iam.check_topic_permissions(PROJECT, TOPIC) + + out, _ = capsys.readouterr() + + assert topic in out + assert "pubsub.topics.publish" in out + + +def test_check_subscription_permissions(subscription, capsys): + iam.check_subscription_permissions(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + + assert subscription in out + assert "pubsub.subscriptions.consume" in out diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py new file mode 100644 index 000000000..ba55d7ce5 --- /dev/null +++ b/samples/snippets/noxfile.py @@ -0,0 +1,224 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +from pathlib import Path +import sys + +import nox + + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +# Copy `noxfile_config.py` to your directory and modify it instead. + + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + 'ignored_versions': ["2.7"], + + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + 'envs': {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append('.') + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars(): + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG['gcloud_project_env'] + # This should error out if not set. + ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG['envs']) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to tested samples. +ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = bool(os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False)) +# +# Style Checks +# + + +def _determine_local_import_names(start_dir): + """Determines all import names that should be considered "local". + + This is used when running the linter to insure that import order is + properly checked. + """ + file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] + return [ + basename + for basename, extension in file_ext_pairs + if extension == ".py" + or os.path.isdir(os.path.join(start_dir, basename)) + and basename not in ("__pycache__") + ] + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--import-order-style=google", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session): + session.install("flake8", "flake8-import-order") + + local_names = _determine_local_import_names(".") + args = FLAKE8_COMMON_ARGS + [ + "--application-import-names", + ",".join(local_names), + "." + ] + session.run("flake8", *args) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests(session, post_install=None): + """Runs py.test for a particular project.""" + if os.path.exists("requirements.txt"): + session.install("-r", "requirements.txt") + + if os.path.exists("requirements-test.txt"): + session.install("-r", "requirements-test.txt") + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars() + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session): + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip("SKIPPED: {} tests are disabled for this sample.".format( + session.python + )) + + +# +# Readmegen +# + + +def _get_repo_root(): + """ Returns the root folder of the project. """ + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session, path): + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py new file mode 100644 index 000000000..477b31b9c --- /dev/null +++ b/samples/snippets/publisher.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python + +# Copyright 2016 Google LLC. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on topics +with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def list_topics(project_id): + """Lists all Pub/Sub topics in the given project.""" + # [START pubsub_list_topics] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + + publisher = pubsub_v1.PublisherClient() + project_path = publisher.project_path(project_id) + + for topic in publisher.list_topics(project_path): + print(topic) + # [END pubsub_list_topics] + + +def create_topic(project_id, topic_id): + """Create a new Pub/Sub topic.""" + # [START pubsub_quickstart_create_topic] + # [START pubsub_create_topic] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + topic = publisher.create_topic(topic_path) + + print("Topic created: {}".format(topic)) + # [END pubsub_quickstart_create_topic] + # [END pubsub_create_topic] + + +def delete_topic(project_id, topic_id): + """Deletes an existing Pub/Sub topic.""" + # [START pubsub_delete_topic] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + publisher.delete_topic(topic_path) + + print("Topic deleted: {}".format(topic_path)) + # [END pubsub_delete_topic] + + +def publish_messages(project_id, topic_id): + """Publishes multiple messages to a Pub/Sub topic.""" + # [START pubsub_quickstart_publisher] + # [START pubsub_publish] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_id}` + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data=data) + print(future.result()) + + print("Published messages.") + # [END pubsub_quickstart_publisher] + # [END pubsub_publish] + + +def publish_messages_with_custom_attributes(project_id, topic_id): + """Publishes multiple messages with custom attributes + to a Pub/Sub topic.""" + # [START pubsub_publish_custom_attributes] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + # Add two attributes, origin and username, to the message + future = publisher.publish( + topic_path, data, origin="python-sample", username="gcp" + ) + print(future.result()) + + print("Published messages with custom attributes.") + # [END pubsub_publish_custom_attributes] + + +def publish_messages_with_error_handler(project_id, topic_id): + # [START pubsub_publish_messages_error_handler] + """Publishes multiple messages to a Pub/Sub topic with an error handler.""" + import time + + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + futures = dict() + + def get_callback(f, data): + def callback(f): + try: + print(f.result()) + futures.pop(data) + except: # noqa + print("Please handle {} for {}.".format(f.exception(), data)) + + return callback + + for i in range(10): + data = str(i) + futures.update({data: None}) + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, data=data.encode("utf-8") # data must be a bytestring. + ) + futures[data] = future + # Publish failures shall be handled in the callback function. + future.add_done_callback(get_callback(future, data)) + + # Wait for all the publish futures to resolve before exiting. + while futures: + time.sleep(5) + + print("Published message with error handler.") + # [END pubsub_publish_messages_error_handler] + + +def publish_messages_with_batch_settings(project_id, topic_id): + """Publishes multiple messages to a Pub/Sub topic with batch settings.""" + # [START pubsub_publisher_batch_settings] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure the batch to publish as soon as there is ten messages, + # one kilobyte of data, or one second has passed. + batch_settings = pubsub_v1.types.BatchSettings( + max_messages=10, # default 100 + max_bytes=1024, # default 1 MB + max_latency=1, # default 10 ms + ) + publisher = pubsub_v1.PublisherClient(batch_settings) + topic_path = publisher.topic_path(project_id, topic_id) + + # Resolve the publish future in a separate thread. + def callback(future): + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + future = publisher.publish(topic_path, data=data) + # Non-blocking. Allow the publisher client to batch multiple messages. + future.add_done_callback(callback) + + print("Published messages with batch settings.") + # [END pubsub_publisher_batch_settings] + + +def publish_messages_with_retry_settings(project_id, topic_id): + """Publishes messages with custom retry settings.""" + # [START pubsub_publisher_retry_settings] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure the retry settings. Defaults will be overwritten. + retry_settings = { + "interfaces": { + "google.pubsub.v1.Publisher": { + "retry_codes": { + "publish": [ + "ABORTED", + "CANCELLED", + "DEADLINE_EXCEEDED", + "INTERNAL", + "RESOURCE_EXHAUSTED", + "UNAVAILABLE", + "UNKNOWN", + ] + }, + "retry_params": { + "messaging": { + "initial_retry_delay_millis": 100, # default: 100 + "retry_delay_multiplier": 1.3, # default: 1.3 + "max_retry_delay_millis": 60000, # default: 60000 + "initial_rpc_timeout_millis": 5000, # default: 25000 + "rpc_timeout_multiplier": 1.0, # default: 1.0 + "max_rpc_timeout_millis": 600000, # default: 30000 + "total_timeout_millis": 600000, # default: 600000 + } + }, + "methods": { + "Publish": { + "retry_codes_name": "publish", + "retry_params_name": "messaging", + } + }, + } + } + } + + publisher = pubsub_v1.PublisherClient(client_config=retry_settings) + topic_path = publisher.topic_path(project_id, topic_id) + + for n in range(1, 10): + data = u"Message number {}".format(n) + # Data must be a bytestring + data = data.encode("utf-8") + future = publisher.publish(topic_path, data=data) + print(future.result()) + + print("Published messages with retry settings.") + # [END pubsub_publisher_retry_settings] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + subparsers.add_parser("list", help=list_topics.__doc__) + + create_parser = subparsers.add_parser("create", help=create_topic.__doc__) + create_parser.add_argument("topic_id") + + delete_parser = subparsers.add_parser("delete", help=delete_topic.__doc__) + delete_parser.add_argument("topic_id") + + publish_parser = subparsers.add_parser("publish", help=publish_messages.__doc__) + publish_parser.add_argument("topic_id") + + publish_with_custom_attributes_parser = subparsers.add_parser( + "publish-with-custom-attributes", + help=publish_messages_with_custom_attributes.__doc__, + ) + publish_with_custom_attributes_parser.add_argument("topic_id") + + publish_with_error_handler_parser = subparsers.add_parser( + "publish-with-error-handler", help=publish_messages_with_error_handler.__doc__, + ) + publish_with_error_handler_parser.add_argument("topic_id") + + publish_with_batch_settings_parser = subparsers.add_parser( + "publish-with-batch-settings", + help=publish_messages_with_batch_settings.__doc__, + ) + publish_with_batch_settings_parser.add_argument("topic_id") + + publish_with_retry_settings_parser = subparsers.add_parser( + "publish-with-retry-settings", + help=publish_messages_with_retry_settings.__doc__, + ) + publish_with_retry_settings_parser.add_argument("topic_id") + + args = parser.parse_args() + + if args.command == "list": + list_topics(args.project_id) + elif args.command == "create": + create_topic(args.project_id, args.topic_id) + elif args.command == "delete": + delete_topic(args.project_id, args.topic_id) + elif args.command == "publish": + publish_messages(args.project_id, args.topic_id) + elif args.command == "publish-with-custom-attributes": + publish_messages_with_custom_attributes(args.project_id, args.topic_id) + elif args.command == "publish-with-error-handler": + publish_messages_with_error_handler(args.project_id, args.topic_id) + elif args.command == "publish-with-batch-settings": + publish_messages_with_batch_settings(args.project_id, args.topic_id) + elif args.command == "publish-with-retry-settings": + publish_messages_with_retry_settings(args.project_id, args.topic_id) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py new file mode 100644 index 000000000..b5c2ea1ea --- /dev/null +++ b/samples/snippets/publisher_test.py @@ -0,0 +1,146 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import uuid + +import backoff +from google.cloud import pubsub_v1 +import mock +import pytest + +import publisher + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID +TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID + + +@pytest.fixture +def client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture +def topic_admin(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) + + try: + topic = client.get_topic(topic_path) + except: # noqa + topic = client.create_topic(topic_path) + + yield topic.name + # Teardown of `topic_admin` is handled in `test_delete()`. + + +@pytest.fixture +def topic_publish(client): + topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) + + try: + topic = client.get_topic(topic_path) + except: # noqa + topic = client.create_topic(topic_path) + + yield topic.name + + client.delete_topic(topic.name) + + +def _make_sleep_patch(): + real_sleep = time.sleep + + def new_sleep(period): + if period == 60: + real_sleep(5) + raise RuntimeError("sigil") + else: + real_sleep(period) + + return mock.patch("time.sleep", new=new_sleep) + + +def test_list(client, topic_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + publisher.list_topics(PROJECT) + out, _ = capsys.readouterr() + assert topic_admin in out + + eventually_consistent_test() + + +def test_create(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) + try: + client.delete_topic(topic_path) + except Exception: + pass + + publisher.create_topic(PROJECT, TOPIC_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert client.get_topic(topic_path) + + eventually_consistent_test() + + +def test_delete(client, topic_admin): + publisher.delete_topic(PROJECT, TOPIC_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + with pytest.raises(Exception): + client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) + + eventually_consistent_test() + + +def test_publish(topic_publish, capsys): + publisher.publish_messages(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_custom_attributes(topic_publish, capsys): + publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_batch_settings(topic_publish, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_retry_settings(topic_publish, capsys): + publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out + + +def test_publish_with_error_handler(topic_publish, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) + + out, _ = capsys.readouterr() + assert "Published" in out diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py new file mode 100644 index 000000000..16432c0c3 --- /dev/null +++ b/samples/snippets/quickstart/pub.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_pub_all] +import argparse +import time + +# [START pubsub_quickstart_pub_deps] +from google.cloud import pubsub_v1 + +# [END pubsub_quickstart_pub_deps] + + +def get_callback(api_future, data, ref): + """Wrap message data in the context of the callback function.""" + + def callback(api_future): + try: + print( + "Published message {} now has message ID {}".format( + data, api_future.result() + ) + ) + ref["num_messages"] += 1 + except Exception: + print( + "A problem occurred when publishing {}: {}\n".format( + data, api_future.exception() + ) + ) + raise + + return callback + + +def pub(project_id, topic_id): + """Publishes a message to a Pub/Sub topic.""" + # [START pubsub_quickstart_pub_client] + # Initialize a Publisher client. + client = pubsub_v1.PublisherClient() + # [END pubsub_quickstart_pub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/topics/{topic_id}` + topic_path = client.topic_path(project_id, topic_id) + + # Data sent to Cloud Pub/Sub must be a bytestring. + data = b"Hello, World!" + + # Keep track of the number of published messages. + ref = dict({"num_messages": 0}) + + # When you publish a message, the client returns a future. + api_future = client.publish(topic_path, data=data) + api_future.add_done_callback(get_callback(api_future, data, ref)) + + # Keep the main thread from exiting while the message future + # gets resolved in the background. + while api_future.running(): + time.sleep(0.5) + print("Published {} message(s).".format(ref["num_messages"])) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("topic_id", help="Pub/Sub topic ID") + + args = parser.parse_args() + + pub(args.project_id, args.topic_id) +# [END pubsub_quickstart_pub_all] diff --git a/samples/snippets/quickstart/pub_test.py b/samples/snippets/quickstart/pub_test.py new file mode 100644 index 000000000..6f5cc06c4 --- /dev/null +++ b/samples/snippets/quickstart/pub_test.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 +import pytest + +import pub # noqa + + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "quickstart-pub-test-topic-" + UUID + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + publisher_client.create_topic(topic_path) + except AlreadyExists: + pass + + yield TOPIC + + publisher_client.delete_topic(topic_path) + + +def test_pub(publisher_client, topic, capsys): + pub.pub(PROJECT, topic) + + out, _ = capsys.readouterr() + + assert "Hello, World!" in out diff --git a/samples/snippets/quickstart/sub.py b/samples/snippets/quickstart/sub.py new file mode 100644 index 000000000..efe008915 --- /dev/null +++ b/samples/snippets/quickstart/sub.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START pubsub_quickstart_sub_all] +import argparse + +# [START pubsub_quickstart_sub_deps] +from google.cloud import pubsub_v1 + +# [END pubsub_quickstart_sub_deps] + + +def sub(project_id, subscription_id): + """Receives messages from a Pub/Sub subscription.""" + # [START pubsub_quickstart_sub_client] + # Initialize a Subscriber client + subscriber_client = pubsub_v1.SubscriberClient() + # [END pubsub_quickstart_sub_client] + # Create a fully qualified identifier in the form of + # `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber_client.subscription_path(project_id, subscription_id) + + def callback(message): + print( + "Received message {} of message ID {}\n".format(message, message.message_id) + ) + # Acknowledge the message. Unack'ed messages will be redelivered. + message.ack() + print("Acknowledged message {}\n".format(message.message_id)) + + streaming_pull_future = subscriber_client.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) + + try: + # Calling result() on StreamingPullFuture keeps the main thread from + # exiting while messages get processed in the callbacks. + streaming_pull_future.result() + except: # noqa + streaming_pull_future.cancel() + + subscriber_client.close() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Google Cloud project ID") + parser.add_argument("subscription_id", help="Pub/Sub subscription ID") + + args = parser.parse_args() + + sub(args.project_id, args.subscription_id) +# [END pubsub_quickstart_sub_all] diff --git a/samples/snippets/quickstart/sub_test.py b/samples/snippets/quickstart/sub_test.py new file mode 100644 index 000000000..38047422a --- /dev/null +++ b/samples/snippets/quickstart/sub_test.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python + +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import uuid + +from google.api_core.exceptions import AlreadyExists +from google.cloud import pubsub_v1 +import mock +import pytest + +import sub # noqa + + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "quickstart-sub-test-topic-" + UUID +SUBSCRIPTION = "quickstart-sub-test-topic-sub-" + UUID + +publisher_client = pubsub_v1.PublisherClient() +subscriber_client = pubsub_v1.SubscriberClient() + + +@pytest.fixture(scope="module") +def topic_path(): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + topic = publisher_client.create_topic(topic_path) + yield topic.name + except AlreadyExists: + yield topic_path + + publisher_client.delete_topic(topic_path) + + +@pytest.fixture(scope="module") +def subscription_path(topic_path): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION) + + try: + subscription = subscriber_client.create_subscription( + subscription_path, topic_path + ) + yield subscription.name + except AlreadyExists: + yield subscription_path + + subscriber_client.delete_subscription(subscription_path) + subscriber_client.close() + + +def _publish_messages(topic_path): + publish_future = publisher_client.publish(topic_path, data=b"Hello World!") + publish_future.result() + + +def test_sub(monkeypatch, topic_path, subscription_path, capsys): + + real_client = pubsub_v1.SubscriberClient() + mock_client = mock.Mock(spec=pubsub_v1.SubscriberClient, wraps=real_client) + + # Attributes on mock_client_constructor uses the corresponding + # attributes on pubsub_v1.SubscriberClient. + mock_client_constructor = mock.create_autospec(pubsub_v1.SubscriberClient) + mock_client_constructor.return_value = mock_client + + monkeypatch.setattr(pubsub_v1, "SubscriberClient", mock_client_constructor) + + def mock_subscribe(subscription_path, callback=None): + real_future = real_client.subscribe(subscription_path, callback=callback) + mock_future = mock.Mock(spec=real_future, wraps=real_future) + + def mock_result(): + return real_future.result(timeout=10) + + mock_future.result.side_effect = mock_result + return mock_future + + mock_client.subscribe.side_effect = mock_subscribe + + _publish_messages(topic_path) + + sub.sub(PROJECT, SUBSCRIPTION) + + out, _ = capsys.readouterr() + assert "Received message" in out + assert "Acknowledged message" in out + + real_client.close() diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt new file mode 100644 index 000000000..adf26b9f9 --- /dev/null +++ b/samples/snippets/requirements-test.txt @@ -0,0 +1,3 @@ +backoff==1.10.0 +pytest==5.3.2 +mock==3.0.5 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt new file mode 100644 index 000000000..9b496510a --- /dev/null +++ b/samples/snippets/requirements.txt @@ -0,0 +1 @@ +google-cloud-pubsub==1.6.0 diff --git a/samples/snippets/subscriber.py b/samples/snippets/subscriber.py new file mode 100644 index 000000000..f079e7d42 --- /dev/null +++ b/samples/snippets/subscriber.py @@ -0,0 +1,783 @@ +#!/usr/bin/env python + +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""This application demonstrates how to perform basic operations on +subscriptions with the Cloud Pub/Sub API. + +For more information, see the README.md under /pubsub and the documentation +at https://cloud.google.com/pubsub/docs. +""" + +import argparse + + +def list_subscriptions_in_topic(project_id, topic_id): + """Lists all subscriptions for a given topic.""" + # [START pubsub_list_topic_subscriptions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + publisher = pubsub_v1.PublisherClient() + topic_path = publisher.topic_path(project_id, topic_id) + + for subscription in publisher.list_topic_subscriptions(topic_path): + print(subscription) + # [END pubsub_list_topic_subscriptions] + + +def list_subscriptions_in_project(project_id): + """Lists all subscriptions in the current project.""" + # [START pubsub_list_subscriptions] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + + subscriber = pubsub_v1.SubscriberClient() + project_path = subscriber.project_path(project_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + for subscription in subscriber.list_subscriptions(project_path): + print(subscription.name) + # [END pubsub_list_subscriptions] + + +def create_subscription(project_id, topic_id, subscription_id): + """Create a new pull subscription on the given topic.""" + # [START pubsub_create_pull_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription(subscription_path, topic_path) + + print("Subscription created: {}".format(subscription)) + # [END pubsub_create_pull_subscription] + + +def create_subscription_with_dead_letter_topic( + project_id, topic_id, subscription_id, dead_letter_topic_id +): + """Create a subscription with dead letter policy.""" + # [START pubsub_dead_letter_create_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy + + # TODO(developer) + # project_id = "your-project-id" + # endpoint = "https://my-test-project.appspot.com/push" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_id = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=10 + ) + + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, dead_letter_policy=dead_letter_policy + ) + + print("Subscription created: {}".format(subscription.name)) + print( + "It will forward dead letter messages to: {}".format( + subscription.dead_letter_policy.dead_letter_topic + ) + ) + print( + "After {} delivery attempts.".format( + subscription.dead_letter_policy.max_delivery_attempts + ) + ) + # [END pubsub_dead_letter_create_subscription] + + +def create_push_subscription(project_id, topic_id, subscription_id, endpoint): + """Create a new push subscription on the given topic.""" + # [START pubsub_create_push_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscription = subscriber.create_subscription( + subscription_path, topic_path, push_config + ) + + print("Push subscription created: {}".format(subscription)) + print("Endpoint for subscription is: {}".format(endpoint)) + # [END pubsub_create_push_subscription] + + +def delete_subscription(project_id, subscription_id): + """Deletes an existing Pub/Sub topic.""" + # [START pubsub_delete_subscription] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + subscriber.delete_subscription(subscription_path) + + print("Subscription deleted: {}".format(subscription_path)) + # [END pubsub_delete_subscription] + + +def update_push_subscription(project_id, topic_id, subscription_id, endpoint): + """ + Updates an existing Pub/Sub subscription's push endpoint URL. + Note that certain properties of a subscription, such as + its topic, are not modifiable. + """ + # [START pubsub_update_push_configuration] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + # subscription_id = "your-subscription-id" + # endpoint = "https://my-test-project.appspot.com/push" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + push_config = pubsub_v1.types.PushConfig(push_endpoint=endpoint) + + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_id, push_config=push_config + ) + + update_mask = {"paths": {"push_config"}} + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + result = subscriber.update_subscription(subscription, update_mask) + + print("Subscription updated: {}".format(subscription_path)) + print("New endpoint for subscription is: {}".format(result.push_config)) + # [END pubsub_update_push_configuration] + + +def update_subscription_with_dead_letter_policy( + project_id, topic_id, subscription_id, dead_letter_topic_id +): + """Update a subscription's dead letter policy.""" + # [START pubsub_dead_letter_update_subscription] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import DeadLetterPolicy, FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + # TODO(developer): This is an existing dead letter topic that the subscription + # with dead letter policy will forward dead letter messages to. + # dead_letter_topic_id = "your-dead-letter-topic-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + dead_letter_topic_path = subscriber.topic_path(project_id, dead_letter_topic_id) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before the update: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask(paths=["dead_letter_policy.max_delivery_attempts"]) + + # Construct a dead letter policy you expect to have after the update. + dead_letter_policy = DeadLetterPolicy( + dead_letter_topic=dead_letter_topic_path, max_delivery_attempts=20 + ) + + # Construct the subscription with the dead letter policy you expect to have + # after the update. Here, values in the required fields (name, topic) help + # identify the subscription. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path, dead_letter_policy=dead_letter_policy, + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After the update: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_update_subscription] + return subscription_after_update + + +def remove_dead_letter_policy(project_id, topic_id, subscription_id): + """Remove dead letter policy from a subscription.""" + # [START pubsub_dead_letter_remove] + from google.cloud import pubsub_v1 + from google.cloud.pubsub_v1.types import FieldMask + + # TODO(developer) + # project_id = "your-project-id" + # TODO(developer): This is an existing topic that the subscription + # with dead letter policy is attached to. + # topic_id = "your-topic-id" + # TODO(developer): This is an existing subscription with a dead letter policy. + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + topic_path = subscriber.topic_path(project_id, topic_id) + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + subscription_before_update = subscriber.get_subscription(subscription_path) + print("Before removing the policy: {}".format(subscription_before_update)) + + # Indicates which fields in the provided subscription to update. + update_mask = FieldMask( + paths=[ + "dead_letter_policy.dead_letter_topic", + "dead_letter_policy.max_delivery_attempts", + ] + ) + + # Construct the subscription (without any dead letter policy) that you + # expect to have after the update. + subscription = pubsub_v1.types.Subscription( + name=subscription_path, topic=topic_path + ) + + with subscriber: + subscription_after_update = subscriber.update_subscription( + subscription, update_mask + ) + + print("After removing the policy: {}".format(subscription_after_update)) + # [END pubsub_dead_letter_remove] + return subscription_after_update + + +def receive_messages(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription.""" + # [START pubsub_subscriber_async_pull] + # [START pubsub_quickstart_subscriber] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_id}` + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_async_pull] + # [END pubsub_quickstart_subscriber] + + +def receive_messages_with_custom_attributes(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription.""" + # [START pubsub_subscriber_async_pull_custom_attributes] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message.data)) + if message.attributes: + print("Attributes:") + for key in message.attributes: + value = message.attributes.get(key) + print("{}: {}".format(key, value)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_async_pull_custom_attributes] + + +def receive_messages_with_flow_control(project_id, subscription_id, timeout=None): + """Receives messages from a pull subscription with flow control.""" + # [START pubsub_subscriber_flow_settings] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message.data)) + message.ack() + + # Limit the subscriber to only have ten outstanding messages at a time. + flow_control = pubsub_v1.types.FlowControl(max_messages=10) + + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback, flow_control=flow_control + ) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + try: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_subscriber_flow_settings] + + +def synchronous_pull(project_id, subscription_id): + """Pulling messages synchronously.""" + # [START pubsub_subscriber_sync_pull] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + NUM_MESSAGES = 3 + + # Wrap the subscriber in a 'with' block to automatically call close() to + # close the underlying gRPC channel when done. + with subscriber: + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + + ack_ids = [] + for received_message in response.received_messages: + print("Received: {}".format(received_message.message.data)) + ack_ids.append(received_message.ack_id) + + # Acknowledges the received messages so they will not be sent again. + subscriber.acknowledge(subscription_path, ack_ids) + + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) + # [END pubsub_subscriber_sync_pull] + + +def synchronous_pull_with_lease_management(project_id, subscription_id): + """Pulling messages synchronously with lease management""" + # [START pubsub_subscriber_sync_pull_with_lease] + import logging + import multiprocessing + import random + import time + + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + NUM_MESSAGES = 2 + ACK_DEADLINE = 30 + SLEEP_TIME = 10 + + # The subscriber pulls a specific number of messages. + response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) + + multiprocessing.log_to_stderr() + logger = multiprocessing.get_logger() + logger.setLevel(logging.INFO) + + def worker(msg): + """Simulates a long-running process.""" + RUN_TIME = random.randint(1, 60) + logger.info( + "{}: Running {} for {}s".format( + time.strftime("%X", time.gmtime()), msg.message.data, RUN_TIME + ) + ) + time.sleep(RUN_TIME) + + # `processes` stores process as key and ack id and message as values. + processes = dict() + for message in response.received_messages: + process = multiprocessing.Process(target=worker, args=(message,)) + processes[process] = (message.ack_id, message.message.data) + process.start() + + while processes: + for process in list(processes): + ack_id, msg_data = processes[process] + # If the process is still running, reset the ack deadline as + # specified by ACK_DEADLINE once every while as specified + # by SLEEP_TIME. + if process.is_alive(): + # `ack_deadline_seconds` must be between 10 to 600. + subscriber.modify_ack_deadline( + subscription_path, [ack_id], ack_deadline_seconds=ACK_DEADLINE, + ) + logger.info( + "{}: Reset ack deadline for {} for {}s".format( + time.strftime("%X", time.gmtime()), msg_data, ACK_DEADLINE, + ) + ) + + # If the processs is finished, acknowledges using `ack_id`. + else: + subscriber.acknowledge(subscription_path, [ack_id]) + logger.info( + "{}: Acknowledged {}".format( + time.strftime("%X", time.gmtime()), msg_data + ) + ) + processes.pop(process) + + # If there are still processes running, sleeps the thread. + if processes: + time.sleep(SLEEP_TIME) + + print( + "Received and acknowledged {} messages. Done.".format( + len(response.received_messages) + ) + ) + + # Close the underlying gPRC channel. Alternatively, wrap subscriber in + # a 'with' block to automatically call close() when done. + subscriber.close() + # [END pubsub_subscriber_sync_pull_with_lease] + + +def listen_for_errors(project_id, subscription_id, timeout=None): + """Receives messages and catches errors from a pull subscription.""" + # [START pubsub_subscriber_error_listener] + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + # Number of seconds the subscriber should listen for messages + # timeout = 5.0 + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except Exception as e: + streaming_pull_future.cancel() + print( + "Listening for messages on {} threw an exception: {}.".format( + subscription_id, e + ) + ) + # [END pubsub_subscriber_error_listener] + + +def receive_messages_with_delivery_attempts(project_id, subscription_id, timeout=None): + # [START pubsub_dead_letter_delivery_attempt] + from concurrent.futures import TimeoutError + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # subscription_id = "your-subscription-id" + + subscriber = pubsub_v1.SubscriberClient() + subscription_path = subscriber.subscription_path(project_id, subscription_id) + + def callback(message): + print("Received message: {}".format(message)) + print("With delivery attempts: {}".format(message.delivery_attempt)) + message.ack() + + streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) + print("Listening for messages on {}..\n".format(subscription_path)) + + # Wrap subscriber in a 'with' block to automatically call close() when done. + with subscriber: + # When `timeout` is not set, result() will block indefinitely, + # unless an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except TimeoutError: + streaming_pull_future.cancel() + # [END pubsub_dead_letter_delivery_attempt] + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("project_id", help="Your Google Cloud project ID") + + subparsers = parser.add_subparsers(dest="command") + list_in_topic_parser = subparsers.add_parser( + "list-in-topic", help=list_subscriptions_in_topic.__doc__ + ) + list_in_topic_parser.add_argument("topic_id") + + list_in_project_parser = subparsers.add_parser( + "list-in-project", help=list_subscriptions_in_project.__doc__ + ) + + create_parser = subparsers.add_parser("create", help=create_subscription.__doc__) + create_parser.add_argument("topic_id") + create_parser.add_argument("subscription_id") + + create_with_dead_letter_policy_parser = subparsers.add_parser( + "create-with-dead-letter-policy", + help=create_subscription_with_dead_letter_topic.__doc__, + ) + create_with_dead_letter_policy_parser.add_argument("topic_id") + create_with_dead_letter_policy_parser.add_argument("subscription_id") + create_with_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + + create_push_parser = subparsers.add_parser( + "create-push", help=create_push_subscription.__doc__ + ) + create_push_parser.add_argument("topic_id") + create_push_parser.add_argument("subscription_id") + create_push_parser.add_argument("endpoint") + + delete_parser = subparsers.add_parser("delete", help=delete_subscription.__doc__) + delete_parser.add_argument("subscription_id") + + update_push_parser = subparsers.add_parser( + "update-push", help=update_push_subscription.__doc__ + ) + update_push_parser.add_argument("topic_id") + update_push_parser.add_argument("subscription_id") + update_push_parser.add_argument("endpoint") + + update_dead_letter_policy_parser = subparsers.add_parser( + "update-dead-letter-policy", + help=update_subscription_with_dead_letter_policy.__doc__, + ) + update_dead_letter_policy_parser.add_argument("topic_id") + update_dead_letter_policy_parser.add_argument("subscription_id") + update_dead_letter_policy_parser.add_argument("dead_letter_topic_id") + + remove_dead_letter_policy_parser = subparsers.add_parser( + "remove-dead-letter-policy", help=remove_dead_letter_policy.__doc__ + ) + remove_dead_letter_policy_parser.add_argument("topic_id") + remove_dead_letter_policy_parser.add_argument("subscription_id") + + receive_parser = subparsers.add_parser("receive", help=receive_messages.__doc__) + receive_parser.add_argument("subscription_id") + receive_parser.add_argument("timeout", default=None, type=float, nargs="?") + + receive_with_custom_attributes_parser = subparsers.add_parser( + "receive-custom-attributes", + help=receive_messages_with_custom_attributes.__doc__, + ) + receive_with_custom_attributes_parser.add_argument("subscription_id") + receive_with_custom_attributes_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + receive_with_flow_control_parser = subparsers.add_parser( + "receive-flow-control", help=receive_messages_with_flow_control.__doc__ + ) + receive_with_flow_control_parser.add_argument("subscription_id") + receive_with_flow_control_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + synchronous_pull_parser = subparsers.add_parser( + "receive-synchronously", help=synchronous_pull.__doc__ + ) + synchronous_pull_parser.add_argument("subscription_id") + + synchronous_pull_with_lease_management_parser = subparsers.add_parser( + "receive-synchronously-with-lease", + help=synchronous_pull_with_lease_management.__doc__, + ) + synchronous_pull_with_lease_management_parser.add_argument("subscription_id") + + listen_for_errors_parser = subparsers.add_parser( + "listen-for-errors", help=listen_for_errors.__doc__ + ) + listen_for_errors_parser.add_argument("subscription_id") + listen_for_errors_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + receive_messages_with_delivery_attempts_parser = subparsers.add_parser( + "receive-messages-with-delivery-attempts", + help=receive_messages_with_delivery_attempts.__doc__, + ) + receive_messages_with_delivery_attempts_parser.add_argument("subscription_id") + receive_messages_with_delivery_attempts_parser.add_argument( + "timeout", default=None, type=float, nargs="?" + ) + + args = parser.parse_args() + + if args.command == "list-in-topic": + list_subscriptions_in_topic(args.project_id, args.topic_id) + elif args.command == "list-in-project": + list_subscriptions_in_project(args.project_id) + elif args.command == "create": + create_subscription(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "create-with-dead-letter-policy": + create_subscription_with_dead_letter_topic( + args.project_id, + args.topic_id, + args.subscription_id, + args.dead_letter_topic_id, + ) + elif args.command == "create-push": + create_push_subscription( + args.project_id, args.topic_id, args.subscription_id, args.endpoint, + ) + elif args.command == "delete": + delete_subscription(args.project_id, args.subscription_id) + elif args.command == "update-push": + update_push_subscription( + args.project_id, args.topic_id, args.subscription_id, args.endpoint, + ) + elif args.command == "update-dead-letter-policy": + update_subscription_with_dead_letter_policy( + args.project_id, + args.topic_id, + args.subscription_id, + args.dead_letter_topic_id, + ) + elif args.command == "remove-dead-letter-policy": + remove_dead_letter_policy(args.project_id, args.topic_id, args.subscription_id) + elif args.command == "receive": + receive_messages(args.project_id, args.subscription_id, args.timeout) + elif args.command == "receive-custom-attributes": + receive_messages_with_custom_attributes( + args.project_id, args.subscription_id, args.timeout + ) + elif args.command == "receive-flow-control": + receive_messages_with_flow_control( + args.project_id, args.subscription_id, args.timeout + ) + elif args.command == "receive-synchronously": + synchronous_pull(args.project_id, args.subscription_id) + elif args.command == "receive-synchronously-with-lease": + synchronous_pull_with_lease_management(args.project_id, args.subscription_id) + elif args.command == "listen-for-errors": + listen_for_errors(args.project_id, args.subscription_id, args.timeout) + elif args.command == "receive-messages-with-delivery-attempts": + receive_messages_with_delivery_attempts( + args.project_id, args.subscription_id, args.timeout + ) diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py new file mode 100644 index 000000000..a7f7c139c --- /dev/null +++ b/samples/snippets/subscriber_test.py @@ -0,0 +1,341 @@ +# Copyright 2016 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +import backoff +from google.cloud import pubsub_v1 +import pytest + +import subscriber + +UUID = uuid.uuid4().hex +PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"] +TOPIC = "subscription-test-topic-" + UUID +DEAD_LETTER_TOPIC = "subscription-test-dead-letter-topic-" + UUID +SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID +SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID +SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID +SUBSCRIPTION_DLQ = "subscription-test-subscription-dlq-" + UUID +ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) +NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) + + +@pytest.fixture(scope="module") +def publisher_client(): + yield pubsub_v1.PublisherClient() + + +@pytest.fixture(scope="module") +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) + + try: + topic = publisher_client.get_topic(topic_path) + except: # noqa + topic = publisher_client.create_topic(topic_path) + + yield topic.name + + publisher_client.delete_topic(topic.name) + + +@pytest.fixture(scope="module") +def dead_letter_topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + dead_letter_topic = publisher_client.get_topic(topic_path) + except: # noqa + dead_letter_topic = publisher_client.create_topic(topic_path) + + yield dead_letter_topic.name + + publisher_client.delete_topic(dead_letter_topic.name) + + +@pytest.fixture(scope="module") +def subscriber_client(): + subscriber_client = pubsub_v1.SubscriberClient() + yield subscriber_client + subscriber_client.close() + + +@pytest.fixture(scope="module") +def subscription_admin(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + +@pytest.fixture(scope="module") +def subscription_sync(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_SYNC) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +@pytest.fixture(scope="module") +def subscription_async(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ASYNC) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +@pytest.fixture(scope="module") +def subscription_dlq(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + + try: + subscription = subscriber_client.get_subscription(subscription_path) + except: # noqa + subscription = subscriber_client.create_subscription( + subscription_path, topic=topic + ) + + yield subscription.name + + subscriber_client.delete_subscription(subscription.name) + + +def test_list_in_topic(subscription_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) + out, _ = capsys.readouterr() + assert subscription_admin in out + + eventually_consistent_test() + + +def test_list_in_project(subscription_admin, capsys): + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + subscriber.list_subscriptions_in_project(PROJECT) + out, _ = capsys.readouterr() + assert subscription_admin in out + + eventually_consistent_test() + + +def test_create(subscriber_client): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert subscriber_client.get_subscription(subscription_path) + + eventually_consistent_test() + + +def test_create_subscription_with_dead_letter_policy( + subscriber_client, publisher_client, topic, dead_letter_topic, capsys +): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_DLQ) + dead_letter_topic_path = publisher_client.topic_path(PROJECT, DEAD_LETTER_TOPIC) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_subscription_with_dead_letter_topic( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "Subscription created: " + subscription_path in out + assert "It will forward dead letter messages to: " + dead_letter_topic_path in out + assert "After 10 delivery attempts." in out + + +def test_create_push(subscriber_client): + subscription_path = subscriber_client.subscription_path(PROJECT, SUBSCRIPTION_ADMIN) + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + subscriber.create_push_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + assert subscriber_client.get_subscription(subscription_path) + + eventually_consistent_test() + + +def test_update(subscriber_client, subscription_admin, capsys): + subscriber.update_push_subscription( + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, NEW_ENDPOINT + ) + + out, _ = capsys.readouterr() + assert "Subscription updated" in out + + +def test_update_dead_letter_policy( + subscriber_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _ = subscriber.update_subscription_with_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ, DEAD_LETTER_TOPIC + ) + + out, _ = capsys.readouterr() + assert "max_delivery_attempts: 20" in out + + +def test_delete(subscriber_client, subscription_admin): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) + + @backoff.on_exception(backoff.expo, AssertionError, max_time=60) + def eventually_consistent_test(): + with pytest.raises(Exception): + subscriber_client.get_subscription(subscription_admin) + + eventually_consistent_test() + + +def _publish_messages(publisher_client, topic): + for n in range(5): + data = u"message {}".format(n).encode("utf-8") + publish_future = publisher_client.publish( + topic, data=data, origin="python-sample" + ) + publish_future.result() + + +def test_receive(publisher_client, topic, subscription_async, capsys): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + +def test_receive_with_custom_attributes( + publisher_client, topic, subscription_async, capsys +): + + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_custom_attributes(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "message" in out + assert "origin" in out + assert "python-sample" in out + + +def test_receive_with_flow_control(publisher_client, topic, subscription_async, capsys): + + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "message" in out + + +def test_receive_synchronously(publisher_client, topic, subscription_sync, capsys): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) + + out, _ = capsys.readouterr() + assert "Done." in out + + +def test_receive_synchronously_with_lease( + publisher_client, topic, subscription_sync, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.synchronous_pull_with_lease_management(PROJECT, SUBSCRIPTION_SYNC) + + out, _ = capsys.readouterr() + assert "Done." in out + + +def test_listen_for_errors(publisher_client, topic, subscription_async, capsys): + + _publish_messages(publisher_client, topic) + + subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_async in out + assert "threw an exception" in out + + +def test_receive_with_delivery_attempts( + publisher_client, topic, subscription_dlq, dead_letter_topic, capsys +): + _publish_messages(publisher_client, topic) + + subscriber.receive_messages_with_delivery_attempts(PROJECT, SUBSCRIPTION_DLQ, 10) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_dlq in out + assert "Received message: " in out + assert "message 4" in out + assert "With delivery attempts: " in out + + +def test_remove_dead_letter_policy(subscriber_client, subscription_dlq): + subscription_after_update = subscriber.remove_dead_letter_policy( + PROJECT, TOPIC, SUBSCRIPTION_DLQ + ) + + assert subscription_after_update.dead_letter_policy.dead_letter_topic == "" diff --git a/synth.py b/synth.py index b44cc0acf..0e2c96e42 100644 --- a/synth.py +++ b/synth.py @@ -18,6 +18,7 @@ import synthtool as s from synthtool import gcp +from synthtool.languages import python gapic = gcp.GAPICBazel() common = gcp.CommonTemplates() @@ -266,8 +267,16 @@ def _merge_dict(d1, d2): # Add templated files # ---------------------------------------------------------------------------- templated_files = gcp.CommonTemplates().py_library( - unit_cov_level=97, cov_level=99, system_test_external_dependencies=["psutil"], + unit_cov_level=97, + cov_level=99, + system_test_external_dependencies=["psutil"], + samples=True, ) s.move(templated_files) +# ---------------------------------------------------------------------------- +# Samples templates +# ---------------------------------------------------------------------------- +python.py_samples() + s.shell.run(["nox", "-s", "blacken"], hide_output=False)