Skip to content

Commit

Permalink
Policy.on_exception actually used to make consumer go inactive.
Browse files Browse the repository at this point in the history
  • Loading branch information
dhermes committed Nov 29, 2017
1 parent 5f059f0 commit 815b44e
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 25 deletions.
3 changes: 1 addition & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,9 @@ def _blocking_consume(self):
self.stop_consuming()
except Exception as exc:
try:
self._policy.on_exception(exc)
self.active = self._policy.on_exception(exc)
except:
self.active = False
raise

def start_consuming(self):
"""Start consuming the stream."""
Expand Down
13 changes: 8 additions & 5 deletions pubsub/google/cloud/pubsub_v1/subscriber/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,19 @@ def running(self):
.. note::
A ``False`` value here does not necessarily mean that the
A :data:`False` value here does not necessarily mean that the
subscription is closed; it merely means that _this_ future is
not the future applicable to it.
Since futures have a single result (or exception) and there is
not a concept of resetting them, a closing re-opening of a
not a concept of resetting them, a closing / re-opening of a
subscription will therefore return a new future.
Returns:
bool: ``True`` if this subscription is opened with this future,
``False`` otherwise.
bool: :data:`True` if this subscription is opened with this
future, :data:`False` otherwise.
"""
return self._policy.future is self
if self._policy.future is not self:
return False

return super(Future, self).running()
7 changes: 5 additions & 2 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,11 @@ def on_exception(self, exception):
"""Called when a gRPC exception occurs.
If this method does nothing, then the stream is re-started. If this
raises an exception, it will stop the consumer thread.
This is executed on the response consumer helper thread.
raises an exception, it will stop the consumer thread. This is
executed on the response consumer helper thread.
Implementations should return :data:`True` if they want the consumer
thread to remain active, otherwise they should return :data:`False`.
Args:
exception (Exception): The exception raised by the RPC.
Expand Down
15 changes: 12 additions & 3 deletions pubsub/google/cloud/pubsub_v1/subscriber/policy/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,17 +176,26 @@ def on_callback_request(self, callback_request):
getattr(self, action)(**kwargs)

def on_exception(self, exception):
"""Bubble the exception.
"""Handle the exception.
This will cause the stream to exit loudly.
If the exception is one of the retryable exceptions, this will signal
to the consumer thread that is should remain active.
This will cause the stream to exit.
Returns:
bool: Indicates if the caller should remain active or shut down.
Will be :data:`True` if the ``exception`` is "acceptable", i.e.
in a list of retryable / idempotent exceptions.
"""
# If this is in the list of idempotent exceptions, then we want to
# retry. That entails just returning None.
if isinstance(exception, self._RETRYABLE_STREAM_ERRORS):
return
return True

# Set any other exception on the future.
self._future.set_exception(exception)
return False

def on_response(self, response):
"""Process all received Pub/Sub messages.
Expand Down
59 changes: 49 additions & 10 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import types as base_types

from google.auth import credentials
import mock
import pytest
Expand Down Expand Up @@ -87,18 +89,55 @@ def test_blocking_consume_keyboard_interrupt():
on_res.assert_called_once_with(consumer._policy, mock.sentinel.A)


@mock.patch.object(thread.Policy, 'call_rpc', autospec=True)
@mock.patch.object(thread.Policy, 'on_response', autospec=True)
@mock.patch.object(thread.Policy, 'on_exception', autospec=True)
def test_blocking_consume_exception_reraise(on_exc, on_res, call_rpc):
consumer = create_consumer()
class OnException(object):

def __init__(self, exiting_event, success=True):
self.exiting_event = exiting_event
self.success = success

def __call__(self, exception):
self.exiting_event.set()
if self.success:
return False
else:
raise RuntimeError('Failed to handle exception.')


def test_blocking_consume_exception_normal_handling():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
exc = TypeError('Bad things!')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(consumer._exiting)

# Establish that we get responses until we are sent the exiting event.
call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
on_res.side_effect = TypeError('Bad things!')
on_exc.side_effect = on_res.side_effect
with pytest.raises(TypeError):
consumer._blocking_consume()
consumer._blocking_consume()

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_response.assert_called_once_with(mock.sentinel.A)
policy.on_exception.assert_called_once_with(exc)


def test_blocking_consume_exception_handling_fails():
policy = mock.Mock(spec=('call_rpc', 'on_response', 'on_exception'))
policy.call_rpc.return_value = (mock.sentinel.A, mock.sentinel.B)
exc = NameError('It fails and it propagates.')
policy.on_response.side_effect = exc

consumer = _consumer.Consumer(policy=policy)
policy.on_exception.side_effect = OnException(
consumer._exiting, success=False)

# Establish that we get responses until we are sent the exiting event.
consumer._blocking_consume()

# Check mocks.
policy.call_rpc.assert_called_once()
policy.on_response.assert_called_once_with(mock.sentinel.A)
policy.on_exception.assert_called_once_with(exc)


def test_start_consuming():
Expand Down
6 changes: 3 additions & 3 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_policy_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def test_on_exception_deadline_exceeded():
details = 'Bad thing happened. Time out, go sit in the corner.'
exc = exceptions.DeadlineExceeded(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_unavailable():
Expand All @@ -103,14 +103,14 @@ def test_on_exception_unavailable():
details = 'UNAVAILABLE. Service taking nap.'
exc = exceptions.ServiceUnavailable(details)

assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is True


def test_on_exception_other():
policy = create_policy()
policy._future = Future(policy=policy)
exc = TypeError('wahhhhhh')
assert policy.on_exception(exc) is None
assert policy.on_exception(exc) is False
with pytest.raises(TypeError):
policy.future.result()

Expand Down

0 comments on commit 815b44e

Please sign in to comment.