From d100a8b9009a3e58a1cf64be60021c2264cdedd2 Mon Sep 17 00:00:00 2001 From: Oleg Ovcharuk Date: Tue, 21 Jan 2025 14:04:41 +0300 Subject: [PATCH] Transactional retryer --- tests/aio/query/test_query_session_pool.py | 35 +++++++++++++++++++++ tests/query/test_query_session_pool.py | 34 ++++++++++++++++++++ ydb/aio/query/pool.py | 35 +++++++++++++++++++++ ydb/query/pool.py | 36 ++++++++++++++++++++++ 4 files changed, 140 insertions(+) diff --git a/tests/aio/query/test_query_session_pool.py b/tests/aio/query/test_query_session_pool.py index 26b12082..ddeaf7f6 100644 --- a/tests/aio/query/test_query_session_pool.py +++ b/tests/aio/query/test_query_session_pool.py @@ -1,8 +1,12 @@ import asyncio import pytest import ydb + +from typing import Optional + from ydb.aio.query.pool import QuerySessionPool from ydb.aio.query.session import QuerySession, QuerySessionStateEnum +from ydb.aio.query.transaction import QueryTxContext class TestQuerySessionPool: @@ -55,6 +59,37 @@ async def callee(session: QuerySession): with pytest.raises(CustomException): await pool.retry_operation_async(callee) + @pytest.mark.parametrize( + "tx_mode", + [ + (None), + (ydb.QuerySerializableReadWrite()), + (ydb.QuerySnapshotReadOnly()), + (ydb.QueryOnlineReadOnly()), + (ydb.QueryStaleReadOnly()), + ], + ) + @pytest.mark.asyncio + async def test_retry_tx_normal(self, pool: QuerySessionPool, tx_mode: Optional[ydb.BaseQueryTxMode]): + async def callee(tx: QueryTxContext): + result_stream = await tx.execute("SELECT 1") + return [result_set async for result_set in result_stream] + + result = await pool.retry_tx_async(callee=callee, tx_mode=tx_mode) + assert len(result) == 1 + + @pytest.mark.asyncio + async def test_retry_tx_raises(self, pool: QuerySessionPool): + class CustomException(Exception): + pass + + async def callee(tx: QueryTxContext): + raise CustomException() + + with pytest.raises(CustomException): + await pool.retry_tx_async(callee) + + @pytest.mark.asyncio async def test_pool_size_limit_logic(self, pool: QuerySessionPool): target_size = 5 diff --git a/tests/query/test_query_session_pool.py b/tests/query/test_query_session_pool.py index cb476fa8..c0ff269e 100644 --- a/tests/query/test_query_session_pool.py +++ b/tests/query/test_query_session_pool.py @@ -1,7 +1,12 @@ import pytest import ydb + +from typing import Optional + from ydb.query.pool import QuerySessionPool from ydb.query.session import QuerySession, QuerySessionStateEnum +from ydb.query.transaction import QueryTxContext + class TestQuerySessionPool: @@ -46,6 +51,35 @@ def callee(session: QuerySession): with pytest.raises(CustomException): pool.retry_operation_sync(callee) + @pytest.mark.parametrize( + "tx_mode", + [ + (None), + (ydb.QuerySerializableReadWrite()), + (ydb.QuerySnapshotReadOnly()), + (ydb.QueryOnlineReadOnly()), + (ydb.QueryStaleReadOnly()), + ], + ) + def test_retry_tx_normal(self, pool: QuerySessionPool, tx_mode: Optional[ydb.BaseQueryTxMode]): + def callee(tx: QueryTxContext): + result_stream = tx.execute("SELECT 1") + return [result_set for result_set in result_stream] + + result = pool.retry_tx_sync(callee=callee, tx_mode=tx_mode) + assert len(result) == 1 + + def test_retry_tx_raises(self, pool: QuerySessionPool): + class CustomException(Exception): + pass + + def callee(tx: QueryTxContext): + raise CustomException() + + with pytest.raises(CustomException): + pool.retry_tx_sync(callee) + + def test_pool_size_limit_logic(self, pool: QuerySessionPool): target_size = 5 pool._size = target_size diff --git a/ydb/aio/query/pool.py b/ydb/aio/query/pool.py index 456896db..f6a84eb0 100644 --- a/ydb/aio/query/pool.py +++ b/ydb/aio/query/pool.py @@ -13,9 +13,11 @@ RetrySettings, retry_operation_async, ) +from ...query.base import BaseQueryTxMode from ...query.base import QueryClientSettings from ... import convert from ..._grpc.grpcwrapper import common_utils +from ..._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public logger = logging.getLogger(__name__) @@ -122,6 +124,39 @@ async def wrapped_callee(): return await retry_operation_async(wrapped_callee, retry_settings) + async def retry_tx_async( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + async def wrapped_callee(): + async with self.checkout() as session: + async with session.transaction(tx_mode=tx_mode) as tx: + result = await callee(tx, *args, **kwargs) + await tx.commit() + return result + + return await retry_operation_async(wrapped_callee, retry_settings) + async def execute_with_retries( self, query: str, diff --git a/ydb/query/pool.py b/ydb/query/pool.py index f1fcd173..e2c36623 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -8,6 +8,7 @@ import threading import queue +from .base import BaseQueryTxMode from .base import QueryClientSettings from .session import ( QuerySession, @@ -20,6 +21,8 @@ from .. import convert from ..settings import BaseRequestSettings from .._grpc.grpcwrapper import common_utils +from .._grpc.grpcwrapper import ydb_query_public_types as _ydb_query_public + logger = logging.getLogger(__name__) @@ -138,6 +141,39 @@ def wrapped_callee(): return retry_operation_sync(wrapped_callee, retry_settings) + def retry_tx_sync( + self, + callee: Callable, + tx_mode: Optional[BaseQueryTxMode] = None, + retry_settings: Optional[RetrySettings] = None, + *args, + **kwargs, + ): + """Special interface to execute a bunch of commands with transaction in a safe, retriable way. + + :param callee: A function, that works with session. + :param tx_mode: Transaction mode, which is a one from the following choises: + 1) QuerySerializableReadWrite() which is default mode; + 2) QueryOnlineReadOnly(allow_inconsistent_reads=False); + 3) QuerySnapshotReadOnly(); + 4) QueryStaleReadOnly(). + :param retry_settings: RetrySettings object. + + :return: Result sets or exception in case of execution errors. + """ + + tx_mode = tx_mode if tx_mode else _ydb_query_public.QuerySerializableReadWrite() + retry_settings = RetrySettings() if retry_settings is None else retry_settings + + def wrapped_callee(): + with self.checkout(timeout=retry_settings.max_session_acquire_timeout) as session: + with session.transaction(tx_mode=tx_mode) as tx: + result = callee(tx, *args, **kwargs) + tx.commit() + return result + + return retry_operation_sync(wrapped_callee, retry_settings) + def execute_with_retries( self, query: str,