Skip to content

Commit

Permalink
fix(pubsub): split large (mod)ACK requests into smaller ones
Browse files Browse the repository at this point in the history
There is a server-side limit on the maximum size of ACK and modACK
requests, which can be hit if the leaser tries to manage too many
messages in a single requests.

This commit assures that such large requests are split into multiple
smaller requests.
  • Loading branch information
plamut committed Nov 4, 2019
1 parent 89eaedb commit 0c61849
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 10 deletions.
47 changes: 37 additions & 10 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,12 @@
# limitations under the License.

from __future__ import absolute_import
from __future__ import division

import collections
import itertools
import logging
import math
import threading

from google.cloud.pubsub_v1 import types
Expand All @@ -34,6 +37,18 @@
"""The maximum amount of time in seconds to wait for additional request items
before processing the next batch of requests."""

_ACK_IDS_BATCH_SIZE = 3000
"""The maximum number of ACK IDs to send in a single StreamingPullRequest.
The backend imposes a maximum request size limit of 524288 bytes (512 KiB) per
acknowledge / modifyAckDeadline request. ACK IDs have a maximum size of 164
bytes, thus we cannot send more than o 524288/164 ~= 3197 ACK IDs in a single
StreamingPullRequest message.
Accounting for some overhead, we should thus only send a maximum of 3000 ACK
IDs at a time.
"""


class Dispatcher(object):
def __init__(self, manager, queue):
Expand Down Expand Up @@ -119,9 +134,16 @@ def ack(self, items):
if time_to_ack is not None:
self._manager.ack_histogram.add(time_to_ack)

ack_ids = [item.ack_id for item in items]
request = types.StreamingPullRequest(ack_ids=ack_ids)
self._manager.send(request)
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = types.StreamingPullRequest(
ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE)
)
self._manager.send(request)

# Remove the message from lease management.
self.drop(items)
Expand Down Expand Up @@ -150,13 +172,18 @@ def modify_ack_deadline(self, items):
Args:
items(Sequence[ModAckRequest]): The items to modify.
"""
ack_ids = [item.ack_id for item in items]
seconds = [item.seconds for item in items]

request = types.StreamingPullRequest(
modify_deadline_ack_ids=ack_ids, modify_deadline_seconds=seconds
)
self._manager.send(request)
# We must potentially split the request into multiple smaller requests
# to avoid the server-side max request size limit.
ack_ids = (item.ack_id for item in items)
seconds = (item.seconds for item in items)
total_chunks = int(math.ceil(len(items) / _ACK_IDS_BATCH_SIZE))

for _ in range(total_chunks):
request = types.StreamingPullRequest(
modify_deadline_ack_ids=itertools.islice(ack_ids, _ACK_IDS_BATCH_SIZE),
modify_deadline_seconds=itertools.islice(seconds, _ACK_IDS_BATCH_SIZE),
)
self._manager.send(request)

def nack(self, items):
"""Explicitly deny receipt of messages.
Expand Down
54 changes: 54 additions & 0 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,33 @@ def test_ack_no_time():
manager.ack_histogram.add.assert_not_called()


def test_ack_splitting_large_payload():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

items = [
# use realistic lengths for ACK IDs (max 164 bytes)
requests.AckRequest(ack_id=str(i).zfill(164), byte_size=0, time_to_ack=20)
for i in range(6001)
]
dispatcher_.ack(items)

calls = manager.send.call_args_list
assert len(calls) == 3

all_ack_ids = {item.ack_id for item in items}
sent_ack_ids = set()

for call in calls:
message = call.args[0]
assert message.ByteSize() <= 524288 # server-side limit (2**19)
sent_ack_ids.update(message.ack_ids)

assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed


def test_lease():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
Expand Down Expand Up @@ -153,6 +180,33 @@ def test_modify_ack_deadline():
)


def test_modify_ack_deadline_splitting_large_payload():
manager = mock.create_autospec(
streaming_pull_manager.StreamingPullManager, instance=True
)
dispatcher_ = dispatcher.Dispatcher(manager, mock.sentinel.queue)

items = [
# use realistic lengths for ACK IDs (max 164 bytes)
requests.ModAckRequest(ack_id=str(i).zfill(164), seconds=60)
for i in range(6001)
]
dispatcher_.modify_ack_deadline(items)

calls = manager.send.call_args_list
assert len(calls) == 3

all_ack_ids = {item.ack_id for item in items}
sent_ack_ids = set()

for call in calls:
message = call.args[0]
assert message.ByteSize() <= 524288 # server-side limit (2**19)
sent_ack_ids.update(message.modify_deadline_ack_ids)

assert sent_ack_ids == all_ack_ids # all messages should have been ACK-ed


@mock.patch("threading.Thread", autospec=True)
def test_start(thread):
manager = mock.create_autospec(
Expand Down

0 comments on commit 0c61849

Please sign in to comment.