Skip to content

Commit

Permalink
Merge pull request #1950 from daspecster/enable-pubsub-bundling
Browse files Browse the repository at this point in the history
Re-enable bundling (for consideration)
  • Loading branch information
daspecster authored Jul 1, 2016
2 parents 7d29482 + da259a8 commit 628ee9a
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
14 changes: 14 additions & 0 deletions gcloud/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,17 @@ def __init__(self, items, page_token):
def next(self):
items, self._items = self._items, None
return items


class _GAXBundlingEvent(object):

result = None

def __init__(self, result):
self._result = result

def is_set(self):
return self.result is not None

def wait(self, *_):
self.result = self._result
8 changes: 4 additions & 4 deletions gcloud/pubsub/_gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,17 @@ def topic_publish(self, topic_path, messages):
:raises: :exc:`gcloud.exceptions.NotFound` if the topic does not
exist
"""
options = CallOptions(is_bundling=False)
message_pbs = [_message_pb_from_dict(message)
for message in messages]
try:
result = self._gax_api.publish(topic_path, message_pbs,
options=options)
event = self._gax_api.publish(topic_path, message_pbs)
if not event.is_set():
event.wait()
except GaxError as exc:
if exc_to_code(exc.cause) == StatusCode.NOT_FOUND:
raise NotFound(topic_path)
raise
return result.message_ids
return event.result.message_ids

def topic_list_subscriptions(self, topic_path, page_size=0,
page_token=None):
Expand Down
33 changes: 29 additions & 4 deletions gcloud/pubsub/test__gax.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,15 @@ def test_topic_delete_error(self):

def test_topic_publish_hit(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
gax_api = _GAXPublisherAPI(_publish_response=response)
event = _GAXBundlingEvent(response)
event.wait() # already received result
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])
Expand All @@ -220,7 +223,29 @@ def test_topic_publish_hit(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_publish_hit_with_wait(self):
import base64
from gcloud._testing import _GAXBundlingEvent
PAYLOAD = b'This is the message text'
B64 = base64.b64encode(PAYLOAD).decode('ascii')
MSGID = 'DEADBEEF'
MESSAGE = {'data': B64, 'attributes': {}}
response = _PublishResponsePB([MSGID])
event = _GAXBundlingEvent(response)
gax_api = _GAXPublisherAPI(_publish_response=event)
api = self._makeOne(gax_api)

resource = api.topic_publish(self.TOPIC_PATH, [MESSAGE])

self.assertEqual(resource, [MSGID])
topic_path, message_pbs, options = gax_api._publish_called_with
self.assertEqual(topic_path, self.TOPIC_PATH)
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options, None)

def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
import base64
Expand All @@ -239,7 +264,7 @@ def test_topic_publish_miss_w_attrs_w_bytes_payload(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {'foo': 'bar'})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_publish_error(self):
import base64
Expand All @@ -258,7 +283,7 @@ def test_topic_publish_error(self):
message_pb, = message_pbs
self.assertEqual(message_pb.data, B64)
self.assertEqual(message_pb.attributes, {})
self.assertEqual(options.is_bundling, False)
self.assertEqual(options, None)

def test_topic_list_subscriptions_no_paging(self):
from google.gax import INITIAL_PAGE
Expand Down

0 comments on commit 628ee9a

Please sign in to comment.