-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Consumer fails when consuming messages compressed with ZSTD at the broker level #843
Comments
Have you tried with code from master branch? It should have been fixed in #801 |
Yes, its happening with the latest version from the master branch. Just reconfirmed it. Here's the full log output.
|
Do we have an update on this? |
Hey guys! I'm just joining the club with the same issue. Any updates? |
Is anyone else working on this because I am trying to integrate this into faust but if it's not added here. I will work on first adding support here and then integrating it into faust later |
Hey, so the zstd error comes because of a fetch request versioning error. Kafka python and by extension this package sends fetch request version 4 and ZSTD support requires version 10 at least. this requires the need for fetch sessions,removed_topics and log_start_offset,current_leader_epoch to be added to the fetch request. I already tested to see if this issue becomes resolved when upgrading the version with default values for those parameters. It does work. I plan to add these features to kafka-python to put into a pull request asap. However, in aiokafka's case I just wanted to check in to see if fetch sessions will interfere the asynchronous nature of this module. this KIP has information on fetch sessions. I plan to see for myself if it will interfere anyway but it would be much more efficient if someone can do the check for me. |
getting below error when trying to consume from a ZSTD compressed record from kafka. raise UnsupportedCodecError( error log: Traceback (most recent call last): Process finished with exit code 1 My Code for consuming a kafka topic: from kafka import KafkaConsumer except KeyboardInterrupt: |
Did you specify extra when installing aiokafka? Either |
@theultimate1 I hit the same problem. Your approach to update Fetch to v10 is spot on. Wondering if you could find time to make a PR soon? Many thanks! |
I will try to get some actual work done for this. been busy lol. This is definitely on my todo. |
@theultimate1 - if I can lend a hand let me know (but you'll need to brief me on what needs doing/what you've discovered). This is currently a slight blocker for ideal operation on my side. |
yup I am cleaning up right now. will put in a PR by this week |
@theultimate1 Hi! Is your PR was merged? |
aiokafka/aiokafka/consumer/fetcher.py Lines 430 to 438 in 01c60cd
need to support at least fetch request v10 to consume zstd message |
Describe the bug
When consuming messages from a broker where compression has been set at the broker level with ZSTD compression, the Consumer fails with an UnknownError. This does not happen when ZSTD is set at a producer level instead. This behaviour is observed both in the latest pypi release, as well as in the latest version from the master branch in this repository
Expected behaviour
Consumer should be able to consume messages with ZSTD compression, when its set at a broker level, just the same as when its set at a producer level.
Environment (please complete the following information):
Reproducible example
When setting up the Broker, add to the server.properties file
On the contrary, when if compression.type is omitted in the server.properties file and instead you create a producer with zstd compression, it works fine.
The text was updated successfully, but these errors were encountered: