Skip to content

Commit

Permalink
Pub/Sub: add publish retry sample [(#2273)](GoogleCloudPlatform/pytho…
Browse files Browse the repository at this point in the history
…n-docs-samples#2273)

* Publish retry sample

* double to single quotes

* double to single quotes

* license year
  • Loading branch information
anguillanneuf authored Jul 22, 2019
1 parent e27f549 commit ae4819a
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 27 deletions.
2 changes: 1 addition & 1 deletion samples/snippets/iam.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/iam_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
112 changes: 92 additions & 20 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ def publish_messages_with_custom_attributes(project_id, topic_name):
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
future = publisher.publish(
topic_path, data, origin='python-sample', username='gcp')
topic_path, data, origin='python-sample', username='gcp'
)
print(future.result())

print('Published messages with custom attributes.')
Expand All @@ -147,7 +148,7 @@ def publish_messages_with_futures(project_id, topic_name):
future = publisher.publish(topic_path, data=data)
print(future.result())

print("Published messages with futures.")
print('Published messages with futures.')
# [END pubsub_publisher_concurrency_control]


Expand All @@ -171,17 +172,17 @@ def callback(f):
try:
print(f.result())
futures.pop(data)
except: # noqa
print("Please handle {} for {}.".format(f.exception(), data))
except: # noqa
print('Please handle {} for {}.'.format(f.exception(), data))

return callback

for i in range(10):
data = str(i)
futures.update({data: None})
# When you publish a message, the client returns a future.
future = publisher.publish(
topic_path,
data=data.encode("utf-8"), # data must be a bytestring.
topic_path, data=data.encode('utf-8') # data must be a bytestring.
)
futures[data] = future
# Publish failures shall be handled in the callback function.
Expand All @@ -191,7 +192,7 @@ def callback(f):
while futures:
time.sleep(5)

print("Published message with error handler.")
print('Published message with error handler.')
# [END pubsub_publish_messages_error_handler]


Expand All @@ -207,7 +208,7 @@ def publish_messages_with_batch_settings(project_id, topic_name):
# of data or one second has passed.
batch_settings = pubsub_v1.types.BatchSettings(
max_bytes=1024, # One kilobyte
max_latency=1, # One second
max_latency=1, # One second
)
publisher = pubsub_v1.PublisherClient(batch_settings)
topic_path = publisher.topic_path(project_id, topic_name)
Expand All @@ -223,7 +224,65 @@ def publish_messages_with_batch_settings(project_id, topic_name):
# [END pubsub_publisher_batch_settings]


if __name__ == '__main__':
def publish_messages_with_retry_settings(project_id, topic_name):
"""Publishes messages with custom retry settings."""
# [START pubsub_publisher_retry_settings]
from google.cloud import pubsub_v1

# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"

# Configure the retry settings. Defaults will be overwritten.
retry_settings = {
'interfaces': {
'google.pubsub.v1.Publisher': {
'retry_codes': {
'publish': [
'ABORTED',
'CANCELLED',
'DEADLINE_EXCEEDED',
'INTERNAL',
'RESOURCE_EXHAUSTED',
'UNAVAILABLE',
'UNKNOWN',
]
},
'retry_params': {
'messaging': {
'initial_retry_delay_millis': 150, # default: 100
'retry_delay_multiplier': 1.5, # default: 1.3
'max_retry_delay_millis': 65000, # default: 60000
'initial_rpc_timeout_millis': 25000, # default: 25000
'rpc_timeout_multiplier': 1.0, # default: 1.0
'max_rpc_timeout_millis': 35000, # default: 30000
'total_timeout_millis': 650000, # default: 600000
}
},
'methods': {
'Publish': {
'retry_codes_name': 'publish',
'retry_params_name': 'messaging',
}
},
}
}
}

publisher = pubsub_v1.PublisherClient(client_config=retry_settings)
topic_path = publisher.topic_path(project_id, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
future = publisher.publish(topic_path, data=data)
print(future.result())

print('Published messages with retry settings.')
# [END pubsub_publisher_retry_settings]


if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
Expand All @@ -233,36 +292,47 @@ def publish_messages_with_batch_settings(project_id, topic_name):
subparsers = parser.add_subparsers(dest='command')
subparsers.add_parser('list', help=list_topics.__doc__)

create_parser = subparsers.add_parser('create', help=create_topic.__doc__)
create_parser = subparsers.add_parser('create',
help=create_topic.__doc__)
create_parser.add_argument('topic_name')

delete_parser = subparsers.add_parser('delete', help=delete_topic.__doc__)
delete_parser = subparsers.add_parser('delete',
help=delete_topic.__doc__)
delete_parser.add_argument('topic_name')

publish_parser = subparsers.add_parser(
'publish', help=publish_messages.__doc__)
publish_parser = subparsers.add_parser('publish',
help=publish_messages.__doc__)
publish_parser.add_argument('topic_name')

publish_with_custom_attributes_parser = subparsers.add_parser(
'publish-with-custom-attributes',
help=publish_messages_with_custom_attributes.__doc__)
help=publish_messages_with_custom_attributes.__doc__,
)
publish_with_custom_attributes_parser.add_argument('topic_name')

publish_with_futures_parser = subparsers.add_parser(
'publish-with-futures',
help=publish_messages_with_futures.__doc__)
'publish-with-futures', help=publish_messages_with_futures.__doc__
)
publish_with_futures_parser.add_argument('topic_name')

publish_with_error_handler_parser = subparsers.add_parser(
'publish-with-error-handler',
help=publish_messages_with_error_handler.__doc__)
help=publish_messages_with_error_handler.__doc__
)
publish_with_error_handler_parser.add_argument('topic_name')

publish_with_batch_settings_parser = subparsers.add_parser(
'publish-with-batch-settings',
help=publish_messages_with_batch_settings.__doc__)
help=publish_messages_with_batch_settings.__doc__
)
publish_with_batch_settings_parser.add_argument('topic_name')

publish_with_retry_settings_parser = subparsers.add_parser(
'publish-with-retry-settings',
help=publish_messages_with_retry_settings.__doc__
)
publish_with_retry_settings_parser.add_argument('topic_name')

args = parser.parse_args()

if args.command == 'list':
Expand All @@ -274,11 +344,13 @@ def publish_messages_with_batch_settings(project_id, topic_name):
elif args.command == 'publish':
publish_messages(args.project_id, args.topic_name)
elif args.command == 'publish-with-custom-attributes':
publish_messages_with_custom_attributes(
args.project_id, args.topic_name)
publish_messages_with_custom_attributes(args.project_id,
args.topic_name)
elif args.command == 'publish-with-futures':
publish_messages_with_futures(args.project_id, args.topic_name)
elif args.command == 'publish-with-error-handler':
publish_messages_with_error_handler(args.project_id, args.topic_name)
elif args.command == 'publish-with-batch-settings':
publish_messages_with_batch_settings(args.project_id, args.topic_name)
elif args.command == 'publish-with-retry-settings':
publish_messages_with_retry_settings(args.project_id, args.topic_name)
9 changes: 8 additions & 1 deletion samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -110,6 +110,13 @@ def test_publish_with_batch_settings(topic, capsys):
assert 'Published' in out


def test_publish_with_retry_settings(topic, capsys):
publisher.publish_messages_with_retry_settings(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out


def test_publish_with_error_handler(topic, capsys):
publisher.publish_messages_with_error_handler(PROJECT, TOPIC)

Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/quickstart.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2018 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
2 changes: 1 addition & 1 deletion samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016 Google Inc. All Rights Reserved.
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down

0 comments on commit ae4819a

Please sign in to comment.