Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for zstd compression #2021

Merged
merged 9 commits into from
Sep 7, 2020

Conversation

gabriel-tincu
Copy link
Contributor

@gabriel-tincu gabriel-tincu commented Mar 15, 2020

The ZSTD relevant code, in a separate PR, @jeffwidman


This change is Reviewable

test/test_producer.py Outdated Show resolved Hide resolved
test/test_producer.py Outdated Show resolved Hide resolved
test/test_producer.py Outdated Show resolved Hide resolved
@jeffwidman jeffwidman changed the title Gabi kafka client zstd codec Add support for zstd compression Mar 16, 2020
@jeffwidman
Copy link
Collaborator

Also, in https://kafka-python.readthedocs.io/en/master/#compression can you mention support for zstd?

A clean way to do it would be to make it a bullet pointed list...

kafka-python supports the following compression formats:

  • gzip - support built-in
  • lz4 - requires installing python-lz4
  • snappy - requires installing python-snappy (which requires the snappy C library)
  • zstd - requires installing zstandard

@gabriel-tincu gabriel-tincu force-pushed the gabi-kafka-client-zstd-codec branch from aa6b827 to 7ca9851 Compare March 19, 2020 15:41
@jeffwidman
Copy link
Collaborator

Now that #2020 is merged, can you please rebase this on top of that?

@gabriel-tincu gabriel-tincu force-pushed the gabi-kafka-client-zstd-codec branch from 356bd3a to 95b06ce Compare March 25, 2020 15:38
@gabriel-tincu
Copy link
Contributor Author

gabriel-tincu commented Mar 25, 2020

Now that #2020 is merged, can you please rebase this on top of that?

done, though it seems github diff algorithm or needs to refresh master the branch tip. I could reopen it, but that would mean losing the conversation history. Any thoughts ?

https://github.com/dpkp/kafka-python/compare/f9e0264..29d812f
Updated with new diff link

@jeffwidman
Copy link
Collaborator

The diff is appearing fine for me.

Typically what you'd do in this case is git rebase -i origin/master and then squash all these commits into a single one...or just a couple of logically separated ones... ie, the fixup commits get merged together.

@jeffwidman
Copy link
Collaborator

Actually strike that, in this diff I'm suddenly seeing the log_start_offset stuff that has already been merged...

Are you sure you pulled latest master and then rebased on top of that?

@gabriel-tincu
Copy link
Contributor Author

Actually strike that, in this diff I'm suddenly seeing the log_start_offset stuff that has already been merged...

Are you sure you pulled latest master and then rebased on top of that?

i rebased on top of the merged branch, due to both PR's being made from a forked version of the repo. That is to say, that repo's master is not in sync with this repo's master, for various reasons.
That is why i linked the compare link, which compares the tips of both branches (or rather tip-1 for the source branch, as i made a commit just a few minutes ago), to illustrate that using compare, the diff appears as it should

Copy link
Collaborator

@jeffwidman jeffwidman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of questions... I tried to only review the zstd stuff

kafka/codec.py Show resolved Hide resolved
test/test_producer.py Show resolved Hide resolved
test/test_producer.py Outdated Show resolved Hide resolved
@jeffwidman
Copy link
Collaborator

i rebased on top of the merged branch, due to both PR's being made from a forked version of the repo. That is to say, that repo's master is not in sync with this repo's master, for various reasons.
That is why i linked the compare link, which compares the tips of both branches (or rather tip-1 for the source branch, as i made a commit just a few minutes ago), to illustrate that using compare, the diff appears as it should

Sorry, I wasn't clear... you don't want to rebase on top of the merged PR branch, you want to rebase on top of master of this repo, which contains the merged branch...

We'll need to see the diff so we can see exactly what is getting merged.

@gabriel-tincu
Copy link
Contributor Author

gabriel-tincu commented Mar 25, 2020

i rebased on top of the merged branch, due to both PR's being made from a forked version of the repo. That is to say, that repo's master is not in sync with this repo's master, for various reasons.
That is why i linked the compare link, which compares the tips of both branches (or rather tip-1 for the source branch, as i made a commit just a few minutes ago), to illustrate that using compare, the diff appears as it should

Sorry, I wasn't clear... you don't want to rebase on top of the merged PR branch, you want to rebase on top of master of this repo, which contains the merged branch...

We'll need to see the diff so we can see exactly what is getting merged.

For future reference, that would imply updating the origin first, then doing the actual rebase, or just adding a new remote, correct? I'll try to open a new PR with an identical but differently named branch. It seems the diff looks the same ... I can't say i understand this behaviour. The rebase left the merged branch commits intact, then calculated new hashes for this pr's commits. The diff should not contain the already merged changes

@jeffwidman
Copy link
Collaborator

jeffwidman commented Mar 26, 2020

Can you just pull latest master from this repo and then merge that master into this branch? It should fix the diff w/o needing to rebase or anything...

And then I'll cleanup the history when I squash merge. If this truly doesn't work, then it's a weird github bug, but I expect it should work since we often do that workflow at my day job with no issues.

