Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure exception is available when BackgroundConsumer open stream fails #357

Merged
merged 12 commits into from
Oct 17, 2023
Merged
17 changes: 12 additions & 5 deletions google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,7 @@ def _is_active(self):
# Note: there is a possibility that this starts *before* the call
# property is set. So we have to check if self.call is set before
# seeing if it's active.
if self.call is not None and not self.call.is_active():
return False
else:
return True
return self.call is not None and self.call.is_active()

def __iter__(self):
if self._initial_request is not None:
Expand Down Expand Up @@ -265,6 +262,10 @@ def add_done_callback(self, callback):
self._callbacks.append(callback)

def _on_call_done(self, future):
# This occurs when the RPC errors or is successfully terminated.
# Note that grpc's "future" here can also be a grpc.RpcError.
# See note in https://github.com/grpc/grpc/issues/10885#issuecomment-302651331
# that `grpc.RpcError` is also `grpc.call`.
for callback in self._callbacks:
callback(future)

Expand All @@ -276,7 +277,13 @@ def open(self):
request_generator = _RequestQueueGenerator(
self._request_queue, initial_request=self._initial_request
)
call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In examining this file, it seems to me that _RequestQueueGenerator._is_active (lines 91 et seq) should be defd as return self.call is not None and self.call.is_active(). The way it's currently written it will return True when self.call == None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e120a0c

call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
except exceptions.GoogleAPICallError as exc:
# The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
# available from the ``response`` property on the mapped exception.
self._on_call_done(exc.response)
raise
parthea marked this conversation as resolved.
Show resolved Hide resolved

request_generator.call = call

Expand Down
15 changes: 15 additions & 0 deletions tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,21 @@ def test_wake_on_error(self):
while consumer.is_active:
pass

def test_rpc_callback_fires_when_consumer_start_fails(self):
expected_exception = exceptions.InvalidArgument(
"test", response=grpc.StatusCode.INVALID_ARGUMENT
)
callback = mock.Mock(spec=["__call__"])

rpc, _ = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.add_done_callback(callback)
bidi_rpc._start_rpc.side_effect = expected_exception

consumer = bidi.BackgroundConsumer(bidi_rpc, on_response=None)
consumer.start()
assert callback.call_args.args[0] == grpc.StatusCode.INVALID_ARGUMENT

def test_consumer_expected_error(self, caplog):
caplog.set_level(logging.DEBUG)

Expand Down