Skip to content

Commit

Permalink
Add support for headers in Consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
tvoinarovskyi committed Jan 20, 2019
1 parent 73f3323 commit 2777c9f
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 4 deletions.
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _consumer_record(self, tp, record):
return ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
record.timestamp_type, key, value, record.checksum,
key_size, value_size)
key_size, value_size, record.headers)


class Fetcher:
Expand Down
3 changes: 3 additions & 0 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,9 @@ def send(self, topic, value=None, key=None, partition=None,
if self.client.api_version < (0, 11):
raise UnsupportedVersionError(
"Headers not supported before Kafka 0.11")
else:
# Record parser/builder support only list type, no explicit None
headers = []

key_bytes, value_bytes = self._serialize(topic, key, value)
partition = self._partition(topic, partition, key, value,
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
ConsumerRecord = collections.namedtuple(
"ConsumerRecord", ["topic", "partition", "offset", "timestamp",
"timestamp_type", "key", "value", "checksum",
"serialized_key_size", "serialized_value_size"])
"serialized_key_size", "serialized_value_size",
"headers"])

OffsetAndTimestamp = collections.namedtuple(
"OffsetAndTimestamp", ["offset", "timestamp"])
4 changes: 2 additions & 2 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def wait_topic(self, client, topic):

@asyncio.coroutine
def send_messages(self, partition, messages, *, topic=None,
timestamp_ms=None, return_inst=False):
timestamp_ms=None, return_inst=False, headers=None):
topic = topic or self.topic
ret = []
producer = AIOKafkaProducer(
Expand All @@ -275,7 +275,7 @@ def send_messages(self, partition, messages, *, topic=None,
msg = str(msg).encode()
future = yield from producer.send(
topic, msg, partition=partition,
timestamp_ms=timestamp_ms)
timestamp_ms=timestamp_ms, headers=headers)
resp = yield from future
self.assertEqual(resp.topic, topic)
self.assertEqual(resp.partition, partition)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1886,3 +1886,15 @@ def test_consumer_propagates_commit_refresh_errors(self):

refresh_event = subscription.assignment.commit_refresh_needed
self.assertTrue(refresh_event.is_set())

@kafka_versions('>=0.11.0')
@run_until_complete
def test_consumer_with_headers(self):
yield from self.send_messages(
0, [0], headers=[("header1", b"17")])
# Start a consumer_factory
consumer = yield from self.consumer_factory()

message = yield from consumer.getone()
self.assertEqual(message.value, b"0")
self.assertEqual(message.headers, [("header1", b"17")])

0 comments on commit 2777c9f

Please sign in to comment.