@gabriel-tincu
Copy link
Contributor Author

Can you just pull latest master from this repo and then merge that master into this branch? It should fix the diff w/o needing to rebase or anything...

And then I'll cleanup the history when I squash merge. If this truly doesn't work, then it's a weird github bug, but I expect it should work since we often do that workflow at my day job with no issues.

That seems to have worked

@jeffwidman
Copy link
Collaborator

IMO this is mostly ready to go, except for #2021 (comment)... fixing the root issue of that is outside the scope of this particular PR, but in the meantime I'd rather not add that workaround hack.

It looks like #2024 takes a stab at fixing the root cause, let's see if we can land that and then land this...

@Green-Angry-Bird
Copy link

Our team is looking for exactly this. When can we merge to master. :D

@tvoinarovskyi
Copy link
Collaborator

@gabriel-tincu Sorry, but could you rebase this over master now? Thanks =)

Gabriel Tincu added 2 commits May 5, 2020 21:42
Update readme
zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies
Decrease fallback max message size due to OOM concerns
Rewrite the 1MB message limit
Test producer constructor makes use of broker version inference again, also add logic for zstd decompression
@gabriel-tincu gabriel-tincu force-pushed the gabi-kafka-client-zstd-codec branch from 3372b10 to 5002d67 Compare May 5, 2020 20:10
@gabriel-tincu
Copy link
Contributor Author

@gabriel-tincu Sorry, but could you rebase this over master now? Thanks =)

@tvoinarovskyi Done. I noticed that tox fails specifically for python3.7, but with an error related to tox itself rather than anything related to this PR

@gabriel-tincu gabriel-tincu requested a review from jeffwidman May 5, 2020 20:12
@Green-Angry-Bird
Copy link

👍 I can't wait for my consumer to not crash on zstd encoded messages.

@Green-Angry-Bird
Copy link

Is there any update on this? Our team is looking forward to using this library with our zstd encoded topics.

Copy link
Collaborator

@tvoinarovskyi tvoinarovskyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeffwidman I reviewed this once more time, seems good to go.

@Green-Angry-Bird
Copy link

Is this ready for merge?

@brokenjacobs
Copy link

ping?

@Green-Angry-Bird
Copy link

I'm a little worried kafka-python is no longer maintained.

For anyone else experiencing this issue; I've created a workaround. I created this workaround in the hopes that kafka-python would be supporting zstd in the near future. While not ideal, this solution pipes the data from the zstd topic into a new topic which has been configured with gzip compression. It requires command-line access on the broker and can be monitored in the Kafka Manager via group ID.

Step 1: Create a new topic and be sure to set compression.type=gzip
kafka-topics --create --bootstrap-server $BROKERS --replication-factor $REPLICAS --partitions $PARTITIONS --config compression.type=gzip --config retention.ms=$RETENTION_MS --config retention.bytes=$RETENTION_BYTES --topic $TOPIC-gzip

Step 2: Create / Reset a consumer group (to track the mirror)
kafka-consumer-groups --bootstrap-server $BROKERS --group $GROUP --topic $TOPIC --reset-offsets --to-earliest --execute

Step 3: Begin the mirror threads
kafka-console-consumer --bootstrap-server $BROKERS --group $GROUP --topic $TOPIC | kafka-console-producer --broker-list $BROKERS --compression-codec gzip --topic $TOPIC-gzip > $TOPIC.log 2> $TOPIC.err

I hope this helps someone else.

@gabriel-tincu
Copy link
Contributor Author

I'm a little worried kafka-python is no longer maintained.

For anyone else experiencing this issue; I've created a workaround. I created this workaround in the hopes that kafka-python would be supporting zstd in the near future. While not ideal, this solution pipes the data from the zstd topic into a new topic which has been configured with gzip compression. It requires command-line access on the broker and can be monitored in the Kafka Manager via group ID.

Step 1: Create a new topic and be sure to set compression.type=gzip
kafka-topics --create --bootstrap-server $BROKERS --replication-factor $REPLICAS --partitions $PARTITIONS --config compression.type=gzip --config retention.ms=$RETENTION_MS --config retention.bytes=$RETENTION_BYTES --topic $TOPIC-gzip

Step 2: Create / Reset a consumer group (to track the mirror)
kafka-consumer-groups --bootstrap-server $BROKERS --group $GROUP --topic $TOPIC --reset-offsets --to-earliest --execute

Step 3: Begin the mirror threads
kafka-console-consumer --bootstrap-server $BROKERS --group $GROUP --topic $TOPIC | kafka-console-producer --broker-list $BROKERS --compression-codec gzip --topic $TOPIC-gzip > $TOPIC.log 2> $TOPIC.err

I hope this helps someone else.

