diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py index 2b2574829306..372d8e84e563 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py @@ -13,9 +13,12 @@ # limitations under the License. from __future__ import absolute_import +from __future__ import division import collections +import itertools import logging +import math import threading from google.cloud.pubsub_v1 import types @@ -34,6 +37,18 @@ """The maximum amount of time in seconds to wait for additional request items before processing the next batch of requests.""" +_ACK_IDS_BATCH_SIZE = 3000 +"""The maximum number of ACK IDs to send in a single StreamingPullRequest. + +The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per +acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164 +bytes, thus we cannot send more than o 524288/164 ~= 3197 ACK IDs in a single +StreamingPullRequest message. + +Accounting for some overhead, we should thus only send a maximum of 3000 ACK +IDs at a time. +""" + class Dispatcher(object): def __init__(self, manager, queue): @@ -119,9 +134,16 @@ def ack(self, items): if time_to_ack is not None: self._manager.ack_histogram.add(time_to_ack) - ack_ids = [item.ack_id for item in items] - request = types.StreamingPullRequest(ack_ids=ack_ids) - self._manager.send(request) + # We must potentially split the request into multiple smaller requests + # to avoid the server-side max request size limit. + ack_ids = (item.ack_id for item in items) + total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + + for _ in range(total_chunks): + request = types.StreamingPullRequest( + ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE) + ) + self._manager.send(request) # Remove the message from lease management. self.drop(items) @@ -150,13 +172,18 @@ def modify_ack_deadline(self, items): Args: items(Sequence[ModAckRequest]): The items to modify. """ - ack_ids = [item.ack_id for item in items] - seconds = [item.seconds for item in items] - - request = types.StreamingPullRequest( - modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds - ) - self._manager.send(request) + # We must potentially split the request into multiple smaller requests + # to avoid the server-side max request size limit. + ack_ids = (item.ack_id for item in items) + seconds = (item.seconds for item in items) + total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE)) + + for _ in range(total_chunks): + request = types.StreamingPullRequest( + modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE), + modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE), + ) + self._manager.send(request) def nack(self, items): """Explicitly deny receipt of messages. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py index 0e1e9744f6d9..33f1de16499a 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py @@ -95,6 +95,33 @@ def test_ack_no_time(): manager.ack_histogram.add.assert_not_called() +def test_ack_splitting_large_payload(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 164 bytes) + requests.AckRequest(ack_id=str(i).zfill(164), byte_size=0, time_to_ack=20) + for i in range(6001) + ] + dispatcher_.ack(items) + + calls = manager.send.call_args_list + assert len(calls) == 3 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = set() + + for call in calls: + message = call.args[0] + assert message.ByteSize() <= 524288 # server-side limit (2**19) + sent_ack_ids.update(message.ack_ids) + + assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + + def test_lease(): manager = mock.create_autospec( streaming_pull_manager.StreamingPullManager, instance=True @@ -153,6 +180,33 @@ def test_modify_ack_deadline(): ) +def test_modify_ack_deadline_splitting_large_payload(): + manager = mock.create_autospec( + streaming_pull_manager.StreamingPullManager, instance=True + ) + dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue) + + items = [ + # use realistic lengths for ACK IDs (max 164 bytes) + requests.ModAckRequest(ack_id=str(i).zfill(164), seconds=60) + for i in range(6001) + ] + dispatcher_.modify_ack_deadline(items) + + calls = manager.send.call_args_list + assert len(calls) == 3 + + all_ack_ids = {item.ack_id for item in items} + sent_ack_ids = set() + + for call in calls: + message = call.args[0] + assert message.ByteSize() <= 524288 # server-side limit (2**19) + sent_ack_ids.update(message.modify_deadline_ack_ids) + + assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed + + @mock.patch("threading.Thread", autospec=True) def test_start(thread): manager = mock.create_autospec(