From 60a4d87f4e8396d5c51d1f8aa2ce7a5b9cef8c3e Mon Sep 17 00:00:00 2001 From: Bogdan Markov Date: Fri, 20 Dec 2024 10:45:04 +0100 Subject: [PATCH] Cancel queue get in case of cancel of parent. --- tests/aio/test_session_pool.py | 5 +++++ ydb/aio/table.py | 9 ++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/aio/test_session_pool.py b/tests/aio/test_session_pool.py index c2875ba3..af1ef351 100644 --- a/tests/aio/test_session_pool.py +++ b/tests/aio/test_session_pool.py @@ -34,10 +34,15 @@ async def test_waiter_is_notified(driver): @pytest.mark.asyncio async def test_no_race_after_future_cancel(driver): pool = ydb.aio.SessionPool(driver, 1) + s = await pool.acquire() waiter = asyncio.ensure_future(pool.acquire()) + await asyncio.sleep(0) waiter.cancel() await pool.release(s) + await asyncio.wait([waiter]) + + assert pool._active_queue.qsize() == 1 s = await pool.acquire() assert s.initialized() await pool.stop() diff --git a/ydb/aio/table.py b/ydb/aio/table.py index aec32e1a..538f498b 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -563,7 +563,14 @@ async def _prepare_session(self, timeout, retry_num) -> ydb.ISession: async def _get_session_from_queue(self, timeout: float): task_wait = asyncio.ensure_future(asyncio.wait_for(self._active_queue.get(), timeout=timeout)) task_should_stop = asyncio.ensure_future(self._should_stop.wait()) - done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED) + try: + done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED) + except asyncio.CancelledError: + cancelled = task_wait.cancel() + if not cancelled: + priority, session = task_wait.result() + self._active_queue.put_nowait((priority, session)) + raise if task_should_stop in done: task_wait.cancel() return self._create()