Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix handling unsupported compression codec (#795) #798

Merged
merged 3 commits into from
Nov 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/795.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix handling unsupported compression codec (issue #795)
2 changes: 2 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
KafkaUnavailableError,
KafkaTimeoutError,
KafkaConnectionError,
UnsupportedCodecError,
)

__all__ = [
Expand Down Expand Up @@ -144,6 +145,7 @@
"KafkaUnavailableError",
"KafkaTimeoutError",
"KafkaConnectionError",
"UnsupportedCodecError",
]


Expand Down
37 changes: 28 additions & 9 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
# * Timestamp Type (3)
# * Compression Type (0-2)

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
import kafka.codec as codecs

from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_WRITABLE, \
PyBUF_SIMPLE, PyBUF_READ, Py_buffer, \
Expand Down Expand Up @@ -108,12 +109,28 @@ DEF NO_PARTITION_LEADER_EPOCH = -1
cutil.crc32c_global_init()


cdef _assert_has_codec(char compression_type):
if compression_type == _ATTR_CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == _ATTR_CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == _ATTR_CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


@cython.no_gc_clear
@cython.final
@cython.freelist(_DEFAULT_RECORD_BATCH_FREELIST_SIZE)
cdef class DefaultRecordBatch:

CODEC_NONE = _ATTR_CODEC_NONE
CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -211,18 +228,19 @@ cdef class DefaultRecordBatch:
if not self._decompressed:
compression_type = <char> self.attributes & _ATTR_CODEC_MASK
if compression_type != _ATTR_CODEC_NONE:
_assert_has_codec(compression_type)
buf = <char *> self._buffer.buf
data = PyMemoryView_FromMemory(
&buf[self._pos],
self._buffer.len - self._pos,
PyBUF_READ)
if compression_type == _ATTR_CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == _ATTR_CODEC_SNAPPY:
elif compression_type == _ATTR_CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == _ATTR_CODEC_LZ4:
elif compression_type == _ATTR_CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())

PyBuffer_Release(&self._buffer)
PyObject_GetBuffer(uncompressed, &self._buffer, PyBUF_SIMPLE)
self._pos = 0
Expand Down Expand Up @@ -360,7 +378,7 @@ cdef class DefaultRecordBatch:
raise CorruptRecordException(
"{} unconsumed bytes after all records consumed".format(
self._buffer.len - self._pos))
self._next_record_index = 0
self._next_record_index = 0
raise StopIteration

msg = self._read_msg()
Expand Down Expand Up @@ -541,7 +559,7 @@ cdef class DefaultRecordBatchBuilder:
buf = PyByteArray_AS_STRING(self._buffer)
self._encode_msg(pos, buf, offset, ts, msg_size, key, value, headers)
self._pos = pos + size

return DefaultRecordMetadata.new(offset, size, ts)

cdef int _encode_msg(
Expand Down Expand Up @@ -571,7 +589,7 @@ cdef class DefaultRecordBatchBuilder:

buf[pos] = 0 # Attributes => Int8
pos += 1

cutil.encode_varint64(buf, &pos, timestamp_delta)
# Base offset is always 0 on Produce
cutil.encode_varint64(buf, &pos, offset)
Expand Down Expand Up @@ -668,6 +686,7 @@ cdef class DefaultRecordBatchBuilder:


if self._compression_type != _ATTR_CODEC_NONE:
_assert_has_codec(self._compression_type)
data = bytes(self._buffer[FIRST_RECORD_OFFSET:self._pos])
if self._compression_type == _ATTR_CODEC_GZIP:
compressed = gzip_encode(data)
Expand Down Expand Up @@ -747,7 +766,7 @@ cdef inline Py_ssize_t _bytelike_len(object obj) except -2:
else:
PyObject_GetBuffer(obj, &buf, PyBUF_SIMPLE)
obj_len = buf.len
PyBuffer_Release(&buf)
PyBuffer_Release(&buf)
return obj_len


Expand Down Expand Up @@ -822,4 +841,4 @@ cdef class DefaultRecordMetadata:
return (
"DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})"
.format(self.offset, self.size, self.timestamp)
)
)
22 changes: 21 additions & 1 deletion aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
from aiokafka.errors import CorruptRecordException
import kafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro

from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_WRITABLE, \
Expand Down Expand Up @@ -44,13 +45,29 @@ DEF ATTRIBUTES_OFFSET = MAGIC_OFFSET + 1
DEF TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + 1


cdef _assert_has_codec(char compression_type):
if compression_type == _ATTR_CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == _ATTR_CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == _ATTR_CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


@cython.no_gc_clear
@cython.final
@cython.freelist(_LEGACY_RECORD_BATCH_FREELIST_SIZE)
cdef class LegacyRecordBatch:

RECORD_OVERHEAD_V0 = RECORD_OVERHEAD_V0_DEF
RECORD_OVERHEAD_V1 = RECORD_OVERHEAD_V1_DEF
CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -117,6 +134,7 @@ cdef class LegacyRecordBatch:
raise CorruptRecordException("Value of compressed message is None")
value = self._main_record.value

