diff --git a/docs/index.rst b/docs/index.rst index 242f9eb8d..9c46e3313 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,7 +122,8 @@ multiprocessing is recommended. Compression *********** -kafka-python supports multiple compression types. To produce or +kafka-python supports multiple compression types: + - gzip : supported natively - lz4 : requires `python-lz4 `_ installed - snappy : requires the `python-snappy `_ package (which requires the snappy C library) diff --git a/kafka/codec.py b/kafka/codec.py index 8ca0728f6..917400e74 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,7 +10,7 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' -ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 +ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import snappy diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 31527bf63..4c5c031b8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -3,8 +3,8 @@ import io import time -from kafka.codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, +from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd, + gzip_decode, snappy_decode, zstd_decode, lz4_decode, lz4_decode_old_kafka) from kafka.protocol.frame import KafkaBytes from kafka.protocol.struct import Struct @@ -35,6 +35,7 @@ class Message(Struct): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) @@ -119,7 +120,7 @@ def is_compressed(self): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD) if codec == self.CODEC_GZIP: assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) @@ -132,6 +133,9 @@ def decompress(self): raw_bytes = lz4_decode_old_kafka(self.value) else: raw_bytes = lz4_decode(self.value) + elif codec == self.CODEC_ZSTD: + assert has_zstd(), "ZSTD decompression unsupported" + raw_bytes = zstd_decode(self.value) else: raise Exception('This should be impossible') diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 917c81cb8..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -189,7 +189,7 @@ def _maybe_uncompress(self): if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) if compression_type == self.CODEC_ZSTD: - uncompressed = zstd_decode(data) + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/test/test_producer.py b/test/test_producer.py index af8fc26f3..7263130d1 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -38,7 +38,6 @@ def test_end_to_end(kafka_broker, compression): producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, - api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, - api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic()