-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker #739
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix Unclosed AIOKafkaConnection when connecting to semi-broken broker |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -247,6 +247,37 @@ async def test_close_disconnects_connection(self): | |
conn.close() | ||
self.assertFalse(conn.connected()) | ||
|
||
@run_until_complete | ||
async def test_semi_broken_connection(self): | ||
"""This testcase tries to replicate what happens on connection to semi-broken | ||
AWS MSK broker. | ||
|
||
Connection could be established, but then some timeout occurs, and connection is | ||
not being destroyed properly, producing 'Unclosed AIOKafkaConnection' and | ||
calling loop exception handler, which could lead to failing the whole app for | ||
example if we're running via aiorun with `stop_on_unhandled_errors=True`. | ||
""" | ||
host, port = self.kafka_host, self.kafka_port | ||
|
||
def mock_connect(slf): | ||
slf._reader = mock.MagicMock() | ||
slf._reader.at_eof = mock.MagicMock(return_value=False) | ||
raise asyncio.TimeoutError | ||
|
||
# We cannot catch ResourceWarning from __del__, so just monitor `close()` call | ||
close_called = False | ||
|
||
def mock_close(slf): | ||
slf._reader = None | ||
nonlocal close_called | ||
close_called = True | ||
|
||
with mock.patch.object(AIOKafkaConnection, 'connect', mock_connect): | ||
with mock.patch.object(AIOKafkaConnection, 'close', mock_close): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to reproduce the problem without mocking? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know how exactly kafka broker can be brought up to exactly this broken state unfortunately. We've got this situation multiple times on Amazon MSK when kafka cluster was under high load, which caused failures on some brokers + "under replicated partitions" issues. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Without it we can be never sure, that we actually fixed the problem. Any script that eventually fails is fine, not necessary suitable for unit tests one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you remove exception handling from
So the unittest is still mimics the issue. Basically any exception from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there other possible race conditions there? Like initializing There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No I think not. The most possible source of exceptions is later call |
||
with pytest.raises(asyncio.TimeoutError): | ||
await create_conn(host, port) | ||
self.assertTrue(close_called) | ||
|
||
def test_connection_version_info(self): | ||
# All version supported | ||
version_info = VersionInfo({ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't be this logic inside
AIOKafkaConnection.connect()
, so that it maintains invariant too?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about that, because
AIOKafkaConnection.connect()
is not being used directly, only viacreate_conn
factory. Though, I can move this wrapping logic if you think that it should be more appropriate to have it there.