You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Consumer will stuck in start() coroutine with JoinGroupCoordinator if time between Consumer.__init__() and consumer.start() is greater than several minutes
#764
Closed
mrmerkone opened this issue
Jul 7, 2021
· 1 comment
· Fixed by #766
Describe the bug
Consumer will not perform join group request on consumer.start() and become stuck if consumer object created more than max_poll_interval_ms ago
Expected behaviour
Consumer performs join group request on consumer.start()
Environment (please complete the following information):
Python 3.8.7
aiokafka version 0.7.1 | 0.6.0
kafka-python version 2.0.2
Kafka Broker version: Apache Kafka 2.2.0-cp2 (i do not have direct access to the broker)
Other information: Confluent Platform 5.2.1
Reproducible example
importasynciofromaiokafkaimportAIOKafkaConsumerasyncdeftest_with_implicit_subscribe():
consumer=AIOKafkaConsumer("myTopic", group_id="my_group")
awaitasyncio.sleep(900) # some very long initialization here longer than max_poll_interval_msawaitconsumer.start() # will stuck hereprint("i will never be called!")
asyncdeftest_with_explicit_subscribe():
consumer=AIOKafkaConsumer(group_id="my_group")
awaitasyncio.sleep(900) # some very long initialization here longer than max_poll_interval_msawaitconsumer.subscribe(["myTopic"])
awaitconsumer.start() # will stuck hereprint("i will never be called!")
if__name__=='__main__':
loop=asyncio.get_event_loop()
loop.run_until_complete(test_with_implicit_subscribe())
loop.run_until_complete(test_with_explicit_subscribe())
Logs:
[DEBUG][2021-07-07 12:34:42,739]asyncio: Using selector: KqueueSelector
[INFO][2021-07-07 12:34:42,739]aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'myTopic'})
[DEBUG][2021-07-07 12:49:42,751]aiokafka: Attempting to bootstrap via node at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:42,780]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: MetadataRequest_v0()
[DEBUG][2021-07-07 12:49:42,992]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: MetadataResponse_v0()
[DEBUG][2021-07-07 12:49:42,996]aiokafka.cluster: Updated cluster metadata to ClusterMetadata(brokers: 2, topics: 1073, groups: 0)
[DEBUG][2021-07-07 12:49:42,996]aiokafka.conn: Closing connection at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:42,996]aiokafka: Received cluster metadata: ClusterMetadata(brokers: 2, topics: 1073, groups: 0)
[DEBUG][2021-07-07 12:49:42,997]aiokafka: Initiating connection to node 2 at some_prod_host_2
[DEBUG][2021-07-07 12:49:43,113]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:43,205]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Response 1: ApiVersionResponse_v0()
[DEBUG][2021-07-07 12:49:43,205]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Request 2: MetadataRequest_v0()
[DEBUG][2021-07-07 12:49:43,618]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_2> Response 2: MetadataResponse_v0()
[DEBUG][2021-07-07 12:49:43,618]aiokafka.conn: Closing connection at some_prod_host_2
[DEBUG][2021-07-07 12:49:43,619]aiokafka: Sending FindCoordinator request for key my_group to broker 1
[DEBUG][2021-07-07 12:49:43,619]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,620]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,705]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: ApiVersionResponse_v0()
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 2: FindCoordinatorRequest_v1()
[DEBUG][2021-07-07 12:49:43,805]aiokafka: Sending metadata request MetadataRequest_v1() to node 1
[DEBUG][2021-07-07 12:49:43,805]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 3: MetadataRequest_v1()
[DEBUG][2021-07-07 12:49:43,880]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 2: FindCoordinatorResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 3: MetadataResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka: Received group coordinator response FindCoordinatorResponse_v1()
[DEBUG][2021-07-07 12:49:43,881]aiokafka: Initiating connection to node 1 at some_prod_host_1:9092
[DEBUG][2021-07-07 12:49:43,882]aiokafka.cluster: Updated cluster metadata to ClusterMetadata(brokers: 2, topics: 1, groups: 0)
[DEBUG][2021-07-07 12:49:44,023]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Request 1: ApiVersionRequest_v0()
[DEBUG][2021-07-07 12:49:44,131]aiokafka.conn: <AIOKafkaConnection host=some_prod_host_1 port=9092> Response 1: ApiVersionResponse_v0()
[INFO][2021-07-07 12:49:44,131]aiokafka.consumer.group_coordinator: Discovered coordinator 1 for group my_group
[INFO][2021-07-07 12:49:44,131]aiokafka.consumer.group_coordinator: Revoking previously assigned partitions set() for group my_group
The text was updated successfully, but these errors were encountered:
Describe the bug
Consumer will not perform join group request on consumer.start() and become stuck if consumer object created more than
max_poll_interval_ms
agoExpected behaviour
Consumer performs join group request on consumer.start()
Environment (please complete the following information):
Reproducible example
Logs:
The text was updated successfully, but these errors were encountered: