Skip to content

Commit

Permalink
Admin client
Browse files Browse the repository at this point in the history
*  Use controller node for create and delete topic instead of random

* Fixes error code 41 on create and delete topic
  • Loading branch information
kbhatiya999 committed Nov 13, 2022
1 parent 70dea78 commit 3d5b58e
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions aiokafka/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,10 @@ async def create_topics(
"Support for CreateTopics v{} has not yet been added "
"to AIOKafkaAdminClient."
.format(version))

metadata = await self._get_cluster_metadata()
response = await self._client.send(
self._client.get_random_node(),
metadata.controller_id,
request)
return response

Expand All @@ -212,7 +214,8 @@ async def delete_topics(
version = self._matching_api_version(DeleteTopicsRequest)
req_cls = DeleteTopicsRequest[version]
request = req_cls(topics, timeout_ms or self._request_timeout_ms)
response = await self._send_request(request)
metadata = await self._get_cluster_metadata()
response = await self._send_request(request, node_id=metadata.controller_id)
return response

async def _get_cluster_metadata(
Expand Down

0 comments on commit 3d5b58e

Please sign in to comment.