Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix batch serializers #887

Merged
merged 5 commits into from
Jan 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
Changelog
=========

Unreleased
==========

Bugfixes:

* Fix serialization for batch (issue #886, pr #887 by @ydjin0602)


0.10.0 (2023-12-15)
===================

Expand Down
34 changes: 28 additions & 6 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@


class BatchBuilder:
def __init__(self, magic, batch_size, compression_type,
*, is_transactional):
def __init__(
self, magic, batch_size, compression_type,
*, is_transactional, key_serializer=None, value_serializer=None
):
if magic < 2:
assert not is_transactional
self._builder = LegacyRecordBatchBuilder(
Expand All @@ -28,6 +30,20 @@
self._relative_offset = 0
self._buffer = None
self._closed = False
self._key_serializer = key_serializer
self._value_serializer = value_serializer

Check warning on line 34 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L33-L34

Added lines #L33 - L34 were not covered by tests

def _serialize(self, key, value):
if self._key_serializer is None:
serialized_key = key

Check warning on line 38 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L38

Added line #L38 was not covered by tests
else:
serialized_key = self._key_serializer(key)

Check warning on line 40 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L40

Added line #L40 was not covered by tests
if self._value_serializer is None:
serialized_value = value

Check warning on line 42 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L42

Added line #L42 was not covered by tests
else:
serialized_value = self._value_serializer(value)

Check warning on line 44 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L44

Added line #L44 was not covered by tests

return serialized_key, serialized_value

Check warning on line 46 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L46

Added line #L46 was not covered by tests

def append(self, *, timestamp, key, value, headers=[]):
"""Add a message to the batch.
Expand All @@ -49,8 +65,9 @@
if self._closed:
return None

key_bytes, value_bytes = self._serialize(key, value)

Check warning on line 68 in aiokafka/producer/message_accumulator.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/message_accumulator.py#L68

Added line #L68 was not covered by tests
metadata = self._builder.append(
self._relative_offset, timestamp, key, value,
self._relative_offset, timestamp, key=key_bytes, value=value_bytes,
headers=headers)

# Check if we could add the message
Expand Down Expand Up @@ -422,7 +439,7 @@

return nodes, unknown_leaders_exist

def create_builder(self):
def create_builder(self, key_serializer=None, value_serializer=None):
if self._api_version >= (0, 11):
magic = 2
elif self._api_version >= (0, 10):
Expand All @@ -435,8 +452,13 @@
self._txn_manager.transactional_id is not None:
is_transactional = True
return BatchBuilder(
magic, self._batch_size, self._compression_type,
is_transactional=is_transactional)
magic,
self._batch_size,
self._compression_type,
is_transactional=is_transactional,
key_serializer=key_serializer,
value_serializer=value_serializer
)

def _append_batch(self, builder, tp):
# We must do this before actual add takes place to check for errors.
Expand Down
14 changes: 8 additions & 6 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,14 @@
return (await self.client._wait_on_metadata(topic))

def _serialize(self, topic, key, value):
if self._key_serializer:
serialized_key = self._key_serializer(key)
else:
if self._key_serializer is None:
serialized_key = key
if self._value_serializer:
serialized_value = self._value_serializer(value)
else:
serialized_key = self._key_serializer(key)

Check warning on line 354 in aiokafka/producer/producer.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/producer.py#L354

Added line #L354 was not covered by tests
if self._value_serializer is None:
serialized_value = value
else:
serialized_value = self._value_serializer(value)

Check warning on line 358 in aiokafka/producer/producer.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/producer.py#L358

Added line #L358 was not covered by tests

message_size = LegacyRecordBatchBuilder.record_overhead(
self._producer_magic)
Expand Down Expand Up @@ -484,7 +484,9 @@
Returns:
BatchBuilder: empty batch to be filled and submitted by the caller.
"""
return self._message_accumulator.create_builder()
return self._message_accumulator.create_builder(

Check warning on line 487 in aiokafka/producer/producer.py

View check run for this annotation

Codecov / codecov/patch

aiokafka/producer/producer.py#L487

Added line #L487 was not covered by tests
key_serializer=self._key_serializer, value_serializer=self._value_serializer
)

async def send_batch(self, batch, topic, *, partition):
"""Submit a BatchBuilder for publication.
Expand Down
47 changes: 47 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,53 @@
await producer.send_batch(
batch, self.topic, partition=partition)

@run_until_complete
async def test_producer_send_batch_with_serializer(self):
def key_serializer(val):
return val.upper().encode()

Check warning on line 386 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L385-L386

Added lines #L385 - L386 were not covered by tests

def value_serializer(val):
return json.dumps(val, separators=(',', ':')).encode()

Check warning on line 389 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L388-L389

Added lines #L388 - L389 were not covered by tests

producer = AIOKafkaProducer(

Check warning on line 391 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L391

Added line #L391 was not covered by tests
bootstrap_servers=self.hosts,
key_serializer=key_serializer,
value_serializer=value_serializer,
)
await producer.start()

Check warning on line 396 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L396

Added line #L396 was not covered by tests

partitions = await producer.partitions_for(self.topic)
partition = partitions.pop()

Check warning on line 399 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L398-L399

Added lines #L398 - L399 were not covered by tests

batch = producer.create_batch()
batch.append(key="key1", value={"value": 111}, timestamp=None)
batch.append(key="key2", value={"value": 222}, timestamp=None)
self.assertEqual(batch.record_count(), 2)

Check warning on line 404 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L401-L404

Added lines #L401 - L404 were not covered by tests

# batch gets properly sent
future = await producer.send_batch(

Check warning on line 407 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L407

Added line #L407 was not covered by tests
batch, self.topic, partition=partition)
resp = await future
await producer.stop()
self.assertEqual(resp.partition, partition)

Check warning on line 411 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L409-L411

Added lines #L409 - L411 were not covered by tests

consumer = AIOKafkaConsumer(

Check warning on line 413 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L413

Added line #L413 was not covered by tests
self.topic,
bootstrap_servers=self.hosts,
enable_auto_commit=True,
auto_offset_reset="earliest")
await consumer.start()

Check warning on line 418 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L418

Added line #L418 was not covered by tests

msg = await consumer.getone()
self.assertEqual(msg.key, b"KEY1")
self.assertEqual(msg.value, b"{\"value\":111}")

Check warning on line 422 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L420-L422

Added lines #L420 - L422 were not covered by tests

msg = await consumer.getone()
self.assertEqual(msg.key, b"KEY2")
self.assertEqual(msg.value, b"{\"value\":222}")

Check warning on line 426 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L424-L426

Added lines #L424 - L426 were not covered by tests

await consumer.stop()

Check warning on line 428 in tests/test_producer.py

View check run for this annotation

Codecov / codecov/patch

tests/test_producer.py#L428

Added line #L428 was not covered by tests

@pytest.mark.ssl
@run_until_complete
async def test_producer_ssl(self):
Expand Down
Loading