Skip to content

Commit

Permalink
Fix memory leak in AsyncClient (#457)
Browse files Browse the repository at this point in the history
* Fix memory leak in AsyncClient

* Graceful shutdown

* fix test

* fix tests

* fix tests
  • Loading branch information
pufit authored Jan 22, 2025
1 parent 6f71dd7 commit eec6d2a
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 8 deletions.
9 changes: 8 additions & 1 deletion clickhouse_connect/driver/asyncclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ def min_version(self, version_str: str) -> bool:
"""
return self.client.min_version(version_str)

def close(self):
async def close(self):
"""
Subclass implementation to close the connection to the server/deallocate the client
"""
self.client.close()
await asyncio.to_thread(self.executor.shutdown, True)

async def query(self,
query: Optional[str] = None,
Expand Down Expand Up @@ -676,3 +677,9 @@ def _raw_insert():
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(self.executor, _raw_insert)
return result

async def __aenter__(self) -> "AsyncClient":
return self

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
await self.close()
1 change: 1 addition & 0 deletions examples/run_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ async def semaphore_wrapper(sm: asyncio.Semaphore, num: int):

semaphore = asyncio.Semaphore(SEMAPHORE)
await asyncio.gather(*[semaphore_wrapper(semaphore, num) for num in range(QUERIES)])
await client.close()


async def main():
Expand Down
10 changes: 6 additions & 4 deletions tests/integration_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import random
import time
from subprocess import Popen, PIPE
from typing import Iterator, NamedTuple, Sequence, Optional, Callable
from typing import Iterator, NamedTuple, Sequence, Optional, Callable, AsyncContextManager

import pytest_asyncio
from pytest import fixture

from clickhouse_connect import common
Expand Down Expand Up @@ -129,9 +130,10 @@ def test_client_fixture(test_config: TestConfig, test_create_client: Callable) -
sys.stderr.write('Successfully stopped docker compose')


@fixture(scope='session', autouse=True, name='test_async_client')
def test_async_client_fixture(test_client: Client) -> Iterator[AsyncClient]:
yield AsyncClient(client=test_client)
@pytest_asyncio.fixture(scope='session', autouse=True, name='test_async_client')
async def test_async_client_fixture(test_client: Client) -> AsyncContextManager[AsyncClient]:
async with AsyncClient(client=test_client) as client:
yield client


@fixture(scope='session', name='table_context')
Expand Down
6 changes: 3 additions & 3 deletions tests/integration_tests/test_session_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async def test_async_client_default_session_id(test_config: TestConfig):
user=test_config.username,
password=test_config.password)
assert async_client.get_client_setting(SESSION_KEY) is None
async_client.close()
await async_client.close()


@pytest.mark.asyncio
Expand All @@ -62,7 +62,7 @@ async def test_async_client_autogenerate_session_id(test_config: TestConfig):
uuid.UUID(session_id)
except ValueError:
pytest.fail(f"Invalid session_id: {session_id}")
async_client.close()
await async_client.close()


@pytest.mark.asyncio
Expand All @@ -75,4 +75,4 @@ async def test_async_client_custom_session_id(test_config: TestConfig):
password=test_config.password,
session_id=session_id)
assert async_client.get_client_setting(SESSION_KEY) == session_id
async_client.close()
await async_client.close()

0 comments on commit eec6d2a

Please sign in to comment.