From 73f3323d5f1a80783f32445e64ff1c6516d630a1 Mon Sep 17 00:00:00 2001 From: Taras Date: Sun, 20 Jan 2019 06:45:25 +0100 Subject: [PATCH 1/3] Added header support to producer --- aiokafka/producer/message_accumulator.py | 15 ++++++++------ aiokafka/producer/producer.py | 9 ++++++-- tests/test_producer.py | 26 ++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 8 deletions(-) diff --git a/aiokafka/producer/message_accumulator.py b/aiokafka/producer/message_accumulator.py index 59a47fef..50c721a1 100644 --- a/aiokafka/producer/message_accumulator.py +++ b/aiokafka/producer/message_accumulator.py @@ -28,7 +28,7 @@ def __init__(self, magic, batch_size, compression_type, self._buffer = None self._closed = False - def append(self, *, timestamp, key, value): + def append(self, *, timestamp, key, value, headers=[]): """Add a message to the batch. Arguments: @@ -49,7 +49,8 @@ def append(self, *, timestamp, key, value): return None metadata = self._builder.append( - self._relative_offset, timestamp, key, value, headers=[]) + self._relative_offset, timestamp, key, value, + headers=headers) # Check if we could add the message if metadata is None: @@ -124,7 +125,8 @@ def tp(self): def record_count(self): return self._builder.record_count() - def append(self, key, value, timestamp_ms, _create_future=create_future): + def append(self, key, value, timestamp_ms, _create_future=create_future, + headers=[]): """Append message (key and value) to batch Returns: @@ -133,7 +135,7 @@ def append(self, key, value, timestamp_ms, _create_future=create_future): asyncio.Future that will resolved when message is delivered """ metadata = self._builder.append( - timestamp=timestamp_ms, key=key, value=value) + timestamp=timestamp_ms, key=key, value=value, headers=headers) if metadata is None: return None @@ -310,7 +312,8 @@ def close(self): yield from self.flush() @asyncio.coroutine - def add_message(self, tp, key, value, timeout, timestamp_ms=None): + def add_message(self, tp, key, value, timeout, timestamp_ms=None, + headers=[]): """ Add message to batch by topic-partition If batch is already full this method waits (`timeout` seconds maximum) until batch is drained by send task @@ -329,7 +332,7 @@ def add_message(self, tp, key, value, timeout, timestamp_ms=None): else: batch = pending_batches[-1] - future = batch.append(key, value, timestamp_ms) + future = batch.append(key, value, timestamp_ms, headers=headers) if future is None: # Batch is full, can't append data atm, # waiting until batch per topic-partition is drained diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 1051f16c..5d2f2d84 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -364,7 +364,7 @@ def _partition(self, topic, partition, key, value, @asyncio.coroutine def send(self, topic, value=None, key=None, partition=None, - timestamp_ms=None): + timestamp_ms=None, headers=None): """Publish a message to a topic. Arguments: @@ -419,6 +419,11 @@ def send(self, topic, value=None, key=None, partition=None, raise IllegalOperation( "Can't send messages while not in transaction") + if headers is not None: + if self.client.api_version < (0, 11): + raise UnsupportedVersionError( + "Headers not supported before Kafka 0.11") + key_bytes, value_bytes = self._serialize(topic, key, value) partition = self._partition(topic, partition, key, value, key_bytes, value_bytes) @@ -428,7 +433,7 @@ def send(self, topic, value=None, key=None, partition=None, fut = yield from self._message_accumulator.add_message( tp, key_bytes, value_bytes, self._request_timeout_ms / 1000, - timestamp_ms=timestamp_ms) + timestamp_ms=timestamp_ms, headers=headers) return fut @asyncio.coroutine diff --git a/tests/test_producer.py b/tests/test_producer.py index 4b2f790b..f24e3699 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -713,3 +713,29 @@ def test_producer_sender_errors_propagate_to_producer(self): KafkaError, "Unexpected error during batch delivery"): yield from producer.send_and_wait( self.topic, b'hello, Kafka!') + + @kafka_versions('>=0.11.0') + @run_until_complete + def test_producer_send_with_headers(self): + producer = AIOKafkaProducer( + loop=self.loop, bootstrap_servers=self.hosts) + yield from producer.start() + self.add_cleanup(producer.stop) + + fut = yield from producer.send( + self.topic, b'msg', partition=0, headers=[("type", b"Normal")]) + resp = yield from fut + self.assertEqual(resp.partition, 0) + + @kafka_versions('<0.11.0') + @run_until_complete + def test_producer_send_with_headers_raise_error(self): + producer = AIOKafkaProducer( + loop=self.loop, bootstrap_servers=self.hosts) + yield from producer.start() + self.add_cleanup(producer.stop) + + with self.assertRaises(UnsupportedVersionError): + yield from producer.send( + self.topic, b'msg', partition=0, + headers=[("type", b"Normal")]) From 2777c9fc901a0a9fc7f8e5f50155741b3e4a548b Mon Sep 17 00:00:00 2001 From: Taras Date: Sun, 20 Jan 2019 06:59:02 +0100 Subject: [PATCH 2/3] Add support for headers in Consumer --- aiokafka/consumer/fetcher.py | 2 +- aiokafka/producer/producer.py | 3 +++ aiokafka/structs.py | 3 ++- tests/_testutil.py | 4 ++-- tests/test_consumer.py | 12 ++++++++++++ 5 files changed, 20 insertions(+), 4 deletions(-) diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index 6a3a3770..e4a4b4f4 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -292,7 +292,7 @@ def _consumer_record(self, tp, record): return ConsumerRecord( tp.topic, tp.partition, record.offset, record.timestamp, record.timestamp_type, key, value, record.checksum, - key_size, value_size) + key_size, value_size, record.headers) class Fetcher: diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 5d2f2d84..528f1a15 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -423,6 +423,9 @@ def send(self, topic, value=None, key=None, partition=None, if self.client.api_version < (0, 11): raise UnsupportedVersionError( "Headers not supported before Kafka 0.11") + else: + # Record parser/builder support only list type, no explicit None + headers = [] key_bytes, value_bytes = self._serialize(topic, key, value) partition = self._partition(topic, partition, key, value, diff --git a/aiokafka/structs.py b/aiokafka/structs.py index 328230cb..640860b9 100644 --- a/aiokafka/structs.py +++ b/aiokafka/structs.py @@ -15,7 +15,8 @@ ConsumerRecord = collections.namedtuple( "ConsumerRecord", ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value", "checksum", - "serialized_key_size", "serialized_value_size"]) + "serialized_key_size", "serialized_value_size", + "headers"]) OffsetAndTimestamp = collections.namedtuple( "OffsetAndTimestamp", ["offset", "timestamp"]) diff --git a/tests/_testutil.py b/tests/_testutil.py index d1a953c4..19825abf 100644 --- a/tests/_testutil.py +++ b/tests/_testutil.py @@ -259,7 +259,7 @@ def wait_topic(self, client, topic): @asyncio.coroutine def send_messages(self, partition, messages, *, topic=None, - timestamp_ms=None, return_inst=False): + timestamp_ms=None, return_inst=False, headers=None): topic = topic or self.topic ret = [] producer = AIOKafkaProducer( @@ -275,7 +275,7 @@ def send_messages(self, partition, messages, *, topic=None, msg = str(msg).encode() future = yield from producer.send( topic, msg, partition=partition, - timestamp_ms=timestamp_ms) + timestamp_ms=timestamp_ms, headers=headers) resp = yield from future self.assertEqual(resp.topic, topic) self.assertEqual(resp.partition, partition) diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 8893dd52..b8ea1622 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1886,3 +1886,15 @@ def test_consumer_propagates_commit_refresh_errors(self): refresh_event = subscription.assignment.commit_refresh_needed self.assertTrue(refresh_event.is_set()) + + @kafka_versions('>=0.11.0') + @run_until_complete + def test_consumer_with_headers(self): + yield from self.send_messages( + 0, [0], headers=[("header1", b"17")]) + # Start a consumer_factory + consumer = yield from self.consumer_factory() + + message = yield from consumer.getone() + self.assertEqual(message.value, b"0") + self.assertEqual(message.headers, [("header1", b"17")]) From 70e4af66a1cde926e12dd1d6ec5abb54e823c3b1 Mon Sep 17 00:00:00 2001 From: Taras Date: Sun, 20 Jan 2019 07:44:32 +0100 Subject: [PATCH 3/3] Make headers a tuple to be hashable --- aiokafka/consumer/fetcher.py | 2 +- tests/test_consumer.py | 2 +- tests/test_fetcher.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/aiokafka/consumer/fetcher.py b/aiokafka/consumer/fetcher.py index e4a4b4f4..ca2e1121 100644 --- a/aiokafka/consumer/fetcher.py +++ b/aiokafka/consumer/fetcher.py @@ -292,7 +292,7 @@ def _consumer_record(self, tp, record): return ConsumerRecord( tp.topic, tp.partition, record.offset, record.timestamp, record.timestamp_type, key, value, record.checksum, - key_size, value_size, record.headers) + key_size, value_size, tuple(record.headers)) class Fetcher: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index b8ea1622..f1e56e85 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1897,4 +1897,4 @@ def test_consumer_with_headers(self): message = yield from consumer.getone() self.assertEqual(message.value, b"0") - self.assertEqual(message.headers, [("header1", b"17")]) + self.assertEqual(message.headers, (("header1", b"17"), )) diff --git a/tests/test_fetcher.py b/tests/test_fetcher.py index 02d68fea..deabe173 100644 --- a/tests/test_fetcher.py +++ b/tests/test_fetcher.py @@ -353,7 +353,7 @@ def _setup_error_after_data(self): messages = [ConsumerRecord( topic="some_topic", partition=1, offset=0, timestamp=0, timestamp_type=0, key=None, value=b"some", checksum=None, - serialized_key_size=0, serialized_value_size=4)] + serialized_key_size=0, serialized_value_size=4, headers=[])] partition_records = PartitionRecords( tp2, mock.Mock(), [], 0, None, None, False, READ_UNCOMMITTED)