Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer hangs with broker-side zstd compression #2316

Open
he-la opened this issue Jun 2, 2022 · 6 comments
Open

Consumer hangs with broker-side zstd compression #2316

he-la opened this issue Jun 2, 2022 · 6 comments

Comments

@he-la
Copy link

he-la commented Jun 2, 2022

When setting compression.type=zstd on the broker, the consumer fails to read any messages, reporting WARNING:kafka.consumer.fetcher:Unknown error fetching data for topic-partition TopicPartition(topic='test', partition=0). Switching to compression.type=gzip or to producer-side compression (even with zstd!) fixes the issue, also for historical messages.

@RuiLoureiro
Copy link

I had issues consuming messages with zstd compression, and fixed it by installing zstandard.
There isn't any reference to zstd in the documentation, but support for it was added in #2021.

@Green-Angry-Bird
Copy link

Green-Angry-Bird commented Jul 5, 2022

@RuiLoureiro This is a separate issue related to reading the headers appropriately from the broker. @he-la even mentions that zstandard is installed and working when compression.type=produce with zstd.

The failure/error here is when a broker has configuration compression.type=zstd.

I logged this as #858 which was closed without fixing.

@Green-Angry-Bird
Copy link

A proposed workaround in #858 is to install confluent-kafka via python3 -m pip install confluent-kafka but I have not verified if this works.

@he-la
Copy link
Author

he-la commented Jul 5, 2022

@Green-Angry-Bird confluent-kafka is a separate library that wraps librdkafka, a kafka client implemented in native code. I have verified that librdkafka works with zstd compression, though not through its python wrapper.

For people looking to just consume kafka with zsdt compression, I would suggest switching to confluent-kafka until the issue is resolved.

@Green-Angry-Bird
Copy link

Sorry all, I logged this issue with confluentinc/confluent-kafka-python#858

@Rohit-Singh3
Copy link

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last):
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in
consume_from_topic(topic_to_consume)
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic
for message in consumer:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next
return self.next_v2()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter
self._maybe_uncompress()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec
raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer
def consume_from_topic(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
group_id='zstd-11-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
v = message.value
k = message.key.decode("utf-8")
log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v)
print(log)

except KeyboardInterrupt:
consumer.close()
if name == "main":
topic_to_consume = "Integrate-Package-Zstd-ESP.info"
consume_from_topic(topic_to_consume)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants