diff --git a/samples/snippets/quickstart/pub.py b/samples/snippets/quickstart/pub.py index 9617b34ea..e340eb4f3 100644 --- a/samples/snippets/quickstart/pub.py +++ b/samples/snippets/quickstart/pub.py @@ -22,13 +22,13 @@ # [END pubsub_quickstart_pub_deps] -def get_callback(api_future, data): +def get_callback(api_future, data, ref): """Wrap message data in the context of the callback function.""" - def callback(api_future): try: print("Published message {} now has message ID {}".format( data, api_future.result())) + ref["num_messages"] += 1 except Exception: print("A problem occurred when publishing {}: {}\n".format( data, api_future.exception())) @@ -39,24 +39,28 @@ def callback(api_future): def pub(project_id, topic_name): """Publishes a message to a Pub/Sub topic.""" # [START pubsub_quickstart_pub_client] - # Initialize a Publisher client + # Initialize a Publisher client. client = pubsub_v1.PublisherClient() # [END pubsub_quickstart_pub_client] # Create a fully qualified identifier in the form of # `projects/{project_id}/topics/{topic_name}` topic_path = client.topic_path(project_id, topic_name) - # Data sent to Cloud Pub/Sub must be a bytestring + # Data sent to Cloud Pub/Sub must be a bytestring. data = b"Hello, World!" + # Keep track of the number of published messages. + ref = dict({"num_messages": 0}) + # When you publish a message, the client returns a future. api_future = client.publish(topic_path, data=data) - api_future.add_done_callback(get_callback(api_future, data)) + api_future.add_done_callback(get_callback(api_future, data, ref)) - # Keep the main thread from exiting until background message - # is processed. + # Keep the main thread from exiting while the message future + # gets resolved in the background. while api_future.running(): - time.sleep(0.1) + time.sleep(0.5) + print("Published {} message(s).".format(ref["num_messages"])) if __name__ == '__main__':