From 5002d670253806e9a8f2b77702e906464e845d11 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:54:40 +0100 Subject: [PATCH] PR change requests Update readme zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies Decrease fallback max message size due to OOM concerns Rewrite the 1MB message limit Test producer constructor makes use of broker version inference again, also add logic for zstd decompression --- docs/index.rst | 3 ++- kafka/codec.py | 2 +- kafka/protocol/message.py | 10 +++++++--- kafka/record/default_records.py | 2 +- test/test_producer.py | 2 -- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 242f9eb8d..9c46e3313 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,7 +122,8 @@ multiprocessing is recommended. Compression *********** -kafka-python supports multiple compression types. To produce or +kafka-python supports multiple compression types: + - gzip : supported natively - lz4 : requires `python-lz4 `_ installed - snappy : requires the `python-snappy `_ package (which requires the snappy C library) diff --git a/kafka/codec.py b/kafka/codec.py index 8ca0728f6..917400e74 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,7 +10,7 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' -ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 +ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import snappy diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 31527bf63..4c5c031b8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -3,8 +3,8 @@ import io import time -from kafka.codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, +from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd, + gzip_decode, snappy_decode, zstd_decode, lz4_decode, lz4_decode_old_kafka) from kafka.protocol.frame import KafkaBytes from kafka.protocol.struct import Struct @@ -35,6 +35,7 @@ class Message(Struct): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) @@ -119,7 +120,7 @@ def is_compressed(self): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD) if codec == self.CODEC_GZIP: assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) @@ -132,6 +133,9 @@ def decompress(self): raw_bytes = lz4_decode_old_kafka(self.value) else: raw_bytes = lz4_decode(self.value) + elif codec == self.CODEC_ZSTD: + assert has_zstd(), "ZSTD decompression unsupported" + raw_bytes = zstd_decode(self.value) else: raise Exception('This should be impossible') diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 917c81cb8..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -189,7 +189,7 @@ def _maybe_uncompress(self): if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) if compression_type == self.CODEC_ZSTD: - uncompressed = zstd_decode(data) + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/test/test_producer.py b/test/test_producer.py index af8fc26f3..7263130d1 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -38,7 +38,6 @@ def test_end_to_end(kafka_broker, compression): producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, - api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, - api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic()