diff --git a/.travis.yml b/.travis.yml index f4276fbc..901f15d5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,6 +12,7 @@ env: matrix: - KAFKA_VERSION=0.9.0.1 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1 - KAFKA_VERSION=0.10.0.0 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1 + - KAFKA_VERSION=0.10.1.0 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1 before_install: - sudo apt-get install -y libsnappy-dev diff --git a/aiokafka/fetcher.py b/aiokafka/fetcher.py index 4d92a4b0..11d46e2e 100644 --- a/aiokafka/fetcher.py +++ b/aiokafka/fetcher.py @@ -1,6 +1,7 @@ import asyncio import collections import logging +import random from itertools import chain import kafka.common as Errors @@ -314,6 +315,11 @@ def _create_fetch_requests(self): requests = [] for node_id, partition_data in fetchable.items(): + # Shuffle partition data to help get more equal consumption + partition_data = list(partition_data.items()) + random.shuffle(partition_data) # shuffle topics + for _, partition in partition_data: + random.shuffle(partition) # shuffle partitions if node_id in backoff_by_nodes: # At least one partition is still waiting to be consumed continue @@ -321,7 +327,7 @@ def _create_fetch_requests(self): -1, # replica_id self._fetch_max_wait_ms, self._fetch_min_bytes, - partition_data.items()) + partition_data) requests.append((node_id, req)) if backoff_by_nodes: # Return min time til any node will be ready to send event diff --git a/aiokafka/producer.py b/aiokafka/producer.py index d62c73ff..15ed7e2d 100644 --- a/aiokafka/producer.py +++ b/aiokafka/producer.py @@ -335,6 +335,7 @@ def _sender_routine(self): # wait when: # * At least one of produce task is finished # * Data for new partition arrived + # * Metadata update if partition leader unknown done, _ = yield from asyncio.wait( waiters, return_when=asyncio.FIRST_COMPLETED, diff --git a/docker/config.yml b/docker/config.yml index 99e3cbee..4b957cf1 100644 --- a/docker/config.yml +++ b/docker/config.yml @@ -6,3 +6,6 @@ versions: - kafka: "0.10.0.0" scala: "2.11" + - + kafka: "0.10.1.0" + scala: "2.11" \ No newline at end of file diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 633197d7..844d16f0 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -445,3 +445,24 @@ def test_check_extended_message_record(self): self.assertEqual(rmsg1.timestamp, None) self.assertEqual(rmsg1.timestamp_type, None) yield from consumer.stop() + + @run_until_complete + def test_equal_consumption(self): + # A strange use case of kafka-python, that can be reproduced in + # aiokafka https://github.com/dpkp/kafka-python/issues/675 + yield from self.send_messages(0, list(range(200))) + yield from self.send_messages(1, list(range(200))) + + partition_consumption = [0, 0] + for x in range(10): + consumer = yield from self.consumer_factory( + max_partition_fetch_bytes=10000) + for x in range(10): + msg = yield from consumer.getone() + partition_consumption[msg.partition] += 1 + yield from consumer.stop() + + diff = abs(partition_consumption[0] - partition_consumption[1]) + # We are good as long as it's not 100%, as we do rely on randomness of + # a shuffle in code. Ideally it should be 50/50 (0 diff) thou + self.assertNotEqual(diff / sum(partition_consumption), 1.0)