_assert_has_codec(compression_type)
if compression_type == _ATTR_CODEC_GZIP:
uncompressed = gzip_decode(value)
elif compression_type == _ATTR_CODEC_SNAPPY:
Expand Down Expand Up @@ -336,6 +354,7 @@ cdef class LegacyRecordBatchBuilder:
Py_ssize_t _batch_size
bytearray _buffer

CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -411,6 +430,7 @@ cdef class LegacyRecordBatchBuilder:
uint32_t crc

if self._compression_type != 0:
_assert_has_codec(self._compression_type)
if self._compression_type == _ATTR_CODEC_GZIP:
compressed = gzip_encode(self._buffer)
elif self._compression_type == _ATTR_CODEC_SNAPPY:
Expand Down
25 changes: 22 additions & 3 deletions aiokafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@
import time
from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
import kafka.codec as codecs


class DefaultRecordBase:

__slots__ = ()

HEADER_STRUCT = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
Expand Down Expand Up @@ -102,6 +105,20 @@ class DefaultRecordBase:

NO_PARTITION_LEADER_EPOCH = -1

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


class _DefaultRecordBatchPy(DefaultRecordBase):

Expand Down Expand Up @@ -177,12 +194,13 @@ def _maybe_uncompress(self):
if not self._decompressed:
compression_type = self.compression_type
if compression_type != self.CODEC_NONE:
self._assert_has_codec(compression_type)
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == self.CODEC_SNAPPY:
elif compression_type == self.CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == self.CODEC_LZ4:
elif compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
self._buffer = bytearray(uncompressed)
self._pos = 0
Expand Down Expand Up @@ -504,6 +522,7 @@ def write_header(self, use_compression_type=True):

def _maybe_compress(self):
if self._compression_type != self.CODEC_NONE:
self._assert_has_codec(self._compression_type)
header_size = self.HEADER_STRUCT.size
data = bytes(self._buffer[header_size:])
if self._compression_type == self.CODEC_GZIP:
Expand Down
21 changes: 20 additions & 1 deletion aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

from binascii import crc32

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
import kafka.codec as codecs


NoneType = type(None)


class LegacyRecordBase:

__slots__ = ()

HEADER_STRUCT_V0 = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
Expand Down Expand Up @@ -73,6 +76,20 @@ class LegacyRecordBase:
LOG_APPEND_TIME = 1
CREATE_TIME = 0

def _assert_has_codec(self, compression_type):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the same function to avoid copy-paste?

Copy link
Collaborator Author

@ods ods Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. What module is the best place to put in?

Copy link
Collaborator Author

@ods ods Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, sorry. They will diverge after adding zstd support. I think that's the reason why it's duplicated in kafka-python: legacy record can't be used with zstd compression. But I didn't verified this, so the difference may be a mistake too.

Copy link
Collaborator Author

@ods ods Nov 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to KIP-110

Zstd will only be allowed with magic = 2 format

if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


class _LegacyRecordBatchPy(LegacyRecordBase):

Expand Down Expand Up @@ -135,6 +152,7 @@ def _decompress(self, key_offset):
data = self._buffer[pos:pos + value_size]

compression_type = self.compression_type
self._assert_has_codec(compression_type)
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
elif compression_type == self.CODEC_SNAPPY:
Expand Down Expand Up @@ -389,6 +407,7 @@ def _encode_msg(self, buf, offset, timestamp, key_size, key,

def _maybe_compress(self):
if self._compression_type:
self._assert_has_codec(self._compression_type)
buf = self._buffer
if self._compression_type == self.CODEC_GZIP:
compressed = gzip_encode(buf)
Expand Down
2 changes: 1 addition & 1 deletion requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ docker==5.0.3
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.0
docutils==0.18
docutils==0.17.1
Pygments==2.10.0
gssapi==1.7.2
dataclasses==0.8; python_version<"3.7"
Expand Down
2 changes: 1 addition & 1 deletion requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-r requirements-cython.txt
Sphinx==4.2.0
Sphinx==4.3.0
sphinxcontrib-asyncio==0.3.0
sphinxcontrib-spelling==7.2.1
alabaster==0.7.12
45 changes: 45 additions & 0 deletions tests/record/test_default_records.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from unittest import mock

import kafka.codec
from kafka.errors import UnsupportedCodecError
import pytest
from aiokafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
Expand Down Expand Up @@ -175,6 +179,47 @@ def test_default_batch_size_limit():
assert len(builder.build()) < 1000


@pytest.mark.parametrize("compression_type,name,checker_name", [
(DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
(DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
(DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4"),
])
def test_unavailable_codec(compression_type, name, checker_name):
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
correct_buffer = builder.build()

with mock.patch.object(kafka.codec, checker_name, return_value=False):
# Check that builder raises error
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
error_msg = "Libraries for {} compression codec not found".format(name)
with pytest.raises(UnsupportedCodecError, match=error_msg):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()

# Check that reader raises same error
batch = DefaultRecordBatch(bytes(correct_buffer))
with pytest.raises(UnsupportedCodecError, match=error_msg):
list(batch)


def test_unsupported_yet_codec():
compression_type = DefaultRecordBatch.CODEC_MASK # It doesn't exist
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
with pytest.raises(UnsupportedCodecError):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()


def test_build_without_append():
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=1,
Expand Down
Loading