Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable kafka static-partition-assignment (KIP-345) #600

Conversation

Mopsgeschwindigkeit
Copy link
Contributor

With version v0.10.0 aiokafka implemented and tested KIP-345 (static partition assignment, PR ->aio-libs/aiokafka#941 ).

With this change the class aiokafka.AIOKafkaConsumer() accepts the group_instance_Id parameter.

Copy link

codecov bot commented Jan 10, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Comparison is base (e91c55f) 93.73% compared to head (32f9e64) 93.73%.

Additional details and impacted files
@@           Coverage Diff           @@
##           master     #600   +/-   ##
=======================================
  Coverage   93.73%   93.73%           
=======================================
  Files         102      102           
  Lines       11158    11158           
  Branches     1536     1536           
=======================================
  Hits        10459    10459           
  Misses        612      612           
  Partials       87       87           

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@wbarnha wbarnha requested a review from patkivikram January 10, 2024 20:17
Copy link
Member

@wbarnha wbarnha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issue on my end, now that it's supported. I'll defer to the opinion of Vikram since he knows more about static assignment, in case there are any caveats.

@@ -787,7 +787,7 @@ def assert_create_worker_consumer(
api_version=app.conf.consumer_api_version,
client_id=conf.broker_client_id,
group_id=conf.id,
# group_instance_id=conf.consumer_group_instance_id,
group_instance_id=conf.consumer_group_instance_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this passed in from the application? As long this is passed in we are good here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sections.Consumer.setting(
params.Str,
version_introduced="2.1",
env_name="CONSUMER_GROUP_INSTANCE_ID",
default=None,
)
def consumer_group_instance_id(self) -> str:
"""Consumer group instance id.
The group_instance_id for static partition assignment.
If not set, default assignment strategy is used. Otherwise,
each consumer instance has to have a unique id.
"""

Looks like it's been passed in for some time, just never got used.

@wbarnha wbarnha merged commit 4a09533 into faust-streaming:master Jan 11, 2024
21 of 23 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants