Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix warnings #810

Merged
merged 6 commits into from
Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,15 @@ async def connect(self):
self._idle_handle = loop.call_soon(
self._idle_check, weakref.ref(self))

if self._version_hint and self._version_hint >= (0, 10):
await self._do_version_lookup()
try:
if self._version_hint and self._version_hint >= (0, 10):
await self._do_version_lookup()

if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]:
await self._do_sasl_handshake()
if self._security_protocol in ["SASL_SSL", "SASL_PLAINTEXT"]:
await self._do_sasl_handshake()
except: # noqa: E722
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please change this to except Exception, I would prefer not to have an implicit catch on BaseException if there is no need for it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do need to catch BaseException. Actually I stepped into the problem with CancelledError, which nowadays doesn't inherit from Exception.

self.close()
raise

return reader, writer

Expand Down
10 changes: 7 additions & 3 deletions aiokafka/util.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import asyncio
import os
from asyncio import AbstractEventLoop
from distutils.version import StrictVersion
from typing import Awaitable, Dict, Tuple, TypeVar, Union
from typing import Awaitable, Dict, Tuple, TypeVar, Union, cast

import async_timeout
from packaging.version import Version

from .structs import OffsetAndMetadata, TopicPartition

Expand Down Expand Up @@ -40,7 +40,11 @@ async def wait_for(fut: Awaitable[T], timeout: Union[None, int, float] = None) -


def parse_kafka_version(api_version: str) -> Tuple[int, int, int]:
version = StrictVersion(api_version).version
parsed = Version(api_version).release
if not 2 <= len(parsed) <= 3:
raise ValueError(api_version)
version = cast(Tuple[int, int, int], (parsed + (0,))[:3])

if not (0, 9) <= version < (3, 0):
raise ValueError(api_version)
return version
Expand Down
4 changes: 4 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
[pytest]
filterwarnings =
error
# https://github.com/docker/docker-py/issues/1293
ignore:.*docker.sock.*:ResourceWarning
ignore:distutils Version classes are deprecated:DeprecationWarning:docker
# Actually comes from docker importing distutils on Windows
ignore:the imp module is deprecated in favour of importlib:DeprecationWarning:pywintypes
markers =
ssl: Tests that require SSL certificates to run
11 changes: 6 additions & 5 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
import platform
import re
import sys
from distutils.command.bdist_rpm import bdist_rpm as _bdist_rpm
from distutils.command.build_ext import build_ext
from distutils.errors import CCompilerError, DistutilsExecError, DistutilsPlatformError

from setuptools import Extension, setup
from setuptools.command.bdist_rpm import bdist_rpm as _bdist_rpm
from setuptools.command.build_ext import build_ext
from setuptools.errors import CCompilerError, ExecError, PlatformError


# Those are needed to build _hton for windows
Expand Down Expand Up @@ -88,20 +88,21 @@ class ve_build_ext(build_ext):
def run(self):
try:
build_ext.run(self)
except (DistutilsPlatformError, FileNotFoundError):
except (PlatformError, FileNotFoundError):
raise BuildFailed()

def build_extension(self, ext):
try:
build_ext.build_extension(self, ext)
except (CCompilerError, DistutilsExecError, DistutilsPlatformError, ValueError):
except (CCompilerError, ExecError, PlatformError, ValueError):
raise BuildFailed()


install_requires = [
"async-timeout",
"kafka-python>=2.0.0",
"dataclasses>=0.5; python_version<'3.7'",
"packaging",
]

PY_VER = sys.version_info
Expand Down
12 changes: 8 additions & 4 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,14 @@ def random_string(length):


def wait_kafka(kafka_host, kafka_port, timeout=60):
loop = asyncio.get_event_loop()
return loop.run_until_complete(
_wait_kafka(kafka_host, kafka_port, timeout)
)
loop = asyncio.new_event_loop()
try:
res = loop.run_until_complete(
_wait_kafka(kafka_host, kafka_port, timeout)
)
finally:
loop.close()
return res


async def _wait_kafka(kafka_host, kafka_port, timeout):
Expand Down
11 changes: 7 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ def pytest_configure(config):
@pytest.fixture(scope='session')
def docker(request):
image = request.config.getoption('--docker-image')
if not image:
return None
return libdocker.from_env()
if image:
client = libdocker.from_env()
yield client
client.close()
else:
yield None


@pytest.fixture(scope='class')
Expand Down Expand Up @@ -231,7 +234,7 @@ def kafka_server(request, docker, docker_ip_address,
kafka_sasl_ssl_port, container
)
finally:
container.remove(force=True)
container.stop()

else:

Expand Down
22 changes: 18 additions & 4 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ def test_create_consumer_no_running_loop(self):
loop.run_until_complete(consumer.getone())
finally:
loop.run_until_complete(consumer.stop())
loop.close()

@run_until_complete
async def test_consumer_context_manager(self):
Expand Down Expand Up @@ -1290,6 +1291,7 @@ async def test_rebalance_listener_with_coroutines(self):
await self.send_messages(1, list(range(10, 20)))

main_self = self
faults = []

class SimpleRebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer):
Expand All @@ -1305,16 +1307,24 @@ async def on_partitions_revoked(self, revoked):
# Confirm that coordinator is actually waiting for callback to
# complete
await asyncio.sleep(0.2)
main_self.assertTrue(
self.consumer._coordinator.needs_join_prepare)
try:
main_self.assertTrue(
self.consumer._coordinator._rejoin_needed_fut.done())
except Exception as exc:
# Exceptions here are intercepted by GroupCoordinator
faults.append(exc)

async def on_partitions_assigned(self, assigned):
self.assign_mock(assigned)
# Confirm that coordinator is actually waiting for callback to
# complete
await asyncio.sleep(0.2)
main_self.assertFalse(
self.consumer._coordinator.needs_join_prepare)
try:
main_self.assertFalse(
self.consumer._coordinator._rejoin_needed_fut.done())
except Exception as exc:
# Exceptions here are intercepted by GroupCoordinator
faults.append(exc)

tp0 = TopicPartition(self.topic, 0)
tp1 = TopicPartition(self.topic, 1)
Expand All @@ -1333,6 +1343,8 @@ async def on_partitions_assigned(self, assigned):
self.assertEqual(msg.value, b"10")
listener1.revoke_mock.assert_called_with(set())
listener1.assign_mock.assert_called_with({tp0, tp1})
if faults:
raise faults[0]

# By adding a 2nd consumer we trigger rebalance
consumer2 = AIOKafkaConsumer(
Expand Down Expand Up @@ -1367,6 +1379,8 @@ async def on_partitions_assigned(self, assigned):
self.assertEqual(listener2.revoke_mock.call_count, 1)
listener2.assign_mock.assert_called_with(c2_assignment)
self.assertEqual(listener2.assign_mock.call_count, 1)
if faults:
raise faults[0]

@run_until_complete
async def test_rebalance_listener_no_deadlock_callbacks(self):
Expand Down
1 change: 1 addition & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def test_create_producer_no_running_loop(self):
self.assertEqual(resp.offset, 0)
finally:
loop.run_until_complete(producer.stop())
loop.close()

@run_until_complete
async def test_producer_context_manager(self):
Expand Down