Skip to content

Commit

Permalink
Handle async grpc errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby committed Feb 20, 2024
1 parent b59d85c commit b1fee15
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed to handle deadline on topic stream in async driver

## 3.8.0 ##
* Added clients for draft.BaseDynamicConfig service

Expand Down
2 changes: 1 addition & 1 deletion ydb/_grpc/grpcwrapper/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def receive(self) -> Any:
# todo handle grpc exceptions and convert it to internal exceptions
try:
grpc_message = await self.from_server_grpc.__anext__()
except grpc.RpcError as e:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
raise connection._rpc_error_handler(self._connection_state, e)

issues._process_response(grpc_message)
Expand Down
8 changes: 6 additions & 2 deletions ydb/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import copy
import typing
from concurrent import futures
import uuid
import threading
Expand Down Expand Up @@ -61,15 +62,18 @@ def _log_request(rpc_state, request):
logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request))


def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
def _rpc_error_handler(
rpc_state,
rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call],
on_disconnected: typing.Callable[[], None] = None):
"""
RPC call error handler, that translates gRPC error into YDB issue
:param rpc_state: A state of rpc
:param rpc_error: an underlying rpc error to handle
:param on_disconnected: a handler to call on disconnected connection
"""
logger.info("%s: received error, %s", rpc_state, rpc_error)
if isinstance(rpc_error, grpc.Call):
if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
return issues.Unauthenticated(rpc_error.details())
elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
Expand Down

0 comments on commit b1fee15

Please sign in to comment.