diff --git a/aiokafka/conn.py b/aiokafka/conn.py index b8c5a788..bdc84e53 100644 --- a/aiokafka/conn.py +++ b/aiokafka/conn.py @@ -227,11 +227,15 @@ async def connect(self): self._idle_handle = loop.call_soon( self._idle_check, weakref.ref(self)) - if self._version_hint and self._version_hint >= (0, 10): - await self._do_version_lookup() + try: + if self._version_hint and self._version_hint >= (0, 10): + await self._do_version_lookup() - if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]: - await self._do_sasl_handshake() + if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]: + await self._do_sasl_handshake() + except: # noqa: E722 + self.close() + raise return reader, writer diff --git a/aiokafka/util.py b/aiokafka/util.py index 21de4b2b..5dab710c 100644 --- a/aiokafka/util.py +++ b/aiokafka/util.py @@ -1,10 +1,10 @@ import asyncio import os from asyncio import AbstractEventLoop -from distutils.version import StrictVersion -from typing import Awaitable, Dict, Tuple, TypeVar, Union +from typing import Awaitable, Dict, Tuple, TypeVar, Union, cast import async_timeout +from packaging.version import Version from .structs import OffsetAndMetadata, TopicPartition @@ -40,7 +40,11 @@ async def wait_for(fut: Awaitable[T], timeout: Union[None, int, float] = None) - def parse_kafka_version(api_version: str) -> Tuple[int, int, int]: - version = StrictVersion(api_version).version + parsed = Version(api_version).release + if not 2 <= len(parsed) <= 3: + raise ValueError(api_version) + version = cast(Tuple[int, int, int], (parsed + (0,))[:3]) + if not (0, 9) <= version < (3, 0): raise ValueError(api_version) return version diff --git a/pytest.ini b/pytest.ini index 6171b335..c3d76d9c 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,6 +1,10 @@ [pytest] filterwarnings = error + # https://github.com/docker/docker-py/issues/1293 ignore:.*docker.sock.*:ResourceWarning + ignore:distutils Version classes are deprecated:DeprecationWarning:docker + # Actually comes from docker importing distutils on Windows + ignore:the imp module is deprecated in favour of importlib:DeprecationWarning:pywintypes markers = ssl: Tests that require SSL certificates to run diff --git a/setup.py b/setup.py index 95c80c20..ab2e09d9 100644 --- a/setup.py +++ b/setup.py @@ -2,11 +2,11 @@ import platform import re import sys -from distutils.command.bdist_rpm import bdist_rpm as _bdist_rpm -from distutils.command.build_ext import build_ext -from distutils.errors import CCompilerError, DistutilsExecError, DistutilsPlatformError from setuptools import Extension, setup +from setuptools.command.bdist_rpm import bdist_rpm as _bdist_rpm +from setuptools.command.build_ext import build_ext +from setuptools.errors import CCompilerError, ExecError, PlatformError # Those are needed to build _hton for windows @@ -88,13 +88,13 @@ class ve_build_ext(build_ext): def run(self): try: build_ext.run(self) - except (DistutilsPlatformError, FileNotFoundError): + except (PlatformError, FileNotFoundError): raise BuildFailed() def build_extension(self, ext): try: build_ext.build_extension(self, ext) - except (CCompilerError, DistutilsExecError, DistutilsPlatformError, ValueError): + except (CCompilerError, ExecError, PlatformError, ValueError): raise BuildFailed() @@ -102,6 +102,7 @@ def build_extension(self, ext): "async-timeout", "kafka-python>=2.0.0", "dataclasses>=0.5; python_version<'3.7'", + "packaging", ] PY_VER = sys.version_info diff --git a/tests/_testutil.py b/tests/_testutil.py index 7aac6ec6..5a4b5f02 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -422,10 +422,14 @@ def random_string(length): def wait_kafka(kafka_host, kafka_port, timeout=60): - loop = asyncio.get_event_loop() - return loop.run_until_complete( - _wait_kafka(kafka_host, kafka_port, timeout) - ) + loop = asyncio.new_event_loop() + try: + res = loop.run_until_complete( + _wait_kafka(kafka_host, kafka_port, timeout) + ) + finally: + loop.close() + return res async def _wait_kafka(kafka_host, kafka_port, timeout): diff --git a/tests/conftest.py b/tests/conftest.py index 0d6083e2..1a250007 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -50,9 +50,12 @@ def pytest_configure(config): @pytest.fixture(scope='session') def docker(request): image = request.config.getoption('--docker-image') - if not image: - return None - return libdocker.from_env() + if image: + client = libdocker.from_env() + yield client + client.close() + else: + yield None @pytest.fixture(scope='class') @@ -231,7 +234,7 @@ def kafka_server(request, docker, docker_ip_address, kafka_sasl_ssl_port, container ) finally: - container.remove(force=True) + container.stop() else: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index f37f0578..5a68c299 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -130,6 +130,7 @@ def test_create_consumer_no_running_loop(self): loop.run_until_complete(consumer.getone()) finally: loop.run_until_complete(consumer.stop()) + loop.close() @run_until_complete async def test_consumer_context_manager(self): @@ -1290,6 +1291,7 @@ async def test_rebalance_listener_with_coroutines(self): await self.send_messages(1, list(range(10, 20))) main_self = self + faults = [] class SimpleRebalanceListener(ConsumerRebalanceListener): def __init__(self, consumer): @@ -1305,16 +1307,24 @@ async def on_partitions_revoked(self, revoked): # Confirm that coordinator is actually waiting for callback to # complete await asyncio.sleep(0.2) - main_self.assertTrue( - self.consumer._coordinator.needs_join_prepare) + try: + main_self.assertTrue( + self.consumer._coordinator._rejoin_needed_fut.done()) + except Exception as exc: + # Exceptions here are intercepted by GroupCoordinator + faults.append(exc) async def on_partitions_assigned(self, assigned): self.assign_mock(assigned) # Confirm that coordinator is actually waiting for callback to # complete await asyncio.sleep(0.2) - main_self.assertFalse( - self.consumer._coordinator.needs_join_prepare) + try: + main_self.assertFalse( + self.consumer._coordinator._rejoin_needed_fut.done()) + except Exception as exc: + # Exceptions here are intercepted by GroupCoordinator + faults.append(exc) tp0 = TopicPartition(self.topic, 0) tp1 = TopicPartition(self.topic, 1) @@ -1333,6 +1343,8 @@ async def on_partitions_assigned(self, assigned): self.assertEqual(msg.value, b"10") listener1.revoke_mock.assert_called_with(set()) listener1.assign_mock.assert_called_with({tp0, tp1}) + if faults: + raise faults[0] # By adding a 2nd consumer we trigger rebalance consumer2 = AIOKafkaConsumer( @@ -1367,6 +1379,8 @@ async def on_partitions_assigned(self, assigned): self.assertEqual(listener2.revoke_mock.call_count, 1) listener2.assign_mock.assert_called_with(c2_assignment) self.assertEqual(listener2.assign_mock.call_count, 1) + if faults: + raise faults[0] @run_until_complete async def test_rebalance_listener_no_deadlock_callbacks(self): diff --git a/tests/test_producer.py b/tests/test_producer.py index 5a890259..5da7684c 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -148,6 +148,7 @@ def test_create_producer_no_running_loop(self): self.assertEqual(resp.offset, 0) finally: loop.run_until_complete(producer.stop()) + loop.close() @run_until_complete async def test_producer_context_manager(self):