@Green-Angry-Bird you could just use the source branch HEAD of this PR in your requirements file (assuming that's how you handle dependencies) until this gets merged. Taking the current global situation into account i think some delay in response on an open source project is perfectly understandable

@Green-Angry-Bird
Copy link

Green-Angry-Bird commented Jul 13, 2020

@Green-Angry-Bird you could just use the source branch HEAD of this PR in your requirements file (assuming that's how you handle dependencies) until this gets merged. Taking the current global situation into account i think some delay in response on an open source project is perfectly understandable

Forgive my ignorance @gabriel-tincu but how would I do that?

I've tried these but they all give different errors:

-e git://github.com/aiven/kafka-python/tree/gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/dpkp/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python

@gabriel-tincu
Copy link
Contributor Author

@Green-Angry-Bird you could just use the source branch HEAD of this PR in your requirements file (assuming that's how you handle dependencies) until this gets merged. Taking the current global situation into account i think some delay in response on an open source project is perfectly understandable

Forgive my ignorance @gabriel-tincu but how would I do that?

I've tried these but they all give different errors:

-e git://github.com/aiven/kafka-python/tree/gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/dpkp/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python

What error does the third option throw? Also, have you tried using it without the #egg=kafka-python at the end?
pip install git+https://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec seems to work in a brand new virtualenv without any errors for me

@Green-Angry-Bird
Copy link

@Green-Angry-Bird you could just use the source branch HEAD of this PR in your requirements file (assuming that's how you handle dependencies) until this gets merged. Taking the current global situation into account i think some delay in response on an open source project is perfectly understandable

Forgive my ignorance @gabriel-tincu but how would I do that?
I've tried these but they all give different errors:

-e git://github.com/aiven/kafka-python/tree/gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/dpkp/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python
-e git://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python

What error does the third option throw? Also, have you tried using it without the #egg=kafka-python at the end?
pip install git+https://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec seems to work in a brand new virtualenv without any errors for me

The error was

$ pip3 install -r requirements.txt 
Obtaining kafka-python from git+git://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python (from -r requirements.txt (line 2))
  WARNING: git clone in ./src/kafka-python exists with URL git://github.com/dpkp/kafka-python
  WARNING: The plan is to install the git repository git://github.com/aiven/kafka-python
What to do?  (s)witch, (i)gnore, (w)ipe, (b)ackup s
  Switching clone ./src/kafka-python to git://github.com/aiven/kafka-python (to revision gabi-kafka-client-zstd-codec)
  Running command git config remote.origin.url git://github.com/aiven/kafka-python
  Running command git checkout -q gabi-kafka-client-zstd-codec
  error: pathspec 'gabi-kafka-client-zstd-codec' did not match any file(s) known to git
ERROR: Command errored out with exit status 1: git checkout -q gabi-kafka-client-zstd-codec Check the logs for full command output.

However, it seems the issue is I needed to prepend it with git+

-e git+git://github.com/aiven/kafka-python@gabi-kafka-client-zstd-codec#egg=kafka-python

^ works :)

@dpkp dpkp merged commit a27ab88 into dpkp:master Sep 7, 2020
gabriel-tincu pushed a commit to aiven/kafka-python that referenced this pull request Sep 22, 2020
@hackaugusto hackaugusto deleted the gabi-kafka-client-zstd-codec branch January 11, 2021 10:26
@Rohit-Singh3
Copy link

getting below error when trying to consume from a ZSTD compressed record from kafka.

raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

error log:

Traceback (most recent call last):
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 25, in
consume_from_topic(topic_to_consume)
File "/home/PycharmProjects/myPythonLearning/kafka/consumer/consumer.py", line 14, in consume_from_topic
for message in consumer:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1193, in next
return self.next_v2()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1201, in next_v2
return next(self._iterator)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 1116, in _message_generator_v2
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 655, in poll
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/group.py", line 708, in _poll_once
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 344, in fetched_records
self._next_partition_records = self._parse_fetched_data(completion)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 818, in _parse_fetched_data
unpacked = list(self._unpack_message_set(tp, records))
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/consumer/fetcher.py", line 467, in _unpack_message_set
for record in batch:
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 276, in iter
self._maybe_uncompress()
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 183, in _maybe_uncompress
self._assert_has_codec(compression_type)
File "/home/PycharmProjects/myPythonLearning/venv/lib/python3.8/site-packages/kafka/record/default_records.py", line 118, in _assert_has_codec
raise UnsupportedCodecError(
kafka.errors.UnsupportedCodecError: UnsupportedCodecError: Libraries for zstd compression codec not found

Process finished with exit code 1

My Code for consuming a kafka topic:

from kafka import KafkaConsumer
def consume_from_topic(topic):
consumer = KafkaConsumer(
topic,
bootstrap_servers='localhost:9092',
group_id='zstd-11-consumer-group',
auto_offset_reset='earliest',
enable_auto_commit=True
)
try:
for message in consumer:
v = message.value
k = message.key.decode("utf-8")
log = "key={}, offset={}, partition={}, value={}".format(k, message.offset, message.partition, v)
print(log)

except KeyboardInterrupt:
    consumer.close()

if name == "main":
topic_to_consume = "Integrate-Package-Zstd-ESP.info"
consume_from_topic(topic_to_consume)

@brokenjacobs
Copy link

Did you install the zstandard package from pypi?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants