diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index d7e51c2d8..51edd7bd8 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -95,7 +95,7 @@ def publish_messages(project_id, topic_name): data = data.encode('utf-8') # When you publish a message, the client returns a future. future = publisher.publish(topic_path, data=data) - print('Published {} of message ID {}.'.format(data, future.result())) + print(future.result()) print('Published messages.') # [END pubsub_quickstart_publisher] @@ -119,8 +119,9 @@ def publish_messages_with_custom_attributes(project_id, topic_name): # Data must be a bytestring data = data.encode('utf-8') # Add two attributes, origin and username, to the message - publisher.publish( + future = publisher.publish( topic_path, data, origin='python-sample', username='gcp') + print(future.result()) print('Published messages with custom attributes.') # [END pubsub_publish_custom_attributes] @@ -138,21 +139,15 @@ def publish_messages_with_futures(project_id, topic_name): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) - # When you publish a message, the client returns a Future. This Future - # can be used to track when the message is published. - futures = [] - for n in range(1, 10): data = u'Message number {}'.format(n) # Data must be a bytestring data = data.encode('utf-8') - message_future = publisher.publish(topic_path, data=data) - futures.append(message_future) - - print('Published message IDs:') - for future in futures: - # result() blocks until the message is published. + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data=data) print(future.result()) + + print("Published messages with futures.") # [END pubsub_publisher_concurrency_control] @@ -169,28 +164,34 @@ def publish_messages_with_error_handler(project_id, topic_name): publisher = pubsub_v1.PublisherClient() topic_path = publisher.topic_path(project_id, topic_name) - def callback(message_future): - if message_future.exception(): - print('{} needs handling.'.format(message_future.exception())) - else: - print(message_future.result()) + futures = dict() - for n in range(1, 10): - data = u'Message number {}'.format(n) - # Data must be a bytestring - data = data.encode('utf-8') - # When you publish a message, the client returns a Future. - message_future = publisher.publish(topic_path, data=data) - # If you wish to handle publish failures, do it in the callback. - # Otherwise, it's okay to call `message_future.result()` directly. - message_future.add_done_callback(callback) - - print('Published message IDs:') - - # We keep the main thread from exiting so message futures can be - # resolved in the background. - while True: - time.sleep(60) + def get_callback(f, data): + def callback(f): + try: + print(f.result()) + futures.pop(data) + except: # noqa + print("Please handle {} for {}.".format(f.exception(), data)) + return callback + + for i in range(10): + data = str(i) + futures.update({data: None}) + # When you publish a message, the client returns a future. + future = publisher.publish( + topic_path, + data=data.encode("utf-8"), # data must be a bytestring. + ) + futures[data] = future + # Publish failures shall be handled in the callback function. + future.add_done_callback(get_callback(future, data)) + + # Wait for all the publish futures to resolve before exiting. + while futures: + time.sleep(5) + + print("Published message with error handler.") # [END pubsub_publish_messages_error_handler] @@ -215,9 +216,10 @@ def publish_messages_with_batch_settings(project_id, topic_name): data = u'Message number {}'.format(n) # Data must be a bytestring data = data.encode('utf-8') - publisher.publish(topic_path, data=data) + future = publisher.publish(topic_path, data=data) + print(future.result()) - print('Published messages.') + print('Published messages with batch settings.') # [END pubsub_publisher_batch_settings] diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index cdb4d0e0e..c2908d746 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -111,11 +111,7 @@ def test_publish_with_batch_settings(topic, capsys): def test_publish_with_error_handler(topic, capsys): - - with _make_sleep_patch(): - with pytest.raises(RuntimeError, match='sigil'): - publisher.publish_messages_with_error_handler( - PROJECT, TOPIC) + publisher.publish_messages_with_error_handler(PROJECT, TOPIC) out, _ = capsys.readouterr() assert 'Published' in out diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index df5b1092b..f91007a6d 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -169,13 +169,15 @@ def test_update(subscriber_client, subscription, capsys): def _publish_messages(publisher_client, topic): for n in range(5): data = u'Message {}'.format(n).encode('utf-8') - publisher_client.publish( + future = publisher_client.publish( topic, data=data) + future.result() def _publish_messages_with_custom_attributes(publisher_client, topic): data = u'Test message'.encode('utf-8') - publisher_client.publish(topic, data=data, origin='python-sample') + future = publisher_client.publish(topic, data=data, origin='python-sample') + future.result() def _make_sleep_patch():