Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-3177: consumer hangs when assigned to partition that doesn't exist #686

Open
sibiryakov opened this issue May 11, 2016 · 4 comments
Open

Comments

@sibiryakov
Copy link
Contributor

sibiryakov commented May 11, 2016

Hi guys,
the code below doesn't work. I'm probably doing something wrong, but I couldn't make it to work.
Kafka-python version is 1.1.1
and Kafka is 0.8.2.2.

# -*- coding: utf-8 -*-
from kafka import KafkaConsumer, KafkaProducer, TopicPartition


def test_it():
    consumer = KafkaConsumer(bootstrap_servers="localhost:9092", consumer_timeout_ms=100)
    consumer.assign([TopicPartition("frontera-todo", 1)])  # make sure frontera-todo has only one partition

    m = next(consumer)  # hang

    consumer.close()

test_it()

Kafka broker output
[2016-05-11 14:46:14,669] ERROR Closing socket for /0:0:0:0:0:0:0:1 because of error (kafka.network.Processor) kafka.common.KafkaException: Wrong request type 16 at kafka.api.RequestKeys$.deserializerForKey(RequestKeys.scala:64) at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50) at kafka.network.Processor.read(SocketServer.scala:450) at kafka.network.Processor.run(SocketServer.scala:340) at java.lang.Thread.run(Thread.java:745)

Traceback on KeyboardInterrupt

Traceback (most recent call last):
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 25, in <module>
    test_it()
  File "/Users/sibiryakov/src/sh/crawl-frontier/frontera/tests/test_kafka2.py", line 11, in test_it
    m = next(consumer)
  File "/usr/local/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 844, in __next__
    return next(self._iterator)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 779, in _message_generator
    self._update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 762, in _update_fetch_positions
    self._fetcher.update_fetch_positions(partitions)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 162, in update_fetch_positions
    self._reset_offset(tp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 188, in _reset_offset
    offset = self._offset(partition, timestamp)
  File "/usr/local/lib/python2.7/site-packages/kafka/consumer/fetcher.py", line 221, in _offset
    self._client.poll(future=refresh_future, sleep=True)
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 430, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "/usr/local/lib/python2.7/site-packages/kafka/client_async.py", line 445, in _poll
    for key, events in self._selector.select(timeout):
  File "/usr/local/lib/python2.7/site-packages/kafka/selectors34.py", line 598, in select
    kev_list = self._kqueue.control(None, max_ev, timeout)
KeyboardInterrupt`

It certainly shouldn't hang, instead raise an exception.

@sibiryakov sibiryakov changed the title KafkaConsumer is hang when assigned to partition that doesn't exist KafkaConsumer is hanging when assigned to partition that doesn't exist May 11, 2016
@dpkp
Copy link
Owner

dpkp commented May 22, 2016

See https://issues.apache.org/jira/browse/KAFKA-3177 -- I will follow whatever fix the apache client lands on. For now, it's a known issue.

Also, the Kafka broker error is unrelated (this is because kafka-python attempts to ping various apis to determine the broker version... API 16 is a new api only available on 0.10).

@dpkp dpkp changed the title KafkaConsumer is hanging when assigned to partition that doesn't exist KAFKA-3177: consumer hangs when assigned to partition that doesn't exist May 22, 2016
@bwilliams42
Copy link

@sibiryakov late, yes, but one way around this is to check if a partition exists for a topic before trying to poll or do anything with it. use the partitions_for_topic method

@jeffwidman
Copy link
Contributor

Will this apply to SimpleConsumer as well?

Not sure what level the problem manifests, and I know both SimpleConsumer and KafkaConsumer share some of the lower level components such as BrokerConnection etc.

@bwilliams42
Copy link

@jefwidman, from a quick look at the code, it doesn't seem so. I don't see a way to get the partitions for a topic using the SimpleConsumer.

tanaypf9 pushed a commit to tanaypf9/pf9-requirements that referenced this issue May 20, 2024
Patch Set 1:

Some of the issues I'm concerned about:

dpkp/kafka-python#674

dpkp/kafka-python#686

dpkp/kafka-python#579

dpkp/kafka-python#551

Patch-set: 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants