Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checking _Rendezvous.done() when stopping Pub / Sub request generator. #4554

Merged
merged 2 commits into from
Dec 7, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def _request_generator_thread(self, policy):
_LOGGER.debug('Sending request:\n%r', request)
yield request

def _stop_request_generator(self, request_generator):
def _stop_request_generator(self, request_generator, response_generator):
"""Ensure a request generator is closed.

This **must** be done when recovering from a retry-able exception.
Expand All @@ -237,12 +237,23 @@ def _stop_request_generator(self, request_generator):
Args:
request_generator (Generator): A streaming pull request generator
returned from :meth:`_request_generator_thread`.
response_generator (grpc._channel._Rendezvous): The gRPC

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

bidirectional stream object that **was** consuming the
``request_generator``. (It will actually spawn a thread
to consume the requests, but that thread will stop once the
rendezvous has a status code set.)

Returns:
bool: Indicates if the generator was successfully stopped. Will
be :data:`True` unless the queue is not empty and the generator
is running.
"""
if not response_generator.done():
_LOGGER.debug(
'Response generator must be done before stopping '
'request generator.')
return False

with self._put_lock:
try:
request_generator.close()
Expand Down Expand Up @@ -322,7 +333,8 @@ def _blocking_consume(self, policy):
except Exception as exc:
recover = policy.on_exception(exc)
if recover:
recover = self._stop_request_generator(request_generator)
recover = self._stop_request_generator(
request_generator, response_generator)
if not recover:
self._stop_no_join()
return
Expand Down
65 changes: 52 additions & 13 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,27 +115,32 @@ def test_blocking_consume_on_exception():


def test_blocking_consume_two_exceptions():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.side_effect = (
(mock.sentinel.A,),
(mock.sentinel.B,),
)
policy = mock.Mock(spec=('call_rpc', 'on_exception'))

exc1 = NameError('Oh noes.')
exc2 = ValueError('Something grumble.')
policy.on_response.side_effect = (exc1, exc2)
policy.on_exception.side_effect = OnException(acceptable=exc1)

response_generator1 = mock.MagicMock(spec=('__iter__', 'done'))
response_generator1.__iter__.side_effect = exc1
response_generator1.done.return_value = True
response_generator2 = mock.MagicMock(spec=('__iter__', 'done'))
response_generator2.__iter__.side_effect = exc2
policy.call_rpc.side_effect = (response_generator1, response_generator2)

consumer = _consumer.Consumer()
consumer._consumer_thread = mock.Mock(spec=threading.Thread)
policy.on_exception.side_effect = OnException(acceptable=exc1)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume(policy)
assert consumer._consumer_thread is None

# Check mocks.
assert policy.call_rpc.call_count == 2
policy.on_response.assert_has_calls(
[mock.call(mock.sentinel.A), mock.call(mock.sentinel.B)])
response_generator1.__iter__.assert_called_once_with()
response_generator1.done.assert_called_once_with()
response_generator2.__iter__.assert_called_once_with()
response_generator2.done.assert_not_called()
policy.on_exception.assert_has_calls(
[mock.call(exc1), mock.call(exc2)])

Expand Down Expand Up @@ -179,6 +184,18 @@ def basic_queue_generator(queue, received):
yield value


def test_stop_request_generator_response_not_done():
consumer = _consumer.Consumer()

response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = False
stopped = consumer._stop_request_generator(None, response_generator)
assert stopped is False

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_not_running():
# Model scenario tested:
# - The request generator **is not** running
Expand Down Expand Up @@ -207,7 +224,10 @@ def test_stop_request_generator_not_running():
# Make sure it **isn't** done.
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is True

# Make sure it **is** done.
Expand All @@ -217,6 +237,9 @@ def test_stop_request_generator_not_running():
assert queue_.get() == item2
assert queue_.empty()

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_close_failure():
# Model scenario tested:
Expand All @@ -229,11 +252,15 @@ def test_stop_request_generator_close_failure():
request_generator = mock.Mock(spec=('close',))
request_generator.close.side_effect = TypeError('Really, not a generator')

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is False

# Make sure close() was only called once.
request_generator.close.assert_called_once_with()
response_generator.done.assert_called_once_with()


def test_stop_request_generator_queue_non_empty():
Expand Down Expand Up @@ -264,7 +291,10 @@ def test_stop_request_generator_queue_non_empty():
assert received.empty()
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is False

# Make sure the generator is **still** not finished.
Expand All @@ -279,6 +309,9 @@ def test_stop_request_generator_queue_non_empty():
pass
assert received.get() == item2

# Check mocks.
response_generator.done.assert_called_once_with()


def test_stop_request_generator_running():
# Model scenario tested:
Expand All @@ -304,7 +337,10 @@ def test_stop_request_generator_running():
assert received.empty()
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
response_generator = mock.Mock(spec=('done',))
response_generator.done.return_value = True
stopped = consumer._stop_request_generator(
request_generator, response_generator)
assert stopped is True

# Make sure it **is** done, though we may have to wait until
Expand All @@ -316,3 +352,6 @@ def test_stop_request_generator_running():
assert request_generator.gi_frame is None
assert received.get() == _helper_threads.STOP
assert queue_.empty()

# Check mocks.
response_generator.done.assert_called_once_with()