Skip to content

Commit

Permalink
Don't pass loop explicitly in docs and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
ods committed Dec 4, 2020
1 parent 07e9bd3 commit 67db442
Show file tree
Hide file tree
Showing 27 changed files with 163 additions and 355 deletions.
15 changes: 5 additions & 10 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ Example of AIOKafkaProducer usage:
from aiokafka import AIOKafkaProducer
import asyncio
loop = asyncio.get_event_loop()
async def send_one():
producer = AIOKafkaProducer(
loop=loop, bootstrap_servers='localhost:9092')
producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
# Get cluster layout and initial topic/partition leadership information
await producer.start()
try:
Expand All @@ -39,14 +36,14 @@ Example of AIOKafkaProducer usage:
# Wait for all pending messages to be delivered or expire.
await producer.stop()
loop.run_until_complete(send_one())
asyncio.run(send_one())
AIOKafkaConsumer
****************

AIOKafkaConsumer is a high-level, asynchronous message consumer.
It interacts with the assigned Kafka Group Coordinator node to allow multiple
It interacts with the assigned Kafka Group Coordinator node to allow multiple
consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).

Example of AIOKafkaConsumer usage:
Expand All @@ -56,12 +53,10 @@ Example of AIOKafkaConsumer usage:
from aiokafka import AIOKafkaConsumer
import asyncio
loop = asyncio.get_event_loop()
async def consume():
consumer = AIOKafkaConsumer(
'my_topic', 'my_other_topic',
loop=loop, bootstrap_servers='localhost:9092',
bootstrap_servers='localhost:9092',
group_id="my-group")
# Get cluster layout and join group `my-group`
await consumer.start()
Expand All @@ -74,7 +69,7 @@ Example of AIOKafkaConsumer usage:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
loop.run_until_complete(consume())
asyncio.run(consume())
Running tests
-------------
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ async def close(self):

async def bootstrap(self):
"""Try to to bootstrap initial cluster metadata"""
assert self._loop is asyncio.get_event_loop(), (
assert self._loop is get_running_loop(), (
"Please create objects with the same loop as running with"
)
# using request v0 for bootstrap if not sure v1 is available
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/consumer/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ async def start(self):
* Wait for possible topic autocreation
* Join group if ``group_id`` provided
"""
assert self._loop is asyncio.get_event_loop(), (
assert self._loop is get_running_loop(), (
"Please create objects with the same loop as running with"
)
assert self._fetcher is None, "Did you call `start` twice?"
Expand Down
2 changes: 1 addition & 1 deletion aiokafka/producer/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ def __del__(self, _warnings=warnings):

async def start(self):
"""Connect to Kafka cluster and check server version"""
assert self._loop is asyncio.get_event_loop(), (
assert self._loop is get_running_loop(), (
"Please create objects with the same loop as running with"
)
log.debug("Starting the Kafka producer") # trace
Expand Down
19 changes: 2 additions & 17 deletions benchmark/simple_consume_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ async def bench_simple(self):
consumer = AIOKafkaConsumer(
topic, group_id="test_group", auto_offset_reset="earliest",
enable_auto_commit=False,
bootstrap_servers=self._bootstrap_servers,
loop=loop)
bootstrap_servers=self._bootstrap_servers)
await consumer.start()

# We start from after producer connect
Expand Down Expand Up @@ -116,21 +115,7 @@ def main():
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

loop = asyncio.get_event_loop()
task = loop.create_task(Benchmark(args).bench_simple())
task.add_done_callback(lambda _, loop=loop: loop.stop())

def signal_hndl(_task=task):
_task.cancel()
loop.add_signal_handler(signal.SIGTERM, signal_hndl)
loop.add_signal_handler(signal.SIGINT, signal_hndl)

try:
loop.run_forever()
finally:
loop.close()
if not task.cancelled():
task.result()
asyncio.run(Benchmark(args).bench_simple())


if __name__ == "__main__":
Expand Down
18 changes: 2 additions & 16 deletions benchmark/simple_produce_bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def bench_simple(self):
partition = self._partition
loop = asyncio.get_event_loop()

producer = AIOKafkaProducer(loop=loop, **self._producer_kwargs)
producer = AIOKafkaProducer(**self._producer_kwargs)
await producer.start()

# We start from after producer connect
Expand Down Expand Up @@ -139,21 +139,7 @@ def main():
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

loop = asyncio.get_event_loop()
task = loop.create_task(Benchmark(args).bench_simple())
task.add_done_callback(lambda _, loop=loop: loop.stop())

def signal_hndl(_task=task):
_task.cancel()
loop.add_signal_handler(signal.SIGTERM, signal_hndl)
loop.add_signal_handler(signal.SIGINT, signal_hndl)

try:
loop.run_forever()
finally:
loop.close()
if not task.cancelled():
task.result()
asyncio.run(Benchmark(args).bench_simple())


if __name__ == "__main__":
Expand Down
12 changes: 4 additions & 8 deletions docker/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import argparse


@asyncio.coroutine
def build(versions_file, args, *, loop):
async def build(versions_file, args, *, loop):

with open(versions_file) as f:
config = yaml.load(f.read())
Expand All @@ -21,18 +20,16 @@ def build(versions_file, args, *, loop):
action=action,
image_name=config['image_name'],
**version_map))
proc = yield from asyncio.create_subprocess_exec(*args, loop=loop)
proc = await asyncio.create_subprocess_exec(*args)
procs.append(proc.wait())

res = yield from asyncio.gather(*procs, loop=loop)
res = await asyncio.gather(*procs)
if any(res): # If any of statuses are not 0 return right away
return res
return res


if __name__ == '__main__':
loop = asyncio.get_event_loop()

parser = argparse.ArgumentParser(
description='Build and push images in parallel.')
parser.add_argument(
Expand All @@ -41,6 +38,5 @@ def build(versions_file, args, *, loop):

args = parser.parse_args()

statuses = loop.run_until_complete(build('config.yml', args, loop=loop))
loop.close()
statuses = asyncio.run(build('config.yml', args))
sys.exit(max(statuses))
Loading

0 comments on commit 67db442

Please sign in to comment.