Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-345 in aiokafka (rebase of #827) #941

Merged
merged 25 commits into from
Dec 7, 2023
Merged
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2dcf7b7
Implement KIP-345 in aiokafka
patkivikram Nov 3, 2020
d02ddd9
fixing linting errors
patkivikram Nov 25, 2020
fc83b3b
fixing linting errors
patkivikram Nov 25, 2020
052b66f
fixing linting errors
patkivikram Nov 25, 2020
4aebeb1
Update tests.yml
patkivikram Dec 4, 2020
5c55d46
Fix linting errors
g-clef Dec 4, 2020
39f8fa3
Linting fixed, tests still failing
g-clef Dec 5, 2020
0c4cf27
fixed tests.
g-clef Dec 5, 2020
f1ab871
Undoing a lot of linting
g-clef Dec 7, 2020
99cc6f0
last few lints
g-clef Dec 7, 2020
5a90fc7
Update assignors.py
patkivikram Dec 8, 2020
1695902
fix linting
patkivikram Dec 8, 2020
8017671
fix lgtm exception
g-clef Dec 9, 2020
77747f2
fix trailing space
g-clef Dec 11, 2020
5d807f2
add KIP-345 tests, remove broker version check
joshuaherrera Nov 14, 2023
54d6870
Merge 'upstream/master' into kip-345, resolve conflicts
joshuaherrera Nov 15, 2023
a570839
use aiokafka AbstractPartitionAssignor
joshuaherrera Nov 15, 2023
46f0b40
only test KIP-345 mode with valid Kafka versions
joshuaherrera Nov 15, 2023
c0b934f
Update aiokafka/consumer/group_coordinator.py
joshuaherrera Nov 20, 2023
7ffca59
remove AbstractStaticPartitionAssignor
joshuaherrera Nov 20, 2023
b8b7ffd
refactor _perform_assignment to use a JoinGroupResponse class as it's…
joshuaherrera Nov 28, 2023
140d115
Merge remote-tracking branch 'upstream/master' into kip-345
joshuaherrera Nov 29, 2023
d9bc186
poll periodically in kip-345 tests
joshuaherrera Nov 30, 2023
d430920
update tests to use async_timeout
joshuaherrera Dec 1, 2023
5c60f63
update isinstance function, use JoinGroupResponse_v5 for check
joshuaherrera Dec 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2106,8 +2106,8 @@
c1_assignment = {tp0}
c2_assignment = {tp1}
else:
c1_assignment = {tp1}
c2_assignment = {tp0}

Check warning on line 2110 in tests/test_consumer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_consumer.py#L2109-L2110

Added lines #L2109 - L2110 were not covered by tests

listener1.revoke_mock.assert_called_with({tp0, tp1})
self.assertEqual(listener1.revoke_mock.call_count, 2)
Expand Down Expand Up @@ -2151,14 +2151,18 @@
# partitions after rebalance
all_partitions = frozenset(list(c1_partitions) + list(c2_partitions))
await consumer2.stop()
await asyncio.sleep(25)
# since the timeout has passed, a rebalance should occur
listener1.revoke_mock.assert_called_with(c1_assignment)
listener1.assign_mock.assert_called_with(c1_assignment.union(c2_assignment))
while True:
ods marked this conversation as resolved.
Show resolved Hide resolved
if (listener1.revoke_mock.call_count > 2
and listener1.assign_mock.call_count > 2):
break
await asyncio.sleep(1)
# this is the last rebalance for consumer1, so the count should now be
# 3.
self.assertEqual(listener1.revoke_mock.call_count, 3)
self.assertEqual(listener1.assign_mock.call_count, 3)
# since the timeout has passed, a rebalance should occur
listener1.revoke_mock.assert_called_with(c1_assignment)
listener1.assign_mock.assert_called_with(c1_assignment.union(c2_assignment))
assert all_partitions == consumer1.assignment()

@kafka_versions('>=2.3.0')
Expand Down Expand Up @@ -2226,7 +2230,11 @@
# It should since KIP-345 is inactive.
consumer2.unsubscribe()
# need to wait for rebalance
await asyncio.sleep(5)
while True:
if (listener1.revoke_mock.call_count > 2
and listener1.assign_mock.call_count > 2):
break
await asyncio.sleep(1)
self.assertEqual(listener1.revoke_mock.call_count, 3)
self.assertEqual(listener1.assign_mock.call_count, 3)
# ensure that consumer2's assigned partitions
Expand All @@ -2238,14 +2246,24 @@
await consumer2._subscription.wait_for_assignment()
# since consumer2 rejoins the group, a rebalance should occur
# for both consumers
await asyncio.sleep(5)
while True:
if (listener1.revoke_mock.call_count > 3
and listener1.assign_mock.call_count > 3
and listener2.revoke_mock.call_count > 1
and listener2.assign_mock.call_count > 1):
break
await asyncio.sleep(1)
self.assertEqual(listener2.revoke_mock.call_count, 2)
self.assertEqual(listener2.assign_mock.call_count, 2)
self.assertEqual(listener1.revoke_mock.call_count, 4)
self.assertEqual(listener1.assign_mock.call_count, 4)

# stop consumer2, which will trigger yet another rebalance
await consumer2.stop()
await asyncio.sleep(15)
while True:
if (listener1.revoke_mock.call_count > 4
and listener1.assign_mock.call_count > 4):
break
await asyncio.sleep(1)
self.assertEqual(listener1.revoke_mock.call_count, 5)
self.assertEqual(listener1.assign_mock.call_count, 5)
Loading