From fac98d101db176c72ffa9ca325100db1153cf1a5 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 11 Dec 2019 16:46:22 -0800 Subject: [PATCH 1/3] use result() on streaming pull futures instead of infinite while --- pubsub/cloud-client/publisher_test.py | 68 ++++++++++--------- pubsub/cloud-client/subscriber.py | 92 +++++++++++++++++--------- pubsub/cloud-client/subscriber_test.py | 89 +++++++++---------------- 3 files changed, 128 insertions(+), 121 deletions(-) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index 125fae3c06b9..1c6fe0c20393 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -25,7 +25,8 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] -TOPIC = "publisher-test-topic-" + UUID +TOPIC_ONE = "publisher-test-topic-one-" + UUID +TOPIC_TWO = "publisher-test-topic-two-" + UUID @pytest.fixture @@ -34,8 +35,8 @@ def client(): @pytest.fixture -def topic(client): - topic_path = client.topic_path(PROJECT, TOPIC) +def topic_one(client): + topic_path = client.topic_path(PROJECT, TOPIC_ONE) try: response = client.get_topic(topic_path) @@ -45,6 +46,20 @@ def topic(client): yield response.name +@pytest.fixture +def topic_two(client): + topic_path = client.topic_path(PROJECT, TOPIC_TWO) + + try: + response = client.get_topic(topic_path) + except: # noqa + response = client.create_topic(topic_path) + + yield response.name + + client.delete_topic(response.name) + + def _make_sleep_patch(): real_sleep = time.sleep @@ -58,83 +73,74 @@ def new_sleep(period): return mock.patch("time.sleep", new=new_sleep) -def _to_delete(): - publisher_client = pubsub_v1.PublisherClient() - publisher_client.delete_topic( - "projects/{}/topics/{}".format(PROJECT, TOPIC) - ) - - -def test_list(client, topic, capsys): +def test_list(client, topic_one, capsys): @eventually_consistent.call def _(): publisher.list_topics(PROJECT) out, _ = capsys.readouterr() - assert topic in out + assert topic_one in out def test_create(client): - topic_path = client.topic_path(PROJECT, TOPIC) + topic_path = client.topic_path(PROJECT, TOPIC_ONE) try: client.delete_topic(topic_path) except Exception: pass - publisher.create_topic(PROJECT, TOPIC) + publisher.create_topic(PROJECT, TOPIC_ONE) @eventually_consistent.call def _(): assert client.get_topic(topic_path) -def test_delete(client, topic): - publisher.delete_topic(PROJECT, TOPIC) +def test_delete(client, topic_one): + publisher.delete_topic(PROJECT, TOPIC_ONE) @eventually_consistent.call def _(): with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC)) + client.get_topic(client.topic_path(PROJECT, TOPIC_ONE)) -def test_publish(topic, capsys): - publisher.publish_messages(PROJECT, TOPIC) +def test_publish(topic_two, capsys): + publisher.publish_messages(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_custom_attributes(topic, capsys): - publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC) +def test_publish_with_custom_attributes(topic_two, capsys): + publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_batch_settings(topic, capsys): - publisher.publish_messages_with_batch_settings(PROJECT, TOPIC) +def test_publish_with_batch_settings(topic_two, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_retry_settings(topic, capsys): - publisher.publish_messages_with_retry_settings(PROJECT, TOPIC) +def test_publish_with_retry_settings(topic_two, capsys): + publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_error_handler(topic, capsys): - publisher.publish_messages_with_error_handler(PROJECT, TOPIC) +def test_publish_with_error_handler(topic_two, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_futures(topic, capsys): - publisher.publish_messages_with_futures(PROJECT, TOPIC) +def test_publish_with_futures(topic_two, capsys): + publisher.publish_messages_with_futures(PROJECT, TOPIC_TWO) out, _ = capsys.readouterr() assert "Published" in out - - _to_delete() diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index ea1cc9ff9e72..973227a13129 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -161,7 +161,7 @@ def update_subscription(project_id, subscription_name, endpoint): # [END pubsub_update_push_configuration] -def receive_messages(project_id, subscription_name): +def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] @@ -171,6 +171,8 @@ def receive_messages(project_id, subscription_name): # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = "How long the subscriber should listen for messages in + # seconds" subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier @@ -183,18 +185,24 @@ def callback(message): print("Received message: {}".format(message)) message.ack() - subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking. We must keep the main thread from - # exiting to allow it to process messages asynchronously in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() will block indefinitely if `timeout` is not set, unless + # an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull] # [END pubsub_quickstart_subscriber] -def receive_messages_with_custom_attributes(project_id, subscription_name): +def receive_messages_with_custom_attributes( + project_id, subscription_name, timeout=None +): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] # [START pubsub_subscriber_async_pull_custom_attributes] @@ -204,6 +212,8 @@ def receive_messages_with_custom_attributes(project_id, subscription_name): # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = "How long the subscriber should listen for messages in + # seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -219,18 +229,24 @@ def callback(message): print("{}: {}".format(key, value)) message.ack() - subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() will block indefinitely if `timeout` is not set, unless + # an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_async_pull_custom_attributes] # [END pubsub_subscriber_sync_pull_custom_attributes] -def receive_messages_with_flow_control(project_id, subscription_name): +def receive_messages_with_flow_control( + project_id, subscription_name, timeout=None +): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] import time @@ -239,6 +255,8 @@ def receive_messages_with_flow_control(project_id, subscription_name): # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" + # TODO timeout = "How long the subscriber should listen for messages in + # seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -251,15 +269,18 @@ def callback(message): # Limit the subscriber to only have ten outstanding messages at a time. flow_control = pubsub_v1.types.FlowControl(max_messages=10) - subscriber.subscribe( + + streaming_pull_future = subscriber.subscribe( subscription_path, callback=callback, flow_control=flow_control ) + print("Listening for messages on {}..\n".format(subscription_path)) - # The subscriber is non-blocking, so we must keep the main thread from - # exiting to allow it to process messages in the background. - print("Listening for messages on {}".format(subscription_path)) - while True: - time.sleep(60) + # result() will block indefinitely if `timeout` is not set, unless + # an exception is encountered first. + try: + streaming_pull_future.result(timeout=timeout) + except: # noqa + streaming_pull_future.cancel() # [END pubsub_subscriber_flow_settings] @@ -386,13 +407,15 @@ def worker(msg): # [END pubsub_subscriber_sync_pull_with_lease] -def listen_for_errors(project_id, subscription_name): +def listen_for_errors(project_id, subscription_name, timeout=None): """Receives messages and catches errors from a pull subscription.""" # [START pubsub_subscriber_error_listener] from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pubsub subscription name" + # TODO timeout = "How long the subscriber should listen for messages in + # seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -403,16 +426,19 @@ def callback(message): print("Received message: {}".format(message)) message.ack() - future = subscriber.subscribe(subscription_path, callback=callback) + streaming_pull_future = subscriber.subscribe( + subscription_path, callback=callback + ) + print("Listening for messages on {}..\n".format(subscription_path)) - # Blocks the thread while messages are coming in through the stream. Any - # exceptions that crop up on the thread will be set on the future. + # result() will block indefinitely if `timeout` is not set, unless + # an exception is encountered first. try: - # When timeout is unspecified, the result method waits indefinitely. - future.result(timeout=30) + streaming_pull_future.result(timeout=timeout) except Exception as e: + streaming_pull_future.cancel() print( - "Listening for messages on {} threw an Exception: {}.".format( + "Listening for messages on {} threw an exception: {}.".format( subscription_name, e ) ) @@ -518,14 +544,14 @@ def callback(message): args.project_id, args.subscription_name, args.endpoint ) elif args.command == "receive": - receive_messages(args.project_id, args.subscription_name) + receive_messages(args.project_id, args.subscription_name, args.timeout) elif args.command == "receive-custom-attributes": receive_messages_with_custom_attributes( - args.project_id, args.subscription_name + args.project_id, args.subscription_name, args.timeout ) elif args.command == "receive-flow-control": receive_messages_with_flow_control( - args.project_id, args.subscription_name + args.project_id, args.subscription_name, args.timeout ) elif args.command == "receive-synchronously": synchronous_pull(args.project_id, args.subscription_name) @@ -534,4 +560,6 @@ def callback(message): args.project_id, args.subscription_name ) elif args.command == "listen_for_errors": - listen_for_errors(args.project_id, args.subscription_name) + listen_for_errors( + args.project_id, args.subscription_name, args.timeout + ) diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 0645c0738e1c..cc093608c022 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -13,7 +13,6 @@ # limitations under the License. import os -import time import uuid from gcp_devrel.testing import eventually_consistent @@ -49,6 +48,8 @@ def topic(publisher_client): yield response.name + publisher_client.delete_topic(response.name) + @pytest.fixture(scope="module") def subscriber_client(): @@ -86,6 +87,8 @@ def subscription_two(subscriber_client, topic): yield response.name + subscriber_client.delete_subscription(response.name) + @pytest.fixture(scope="module") def subscription_three(subscriber_client, topic): @@ -102,6 +105,8 @@ def subscription_three(subscriber_client, topic): yield response.name + subscriber_client.delete_subscription(response.name) + def test_list_in_topic(subscription_one, capsys): @eventually_consistent.call @@ -172,73 +177,36 @@ def _(): def _publish_messages(publisher_client, topic): for n in range(5): - data = u"Message {}".format(n).encode("utf-8") - future = publisher_client.publish(topic, data=data) - future.result() - - -def _publish_messages_with_custom_attributes(publisher_client, topic): - data = u"Test message".encode("utf-8") - future = publisher_client.publish(topic, data=data, origin="python-sample") - future.result() - - -def _make_sleep_patch(): - real_sleep = time.sleep - - def new_sleep(period): - if period == 60: - real_sleep(5) - raise RuntimeError("sigil") - else: - real_sleep(period) - - return mock.patch("time.sleep", new=new_sleep) - - -def _to_delete(): - publisher_client = pubsub_v1.PublisherClient() - subscriber_client = pubsub_v1.SubscriberClient() - resources = [TOPIC, SUBSCRIPTION_TWO, SUBSCRIPTION_THREE] - - for item in resources: - if "subscription-test-topic" in item: - publisher_client.delete_topic( - "projects/{}/topics/{}".format(PROJECT, item) - ) - if "subscription-test-subscription" in item: - subscriber_client.delete_subscription( - "projects/{}/subscriptions/{}".format(PROJECT, item) - ) + data = u"message {}".format(n).encode("utf-8") + publish_future = publisher_client.publish( + topic, data=data, origin="python-sample" + ) + publish_future.result() def test_receive(publisher_client, topic, subscription_two, capsys): _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO) + subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO, 5) out, _ = capsys.readouterr() assert "Listening" in out assert subscription_two in out - assert "Message" in out + assert "message" in out def test_receive_with_custom_attributes( publisher_client, topic, subscription_two, capsys ): - _publish_messages_with_custom_attributes(publisher_client, topic) + _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_TWO - ) + subscriber.receive_messages_with_custom_attributes( + PROJECT, SUBSCRIPTION_TWO, 5 + ) out, _ = capsys.readouterr() - assert "Test message" in out + assert "message" in out assert "origin" in out assert "python-sample" in out @@ -249,16 +217,12 @@ def test_receive_with_flow_control( _publish_messages(publisher_client, topic) - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match="sigil"): - subscriber.receive_messages_with_flow_control( - PROJECT, SUBSCRIPTION_TWO - ) + subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_TWO, 5) out, _ = capsys.readouterr() assert "Listening" in out assert subscription_two in out - assert "Message" in out + assert "message" in out def test_receive_synchronously( @@ -284,5 +248,14 @@ def test_receive_synchronously_with_lease( out, _ = capsys.readouterr() assert "Done." in out - # Clean up resources after all the tests. - _to_delete() + +def test_listen_for_errors(publisher_client, topic, subscription_two, capsys): + + _publish_messages(publisher_client, topic) + + subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_TWO, 5) + + out, _ = capsys.readouterr() + assert "Listening" in out + assert subscription_two in out + assert "threw an exception" in out From 7432b1c650f3e0977374a1999ecc43e225870310 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Wed, 11 Dec 2019 16:59:49 -0800 Subject: [PATCH 2/3] remove unused imports --- pubsub/cloud-client/subscriber.py | 6 ------ pubsub/cloud-client/subscriber_test.py | 1 - 2 files changed, 7 deletions(-) diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 973227a13129..7733a091e6f2 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -165,8 +165,6 @@ def receive_messages(project_id, subscription_name, timeout=None): """Receives messages from a pull subscription.""" # [START pubsub_subscriber_async_pull] # [START pubsub_quickstart_subscriber] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" @@ -206,8 +204,6 @@ def receive_messages_with_custom_attributes( """Receives messages from a pull subscription.""" # [START pubsub_subscriber_sync_pull_custom_attributes] # [START pubsub_subscriber_async_pull_custom_attributes] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" @@ -249,8 +245,6 @@ def receive_messages_with_flow_control( ): """Receives messages from a pull subscription with flow control.""" # [START pubsub_subscriber_flow_settings] - import time - from google.cloud import pubsub_v1 # TODO project_id = "Your Google Cloud Project ID" diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index cc093608c022..47d0acfb6830 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -17,7 +17,6 @@ from gcp_devrel.testing import eventually_consistent from google.cloud import pubsub_v1 -import mock import pytest import subscriber From e581af9f6239f33c82f999b0db96f51505584a04 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Thu, 12 Dec 2019 11:48:26 -0800 Subject: [PATCH 3/3] David's suggestions --- pubsub/cloud-client/publisher_test.py | 65 +++++++-------- pubsub/cloud-client/subscriber.py | 32 ++++---- pubsub/cloud-client/subscriber_test.py | 108 +++++++++++++------------ 3 files changed, 105 insertions(+), 100 deletions(-) diff --git a/pubsub/cloud-client/publisher_test.py b/pubsub/cloud-client/publisher_test.py index 1c6fe0c20393..fbe30694ae45 100644 --- a/pubsub/cloud-client/publisher_test.py +++ b/pubsub/cloud-client/publisher_test.py @@ -25,8 +25,8 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] -TOPIC_ONE = "publisher-test-topic-one-" + UUID -TOPIC_TWO = "publisher-test-topic-two-" + UUID +TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID +TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID @pytest.fixture @@ -35,29 +35,30 @@ def client(): @pytest.fixture -def topic_one(client): - topic_path = client.topic_path(PROJECT, TOPIC_ONE) +def topic_admin(client): + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: - response = client.get_topic(topic_path) + topic = client.get_topic(topic_path) except: # noqa - response = client.create_topic(topic_path) + topic = client.create_topic(topic_path) - yield response.name + yield topic.name + # Teardown of `topic_admin` is handled in `test_delete()`. @pytest.fixture -def topic_two(client): - topic_path = client.topic_path(PROJECT, TOPIC_TWO) +def topic_publish(client): + topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH) try: - response = client.get_topic(topic_path) + topic = client.get_topic(topic_path) except: # noqa - response = client.create_topic(topic_path) + topic = client.create_topic(topic_path) - yield response.name + yield topic.name - client.delete_topic(response.name) + client.delete_topic(topic.name) def _make_sleep_patch(): @@ -73,74 +74,74 @@ def new_sleep(period): return mock.patch("time.sleep", new=new_sleep) -def test_list(client, topic_one, capsys): +def test_list(client, topic_admin, capsys): @eventually_consistent.call def _(): publisher.list_topics(PROJECT) out, _ = capsys.readouterr() - assert topic_one in out + assert topic_admin in out def test_create(client): - topic_path = client.topic_path(PROJECT, TOPIC_ONE) + topic_path = client.topic_path(PROJECT, TOPIC_ADMIN) try: client.delete_topic(topic_path) except Exception: pass - publisher.create_topic(PROJECT, TOPIC_ONE) + publisher.create_topic(PROJECT, TOPIC_ADMIN) @eventually_consistent.call def _(): assert client.get_topic(topic_path) -def test_delete(client, topic_one): - publisher.delete_topic(PROJECT, TOPIC_ONE) +def test_delete(client, topic_admin): + publisher.delete_topic(PROJECT, TOPIC_ADMIN) @eventually_consistent.call def _(): with pytest.raises(Exception): - client.get_topic(client.topic_path(PROJECT, TOPIC_ONE)) + client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN)) -def test_publish(topic_two, capsys): - publisher.publish_messages(PROJECT, TOPIC_TWO) +def test_publish(topic_publish, capsys): + publisher.publish_messages(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_custom_attributes(topic_two, capsys): - publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_TWO) +def test_publish_with_custom_attributes(topic_publish, capsys): + publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_batch_settings(topic_two, capsys): - publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_TWO) +def test_publish_with_batch_settings(topic_publish, capsys): + publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_retry_settings(topic_two, capsys): - publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_TWO) +def test_publish_with_retry_settings(topic_publish, capsys): + publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_error_handler(topic_two, capsys): - publisher.publish_messages_with_error_handler(PROJECT, TOPIC_TWO) +def test_publish_with_error_handler(topic_publish, capsys): + publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out -def test_publish_with_futures(topic_two, capsys): - publisher.publish_messages_with_futures(PROJECT, TOPIC_TWO) +def test_publish_with_futures(topic_publish, capsys): + publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH) out, _ = capsys.readouterr() assert "Published" in out diff --git a/pubsub/cloud-client/subscriber.py b/pubsub/cloud-client/subscriber.py index 7733a091e6f2..0d328d232d05 100644 --- a/pubsub/cloud-client/subscriber.py +++ b/pubsub/cloud-client/subscriber.py @@ -169,8 +169,8 @@ def receive_messages(project_id, subscription_name, timeout=None): # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = "How long the subscriber should listen for messages in - # seconds" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() # The `subscription_path` method creates a fully qualified identifier @@ -188,8 +188,8 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() will block indefinitely if `timeout` is not set, unless - # an exception is encountered first. + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. try: streaming_pull_future.result(timeout=timeout) except: # noqa @@ -208,8 +208,8 @@ def receive_messages_with_custom_attributes( # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = "How long the subscriber should listen for messages in - # seconds" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -230,8 +230,8 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() will block indefinitely if `timeout` is not set, unless - # an exception is encountered first. + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. try: streaming_pull_future.result(timeout=timeout) except: # noqa @@ -249,8 +249,8 @@ def receive_messages_with_flow_control( # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pub/Sub subscription name" - # TODO timeout = "How long the subscriber should listen for messages in - # seconds" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -269,8 +269,8 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() will block indefinitely if `timeout` is not set, unless - # an exception is encountered first. + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. try: streaming_pull_future.result(timeout=timeout) except: # noqa @@ -408,8 +408,8 @@ def listen_for_errors(project_id, subscription_name, timeout=None): # TODO project_id = "Your Google Cloud Project ID" # TODO subscription_name = "Your Pubsub subscription name" - # TODO timeout = "How long the subscriber should listen for messages in - # seconds" + # TODO timeout = 5.0 # "How long the subscriber should listen for + # messages in seconds" subscriber = pubsub_v1.SubscriberClient() subscription_path = subscriber.subscription_path( @@ -425,8 +425,8 @@ def callback(message): ) print("Listening for messages on {}..\n".format(subscription_path)) - # result() will block indefinitely if `timeout` is not set, unless - # an exception is encountered first. + # result() in a future will block indefinitely if `timeout` is not set, + # unless an exception is encountered first. try: streaming_pull_future.result(timeout=timeout) except Exception as e: diff --git a/pubsub/cloud-client/subscriber_test.py b/pubsub/cloud-client/subscriber_test.py index 47d0acfb6830..50353c1c6e42 100644 --- a/pubsub/cloud-client/subscriber_test.py +++ b/pubsub/cloud-client/subscriber_test.py @@ -24,9 +24,9 @@ UUID = uuid.uuid4().hex PROJECT = os.environ["GCLOUD_PROJECT"] TOPIC = "subscription-test-topic-" + UUID -SUBSCRIPTION_ONE = "subscription-test-subscription-one-" + UUID -SUBSCRIPTION_TWO = "subscription-test-subscription-two-" + UUID -SUBSCRIPTION_THREE = "subscription-test-subscription-three-" + UUID +SUBSCRIPTION_ADMIN = "subscription-test-subscription-admin-" + UUID +SUBSCRIPTION_ASYNC = "subscription-test-subscription-async-" + UUID +SUBSCRIPTION_SYNC = "subscription-test-subscription-sync-" + UUID ENDPOINT = "https://{}.appspot.com/push".format(PROJECT) NEW_ENDPOINT = "https://{}.appspot.com/push2".format(PROJECT) @@ -41,13 +41,13 @@ def topic(publisher_client): topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - response = publisher_client.get_topic(topic_path) + subscription = publisher_client.get_topic(topic_path) except: # noqa - response = publisher_client.create_topic(topic_path) + subscription = publisher_client.create_topic(topic_path) - yield response.name + yield subscription.name - publisher_client.delete_topic(response.name) + publisher_client.delete_topic(subscription.name) @pytest.fixture(scope="module") @@ -56,76 +56,76 @@ def subscriber_client(): @pytest.fixture(scope="module") -def subscription_one(subscriber_client, topic): +def subscription_admin(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name @pytest.fixture(scope="module") -def subscription_two(subscriber_client, topic): +def subscription_sync(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_TWO + PROJECT, SUBSCRIPTION_SYNC ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name - subscriber_client.delete_subscription(response.name) + subscriber_client.delete_subscription(subscription.name) @pytest.fixture(scope="module") -def subscription_three(subscriber_client, topic): +def subscription_async(subscriber_client, topic): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_THREE + PROJECT, SUBSCRIPTION_ASYNC ) try: - response = subscriber_client.get_subscription(subscription_path) + subscription = subscriber_client.get_subscription(subscription_path) except: # noqa - response = subscriber_client.create_subscription( + subscription = subscriber_client.create_subscription( subscription_path, topic=topic ) - yield response.name + yield subscription.name - subscriber_client.delete_subscription(response.name) + subscriber_client.delete_subscription(subscription.name) -def test_list_in_topic(subscription_one, capsys): +def test_list_in_topic(subscription_admin, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_topic(PROJECT, TOPIC) out, _ = capsys.readouterr() - assert subscription_one in out + assert subscription_admin in out -def test_list_in_project(subscription_one, capsys): +def test_list_in_project(subscription_admin, capsys): @eventually_consistent.call def _(): subscriber.list_subscriptions_in_project(PROJECT) out, _ = capsys.readouterr() - assert subscription_one in out + assert subscription_admin in out def test_create(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: @@ -133,7 +133,7 @@ def test_create(subscriber_client): except Exception: pass - subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ONE) + subscriber.create_subscription(PROJECT, TOPIC, SUBSCRIPTION_ADMIN) @eventually_consistent.call def _(): @@ -142,7 +142,7 @@ def _(): def test_create_push(subscriber_client): subscription_path = subscriber_client.subscription_path( - PROJECT, SUBSCRIPTION_ONE + PROJECT, SUBSCRIPTION_ADMIN ) try: subscriber_client.delete_subscription(subscription_path) @@ -150,7 +150,7 @@ def test_create_push(subscriber_client): pass subscriber.create_push_subscription( - PROJECT, TOPIC, SUBSCRIPTION_ONE, ENDPOINT + PROJECT, TOPIC, SUBSCRIPTION_ADMIN, ENDPOINT ) @eventually_consistent.call @@ -158,20 +158,20 @@ def _(): assert subscriber_client.get_subscription(subscription_path) -def test_update(subscriber_client, subscription_one, capsys): - subscriber.update_subscription(PROJECT, SUBSCRIPTION_ONE, NEW_ENDPOINT) +def test_update(subscriber_client, subscription_admin, capsys): + subscriber.update_subscription(PROJECT, SUBSCRIPTION_ADMIN, NEW_ENDPOINT) out, _ = capsys.readouterr() assert "Subscription updated" in out -def test_delete(subscriber_client, subscription_one): - subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ONE) +def test_delete(subscriber_client, subscription_admin): + subscriber.delete_subscription(PROJECT, SUBSCRIPTION_ADMIN) @eventually_consistent.call def _(): with pytest.raises(Exception): - subscriber_client.get_subscription(subscription_one) + subscriber_client.get_subscription(subscription_admin) def _publish_messages(publisher_client, topic): @@ -183,25 +183,25 @@ def _publish_messages(publisher_client, topic): publish_future.result() -def test_receive(publisher_client, topic, subscription_two, capsys): +def test_receive(publisher_client, topic, subscription_async, capsys): _publish_messages(publisher_client, topic) - subscriber.receive_messages(PROJECT, SUBSCRIPTION_TWO, 5) + subscriber.receive_messages(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_two in out + assert subscription_async in out assert "message" in out def test_receive_with_custom_attributes( - publisher_client, topic, subscription_two, capsys + publisher_client, topic, subscription_async, capsys ): _publish_messages(publisher_client, topic) subscriber.receive_messages_with_custom_attributes( - PROJECT, SUBSCRIPTION_TWO, 5 + PROJECT, SUBSCRIPTION_ASYNC, 5 ) out, _ = capsys.readouterr() @@ -211,50 +211,54 @@ def test_receive_with_custom_attributes( def test_receive_with_flow_control( - publisher_client, topic, subscription_two, capsys + publisher_client, topic, subscription_async, capsys ): _publish_messages(publisher_client, topic) - subscriber.receive_messages_with_flow_control(PROJECT, SUBSCRIPTION_TWO, 5) + subscriber.receive_messages_with_flow_control( + PROJECT, SUBSCRIPTION_ASYNC, 5 + ) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_two in out + assert subscription_async in out assert "message" in out def test_receive_synchronously( - publisher_client, topic, subscription_three, capsys + publisher_client, topic, subscription_sync, capsys ): _publish_messages(publisher_client, topic) - subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_THREE) + subscriber.synchronous_pull(PROJECT, SUBSCRIPTION_SYNC) out, _ = capsys.readouterr() assert "Done." in out def test_receive_synchronously_with_lease( - publisher_client, topic, subscription_three, capsys + publisher_client, topic, subscription_sync, capsys ): _publish_messages(publisher_client, topic) subscriber.synchronous_pull_with_lease_management( - PROJECT, SUBSCRIPTION_THREE + PROJECT, SUBSCRIPTION_SYNC ) out, _ = capsys.readouterr() assert "Done." in out -def test_listen_for_errors(publisher_client, topic, subscription_two, capsys): +def test_listen_for_errors( + publisher_client, topic, subscription_async, capsys +): _publish_messages(publisher_client, topic) - subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_TWO, 5) + subscriber.listen_for_errors(PROJECT, SUBSCRIPTION_ASYNC, 5) out, _ = capsys.readouterr() assert "Listening" in out - assert subscription_two in out + assert subscription_async in out assert "threw an exception" in out