Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub: remove infinite while loops in subscriber examples #2604

Merged
merged 3 commits into from
Dec 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 41 additions & 34 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

UUID = uuid.uuid4().hex
PROJECT = os.environ["GCLOUD_PROJECT"]
TOPIC = "publisher-test-topic-" + UUID
TOPIC_ADMIN = "publisher-test-topic-admin-" + UUID
TOPIC_PUBLISH = "publisher-test-topic-publish-" + UUID


@pytest.fixture
Expand All @@ -34,15 +35,30 @@ def client():


@pytest.fixture
def topic(client):
topic_path = client.topic_path(PROJECT, TOPIC)
def topic_admin(client):
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)

try:
response = client.get_topic(topic_path)
topic = client.get_topic(topic_path)
except: # noqa
response = client.create_topic(topic_path)
topic = client.create_topic(topic_path)

yield response.name
yield topic.name
# Teardown of `topic_admin` is handled in `test_delete()`.


@pytest.fixture
def topic_publish(client):
topic_path = client.topic_path(PROJECT, TOPIC_PUBLISH)

try:
topic = client.get_topic(topic_path)
except: # noqa
topic = client.create_topic(topic_path)

yield topic.name

client.delete_topic(topic.name)


def _make_sleep_patch():
Expand All @@ -58,83 +74,74 @@ def new_sleep(period):
return mock.patch("time.sleep", new=new_sleep)


def _to_delete():
publisher_client = pubsub_v1.PublisherClient()
publisher_client.delete_topic(
"projects/{}/topics/{}".format(PROJECT, TOPIC)
)


def test_list(client, topic, capsys):
def test_list(client, topic_admin, capsys):
@eventually_consistent.call
def _():
publisher.list_topics(PROJECT)
out, _ = capsys.readouterr()
assert topic in out
assert topic_admin in out


def test_create(client):
topic_path = client.topic_path(PROJECT, TOPIC)
topic_path = client.topic_path(PROJECT, TOPIC_ADMIN)
try:
client.delete_topic(topic_path)
except Exception:
pass

publisher.create_topic(PROJECT, TOPIC)
publisher.create_topic(PROJECT, TOPIC_ADMIN)

@eventually_consistent.call
def _():
assert client.get_topic(topic_path)


def test_delete(client, topic):
publisher.delete_topic(PROJECT, TOPIC)
def test_delete(client, topic_admin):
publisher.delete_topic(PROJECT, TOPIC_ADMIN)

@eventually_consistent.call
def _():
with pytest.raises(Exception):
client.get_topic(client.topic_path(PROJECT, TOPIC))
client.get_topic(client.topic_path(PROJECT, TOPIC_ADMIN))


def test_publish(topic, capsys):
publisher.publish_messages(PROJECT, TOPIC)
def test_publish(topic_publish, capsys):
publisher.publish_messages(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_custom_attributes(topic, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)
def test_publish_with_custom_attributes(topic_publish, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_batch_settings(topic, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)
def test_publish_with_batch_settings(topic_publish, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_retry_settings(topic, capsys):
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC)
def test_publish_with_retry_settings(topic_publish, capsys):
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_error_handler(topic, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)
def test_publish_with_error_handler(topic_publish, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out


def test_publish_with_futures(topic, capsys):
publisher.publish_messages_with_futures(PROJECT, TOPIC)
def test_publish_with_futures(topic_publish, capsys):
publisher.publish_messages_with_futures(PROJECT, TOPIC_PUBLISH)

out, _ = capsys.readouterr()
assert "Published" in out

_to_delete()
98 changes: 60 additions & 38 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,16 @@ def update_subscription(project_id, subscription_name, endpoint):
# [END pubsub_update_push_configuration]


def receive_messages(project_id, subscription_name):
def receive_messages(project_id, subscription_name, timeout=None):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_async_pull]
# [START pubsub_quickstart_subscriber]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
Expand All @@ -183,27 +183,33 @@ def callback(message):
print("Received message: {}".format(message))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking. We must keep the main thread from
# exiting to allow it to process messages asynchronously in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull]
# [END pubsub_quickstart_subscriber]


def receive_messages_with_custom_attributes(project_id, subscription_name):
def receive_messages_with_custom_attributes(
project_id, subscription_name, timeout=None
):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
# [START pubsub_subscriber_async_pull_custom_attributes]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -219,26 +225,32 @@ def callback(message):
print("{}: {}".format(key, value))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_async_pull_custom_attributes]
# [END pubsub_subscriber_sync_pull_custom_attributes]


def receive_messages_with_flow_control(project_id, subscription_name):
def receive_messages_with_flow_control(
project_id, subscription_name, timeout=None
):
"""Receives messages from a pull subscription with flow control."""
# [START pubsub_subscriber_flow_settings]
import time

from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pub/Sub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -251,15 +263,18 @@ def callback(message):

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)
subscriber.subscribe(

streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback, flow_control=flow_control
)
print("Listening for messages on {}..\n".format(subscription_path))

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print("Listening for messages on {}".format(subscription_path))
while True:
time.sleep(60)
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
streaming_pull_future.result(timeout=timeout)
except: # noqa
streaming_pull_future.cancel()
# [END pubsub_subscriber_flow_settings]


Expand Down Expand Up @@ -386,13 +401,15 @@ def worker(msg):
# [END pubsub_subscriber_sync_pull_with_lease]


def listen_for_errors(project_id, subscription_name):
def listen_for_errors(project_id, subscription_name, timeout=None):
"""Receives messages and catches errors from a pull subscription."""
# [START pubsub_subscriber_error_listener]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO subscription_name = "Your Pubsub subscription name"
# TODO timeout = 5.0 # "How long the subscriber should listen for
# messages in seconds"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
Expand All @@ -403,16 +420,19 @@ def callback(message):
print("Received message: {}".format(message))
message.ack()

future = subscriber.subscribe(subscription_path, callback=callback)
streaming_pull_future = subscriber.subscribe(
subscription_path, callback=callback
)
print("Listening for messages on {}..\n".format(subscription_path))

# Blocks the thread while messages are coming in through the stream. Any
# exceptions that crop up on the thread will be set on the future.
# result() in a future will block indefinitely if `timeout` is not set,
# unless an exception is encountered first.
try:
# When timeout is unspecified, the result method waits indefinitely.
future.result(timeout=30)
streaming_pull_future.result(timeout=timeout)
except Exception as e:
streaming_pull_future.cancel()
print(
"Listening for messages on {} threw an Exception: {}.".format(
"Listening for messages on {} threw an exception: {}.".format(
subscription_name, e
)
)
Expand Down Expand Up @@ -518,14 +538,14 @@ def callback(message):
args.project_id, args.subscription_name, args.endpoint
)
elif args.command == "receive":
receive_messages(args.project_id, args.subscription_name)
receive_messages(args.project_id, args.subscription_name, args.timeout)
elif args.command == "receive-custom-attributes":
receive_messages_with_custom_attributes(
args.project_id, args.subscription_name
args.project_id, args.subscription_name, args.timeout
)
elif args.command == "receive-flow-control":
receive_messages_with_flow_control(
args.project_id, args.subscription_name
args.project_id, args.subscription_name, args.timeout
)
elif args.command == "receive-synchronously":
synchronous_pull(args.project_id, args.subscription_name)
Expand All @@ -534,4 +554,6 @@ def callback(message):
args.project_id, args.subscription_name
)
elif args.command == "listen_for_errors":
listen_for_errors(args.project_id, args.subscription_name)
listen_for_errors(
args.project_id, args.subscription_name, args.timeout
)
Loading