Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure exception is available when BackgroundConsumer open stream fails #357

Merged
merged 12 commits into from
Oct 17, 2023
Merged
6 changes: 5 additions & 1 deletion google/api_core/bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,11 @@ def open(self):
request_generator = _RequestQueueGenerator(
self._request_queue, initial_request=self._initial_request
)
call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In examining this file, it seems to me that _RequestQueueGenerator._is_active (lines 91 et seq) should be defd as return self.call is not None and self.call.is_active(). The way it's currently written it will return True when self.call == None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e120a0c

call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
except exceptions.GoogleAPICallError as exc:
self._on_call_done(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_call_done expects a future (based on the name of its argument). How does it work with an exception here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception will be raised in the call back add_done_callback and must be handled by the user. This is based on the comment here:

Note that error handling *must* be done by using the provided
``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
whenever the RPC itself exits and will not provide any error details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_done_callback specifies callback (Callable[[grpc.Future], None]), but it's not clear to me that GoogleAPICallError is a grpc.Future (see this)

Copy link
Collaborator Author

@parthea parthea Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change that I proposed is exactly the same behaviour as the _on_call_done method of ResumableBidiRpc

# Unlike the base class, we only execute the callbacks on a terminal
# error, not for errors that we can recover from. Note that grpc's
# "future" here is also a grpc.RpcError.

From grpc/grpc#10885 (comment), grpc.RpcError is also grpc.Call.

I added this note in c7f63bd

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted c7f63bd because GoogleAPICallError is not grpc.Call or grpc.RpcError. The original grpc.RpcError is available on the response property of GoogleAPICallError

def wrap_errors(callable_):
"""Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error
classes.
Errors raised by the gRPC callable are mapped to the appropriate
:class:`google.api_core.exceptions.GoogleAPICallError` subclasses.
The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
available from the ``response`` property on the mapped exception. This
is useful for extracting metadata from the original error.

The behaviour proposed in this PR is the same as ResumableBidiRpc and we typically raise GoogleAPICallError instead of grpc.RpcError

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code in 753a591 to raise grpc.RpcError instead of GoogleAPICallError since grpc.RpcError is also grpc.Call and it is a better fit

raise
parthea marked this conversation as resolved.
Show resolved Hide resolved

request_generator.call = call

Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_bidi.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,19 @@ def test_wake_on_error(self):
while consumer.is_active:
pass

def test_rpc_callback_fires_when_consumer_start_fails(self):
expected_exception = exceptions.InvalidArgument("test")
callback = mock.Mock(spec=["__call__"])

rpc, _ = make_rpc()
bidi_rpc = bidi.BidiRpc(rpc)
bidi_rpc.add_done_callback(callback)
bidi_rpc._start_rpc.side_effect = expected_exception

consumer = bidi.BackgroundConsumer(bidi_rpc, on_response=None)
consumer.start()
assert callback.call_args.args[0] == expected_exception

def test_consumer_expected_error(self, caplog):
caplog.set_level(logging.DEBUG)

Expand Down