Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

admin client - failure to create topics (error code 41) #995

Closed
kbhatiya999 opened this issue Apr 1, 2024 · 3 comments · Fixed by #996
Closed

admin client - failure to create topics (error code 41) #995

kbhatiya999 opened this issue Apr 1, 2024 · 3 comments · Fixed by #996

Comments

@kbhatiya999
Copy link

Describe the bug
Creating topic fails with topic error 41

Expected behaviour
A topic should be created

Environment (please complete the following information):

  • aiokafka version (python -c "import aiokafka; print(aiokafka.__version__)"): 0.10.0
  • Kafka Broker version (kafka-topics.sh --version): 3.7.0
  • Other information (Confluent Cloud version, etc.): NA

Reproducible example

version: '3'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka1:
    image: confluentinc/cp-kafka:latest
    hostname: kafka1
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka1:29092,OUTSIDE://localhost:9092 
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper

  kafka2:
    image: confluentinc/cp-kafka:latest
    hostname: kafka2
    ports:
      - "9093:9093"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka2:29093,OUTSIDE://localhost:9093
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29093,OUTSIDE://0.0.0.0:9093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper

  kafka3:
    image: confluentinc/cp-kafka:latest
    hostname: kafka3
    ports:
      - "9094:9094"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka3:29094,OUTSIDE://localhost:9094
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29094,OUTSIDE://0.0.0.0:9094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE 
    depends_on:
      - zookeeper
import asyncio
import uuid

from aiokafka.admin import AIOKafkaAdminClient, NewPartitions, NewTopic


async def main():
    random_topic_name = "topic-" + str(uuid.uuid4())
    # create a new topic
    admin = AIOKafkaAdminClient(bootstrap_servers="localhost:9092")
    await admin.start()
    iteration = 0
    while True:
        response = await admin.create_topics(
            [NewTopic(name=random_topic_name, num_partitions=1, replication_factor=1)]
        )
        print(f"----------{iteration=}-----------")
        # if error_code is not 0, then topic creation failed
        if any(
            error_code for (topic, error_code, error_message) in response.topic_errors
        ):
            print(
                f"Error creating topic as request sent to non-controller Node :: error_code='{response.topic_errors[0][1]}' error_message='{response.topic_errors[0][2]}'"
            )
            print("Retrying topic creation until successful...")

        # if error_code is 0, then topic creation was successful
        if not any(
            error_code for (topic, error_code, error_message) in response.topic_errors
        ):
            print(
                f"Topic '{random_topic_name}' created successfully. Request sent to controller Node."
            )
            break
        iteration += 1

    await asyncio.sleep(5)
    # delete the topic
    response = await admin.delete_topics([random_topic_name])
    print(f"Response from delete_topics: {response}")


async def check_version():
    admin: AIOKafkaAdminClient = AIOKafkaAdminClient(
        bootstrap_servers="localhost:9092",
    )
    await admin.start()
    version = await admin._get_cluster_metadata()
    print(version)


# asyncio.run(check_version())
asyncio.run(main())
@ods
Copy link
Collaborator

ods commented Apr 3, 2024

Hi @kbhatiya999, thank you very much for reproducing the problem! I'll definitely find time to dive into. There can be some delay though, as things are too busy today.

@ods
Copy link
Collaborator

ods commented Apr 7, 2024

Hi @kbhatiya999, could you please check this pull request? It's no finished yet: we also have to identify other methods to apply the same approach.

@kbhatiya999
Copy link
Author

Yes let me take a look

@ods ods closed this as completed in #996 Apr 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants