Skip to content

Commit

Permalink
Add timeout for topic_publish() for gRPC side.
Browse files Browse the repository at this point in the history
  • Loading branch information
daspecster committed Mar 10, 2017
1 parent b8ca2c0 commit 6dc7fff
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 3 deletions.
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def topic_delete(self, topic_path):
raise NotFound(topic_path)
raise

def topic_publish(self, topic_path, messages):
def topic_publish(self, topic_path, messages, timeout=30):
"""API call: publish one or more messages to a topic
See:
Expand All @@ -161,12 +161,15 @@ def topic_publish(self, topic_path, messages):
:type messages: list of dict
:param messages: messages to be published.
:type timeout: int
:param timeout: (Optional) Timeout seconds.
:rtype: list of string
:returns: list of opaque IDs for published messages.
:raises: :exc:`google.cloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_bundling=False)
options = CallOptions(is_bundling=False, timeout=timeout)
message_pbs = [_message_pb_from_mapping(message)
for message in messages]
try:
Expand Down
4 changes: 3 additions & 1 deletion pubsub/unit_tests/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,21 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD)
MESSAGE = {'data': B64, 'attributes': {'foo': 'bar'}}
timeout = 120 # 120 seconds or 2 minutes
gax_api = _GAXPublisherAPI()
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(NotFound):
api.topic_publish(self.TOPIC_PATH, [MESSAGE])
api.topic_publish(self.TOPIC_PATH, [MESSAGE], timeout=timeout)

topic_path, message_pbs, options = gax_api._publish_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options.timeout, timeout)

def test_topic_publish_error(self):
import base64
Expand Down

0 comments on commit 6dc7fff

Please sign in to comment.