Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer is unable to read for zstd compressed topic #858

Closed
5 of 7 tasks
Green-Angry-Bird opened this issue May 6, 2020 · 14 comments
Closed
5 of 7 tasks

Consumer is unable to read for zstd compressed topic #858

Green-Angry-Bird opened this issue May 6, 2020 · 14 comments

Comments

@Green-Angry-Bird
Copy link

Green-Angry-Bird commented May 6, 2020

Description

Consumer is unable to read for zstd compressed topic.

Error is:

Consumer error: KafkaError{code=UNSUPPORTED_COMPRESSION_TYPE,val=76,str="Broker: Unsupported compression type"}

I have the latest librdkafka as specified in the install instructions:

librdkafka-dev is already the newest version (1.4.0~1confluent5.5.0-1).
librdkafka1 is already the newest version (1.4.0~1confluent5.5.0-1).

And the latest confluent-kafka-python

confluent-kafka        1.4.1

OS: Ubuntu 18.04.4 LTS amd64

How to reproduce

from confluent_kafka import Consumer

KAFKA_URLS = 'localhost:9092'
GROUP_ID = 'test'
TOPIC = 'test'
POLL_TIMEOUT = 5

c = Consumer({
    'bootstrap.servers': KAFKA_URLS,
    'group.id': GROUP_ID,
    'auto.offset.reset': 'earliest'
})
c.subscribe([TOPIC])

running = True
while running:
    msg = c.poll(POLL_TIMEOUT)
    if msg is None:
        continue
    if not msg.error():
        print(msg)
    elif msg.error():
        print("Consumer error: {}\n".format(msg.error()))
        running = False

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
  • Apache Kafka broker version:
  • Client configuration: {...}
  • Operating system:
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

edenhill commented May 8, 2020

Is there a reason you are not using the binary wheels of confluent-kafka-python, which includes librdkafka built with zstd?

@Green-Angry-Bird
Copy link
Author

Is there a reason you are not using the binary wheels of confluent-kafka-python, which includes librdkafka built with zstd?

You've lost me already. I followed the install instructions on the README. Is there another way to install confluent-kafka-python?

I simply used:

pip3 install confluent-kafka

When I experience the error, I then ventured deeper and eventually followed the instructions linked in the README: http://docs.confluent.io/current/installation.html#installation-apt

Even after installing librdkafka as specified there, I still get the above mentioned error.

@edenhill
Copy link
Contributor

Can you set 'debug': 'generic' in your client configuration and provide us the first handful of debug lines (where the librdkafka version and features are reported)?

@Green-Angry-Bird
Copy link
Author

Green-Angry-Bird commented May 11, 2020

@edenhill Here's the first few lines of the debug log. I removed the "Broadcasting state change" and sanitized the hostnames

%7|1589205210.783|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.4.0 (0x10400ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG INSTALL GNULD LDS LIBDL PLUGINS ZLIB SSL ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x1)
%7|1589205210.783|CONNECT|rdkafka#consumer-1| [thrd:main]: kafka03:9092/bootstrap: Selected for cluster connection: coordinator query (broker has 0 connection attempt(s))
%7|1589205210.790|PROTOERR|rdkafka#consumer-1| [thrd:kafka03:9092/bootstrap]: kafka03:9092/bootstrap: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205210.790|PROTOERR|rdkafka#consumer-1| [thrd:kafka03:9092/bootstrap]: kafka03:9092/bootstrap: ApiArrayCnt -1 out of range
%7|1589205210.795|CLUSTERID|rdkafka#consumer-1| [thrd:main]: kafka03:9092/bootstrap: ClusterId update "" -> "CqXit7a7TYuuYQiPzvQNHQ"
%7|1589205211.793|PROTOERR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1003: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205211.793|PROTOERR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1003: ApiArrayCnt -1 out of range
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka05:9092/bootstrap]: kafka05:9092/1005: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka05:9092/bootstrap]: kafka05:9092/1005: ApiArrayCnt -1 out of range
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka01:9092/bootstrap]: kafka01:9092/1002: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka01:9092/bootstrap]: kafka01:9092/1002: ApiArrayCnt -1 out of range
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka04:9092/bootstrap]: kafka04:9092/1004: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka04:9092/bootstrap]: kafka04:9092/1004: ApiArrayCnt -1 out of range
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka02:9092/bootstrap]: kafka02:9092/1001: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205216.806|PROTOERR|rdkafka#consumer-1| [thrd:kafka02:9092/bootstrap]: kafka02:9092/1001: ApiArrayCnt -1 out of range

@edenhill
Copy link
Contributor

What is your broker version?

@Green-Angry-Bird
Copy link
Author

Green-Angry-Bird commented May 11, 2020

I'm not 100% sure how to check the Kafka Broker version but the kafka jar is:

kafka_2.11-2.2.1-cp1.jar

@edenhill
Copy link
Contributor

Thank you ,that's Kafka 2.2.1 (the 2.11 is the scala version)

@edenhill
Copy link
Contributor

This is from the AK source:

 * - {@link Errors#UNSUPPORTED_COMPRESSION_TYPE} If a fetched topic is using a compression type which is
 *     not supported by the fetch request version

Which implies that compression.type=zstd is configured on the topic (broker-side config), but the client is using a FetchRequest version that is too old (<10).
The problem lies in librdkafka, it only implements FetchRequest versions 1, 2, 4 and 11, and broker 2.2.1 supports up to version 10, so librdkafka selects version 4 which does not support ZStd.

We'll fix this for the upcoming v1.5.0 release (mid june).
As a workaround I think you can set the broker's topic configuration compression.type=producer.

@Green-Angry-Bird
Copy link
Author

That's correct, the topic is configured with compression.type=zstd. I was able to test this workaround compression.type=produce on a test instance and this worked.

Oddly, the debug still prints the proto errors:

%7|1589205211.793|PROTOERR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1003: Protocol parse failure for ApiVersion v3 at 3/6 (rd_kafka_handle_ApiVersion:1911) (incorrect broker.version.fallback?)
%7|1589205211.793|PROTOERR|rdkafka#consumer-1| [thrd:GroupCoordinator]: GroupCoordinator/1003: ApiArrayCnt -1 out of range

However, the messages arrive at the consumer instead of just an error.

@edenhill
Copy link
Contributor

Those are debug logs and can be ignored

@pikrzysztof
Copy link

Ubuntu 20.04 packages this library:

kris@MACHINE $ apt search confluent       
Sorting... Done
Full Text Search... Done
golang-github-confluentinc-confluent-kafka-go-dev/focal,focal 0.11.6-1 all
  Apache Kafka Golang client by Confluent

python3-confluent-kafka/focal 1.1.0-1build1 amd64
  Python client to interact with Kafka - Python 3.x

You can also install the library from pypi through python3 -m pip install confluent-kafka.

The trouble is apt installation does not support zstd, wheres pypi installation does. Of course, you can install both and have even more confusion.

Would it be possible to synchronize these packages?

@edenhill
Copy link
Contributor

edenhill commented Nov 24, 2020

We don't maintain the Ubuntu packages, and I don't see much point in using them since both Go and Python have packaging and dependency systems builtin which will always be (vastly) more up to date than anything Debian/Ubuntu packages.

@theultimate1
Copy link

Has this been resolved?. I am working on an issue in faust and the broker level zstd compression problem stems from this package.

@edenhill
Copy link
Contributor

edenhill commented Nov 2, 2022

Yes, it was fixed in May 2020

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants