Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker #739

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/715.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker
7 changes: 6 additions & 1 deletion aiokafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ async def create_conn(
sasl_kerberos_domain_name=sasl_kerberos_domain_name,
sasl_oauth_token_provider=sasl_oauth_token_provider,
version_hint=version_hint)
await conn.connect()
try:
await conn.connect()
except Exception:
# Cleanup to prevent `Unclosed AIOKafkaConnection` if we failed to connect here
conn.close()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't be this logic inside AIOKafkaConnection.connect(), so that it maintains invariant too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that, because AIOKafkaConnection.connect() is not being used directly, only via create_conn factory. Though, I can move this wrapping logic if you think that it should be more appropriate to have it there.

raise
return conn


Expand Down
31 changes: 31 additions & 0 deletions tests/test_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,37 @@ async def test_close_disconnects_connection(self):
conn.close()
self.assertFalse(conn.connected())

@run_until_complete
async def test_semi_broken_connection(self):
"""This testcase tries to replicate what happens on connection to semi-broken
AWS MSK broker.

Connection could be established, but then some timeout occurs, and connection is
not being destroyed properly, producing 'Unclosed AIOKafkaConnection' and
calling loop exception handler, which could lead to failing the whole app for
example if we're running via aiorun with `stop_on_unhandled_errors=True`.
"""
host, port = self.kafka_host, self.kafka_port

def mock_connect(slf):
slf._reader = mock.MagicMock()
slf._reader.at_eof = mock.MagicMock(return_value=False)
raise asyncio.TimeoutError

# We cannot catch ResourceWarning from __del__, so just monitor `close()` call
close_called = False

def mock_close(slf):
slf._reader = None
nonlocal close_called
close_called = True

with mock.patch.object(AIOKafkaConnection, 'connect', mock_connect):
with mock.patch.object(AIOKafkaConnection, 'close', mock_close):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reproduce the problem without mocking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how exactly kafka broker can be brought up to exactly this broken state unfortunately. We've got this situation multiple times on Amazon MSK when kafka cluster was under high load, which caused failures on some brokers + "under replicated partitions" issues.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without it we can be never sure, that we actually fixed the problem. Any script that eventually fails is fine, not necessary suitable for unit tests one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you remove exception handling from create_conn and run python -m pytest -v --log-cli-level=INFO -s --docker-image aiolibs/kafka:2.12_2.2.2 tests/test_conn.py::ConnIntegrationTest::test_semi_broken_connection there will be original error logged in live log as:

Exception ignored in: <function AIOKafkaConnection.__del__ at 0x7f01a90fc3b0>
Traceback (most recent call last):
  File "/home/vvk/devel/aiokafka/aiokafka/conn.py", line 194, in __del__
    source=self)
ResourceWarning: Unclosed AIOKafkaConnection <AIOKafkaConnection host=127.0.0.1 port=43803>

So the unittest is still mimics the issue. Basically any exception from connect() after self._read_task is set is a problem.

Copy link
Collaborator

@ods ods Apr 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there other possible race conditions there? Like initializing self._reader, but not self._read_task?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I think not. The most possible source of exceptions is later call await self._do_version_lookup()

with pytest.raises(asyncio.TimeoutError):
await create_conn(host, port)
self.assertTrue(close_called)

def test_connection_version_info(self):
# All version supported
version_info = VersionInfo({
Expand Down