Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout for topic_publish() for gRPC side. #3130

Merged
merged 1 commit into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def topic_delete(self, topic_path):
raise NotFound(topic_path)
raise

def topic_publish(self, topic_path, messages):
def topic_publish(self, topic_path, messages, timeout=30):

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"""API call: publish one or more messages to a topic

See:
Expand All @@ -161,12 +161,15 @@ def topic_publish(self, topic_path, messages):
:type messages: list of dict
:param messages: messages to be published.

:type timeout: int
:param timeout: (Optional) Timeout seconds.

:rtype: list of string
:returns: list of opaque IDs for published messages.
:raises: :exc:`google.cloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_bundling=False)
options = CallOptions(is_bundling=False, timeout=timeout)
message_pbs = [_message_pb_from_mapping(message)
for message in messages]
try:
Expand Down
4 changes: 3 additions & 1 deletion pubsub/unit_tests/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,21 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD)
MESSAGE = {'data': B64, 'attributes': {'foo': 'bar'}}
timeout = 120 # 120 seconds or 2 minutes
gax_api = _GAXPublisherAPI()
client = _Client(self.PROJECT)
api = self._make_one(gax_api, client)

with self.assertRaises(NotFound):
api.topic_publish(self.TOPIC_PATH, [MESSAGE])
api.topic_publish(self.TOPIC_PATH, [MESSAGE], timeout=timeout)

topic_path, message_pbs, options = gax_api._publish_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options.timeout, timeout)

def test_topic_publish_error(self):
import base64
Expand Down