Skip to content

Commit

Permalink
Fix spec for GroupCoordinatorResponse_v1 (#961)
Browse files Browse the repository at this point in the history
  • Loading branch information
ods authored Jan 17, 2024
1 parent 8f7bff0 commit 72c1969
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
7 changes: 2 additions & 5 deletions aiokafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class AIOKafkaAdminClient:
each request to servers and can be used to identify specific
server-side log entries that correspond to this client. Also
submitted to GroupCoordinator for logging with respect to
consumer group administration. Default: 'kafka-python-{version}'
consumer group administration. Default: 'aiokafka-{version}'
request_timeout_ms (int): Client request timeout in milliseconds.
Default: 40000.
connections_max_idle_ms: Close idle connections after the number of
Expand Down Expand Up @@ -508,10 +508,7 @@ async def find_coordinator(self, group_id: str, coordinator_type: int = 0) -> in
:return int: the acting coordinator broker id
"""
# FIXME GroupCoordinatorRequest_v1 in kafka-python 2.0.2 doesn't match
# spec causing "ValueError: Buffer underrun decoding string"
# version = self._matching_api_version(GroupCoordinatorRequest)
version = self._matching_api_version(GroupCoordinatorRequest[:1])
version = self._matching_api_version(GroupCoordinatorRequest)
if version == 0 and coordinator_type:
raise IncompatibleBrokerVersion(
"Cannot query for transaction id on current broker version"
Expand Down
10 changes: 8 additions & 2 deletions aiokafka/protocol/commit.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ class GroupCoordinatorResponse_v1(Response):
API_KEY = 10
API_VERSION = 1
SCHEMA = Schema(
("throttle_time_ms", Int32),
("error_code", Int16),
("error_message", String("utf-8")),
("coordinator_id", Int32),
Expand All @@ -292,14 +293,19 @@ class GroupCoordinatorRequest_v0(Request):
API_KEY = 10
API_VERSION = 0
RESPONSE_TYPE = GroupCoordinatorResponse_v0
SCHEMA = Schema(("consumer_group", String("utf-8")))
SCHEMA = Schema(
("consumer_group", String("utf-8")),
)


class GroupCoordinatorRequest_v1(Request):
API_KEY = 10
API_VERSION = 1
RESPONSE_TYPE = GroupCoordinatorResponse_v1
SCHEMA = Schema(("coordinator_key", String("utf-8")), ("coordinator_type", Int8))
SCHEMA = Schema(
("coordinator_key", String("utf-8")),
("coordinator_type", Int8),
)


GroupCoordinatorRequest = [GroupCoordinatorRequest_v0, GroupCoordinatorRequest_v1]
Expand Down

0 comments on commit 72c1969

Please sign in to comment.