diff --git a/CHANGELOG.md b/CHANGELOG.md index 6137c077..769e0ea3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Fixed to handle deadline on topic stream in async driver + ## 3.8.0 ## * Added clients for draft.BaseDynamicConfig service diff --git a/ydb/_grpc/grpcwrapper/common_utils.py b/ydb/_grpc/grpcwrapper/common_utils.py index bc294025..7fb1576f 100644 --- a/ydb/_grpc/grpcwrapper/common_utils.py +++ b/ydb/_grpc/grpcwrapper/common_utils.py @@ -203,7 +203,7 @@ async def receive(self) -> Any: # todo handle grpc exceptions and convert it to internal exceptions try: grpc_message = await self.from_server_grpc.__anext__() - except grpc.RpcError as e: + except (grpc.RpcError, grpc.aio.AioRpcError) as e: raise connection._rpc_error_handler(self._connection_state, e) issues._process_response(grpc_message) diff --git a/ydb/connection.py b/ydb/connection.py index 1c4bd9c7..e1ed2d5c 100644 --- a/ydb/connection.py +++ b/ydb/connection.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- import logging import copy +import typing from concurrent import futures import uuid import threading @@ -61,7 +62,10 @@ def _log_request(rpc_state, request): logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request)) -def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None): +def _rpc_error_handler( + rpc_state, + rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call], + on_disconnected: typing.Callable[[], None] = None): """ RPC call error handler, that translates gRPC error into YDB issue :param rpc_state: A state of rpc @@ -69,7 +73,7 @@ def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None): :param on_disconnected: a handler to call on disconnected connection """ logger.info("%s: received error, %s", rpc_state, rpc_error) - if isinstance(rpc_error, grpc.Call): + if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)): if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED: return issues.Unauthenticated(rpc_error.details()) elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED: