From 421f6792fed6e9cf3b0485f1c546928fb90612ff Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 15:41:31 -0500 Subject: [PATCH 1/5] Fix memory leak in AsyncClient --- clickhouse_connect/driver/asyncclient.py | 1 + examples/run_async.py | 1 + 2 files changed, 2 insertions(+) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index d2346f04..bfbac58d 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -69,6 +69,7 @@ def close(self): Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() + self.executor.shutdown(wait=False) async def query(self, query: Optional[str] = None, diff --git a/examples/run_async.py b/examples/run_async.py index c4cb338e..99ceab96 100644 --- a/examples/run_async.py +++ b/examples/run_async.py @@ -41,6 +41,7 @@ async def semaphore_wrapper(sm: asyncio.Semaphore, num: int): semaphore = asyncio.Semaphore(SEMAPHORE) await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)]) + client.close() async def main(): From fcec2a071c37eb629ec8845de8eb808640d21a4e Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 16:01:04 -0500 Subject: [PATCH 2/5] Graceful shutdown --- clickhouse_connect/driver/asyncclient.py | 10 ++++++++-- examples/run_async.py | 2 +- tests/integration_tests/conftest.py | 3 ++- tests/integration_tests/test_session_id.py | 6 +++--- 4 files changed, 14 insertions(+), 7 deletions(-) diff --git a/clickhouse_connect/driver/asyncclient.py b/clickhouse_connect/driver/asyncclient.py index bfbac58d..e78dfd5d 100644 --- a/clickhouse_connect/driver/asyncclient.py +++ b/clickhouse_connect/driver/asyncclient.py @@ -64,12 +64,12 @@ def min_version(self, version_str: str) -> bool: """ return self.client.min_version(version_str) - def close(self): + async def close(self): """ Subclass implementation to close the connection to the server/deallocate the client """ self.client.close() - self.executor.shutdown(wait=False) + await asyncio.to_thread(self.executor.shutdown, True) async def query(self, query: Optional[str] = None, @@ -677,3 +677,9 @@ def _raw_insert(): loop = asyncio.get_running_loop() result = await loop.run_in_executor(self.executor, _raw_insert) return result + + async def __aenter__(self) -> "AsyncClient": + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: + await self.close() diff --git a/examples/run_async.py b/examples/run_async.py index 99ceab96..069e3f90 100644 --- a/examples/run_async.py +++ b/examples/run_async.py @@ -41,7 +41,7 @@ async def semaphore_wrapper(sm: asyncio.Semaphore, num: int): semaphore = asyncio.Semaphore(SEMAPHORE) await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)]) - client.close() + await client.close() async def main(): diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index f2fb6be1..7a7edbaa 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -131,7 +131,8 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) - @fixture(scope='session', autouse=True, name='test_async_client') def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]: - yield AsyncClient(client=test_client) + async with AsyncClient(client=test_client): + yield @fixture(scope='session', name='table_context') diff --git a/tests/integration_tests/test_session_id.py b/tests/integration_tests/test_session_id.py index 84c683f9..17ac9112 100644 --- a/tests/integration_tests/test_session_id.py +++ b/tests/integration_tests/test_session_id.py @@ -46,7 +46,7 @@ async def test_async_client_default_session_id(test_config: TestConfig): user=test_config.username, password=test_config.password) assert async_client.get_client_setting(SESSION_KEY) is None - async_client.close() + await async_client.close() @pytest.mark.asyncio @@ -62,7 +62,7 @@ async def test_async_client_autogenerate_session_id(test_config: TestConfig): uuid.UUID(session_id) except ValueError: pytest.fail(f"Invalid session_id: {session_id}") - async_client.close() + await async_client.close() @pytest.mark.asyncio @@ -75,4 +75,4 @@ async def test_async_client_custom_session_id(test_config: TestConfig): password=test_config.password, session_id=session_id) assert async_client.get_client_setting(SESSION_KEY) == session_id - async_client.close() + await async_client.close() From 03dcdb3fc9f0da4c128850e00e35a90c8ff457e5 Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 16:10:40 -0500 Subject: [PATCH 3/5] fix test --- tests/integration_tests/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 7a7edbaa..91fa497d 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -130,7 +130,7 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) - @fixture(scope='session', autouse=True, name='test_async_client') -def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]: +async def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]: async with AsyncClient(client=test_client): yield From 78476ba83ad7c965702c1b6706868ceb95336a19 Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 16:16:36 -0500 Subject: [PATCH 4/5] fix tests --- tests/integration_tests/conftest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 91fa497d..3bff5b1a 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -3,7 +3,7 @@ import random import time from subprocess import Popen, PIPE -from typing import Iterator, NamedTuple, Sequence, Optional, Callable +from typing import Iterator, NamedTuple, Sequence, Optional, Callable, AsyncContextManager from pytest import fixture @@ -130,9 +130,9 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) - @fixture(scope='session', autouse=True, name='test_async_client') -async def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]: - async with AsyncClient(client=test_client): - yield +async def test_async_client_fixture(test_client: Client) -> AsyncContextManager[AsyncClient]: + async with AsyncClient(client=test_client) as client: + yield client @fixture(scope='session', name='table_context') From 50099da369e8cf0ee1d9fde87969a00b2a1b461c Mon Sep 17 00:00:00 2001 From: pufit Date: Tue, 21 Jan 2025 16:27:44 -0500 Subject: [PATCH 5/5] fix tests --- tests/integration_tests/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests/conftest.py b/tests/integration_tests/conftest.py index 3bff5b1a..6beeef25 100644 --- a/tests/integration_tests/conftest.py +++ b/tests/integration_tests/conftest.py @@ -5,6 +5,7 @@ from subprocess import Popen, PIPE from typing import Iterator, NamedTuple, Sequence, Optional, Callable, AsyncContextManager +import pytest_asyncio from pytest import fixture from clickhouse_connect import common @@ -129,7 +130,7 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) - sys.stderr.write('Successfully stopped docker compose') -@fixture(scope='session', autouse=True, name='test_async_client') +@pytest_asyncio.fixture(scope='session', autouse=True, name='test_async_client') async def test_async_client_fixture(test_client: Client) -> AsyncContextManager[AsyncClient]: async with AsyncClient(client=test_client) as client: yield client