Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure exception is available when BackgroundConsumer open stream fails #357

Merged
merged 12 commits into from
Oct 17, 2023

Conversation

parthea
Copy link
Collaborator

@parthea parthea commented Mar 14, 2022

Fixes #268🦕

This PR updates the bidi.py code to reflect what is stated in the comments.

The comment for add_done_callback states This occurs when the RPC errors or is successfully terminated. however the done callback was not being called when an error occurs.

def add_done_callback(self, callback):
"""Adds a callback that will be called when the RPC terminates.
This occurs when the RPC errors or is successfully terminated.
Args:
callback (Callable[[grpc.Future], None]): The callback to execute.
It will be provided with the same gRPC future as the underlying
stream which will also be a :class:`grpc.Call`.
"""

def _on_call_done(self, future):
for callback in self._callbacks:
callback(future)

@parthea parthea requested review from a team as code owners March 14, 2022 20:37
@parthea parthea force-pushed the fix-background-consumer-start-eats-exception branch from 7f05e51 to 767b215 Compare March 14, 2022 20:38
@parthea parthea force-pushed the fix-background-consumer-start-eats-exception branch from dddb4d3 to 97c2363 Compare March 15, 2022 13:18
tswast
tswast previously requested changes Mar 15, 2022
Copy link
Contributor

@tswast tswast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great! Let's block this until we figure out google-cloud-bigquery-storage though. googleapis/python-bigquery-storage#414

try:
call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
except exceptions.GoogleAPICallError as exc:
self._on_call_done(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on_call_done expects a future (based on the name of its argument). How does it work with an exception here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exception will be raised in the call back add_done_callback and must be handled by the user. This is based on the comment here:

Note that error handling *must* be done by using the provided
``bidi_rpc``'s ``add_done_callback``. This helper will automatically exit
whenever the RPC itself exits and will not provide any error details.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_done_callback specifies callback (Callable[[grpc.Future], None]), but it's not clear to me that GoogleAPICallError is a grpc.Future (see this)

Copy link
Collaborator Author

@parthea parthea Oct 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change that I proposed is exactly the same behaviour as the _on_call_done method of ResumableBidiRpc

# Unlike the base class, we only execute the callbacks on a terminal
# error, not for errors that we can recover from. Note that grpc's
# "future" here is also a grpc.RpcError.

From grpc/grpc#10885 (comment), grpc.RpcError is also grpc.Call.

I added this note in c7f63bd

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted c7f63bd because GoogleAPICallError is not grpc.Call or grpc.RpcError. The original grpc.RpcError is available on the response property of GoogleAPICallError

def wrap_errors(callable_):
"""Wrap a gRPC callable and map :class:`grpc.RpcErrors` to friendly error
classes.
Errors raised by the gRPC callable are mapped to the appropriate
:class:`google.api_core.exceptions.GoogleAPICallError` subclasses.
The original `grpc.RpcError` (which is usually also a `grpc.Call`) is
available from the ``response`` property on the mapped exception. This
is useful for extracting metadata from the original error.

The behaviour proposed in this PR is the same as ResumableBidiRpc and we typically raise GoogleAPICallError instead of grpc.RpcError

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the code in 753a591 to raise grpc.RpcError instead of GoogleAPICallError since grpc.RpcError is also grpc.Call and it is a better fit

@product-auto-label product-auto-label bot added the size: s Pull request size is small. label Jun 14, 2022
@parthea parthea requested review from tswast and atulep June 24, 2022 15:31
@parthea parthea dismissed tswast’s stale review June 30, 2022 02:22

Dismiss stale review

@parthea parthea requested a review from loferris June 30, 2022 02:22
@parthea
Copy link
Collaborator Author

parthea commented Jun 30, 2022

@loferris, please could you review/approve?

@tswast tswast requested a review from rosiezou January 24, 2023 22:02
@tswast
Copy link
Contributor

tswast commented Jan 24, 2023

@rosiezou You might be interested in this PR. I forget the full context, but I know it's something I wanted for better debugging of the Python BQ Storage Write API.

@parthea parthea assigned vchudnov-g and unassigned steffnay and loferris Sep 1, 2023
@brianhlee
Copy link

@parthea any idea when this will be merged?

@parthea
Copy link
Collaborator Author

parthea commented Sep 19, 2023

@vchudnov-g Please could you take a look?

@@ -276,7 +276,11 @@ def open(self):
request_generator = _RequestQueueGenerator(
self._request_queue, initial_request=self._initial_request
)
call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In examining this file, it seems to me that _RequestQueueGenerator._is_active (lines 91 et seq) should be defd as return self.call is not None and self.call.is_active(). The way it's currently written it will return True when self.call == None

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e120a0c

try:
call = self._start_rpc(iter(request_generator), metadata=self._rpc_metadata)
except exceptions.GoogleAPICallError as exc:
self._on_call_done(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_done_callback specifies callback (Callable[[grpc.Future], None]), but it's not clear to me that GoogleAPICallError is a grpc.Future (see this)

@parthea parthea assigned parthea and unassigned vchudnov-g Sep 29, 2023
@parthea parthea assigned vchudnov-g and unassigned parthea Oct 13, 2023
@parthea
Copy link
Collaborator Author

parthea commented Oct 13, 2023

@vchudnov-g Please could you take another look?

@parthea parthea merged commit 405272c into main Oct 17, 2023
24 checks passed
@parthea parthea deleted the fix-background-consumer-start-eats-exception branch October 17, 2023 19:54
parthea added a commit that referenced this pull request Nov 30, 2023
parthea added a commit that referenced this pull request Dec 1, 2023
…eam caught unexpected exception and will exit` (#562)

* chore: partial revert of PR #357

This reverts commit e120a0c.

* add comment
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size: s Pull request size is small.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

bidi.BackgroundConsumer.start() eats exception when _bidi_rpc.open() fails
8 participants