diff --git a/aiokafka/admin.py b/aiokafka/admin.py index c3121960..fab161ec 100644 --- a/aiokafka/admin.py +++ b/aiokafka/admin.py @@ -193,8 +193,10 @@ async def create_topics( "Support for CreateTopics v{} has not yet been added " "to AIOKafkaAdminClient." .format(version)) + + metadata = await self._get_cluster_metadata() response = await self._client.send( - self._client.get_random_node(), + metadata.controller_id, request) return response @@ -212,7 +214,8 @@ async def delete_topics( version = self._matching_api_version(DeleteTopicsRequest) req_cls = DeleteTopicsRequest[version] request = req_cls(topics, timeout_ms or self._request_timeout_ms) - response = await self._send_request(request) + metadata = await self._get_cluster_metadata() + response = await self._send_request(request, node_id=metadata.controller_id) return response async def _get_cluster_metadata(