Skip to content

Commit

Permalink
Re-factoring _stop_request_generator() a slight bit.
Browse files Browse the repository at this point in the history
- Dropping the "extra" check
- Making it return `True/False` rather than throwing an exception,
  this way the recovery can just be negated instead of a panic
- Added logging statements to indicate if it was stopped and when
  the helper is actually waiting on `Queue.empty()`
  • Loading branch information
dhermes committed Dec 1, 2017
1 parent da55f04 commit bc44c1e
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 34 deletions.
31 changes: 20 additions & 11 deletions pubsub/google/cloud/pubsub_v1/subscriber/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,29 +238,38 @@ def _stop_request_generator(self, request_generator):
request_generator (Generator): A streaming pull request generator
returned from :meth:`_request_generator_thread`.
Raises:
ValueError: If ``.close()`` fails for any reason other than
"generator already executing".
ValueError: If the queue is not empty and the generator is
running.
Returns:
bool: Indicates if the generator was successfully stopped. Will
be :data:`True` unless the queue is not empty and the generator
is running.
"""
with self._put_lock:
try:
request_generator.close()
except ValueError as exc:
if exc.args != ('generator already executing',):
raise
except ValueError:
# Should be ``ValueError('generator already executing')``
if not self._request_queue.empty():
raise ValueError('Queue expected to be empty.')
_LOGGER.debug(
'Request generator could not be closed but '
'request queue is not empty.')
return False
# If we **cannot** close the request generator,
# then there is no blocking get on the queue. Since
# we have locked ``.put()`` this means that the
# queue **was** and remains empty.
self._request_queue.put(_helper_threads.STOP)
# Wait for the request generator to ``.get()`` the ``STOP``.
_LOGGER.debug(
'Waiting for active request generator to receive STOP')
while not self._request_queue.empty():
pass
request_generator.close()
except Exception as exc:
_LOGGER.error('Failed to close request generator: %r', exc)
return False

_LOGGER.debug('Successfully closed request generator.')
return True

def _blocking_consume(self):
"""Consume the stream indefinitely."""
Expand Down Expand Up @@ -289,8 +298,8 @@ def _blocking_consume(self):
except Exception as exc:
recover = self._policy.on_exception(exc)
if recover:
self._stop_request_generator(request_generator)
else:
recover = self._stop_request_generator(request_generator)
if not recover:
self.stop_consuming()

def start_consuming(self):
Expand Down
88 changes: 65 additions & 23 deletions pubsub/tests/unit/pubsub_v1/subscriber/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
import threading
import types as base_types

Expand Down Expand Up @@ -145,13 +146,57 @@ def test_start_consuming():
)


def basic_queue_generator(queue, received):
value = queue.get()
received.append(value)
yield value
@contextlib.contextmanager
def no_op_ctx_manager():
yield


def basic_queue_generator(queue, received, append_lock=None):
if append_lock is None:
append_lock = no_op_ctx_manager()

while True:
value = queue.get()
with append_lock:
received.append(value)
yield value


def test_stop_request_generator_not_running():
consumer = create_consumer()
queue_ = consumer._request_queue
received = []
append_lock = threading.Lock()
request_generator = basic_queue_generator(queue_, received, append_lock)

queue_.put('unblock-please')
queue_.put('still-here')
assert not queue_.empty()
assert received == []
thread = threading.Thread(target=next, args=(request_generator,))
thread.start()

# Make sure the generator is not stuck at the blocked ``.get()``
# in the thread.
while request_generator.gi_running:
pass
with append_lock:
assert received == ['unblock-please']
# Make sure it **isn't** done.
assert request_generator.gi_frame is not None

stopped = consumer._stop_request_generator(request_generator)
assert stopped is True

# Make sure it **is** done.
assert not request_generator.gi_running
assert request_generator.gi_frame is None
assert not queue_.empty()
assert queue_.get() == 'still-here'
assert queue_.empty()


def test_stop_request_WAT():
consumer = create_consumer()
queue_ = consumer._request_queue
received = []
Expand All @@ -172,7 +217,9 @@ def test_stop_request_generator_not_running():
# Make sure it **isn't** done.
assert request_generator.gi_frame is not None

consumer._stop_request_generator(request_generator)
stopped = consumer._stop_request_generator(request_generator)
assert stopped is True

# Make sure it **is** done.
assert not request_generator.gi_running
assert request_generator.gi_frame is None
Expand All @@ -184,21 +231,15 @@ def test_stop_request_generator_not_running():
def test_stop_request_generator_close_failure():
consumer = create_consumer()

# First, a ValueError with the wrong error message.
# Second, an uncaught exception.
exceptions = (
ValueError('Not a generator'),
TypeError('Really, not a generator'),
)
for exc in exceptions:
request_generator = mock.Mock(spec=('close',))
request_generator.close.side_effect = exc
with pytest.raises(type(exc)) as exc_info:
consumer._stop_request_generator(request_generator)
exc = TypeError('Really, not a generator')
request_generator = mock.Mock(spec=('close',))
request_generator.close.side_effect = exc

assert exc_info.value is exc
# Make sure close() was only called once.
request_generator.close.assert_called_once_with()
stopped = consumer._stop_request_generator(request_generator)
assert stopped is False

# Make sure close() was only called once.
request_generator.close.assert_called_once_with()


def test_stop_request_generator_queue_non_empty():
Expand All @@ -208,10 +249,9 @@ def test_stop_request_generator_queue_non_empty():
request_generator.close.side_effect = ValueError(
'generator already executing')

with pytest.raises(ValueError) as exc_info:
consumer._stop_request_generator(request_generator)
stopped = consumer._stop_request_generator(request_generator)
assert stopped is False

assert exc_info.value.args == ('Queue expected to be empty.',)
# Make sure close() was only called once.
request_generator.close.assert_called_once_with()

Expand All @@ -233,7 +273,9 @@ def test_stop_request_generator_running():
# Make sure it **isn't** done.
assert request_generator.gi_frame is not None

consumer._stop_request_generator(request_generator)
stopped = consumer._stop_request_generator(request_generator)
assert stopped is True

# Make sure it **is** done.
assert not request_generator.gi_running
assert request_generator.gi_frame is None
Expand Down

0 comments on commit bc44c1e

Please sign in to comment.