-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make sure inactive request generator is stopped before spawning new one in Pub / Sub consumer. #4503
Conversation
da0510c
to
f33ac4b
Compare
Then running `message-after-recover` against PR googleapis/google-cloud-python#4503
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One item that I believe to be wrong, and one question.
@@ -244,7 +252,19 @@ def _blocking_consume(self): | |||
break | |||
except Exception as exc: | |||
recover = self._policy.on_exception(exc) | |||
if not recover: | |||
if recover: | |||
with threading.Lock(): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -60,12 +60,13 @@ def __init__(self): | |||
def __contains__(self, needle): | |||
return needle in self._helper_threads | |||
|
|||
def start(self, name, queue, target): | |||
def start(self, name, queue_put, target): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
with threading.Lock(): | ||
# Must lock so that ``self.send_request()`` cannot add | ||
# more items to the queue while it is being replaced. | ||
previous_queue = self._request_queue |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
try: | ||
request_generator.close() | ||
except ValueError as exc: | ||
if exc.args != ('generator already executing',): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
""" | ||
with self._put_lock: | ||
try: | ||
request_generator.close() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -60,12 +60,13 @@ def __init__(self): | |||
def __contains__(self, needle): | |||
return needle in self._helper_threads | |||
|
|||
def start(self, name, queue, target): | |||
def start(self, name, queue_put, target): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
# queue **was** and remains empty. | ||
self._request_queue.put(_helper_threads.STOP) | ||
# Wait for the request generator to ``.get()`` the ``STOP``. | ||
while not self._request_queue.empty(): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
except ValueError as exc: | ||
if exc.args != ('generator already executing',): | ||
raise | ||
if not self._request_queue.empty(): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
640524c
to
bc44c1e
Compare
'Waiting for active request generator to receive STOP') | ||
while not self._request_queue.empty(): | ||
pass | ||
request_generator.close() |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
- Made the "already closed" test use an actual generator instead of a mock. - Removed data race from `test_stop_request_generator_running()` - Added tracking of the values received from the queue in test generator
- Dropping the "extra" check - Making it return `True/False` rather than throwing an exception, this way the recovery can just be negated instead of a panic - Added logging statements to indicate if it was stopped and when the helper is actually waiting on `Queue.empty()`
bc44c1e
to
e78f9d2
Compare
@lukesneeringer @jonparrott PTAL. Phew I think it's finally merge-able. |
'request queue is not empty.') | ||
return False | ||
# If we **cannot** close the request generator, | ||
# then there is no blocking get on the queue. Since |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
yield value | ||
|
||
|
||
def test_stop_request_generator_not_running(): |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Also updating `test_stop_request_generator_queue_non_empty()` to use a real generator rather than a mock.
I added explanations for each unit test @jonparrott. Also updated one of them to use a real generator instead of a This code is no fun. |
This makes therequest_queue
used byConsumer._request_generator_thread()
local to the caller. This way on recovery a new queue can be used for the new thread.