Skip to content

Commit

Permalink
Add support for zstd compression (dpkp#2021)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tincu Gabriel committed Sep 22, 2020
1 parent f73cabc commit 5d30e41
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 17 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ addons:
apt:
packages:
- libsnappy-dev
- libzstd-dev
- openjdk-8-jdk

cache:
Expand Down
9 changes: 5 additions & 4 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports gzip compression/decompression natively. To produce or
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy, install python-snappy (also requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.
kafka-python supports multiple compression types:

- gzip : supported natively
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
- zstd : requires the `python-zstandard <https://github.com/indygreg/python-zstandard>`_ package installed

Protocol
********
Expand Down
2 changes: 1 addition & 1 deletion kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
Expand Down
10 changes: 7 additions & 3 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io
import time

from kafka.codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode,
from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
gzip_decode, snappy_decode, zstd_decode,
lz4_decode, lz4_decode_old_kafka)
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.struct import Struct
Expand Down Expand Up @@ -35,6 +35,7 @@ class Message(Struct):
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)

Expand Down Expand Up @@ -119,7 +120,7 @@ def is_compressed(self):

def decompress(self):
codec = self.attributes & self.CODEC_MASK
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
if codec == self.CODEC_GZIP:
assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
Expand All @@ -132,6 +133,9 @@ def decompress(self):
raw_bytes = lz4_decode_old_kafka(self.value)
else:
raw_bytes = lz4_decode(self.value)
elif codec == self.CODEC_ZSTD:
assert has_zstd(), "ZSTD decompression unsupported"
raw_bytes = zstd_decode(self.value)
else:
raise Exception('This should be impossible')

Expand Down
12 changes: 6 additions & 6 deletions test/test_codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ def test_gzip():
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = gzip_decode(gzip_encode(b1))
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2


@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
def test_snappy():
for i in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = snappy_decode(snappy_encode(b1))
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2


@pytest.mark.skipif(not has_snappy(), reason="Snappy not available")
Expand Down Expand Up @@ -91,7 +91,7 @@ def test_lz4():
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2


@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
Expand All @@ -101,7 +101,7 @@ def test_lz4_old():
b1 = random_string(100).encode('utf-8')
b2 = lz4_decode_old_kafka(lz4_encode_old_kafka(b1))
assert len(b1) == len(b2)
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2


@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
Expand All @@ -113,12 +113,12 @@ def test_lz4_incremental():
b1 = random_string(100).encode('utf-8') * 50000
b2 = lz4_decode(lz4_encode(b1))
assert len(b1) == len(b2)
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2


@pytest.mark.skipif(not has_zstd(), reason="Zstd not available")
def test_zstd():
for _ in range(1000):
b1 = random_string(100).encode('utf-8')
b2 = zstd_decode(zstd_encode(b1))
assert b1 == b2, "decompressed value differs from input value: %r vs %r" % (b2, b1)
assert b1 == b2
4 changes: 1 addition & 3 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ def test_end_to_end(kafka_broker, compression):
pytest.skip('python-lz4 crashes on older versions of pypy')

if compression == 'zstd' and env_kafka_version() < (2, 1, 0):
pytest.skip("zstd requires kafka 2.1.0 or more")
pytest.skip('zstd requires kafka 2.1.0 or newer')

connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=30000,
api_version=env_kafka_version(),
compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
Expand Down Expand Up @@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
api_version=env_kafka_version(),
max_block_ms=30000,
compression_type=compression)
magic = producer._max_usable_produce_magic()
Expand Down

0 comments on commit 5d30e41

Please sign in to comment.