-
Notifications
You must be signed in to change notification settings - Fork 187
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable kafka static-partition-assignment (KIP-345) #600
enable kafka static-partition-assignment (KIP-345) #600
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #600 +/- ##
=======================================
Coverage 93.73% 93.73%
=======================================
Files 102 102
Lines 11158 11158
Branches 1536 1536
=======================================
Hits 10459 10459
Misses 612 612
Partials 87 87 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No issue on my end, now that it's supported. I'll defer to the opinion of Vikram since he knows more about static assignment, in case there are any caveats.
@@ -787,7 +787,7 @@ def assert_create_worker_consumer( | |||
api_version=app.conf.consumer_api_version, | |||
client_id=conf.broker_client_id, | |||
group_id=conf.id, | |||
# group_instance_id=conf.consumer_group_instance_id, | |||
group_instance_id=conf.consumer_group_instance_id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this passed in from the application? As long this is passed in we are good here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
faust/faust/types/settings/settings.py
Lines 1106 to 1119 in e91c55f
@sections.Consumer.setting( | |
params.Str, | |
version_introduced="2.1", | |
env_name="CONSUMER_GROUP_INSTANCE_ID", | |
default=None, | |
) | |
def consumer_group_instance_id(self) -> str: | |
"""Consumer group instance id. | |
The group_instance_id for static partition assignment. | |
If not set, default assignment strategy is used. Otherwise, | |
each consumer instance has to have a unique id. | |
""" |
Looks like it's been passed in for some time, just never got used.
With version v0.10.0 aiokafka implemented and tested KIP-345 (static partition assignment, PR ->aio-libs/aiokafka#941 ).
With this change the class
aiokafka.AIOKafkaConsumer()
accepts thegroup_instance_Id
parameter.