Skip to content

Commit

Permalink
Merge pull request #471 from aio-libs/add_headers
Browse files Browse the repository at this point in the history
Add headers support
  • Loading branch information
tvoinarovskyi authored Jan 20, 2019
2 parents b9c1bca + 70e4af6 commit 88912fb
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 13 deletions.
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _consumer_record(self, tp, record):
return ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
record.timestamp_type, key, value, record.checksum,
key_size, value_size)
key_size, value_size, tuple(record.headers))


class Fetcher:
Expand Down
15 changes: 9 additions & 6 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, magic, batch_size, compression_type,
self._buffer = None
self._closed = False

def append(self, *, timestamp, key, value):
def append(self, *, timestamp, key, value, headers=[]):
"""Add a message to the batch.
Arguments:
Expand All @@ -49,7 +49,8 @@ def append(self, *, timestamp, key, value):
return None

metadata = self._builder.append(
self._relative_offset, timestamp, key, value, headers=[])
self._relative_offset, timestamp, key, value,
headers=headers)

# Check if we could add the message
if metadata is None:
Expand Down Expand Up @@ -124,7 +125,8 @@ def tp(self):
def record_count(self):
return self._builder.record_count()

def append(self, key, value, timestamp_ms, _create_future=create_future):
def append(self, key, value, timestamp_ms, _create_future=create_future,
headers=[]):
"""Append message (key and value) to batch
Returns:
Expand All @@ -133,7 +135,7 @@ def append(self, key, value, timestamp_ms, _create_future=create_future):
asyncio.Future that will resolved when message is delivered
"""
metadata = self._builder.append(
timestamp=timestamp_ms, key=key, value=value)
timestamp=timestamp_ms, key=key, value=value, headers=headers)
if metadata is None:
return None

Expand Down Expand Up @@ -310,7 +312,8 @@ def close(self):
yield from self.flush()

@asyncio.coroutine
def add_message(self, tp, key, value, timeout, timestamp_ms=None):
def add_message(self, tp, key, value, timeout, timestamp_ms=None,
headers=[]):
""" Add message to batch by topic-partition
If batch is already full this method waits (`timeout` seconds maximum)
until batch is drained by send task
Expand All @@ -329,7 +332,7 @@ def add_message(self, tp, key, value, timeout, timestamp_ms=None):
else:
batch = pending_batches[-1]

future = batch.append(key, value, timestamp_ms)
future = batch.append(key, value, timestamp_ms, headers=headers)
if future is None:
# Batch is full, can't append data atm,
# waiting until batch per topic-partition is drained
Expand Down
12 changes: 10 additions & 2 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def _partition(self, topic, partition, key, value,

@asyncio.coroutine
def send(self, topic, value=None, key=None, partition=None,
timestamp_ms=None):
timestamp_ms=None, headers=None):
"""Publish a message to a topic.
Arguments:
Expand Down Expand Up @@ -419,6 +419,14 @@ def send(self, topic, value=None, key=None, partition=None,
raise IllegalOperation(
"Can't send messages while not in transaction")

if headers is not None:
if self.client.api_version < (0, 11):
raise UnsupportedVersionError(
"Headers not supported before Kafka 0.11")
else:
# Record parser/builder support only list type, no explicit None
headers = []

key_bytes, value_bytes = self._serialize(topic, key, value)
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
Expand All @@ -428,7 +436,7 @@ def send(self, topic, value=None, key=None, partition=None,

fut = yield from self._message_accumulator.add_message(
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000,
timestamp_ms=timestamp_ms)
timestamp_ms=timestamp_ms, headers=headers)
return fut

@asyncio.coroutine
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
ConsumerRecord = collections.namedtuple(
"ConsumerRecord", ["topic", "partition", "offset", "timestamp",
"timestamp_type", "key", "value", "checksum",
"serialized_key_size", "serialized_value_size"])
"serialized_key_size", "serialized_value_size",
"headers"])

OffsetAndTimestamp = collections.namedtuple(
"OffsetAndTimestamp", ["offset", "timestamp"])
4 changes: 2 additions & 2 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def wait_topic(self, client, topic):

@asyncio.coroutine
def send_messages(self, partition, messages, *, topic=None,
timestamp_ms=None, return_inst=False):
timestamp_ms=None, return_inst=False, headers=None):
topic = topic or self.topic
ret = []
producer = AIOKafkaProducer(
Expand All @@ -275,7 +275,7 @@ def send_messages(self, partition, messages, *, topic=None,
msg = str(msg).encode()
future = yield from producer.send(
topic, msg, partition=partition,
timestamp_ms=timestamp_ms)
timestamp_ms=timestamp_ms, headers=headers)
resp = yield from future
self.assertEqual(resp.topic, topic)
self.assertEqual(resp.partition, partition)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1886,3 +1886,15 @@ def test_consumer_propagates_commit_refresh_errors(self):

refresh_event = subscription.assignment.commit_refresh_needed
self.assertTrue(refresh_event.is_set())

@kafka_versions('>=0.11.0')
@run_until_complete
def test_consumer_with_headers(self):
yield from self.send_messages(
0, [0], headers=[("header1", b"17")])
# Start a consumer_factory
consumer = yield from self.consumer_factory()

message = yield from consumer.getone()
self.assertEqual(message.value, b"0")
self.assertEqual(message.headers, (("header1", b"17"), ))
2 changes: 1 addition & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def _setup_error_after_data(self):
messages = [ConsumerRecord(
topic="some_topic", partition=1, offset=0, timestamp=0,
timestamp_type=0, key=None, value=b"some", checksum=None,
serialized_key_size=0, serialized_value_size=4)]
serialized_key_size=0, serialized_value_size=4, headers=[])]
partition_records = PartitionRecords(
tp2, mock.Mock(), [], 0,
None, None, False, READ_UNCOMMITTED)
Expand Down
26 changes: 26 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,3 +713,29 @@ def test_producer_sender_errors_propagate_to_producer(self):
KafkaError, "Unexpected error during batch delivery"):
yield from producer.send_and_wait(
self.topic, b'hello, Kafka!')

@kafka_versions('>=0.11.0')
@run_until_complete
def test_producer_send_with_headers(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

fut = yield from producer.send(
self.topic, b'msg', partition=0, headers=[("type", b"Normal")])
resp = yield from fut
self.assertEqual(resp.partition, 0)

@kafka_versions('<0.11.0')
@run_until_complete
def test_producer_send_with_headers_raise_error(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

with self.assertRaises(UnsupportedVersionError):
yield from producer.send(
self.topic, b'msg', partition=0,
headers=[("type", b"Normal")])

0 comments on commit 88912fb

Please sign in to comment.