Skip to content

Commit

Permalink
Add Codec for ZStandard Compression (#801)
Browse files Browse the repository at this point in the history
* Add log_start_offset to message protocol parsing

Based on: https://github.com/dpkp/kafka-python/pull/2020/files

* Add zstd compression support

* Add changelog entry
  • Loading branch information
ods authored Dec 6, 2021
1 parent 67f55c0 commit 5370a4c
Show file tree
Hide file tree
Showing 25 changed files with 213 additions and 60 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ jobs:
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libsnappy-dev libkrb5-dev
sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev
- name: Install python dependencies
run: |
pip install --upgrade pip setuptools wheel
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libsnappy-dev libkrb5-dev
sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev
- name: Get pip cache dir
id: pip-cache
Expand Down Expand Up @@ -270,7 +270,7 @@ jobs:
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y libsnappy-dev libkrb5-dev krb5-user
sudo apt-get install -y libsnappy-dev libzstd-dev libkrb5-dev krb5-user
- name: Get pip cache dir
id: pip-cache
Expand Down
1 change: 1 addition & 0 deletions .travis.yml_bak
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ addons:
apt:
packages:
- libsnappy-dev
- libzstd-dev
- krb5-user

install:
Expand Down
1 change: 1 addition & 0 deletions CHANGES/801.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add Codec for ZStandard Compression (KIP-110)
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ generate ssh keys for some tests.

Setting up tests requirements (assuming you're within virtualenv on ubuntu 14.04+)::

sudo apt-get install -y libsnappy-dev
sudo apt-get install -y libsnappy-dev libzstd-dev
make setup

Running tests with coverage::
Expand Down
8 changes: 5 additions & 3 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def append(self, key, value, timestamp_ms, _create_future=create_future,
self._msg_futures.append((future, metadata))
return future

def done(self, base_offset, timestamp=None,
def done(self, base_offset, timestamp=None, log_start_offset=None,
_record_metadata_class=RecordMetadata):
"""Resolve all pending futures"""
tp = self._tp
Expand All @@ -157,7 +157,8 @@ def done(self, base_offset, timestamp=None,
# Set main batch future
if not self.future.done():
self.future.set_result(_record_metadata_class(
topic, partition, tp, base_offset, timestamp, timestamp_type))
topic, partition, tp, base_offset, timestamp, timestamp_type,
log_start_offset))

# Set message futures
for future, metadata in self._msg_futures:
Expand All @@ -169,7 +170,8 @@ def done(self, base_offset, timestamp=None,
timestamp = metadata.timestamp
offset = base_offset + metadata.offset
future.set_result(_record_metadata_class(
topic, partition, tp, offset, timestamp, timestamp_type))
topic, partition, tp, offset, timestamp, timestamp_type,
log_start_offset))

def done_noack(self):
""" Resolve all pending futures to None """
Expand Down
23 changes: 14 additions & 9 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import warnings

from kafka.partitioner.default import DefaultPartitioner
from kafka.codec import has_gzip, has_snappy, has_lz4
from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd

from aiokafka.client import AIOKafkaClient
from aiokafka.errors import (
MessageSizeTooLargeError, UnsupportedVersionError, IllegalOperation)
from aiokafka.record.default_records import DefaultRecordBatch
from aiokafka.record.legacy_records import LegacyRecordBatchBuilder
from aiokafka.structs import TopicPartition
from aiokafka.util import (
Expand Down Expand Up @@ -86,10 +87,10 @@ class AIOKafkaProducer:
If unset, defaults to *acks=1*. If ``enable_idempotence`` is
``True`` defaults to *acks=all*
compression_type (str): The compression type for all data generated by
the producer. Valid values are 'gzip', 'snappy', 'lz4', or None.
Compression is of full batches of data, so the efficacy of batching
will also impact the compression ratio (more batching means better
compression). Default: None.
the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or
None. Compression is of full batches of data, so the efficacy of
batching will also impact the compression ratio (more batching
means better compression). Default: None.
max_batch_size (int): Maximum size of buffered data per partition.
After this amount `send` coroutine will block until batch is
drained.
Expand Down Expand Up @@ -167,9 +168,10 @@ class AIOKafkaProducer:
_PRODUCER_CLIENT_ID_SEQUENCE = 0

_COMPRESSORS = {
'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP),
'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY),
'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4),
'gzip': (has_gzip, DefaultRecordBatch.CODEC_GZIP),
'snappy': (has_snappy, DefaultRecordBatch.CODEC_SNAPPY),
'lz4': (has_lz4, DefaultRecordBatch.CODEC_LZ4),
'zstd': (has_zstd, DefaultRecordBatch.CODEC_ZSTD),
}

_closed = None # Serves as an uninitialized flag for __del__
Expand Down Expand Up @@ -203,7 +205,7 @@ def __init__(self, *, loop=None, bootstrap_servers='localhost',

if acks not in (0, 1, -1, 'all', _missing):
raise ValueError("Invalid ACKS parameter")
if compression_type not in ('gzip', 'snappy', 'lz4', None):
if compression_type not in ('gzip', 'snappy', 'lz4', 'zstd', None):
raise ValueError("Invalid compression type!")
if compression_type:
checker, compression_attrs = self._COMPRESSORS[compression_type]
Expand Down Expand Up @@ -298,6 +300,9 @@ async def start(self):
if self._compression_type == 'lz4':
assert self.client.api_version >= (0, 8, 2), \
'LZ4 Requires >= Kafka 0.8.2 Brokers'
elif self._compression_type == 'zstd':
assert self.client.api_version >= (2, 1, 0), \
'Zstd Requires >= Kafka 2.1.0 Brokers'

if self._txn_manager is not None and self.client.api_version < (0, 11):
raise UnsupportedVersionError(
Expand Down
32 changes: 27 additions & 5 deletions aiokafka/producer/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,15 @@ def create_request(self):
(tp.partition, batch.get_data_buffer())
)

if self._client.api_version >= (0, 11):
if self._client.api_version >= (2, 1):
version = 7
elif self._client.api_version >= (2, 0):
version = 6
elif self._client.api_version >= (1, 1):
version = 5
elif self._client.api_version >= (1, 0):
version = 4
elif self._client.api_version >= (0, 11):
version = 3
elif self._client.api_version >= (0, 10):
version = 2
Expand Down Expand Up @@ -737,20 +745,34 @@ async def do(self, node_id):
def handle_response(self, response):
for topic, partitions in response.topics:
for partition_info in partitions:
global_error = None
log_start_offset = None
if response.API_VERSION < 2:
partition, error_code, offset = partition_info
# Mimic CREATE_TIME to take user provided timestamp
timestamp = -1
else:
elif 2 <= response.API_VERSION <= 4:
partition, error_code, offset, timestamp = partition_info
elif 5 <= response.API_VERSION <= 7:
(
partition, error_code, offset, timestamp,
log_start_offset
) = partition_info
else:
# the ignored parameter is record_error of type
# list[(batch_index: int, error_message: str)]
(
partition, error_code, offset, timestamp,
log_start_offset, _, global_error
) = partition_info
tp = TopicPartition(topic, partition)
error = Errors.for_code(error_code)
batch = self._batches.get(tp)
if batch is None:
continue

if error is Errors.NoError:
batch.done(offset, timestamp)
batch.done(offset, timestamp, log_start_offset)
elif error is DuplicateSequenceNumber:
# If we have received a duplicate sequence error,
# it means that the sequence number has advanced
Expand All @@ -761,7 +783,7 @@ def handle_response(self, response):
# The only thing we can do is to return success to
# the user and not return a valid offset and
# timestamp.
batch.done(offset, timestamp)
batch.done(offset, timestamp, log_start_offset)
elif not self._can_retry(error(), batch):
if error is InvalidProducerEpoch:
exc = ProducerFenced()
Expand All @@ -773,7 +795,7 @@ def handle_response(self, response):
else:
log.warning(
"Got error produce response on topic-partition"
" %s, retrying. Error: %s", tp, error)
" %s, retrying. Error: %s", tp, global_error or error)
# Ok, we can retry this batch
if getattr(error, "invalid_metadata", False):
self._client.force_metadata_update()
Expand Down
80 changes: 78 additions & 2 deletions aiokafka/protocol/produce.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,51 @@ class ProduceResponse_v5(Response):
)


class ProduceResponse_v6(Response):
"""
The version number is bumped to indicate that on quota violation brokers
send out responses before throttling.
"""
API_KEY = 0
API_VERSION = 6
SCHEMA = ProduceResponse_v5.SCHEMA


class ProduceResponse_v7(Response):
"""
V7 bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_KEY = 0
API_VERSION = 7
SCHEMA = ProduceResponse_v6.SCHEMA


class ProduceResponse_v8(Response):
"""
V8 bumped up to add two new fields record_errors offset list and error_message
(See KIP-467)
"""
API_KEY = 0
API_VERSION = 8
SCHEMA = Schema(
('topics', Array(
('topic', String('utf-8')),
('partitions', Array(
('partition', Int32),
('error_code', Int16),
('offset', Int64),
('timestamp', Int64),
('log_start_offset', Int64)),
('record_errors', (Array(
('batch_index', Int32),
('batch_index_error_message', String('utf-8'))
))),
('error_message', String('utf-8'))
))),
('throttle_time_ms', Int32)
)


class ProduceRequest(Request):
API_KEY = 0

Expand Down Expand Up @@ -152,11 +197,42 @@ class ProduceRequest_v5(ProduceRequest):
SCHEMA = ProduceRequest_v4.SCHEMA


class ProduceRequest_v6(ProduceRequest):
"""
The version number is bumped to indicate that on quota violation brokers
send out responses before throttling.
"""
API_VERSION = 6
RESPONSE_TYPE = ProduceResponse_v6
SCHEMA = ProduceRequest_v5.SCHEMA


class ProduceRequest_v7(ProduceRequest):
"""
V7 bumped up to indicate ZStandard capability. (see KIP-110)
"""
API_VERSION = 7
RESPONSE_TYPE = ProduceResponse_v7
SCHEMA = ProduceRequest_v6.SCHEMA


class ProduceRequest_v8(ProduceRequest):
"""
V8 bumped up to add two new fields record_errors offset list and error_message
to PartitionResponse (See KIP-467)
"""
API_VERSION = 8
RESPONSE_TYPE = ProduceResponse_v8
SCHEMA = ProduceRequest_v7.SCHEMA


ProduceRequest = [
ProduceRequest_v0, ProduceRequest_v1, ProduceRequest_v2,
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5
ProduceRequest_v3, ProduceRequest_v4, ProduceRequest_v5,
ProduceRequest_v6, ProduceRequest_v7, ProduceRequest_v8,
]
ProduceResponse = [
ProduceResponse_v0, ProduceResponse_v1, ProduceResponse_v2,
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5
ProduceResponse_v3, ProduceResponse_v4, ProduceResponse_v5,
ProduceResponse_v6, ProduceResponse_v7, ProduceResponse_v8,
]
3 changes: 2 additions & 1 deletion aiokafka/record/_crecords/consts.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ DEF _ATTR_CODEC_NONE = 0x00
DEF _ATTR_CODEC_GZIP = 0x01
DEF _ATTR_CODEC_SNAPPY = 0x02
DEF _ATTR_CODEC_LZ4 = 0x03
DEF _ATTR_CODEC_ZSTD = 0x04

DEF _TIMESTAMP_TYPE_MASK = 0x08
DEF _TRANSACTIONAL_MASK = 0x10
Expand All @@ -21,4 +22,4 @@ DEF _LEGACY_RECORD_FREELIST_SIZE = 100

DEF _DEFAULT_RECORD_METADATA_FREELIST_SIZE = 20
DEF _DEFAULT_RECORD_BATCH_FREELIST_SIZE = 100
DEF _DEFAULT_RECORD_FREELIST_SIZE = 100
DEF _DEFAULT_RECORD_FREELIST_SIZE = 100
11 changes: 9 additions & 2 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@

from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
gzip_encode, snappy_encode, lz4_encode, zstd_encode,
gzip_decode, snappy_decode, lz4_decode, zstd_decode
)
import kafka.codec as codecs

Expand Down Expand Up @@ -116,6 +116,8 @@ cdef _assert_has_codec(char compression_type):
checker, name = codecs.has_snappy, "snappy"
elif compression_type == _ATTR_CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
elif compression_type == _ATTR_CODEC_ZSTD:
checker, name = codecs.has_zstd, "zstd"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
Expand All @@ -134,6 +136,7 @@ cdef class DefaultRecordBatch:
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
CODEC_ZSTD = _ATTR_CODEC_ZSTD

def __init__(self, object buffer):
PyObject_GetBuffer(buffer, &self._buffer, PyBUF_SIMPLE)
Expand Down Expand Up @@ -240,6 +243,8 @@ cdef class DefaultRecordBatch:
uncompressed = snappy_decode(data.tobytes())
elif compression_type == _ATTR_CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
elif compression_type == _ATTR_CODEC_ZSTD:
uncompressed = zstd_decode(data.tobytes())

PyBuffer_Release(&self._buffer)
PyObject_GetBuffer(uncompressed, &self._buffer, PyBUF_SIMPLE)
Expand Down Expand Up @@ -694,6 +699,8 @@ cdef class DefaultRecordBatchBuilder:
compressed = snappy_encode(data)
elif self._compression_type == _ATTR_CODEC_LZ4:
compressed = lz4_encode(data)
elif self._compression_type == _ATTR_CODEC_ZSTD:
compressed = zstd_encode(data)
size = (<Py_ssize_t> len(compressed)) + FIRST_RECORD_OFFSET
# We will just write the result into the same memory space.
PyByteArray_Resize(self._buffer, size)
Expand Down
Loading

0 comments on commit 5370a4c

Please sign in to comment.