Skip to content

Commit

Permalink
PR change requests
Browse files Browse the repository at this point in the history
Update readme
zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies
Decrease fallback max message size due to OOM concerns
Rewrite the 1MB message limit
Test producer constructor makes use of broker version inference again, also add logic for zstd decompression
  • Loading branch information
Gabriel Tincu committed May 5, 2020
1 parent c9b5ba2 commit 5002d67
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 8 deletions.
3 changes: 2 additions & 1 deletion docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ multiprocessing is recommended.
Compression
***********

kafka-python supports multiple compression types. To produce or
kafka-python supports multiple compression types:

- gzip : supported natively
- lz4 : requires `python-lz4 <https://pypi.org/project/lz4/>`_ installed
- snappy : requires the `python-snappy <https://pypi.org/project/python-snappy/>`_ package (which requires the snappy C library)
Expand Down
2 changes: 1 addition & 1 deletion kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

_XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1)
_XERIAL_V1_FORMAT = 'bccccccBii'
ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3
ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024

try:
import snappy
Expand Down
10 changes: 7 additions & 3 deletions kafka/protocol/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
import io
import time

from kafka.codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode,
from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd,
gzip_decode, snappy_decode, zstd_decode,
lz4_decode, lz4_decode_old_kafka)
from kafka.protocol.frame import KafkaBytes
from kafka.protocol.struct import Struct
Expand Down Expand Up @@ -35,6 +35,7 @@ class Message(Struct):
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
CODEC_ZSTD = 0x04
TIMESTAMP_TYPE_MASK = 0x08
HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)

Expand Down Expand Up @@ -119,7 +120,7 @@ def is_compressed(self):

def decompress(self):
codec = self.attributes & self.CODEC_MASK
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4)
assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD)
if codec == self.CODEC_GZIP:
assert has_gzip(), 'Gzip decompression unsupported'
raw_bytes = gzip_decode(self.value)
Expand All @@ -132,6 +133,9 @@ def decompress(self):
raw_bytes = lz4_decode_old_kafka(self.value)
else:
raw_bytes = lz4_decode(self.value)
elif codec == self.CODEC_ZSTD:
assert has_zstd(), "ZSTD decompression unsupported"
raw_bytes = zstd_decode(self.value)
else:
raise Exception('This should be impossible')

Expand Down
2 changes: 1 addition & 1 deletion kafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def _maybe_uncompress(self):
if compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
if compression_type == self.CODEC_ZSTD:
uncompressed = zstd_decode(data)
uncompressed = zstd_decode(data.tobytes())
self._buffer = bytearray(uncompressed)
self._pos = 0
self._decompressed = True
Expand Down
2 changes: 0 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ def test_end_to_end(kafka_broker, compression):
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
max_block_ms=30000,
api_version=env_kafka_version(),
compression_type=compression,
value_serializer=str.encode)
consumer = KafkaConsumer(bootstrap_servers=connect_str,
Expand Down Expand Up @@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)])
producer = KafkaProducer(bootstrap_servers=connect_str,
retries=5,
api_version=env_kafka_version(),
max_block_ms=30000,
compression_type=compression)
magic = producer._max_usable_produce_magic()
Expand Down

0 comments on commit 5002d67

Please sign in to comment.