From deca383ae3a6901064cb16fdba6d31259b36980c Mon Sep 17 00:00:00 2001 From: daspecster Date: Fri, 10 Mar 2017 10:44:03 -0500 Subject: [PATCH] Add timeout for topic_publish() for gRPC side. --- pubsub/google/cloud/pubsub/_gax.py | 7 +++++-- pubsub/unit_tests/test__gax.py | 4 +++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pubsub/google/cloud/pubsub/_gax.py b/pubsub/google/cloud/pubsub/_gax.py index 17c93786c442..599c99ef1d44 100644 --- a/pubsub/google/cloud/pubsub/_gax.py +++ b/pubsub/google/cloud/pubsub/_gax.py @@ -148,7 +148,7 @@ def topic_delete(self, topic_path): raise NotFound(topic_path) raise - def topic_publish(self, topic_path, messages): + def topic_publish(self, topic_path, messages, timeout=30): """API call: publish one or more messages to a topic See: @@ -161,12 +161,15 @@ def topic_publish(self, topic_path, messages): :type messages: list of dict :param messages: messages to be published. + :type timeout: int + :param timeout: (Optional) Timeout seconds. + :rtype: list of string :returns: list of opaque IDs for published messages. :raises: :exc:`google.cloud.exceptions.NotFound` if the topic does not exist """ - options = CallOptions(is_bundling=False) + options = CallOptions(is_bundling=False, timeout=timeout) message_pbs = [_message_pb_from_mapping(message) for message in messages] try: diff --git a/pubsub/unit_tests/test__gax.py b/pubsub/unit_tests/test__gax.py index 95f908f0c3f4..0f55c8fe298c 100644 --- a/pubsub/unit_tests/test__gax.py +++ b/pubsub/unit_tests/test__gax.py @@ -274,12 +274,13 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): PAYLOAD = b'This is the message text' B64 = base64.b64encode(PAYLOAD) MESSAGE = {'data': B64, 'attributes': {'foo': 'bar'}} + timeout = 120 # 120 seconds or 2 minutes gax_api = _GAXPublisherAPI() client = _Client(self.PROJECT) api = self._make_one(gax_api, client) with self.assertRaises(NotFound): - api.topic_publish(self.TOPIC_PATH, [MESSAGE]) + api.topic_publish(self.TOPIC_PATH, [MESSAGE], timeout=timeout) topic_path, message_pbs, options = gax_api._publish_called_with self.assertEqual(topic_path, self.TOPIC_PATH) @@ -287,6 +288,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self): self.assertEqual(message_pb.data, B64) self.assertEqual(message_pb.attributes, {'foo': 'bar'}) self.assertEqual(options.is_bundling, False) + self.assertEqual(options.timeout, timeout) def test_topic_publish_error(self): import base64