Skip to content

Commit

Permalink
Merge pull request #66 from aio-libs/pre0.2.0
Browse files Browse the repository at this point in the history
Randomize topic order in fetch requests
  • Loading branch information
tvoinarovskyi authored Nov 10, 2016
2 parents 9da98b7 + 317d339 commit c49752d
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 1 deletion.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ env:
matrix:
- KAFKA_VERSION=0.9.0.1 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1
- KAFKA_VERSION=0.10.0.0 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1
- KAFKA_VERSION=0.10.1.0 SCALA_VERSION=2.11 PYTHONASYNCIODEBUG=1

before_install:
- sudo apt-get install -y libsnappy-dev
Expand Down
8 changes: 7 additions & 1 deletion aiokafka/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
import collections
import logging
import random
from itertools import chain

import kafka.common as Errors
Expand Down Expand Up @@ -314,14 +315,19 @@ def _create_fetch_requests(self):

requests = []
for node_id, partition_data in fetchable.items():
# Shuffle partition data to help get more equal consumption
partition_data = list(partition_data.items())
random.shuffle(partition_data) # shuffle topics
for _, partition in partition_data:
random.shuffle(partition) # shuffle partitions
if node_id in backoff_by_nodes:
# At least one partition is still waiting to be consumed
continue
req = self._fetch_request_class(
-1, # replica_id
self._fetch_max_wait_ms,
self._fetch_min_bytes,
partition_data.items())
partition_data)
requests.append((node_id, req))
if backoff_by_nodes:
# Return min time til any node will be ready to send event
Expand Down
1 change: 1 addition & 0 deletions aiokafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ def _sender_routine(self):
# wait when:
# * At least one of produce task is finished
# * Data for new partition arrived
# * Metadata update if partition leader unknown
done, _ = yield from asyncio.wait(
waiters,
return_when=asyncio.FIRST_COMPLETED,
Expand Down
3 changes: 3 additions & 0 deletions docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ versions:
-
kafka: "0.10.0.0"
scala: "2.11"
-
kafka: "0.10.1.0"
scala: "2.11"
21 changes: 21 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,3 +445,24 @@ def test_check_extended_message_record(self):
self.assertEqual(rmsg1.timestamp, None)
self.assertEqual(rmsg1.timestamp_type, None)
yield from consumer.stop()

@run_until_complete
def test_equal_consumption(self):
# A strange use case of kafka-python, that can be reproduced in
# aiokafka https://github.com/dpkp/kafka-python/issues/675
yield from self.send_messages(0, list(range(200)))
yield from self.send_messages(1, list(range(200)))

partition_consumption = [0, 0]
for x in range(10):
consumer = yield from self.consumer_factory(
max_partition_fetch_bytes=10000)
for x in range(10):
msg = yield from consumer.getone()
partition_consumption[msg.partition] += 1
yield from consumer.stop()

diff = abs(partition_consumption[0] - partition_consumption[1])
# We are good as long as it's not 100%, as we do rely on randomness of
# a shuffle in code. Ideally it should be 50/50 (0 diff) thou
self.assertNotEqual(diff / sum(partition_consumption), 1.0)

0 comments on commit c49752d

Please sign in to comment.