Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement KIP-345 in aiokafka (rebase of #827) #941

Merged
merged 25 commits into from
Dec 7, 2023

Conversation

joshuaherrera
Copy link
Contributor

Changes

Fixes #680

This is a rebase of #827 that adds test for KIP-345 functionality - See #827 (comment)

Checklist

  • I think the code is well written
  • Unit tests for the changes exist
  • Documentation reflects the changes
  • Add a new news fragment into the CHANGES folder
    • name it <issue_id>.<type> (e.g. 588.bugfix)
    • if you don't have an issue_id change it to the pr id after creating the PR
    • ensure type is one of the following:
      • .feature: Signifying a new feature.
      • .bugfix: Signifying a bug fix.
      • .doc: Signifying a documentation improvement.
      • .removal: Signifying a deprecation or removal of public API.
      • .misc: A ticket has been closed, but it is not of interest to users.
    • Make sure to use full sentences with correct case and punctuation, for example: Fix issue with non-ascii contents in doctest text files.

@joshuaherrera
Copy link
Contributor Author

joshuaherrera commented Nov 15, 2023

@ods Here's the PR for the tests, I'll work on getting merge conflicts fixed on my end since it's only two files, hopefully by EOD today.

Edit: Merge conflicts resolved.

aiokafka/consumer/group_coordinator.py Outdated Show resolved Hide resolved
aiokafka/consumer/assignors.py Outdated Show resolved Hide resolved
aiokafka/consumer/group_coordinator.py Outdated Show resolved Hide resolved
aiokafka/consumer/group_coordinator.py Outdated Show resolved Hide resolved
Copy link

codecov bot commented Nov 29, 2023

Codecov Report

Attention: 5 lines in your changes are missing coverage. Please review.

Comparison is base (d7201c1) 94.89% compared to head (5c60f63) 94.91%.
Report is 1 commits behind head on master.

Files Patch % Lines
tests/test_consumer.py 97.58% 2 Missing and 1 partial ⚠️
aiokafka/consumer/group_coordinator.py 95.23% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #941      +/-   ##
==========================================
+ Coverage   94.89%   94.91%   +0.01%     
==========================================
  Files         106      106              
  Lines       16136    16318     +182     
  Branches     2592     2615      +23     
==========================================
+ Hits        15313    15488     +175     
- Misses        549      554       +5     
- Partials      274      276       +2     
Flag Coverage Δ
cext 91.59% <95.67%> (+0.01%) ⬆️
integration 94.56% <95.67%> (-0.01%) ⬇️
purepy 94.42% <95.67%> (+<0.01%) ⬆️
unit 44.90% <20.19%> (-0.30%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tvoinarovskyi
Copy link
Member

Hi joshuaherrera,

Thanks for the PR, will be joining the review. Really nice, clean PR, thanks for the contribution. Added a few small points to consider.
Could you do a rebase to master, seems like we have a few broken CI runs due to upstream changes?

while try_join:
try_join = False

if self._api_version < (0, 10, 1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we abstract out this into a function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This depends on the KIP-394 decision, since this is tied to that. If we move that to another PR than this would revert to how we currently handle this in master.

@@ -1202,46 +1217,67 @@ async def perform_group_join(self):
metadata = metadata.encode()
group_protocol = (assignor.name, metadata)
metadata_list.append(group_protocol)
# for KIP-394 we may have to send a second join request
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a completely different effort, maybe worth separating out to another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bulk of this addition is captured in the new while loop wrapping lines 1222-1279, and the newly added MemberIdRequired error. KIP-394 introduces the MemberIdRequired error, and according to the KIP, if the client gets this error they should retry the join with the previous member id.

I'd prefer to leave it in since the original author thought it necessary and the KIP itself calls out that this is "an important complement to KIP-345" but if you think this should be moved to it's own PR (and would delay this PR) I can remove the code related to KIP-394 from this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tvoinarovskyi Any updates on next steps? If you or @ods think it's best to remove this and leave it for another PR, I'm happy to do so, since it seems like this is the last unresolved thread for this PR. I don't want to hold this up unnecessarily.

# partitions after rebalance
all_partitions = frozenset(list(c1_partitions) + list(c2_partitions))
await consumer2.stop()
await asyncio.sleep(25)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we document here exactly the reason this is 25s? As I understand it is derived from default timeouts, maybe we can make it as low as 10s?
The reason I am asking is that CI has limited time to execute and stops like those increase runtime constantly. A better approach would be to poll for results, for example, wait for every second till listener1.revoke_mock.call_count > 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make the polling change as recommended.


# stop consumer2, which will trigger yet another rebalance
await consumer2.stop()
await asyncio.sleep(15)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

tests/test_consumer.py Outdated Show resolved Hide resolved
@ods ods merged commit bc14e6d into aio-libs:master Dec 7, 2023
29 of 31 checks passed
@tartieret
Copy link

This is very exciting ! Thanks a lot for taking care of this. Do you have any idea when this could be released?

@ods
Copy link
Collaborator

ods commented Dec 13, 2023

@tartieret Please try it out and provide feedback whether it works as expected.

@tartieret
Copy link

@tartieret Please try it out and provide feedback whether it works as expected.

I'll have someone in my team look at this over the next few days

@igulati
Copy link

igulati commented Dec 28, 2023

@tartieret Please try it out and provide feedback whether it works as expected.

hey @ods , replying on behalf of @tartieret, I just tested these changes and they seems to be working for me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support for static membership protocol
7 participants