-
Notifications
You must be signed in to change notification settings - Fork 923
Description
I'm trying to resolve an issue I've faced with a consumer finalization:
A consumer had subscribed to a Topic and was processing data from the Topic. Then the Topic was deleted.
After some time the Consumer received error about non-existing topic and my application tried to perform consumer.close()
But unfortunately, the close() hang.
I've tried to investigate it a bit and see that it stuck at some wait lock "futex(0x7fc6f2ffd9d0, FUTEX_WAIT, 32, NULL"
I am not familiar much with rdkafka or confluent-kafka-python projects but see one inconsistency:
here rd_kafka_queue_destroy is called after consumer close
rd_kafka_queue_destroy(self->u.Consumer.rkqu); |
but documentation says that it should be called before
https://github.com/confluentinc/librdkafka/blob/4b63c6c8881968f7185da33f09cbd69561bb612c/src/rdkafka.h#L3432