Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add headers support #471

Merged
merged 3 commits into from
Jan 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiokafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ def _consumer_record(self, tp, record):
return ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
record.timestamp_type, key, value, record.checksum,
key_size, value_size)
key_size, value_size, tuple(record.headers))


class Fetcher:
Expand Down
15 changes: 9 additions & 6 deletions aiokafka/producer/message_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(self, magic, batch_size, compression_type,
self._buffer = None
self._closed = False

def append(self, *, timestamp, key, value):
def append(self, *, timestamp, key, value, headers=[]):
"""Add a message to the batch.
Arguments:
Expand All @@ -49,7 +49,8 @@ def append(self, *, timestamp, key, value):
return None

metadata = self._builder.append(
self._relative_offset, timestamp, key, value, headers=[])
self._relative_offset, timestamp, key, value,
headers=headers)

# Check if we could add the message
if metadata is None:
Expand Down Expand Up @@ -124,7 +125,8 @@ def tp(self):
def record_count(self):
return self._builder.record_count()

def append(self, key, value, timestamp_ms, _create_future=create_future):
def append(self, key, value, timestamp_ms, _create_future=create_future,
headers=[]):
"""Append message (key and value) to batch
Returns:
Expand All @@ -133,7 +135,7 @@ def append(self, key, value, timestamp_ms, _create_future=create_future):
asyncio.Future that will resolved when message is delivered
"""
metadata = self._builder.append(
timestamp=timestamp_ms, key=key, value=value)
timestamp=timestamp_ms, key=key, value=value, headers=headers)
if metadata is None:
return None

Expand Down Expand Up @@ -310,7 +312,8 @@ def close(self):
yield from self.flush()

@asyncio.coroutine
def add_message(self, tp, key, value, timeout, timestamp_ms=None):
def add_message(self, tp, key, value, timeout, timestamp_ms=None,
headers=[]):
""" Add message to batch by topic-partition
If batch is already full this method waits (`timeout` seconds maximum)
until batch is drained by send task
Expand All @@ -329,7 +332,7 @@ def add_message(self, tp, key, value, timeout, timestamp_ms=None):
else:
batch = pending_batches[-1]

future = batch.append(key, value, timestamp_ms)
future = batch.append(key, value, timestamp_ms, headers=headers)
if future is None:
# Batch is full, can't append data atm,
# waiting until batch per topic-partition is drained
Expand Down
12 changes: 10 additions & 2 deletions aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ def _partition(self, topic, partition, key, value,

@asyncio.coroutine
def send(self, topic, value=None, key=None, partition=None,
timestamp_ms=None):
timestamp_ms=None, headers=None):
"""Publish a message to a topic.
Arguments:
Expand Down Expand Up @@ -419,6 +419,14 @@ def send(self, topic, value=None, key=None, partition=None,
raise IllegalOperation(
"Can't send messages while not in transaction")

if headers is not None:
if self.client.api_version < (0, 11):
raise UnsupportedVersionError(
"Headers not supported before Kafka 0.11")
else:
# Record parser/builder support only list type, no explicit None
headers = []

key_bytes, value_bytes = self._serialize(topic, key, value)
partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
Expand All @@ -428,7 +436,7 @@ def send(self, topic, value=None, key=None, partition=None,

fut = yield from self._message_accumulator.add_message(
tp, key_bytes, value_bytes, self._request_timeout_ms / 1000,
timestamp_ms=timestamp_ms)
timestamp_ms=timestamp_ms, headers=headers)
return fut

@asyncio.coroutine
Expand Down
3 changes: 2 additions & 1 deletion aiokafka/structs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
ConsumerRecord = collections.namedtuple(
"ConsumerRecord", ["topic", "partition", "offset", "timestamp",
"timestamp_type", "key", "value", "checksum",
"serialized_key_size", "serialized_value_size"])
"serialized_key_size", "serialized_value_size",
"headers"])

OffsetAndTimestamp = collections.namedtuple(
"OffsetAndTimestamp", ["offset", "timestamp"])
4 changes: 2 additions & 2 deletions tests/_testutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ def wait_topic(self, client, topic):

@asyncio.coroutine
def send_messages(self, partition, messages, *, topic=None,
timestamp_ms=None, return_inst=False):
timestamp_ms=None, return_inst=False, headers=None):
topic = topic or self.topic
ret = []
producer = AIOKafkaProducer(
Expand All @@ -275,7 +275,7 @@ def send_messages(self, partition, messages, *, topic=None,
msg = str(msg).encode()
future = yield from producer.send(
topic, msg, partition=partition,
timestamp_ms=timestamp_ms)
timestamp_ms=timestamp_ms, headers=headers)
resp = yield from future
self.assertEqual(resp.topic, topic)
self.assertEqual(resp.partition, partition)
Expand Down
12 changes: 12 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -1886,3 +1886,15 @@ def test_consumer_propagates_commit_refresh_errors(self):

refresh_event = subscription.assignment.commit_refresh_needed
self.assertTrue(refresh_event.is_set())

@kafka_versions('>=0.11.0')
@run_until_complete
def test_consumer_with_headers(self):
yield from self.send_messages(
0, [0], headers=[("header1", b"17")])
# Start a consumer_factory
consumer = yield from self.consumer_factory()

message = yield from consumer.getone()
self.assertEqual(message.value, b"0")
self.assertEqual(message.headers, (("header1", b"17"), ))
2 changes: 1 addition & 1 deletion tests/test_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def _setup_error_after_data(self):
messages = [ConsumerRecord(
topic="some_topic", partition=1, offset=0, timestamp=0,
timestamp_type=0, key=None, value=b"some", checksum=None,
serialized_key_size=0, serialized_value_size=4)]
serialized_key_size=0, serialized_value_size=4, headers=[])]
partition_records = PartitionRecords(
tp2, mock.Mock(), [], 0,
None, None, False, READ_UNCOMMITTED)
Expand Down
26 changes: 26 additions & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -713,3 +713,29 @@ def test_producer_sender_errors_propagate_to_producer(self):
KafkaError, "Unexpected error during batch delivery"):
yield from producer.send_and_wait(
self.topic, b'hello, Kafka!')

@kafka_versions('>=0.11.0')
@run_until_complete
def test_producer_send_with_headers(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

fut = yield from producer.send(
self.topic, b'msg', partition=0, headers=[("type", b"Normal")])
resp = yield from fut
self.assertEqual(resp.partition, 0)

@kafka_versions('<0.11.0')
@run_until_complete
def test_producer_send_with_headers_raise_error(self):
producer = AIOKafkaProducer(
loop=self.loop, bootstrap_servers=self.hosts)
yield from producer.start()
self.add_cleanup(producer.stop)

with self.assertRaises(UnsupportedVersionError):
yield from producer.send(
self.topic, b'msg', partition=0,
headers=[("type", b"Normal")])