diff --git a/README.rst b/README.rst index 40491fc95..44d1545ae 100644 --- a/README.rst +++ b/README.rst @@ -25,11 +25,8 @@ Example of AIOKafkaProducer usage: from aiokafka import AIOKafkaProducer import asyncio - loop = asyncio.get_event_loop() - async def send_one(): - producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092') + producer = AIOKafkaProducer(bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: @@ -39,14 +36,14 @@ Example of AIOKafkaProducer usage: # Wait for all pending messages to be delivered or expire. await producer.stop() - loop.run_until_complete(send_one()) + asyncio.run(send_one()) AIOKafkaConsumer **************** AIOKafkaConsumer is a high-level, asynchronous message consumer. -It interacts with the assigned Kafka Group Coordinator node to allow multiple +It interacts with the assigned Kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0). Example of AIOKafkaConsumer usage: @@ -56,12 +53,10 @@ Example of AIOKafkaConsumer usage: from aiokafka import AIOKafkaConsumer import asyncio - loop = asyncio.get_event_loop() - async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() @@ -74,7 +69,7 @@ Example of AIOKafkaConsumer usage: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() - loop.run_until_complete(consume()) + asyncio.run(consume()) Running tests ------------- diff --git a/aiokafka/client.py b/aiokafka/client.py index 7acd53256..00ae9aedd 100644 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -183,7 +183,7 @@ async def close(self): async def bootstrap(self): """Try to to bootstrap initial cluster metadata""" - assert self._loop is asyncio.get_event_loop(), ( + assert self._loop is get_running_loop(), ( "Please create objects with the same loop as running with" ) # using request v0 for bootstrap if not sure v1 is available diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index b5cd8e2f4..24d64d770 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -334,7 +334,7 @@ async def start(self): * Wait for possible topic autocreation * Join group if ``group_id`` provided """ - assert self._loop is asyncio.get_event_loop(), ( + assert self._loop is get_running_loop(), ( "Please create objects with the same loop as running with" ) assert self._fetcher is None, "Did you call `start` twice?" diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py index 98aa95096..964054859 100644 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -283,7 +283,7 @@ def __del__(self, _warnings=warnings): async def start(self): """Connect to Kafka cluster and check server version""" - assert self._loop is asyncio.get_event_loop(), ( + assert self._loop is get_running_loop(), ( "Please create objects with the same loop as running with" ) log.debug("Starting the Kafka producer") # trace diff --git a/benchmark/simple_consume_bench.py b/benchmark/simple_consume_bench.py index fbcde90f9..420eb93bc 100644 --- a/benchmark/simple_consume_bench.py +++ b/benchmark/simple_consume_bench.py @@ -50,8 +50,7 @@ async def bench_simple(self): consumer = AIOKafkaConsumer( topic, group_id="test_group", auto_offset_reset="earliest", enable_auto_commit=False, - bootstrap_servers=self._bootstrap_servers, - loop=loop) + bootstrap_servers=self._bootstrap_servers) await consumer.start() # We start from after producer connect @@ -116,21 +115,7 @@ def main(): import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - loop = asyncio.get_event_loop() - task = loop.create_task(Benchmark(args).bench_simple()) - task.add_done_callback(lambda _, loop=loop: loop.stop()) - - def signal_hndl(_task=task): - _task.cancel() - loop.add_signal_handler(signal.SIGTERM, signal_hndl) - loop.add_signal_handler(signal.SIGINT, signal_hndl) - - try: - loop.run_forever() - finally: - loop.close() - if not task.cancelled(): - task.result() + asyncio.run(Benchmark(args).bench_simple()) if __name__ == "__main__": diff --git a/benchmark/simple_produce_bench.py b/benchmark/simple_produce_bench.py index 2bc364f04..b1a81eadf 100644 --- a/benchmark/simple_produce_bench.py +++ b/benchmark/simple_produce_bench.py @@ -65,7 +65,7 @@ async def bench_simple(self): partition = self._partition loop = asyncio.get_event_loop() - producer = AIOKafkaProducer(loop=loop, **self._producer_kwargs) + producer = AIOKafkaProducer(**self._producer_kwargs) await producer.start() # We start from after producer connect @@ -139,21 +139,7 @@ def main(): import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) - loop = asyncio.get_event_loop() - task = loop.create_task(Benchmark(args).bench_simple()) - task.add_done_callback(lambda _, loop=loop: loop.stop()) - - def signal_hndl(_task=task): - _task.cancel() - loop.add_signal_handler(signal.SIGTERM, signal_hndl) - loop.add_signal_handler(signal.SIGINT, signal_hndl) - - try: - loop.run_forever() - finally: - loop.close() - if not task.cancelled(): - task.result() + asyncio.run(Benchmark(args).bench_simple()) if __name__ == "__main__": diff --git a/docker/build.py b/docker/build.py index a70eade6b..c95c6689b 100644 --- a/docker/build.py +++ b/docker/build.py @@ -5,8 +5,7 @@ import argparse -@asyncio.coroutine -def build(versions_file, args, *, loop): +async def build(versions_file, args, *, loop): with open(versions_file) as f: config = yaml.load(f.read()) @@ -21,18 +20,16 @@ def build(versions_file, args, *, loop): action=action, image_name=config['image_name'], **version_map)) - proc = yield from asyncio.create_subprocess_exec(*args, loop=loop) + proc = await asyncio.create_subprocess_exec(*args) procs.append(proc.wait()) - res = yield from asyncio.gather(*procs, loop=loop) + res = await asyncio.gather(*procs) if any(res): # If any of statuses are not 0 return right away return res return res if __name__ == '__main__': - loop = asyncio.get_event_loop() - parser = argparse.ArgumentParser( description='Build and push images in parallel.') parser.add_argument( @@ -41,6 +38,5 @@ def build(versions_file, args, *, loop): args = parser.parse_args() - statuses = loop.run_until_complete(build('config.yml', args, loop=loop)) - loop.close() + statuses = asyncio.run(build('config.yml', args)) sys.exit(max(statuses)) diff --git a/docs/consumer.rst b/docs/consumer.rst index 6cc1bdbd5..6fea40c50 100644 --- a/docs/consumer.rst +++ b/docs/consumer.rst @@ -10,7 +10,7 @@ from a Kafka cluster. Most simple usage would be:: consumer = aiokafka.AIOKafkaConsumer( "my_topic", - loop=loop, bootstrap_servers='localhost:9092' + bootstrap_servers='localhost:9092' ) await consumer.start() try: @@ -24,7 +24,7 @@ from a Kafka cluster. Most simple usage would be:: await consumer.stop() .. note:: ``msg.value`` and ``msg.key`` are raw bytes, use **key_deserializer** - and **value_deserializer** configuration if you need to decode them. + and **value_deserializer** configuration if you need to decode them. .. note:: **Consumer** maintains TCP connections as well as a few background tasks to fetch data and coordinate assignments. Failure to call @@ -41,7 +41,7 @@ balance consumption using **Consumer Groups**. Offsets and Consumer Position ----------------------------- -Kafka maintains a numerical *offset* for each record in a partition. This +Kafka maintains a numerical *offset* for each record in a partition. This *offset* acts as a `unique identifier` of a record within that partition and also denotes the *position* of the consumer in the partition. For example:: @@ -62,8 +62,8 @@ also denotes the *position* of the consumer in the partition. For example:: to set ``group_id`` to something other than ``None``. See `Consumer Groups and Topic Subscriptions`_ below. -Here if the consumer is at *position* **5** it has consumed records with -*offsets* **0** through **4** and will next receive the record with +Here if the consumer is at *position* **5** it has consumed records with +*offsets* **0** through **4** and will next receive the record with *offset* **5**. There are actually two *notions of position*: @@ -89,7 +89,7 @@ For most simple use cases auto committing is probably the best choice:: consumer = AIOKafkaConsumer( "my_topic", - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my_group", # Consumer must be in a group to commit enable_auto_commit=True, # Is True by default anyway auto_commit_interval_ms=1000, # Autocommit every second @@ -108,7 +108,7 @@ batch operations you should use *manual commit*:: consumer = AIOKafkaConsumer( "my_topic", - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my_group", # Consumer must be in a group to commit enable_auto_commit=False, # Will disable autocommit auto_offset_reset="earliest", # If committed offset not found, start @@ -134,7 +134,7 @@ batch operations you should use *manual commit*:: This example will hold on to messages until we have enough to process in bulk. The algorithm can be enhanced by taking advantage of: - * ``await consumer.getmany()`` to avoid multiple calls to get a batch of + * ``await consumer.getmany()`` to avoid multiple calls to get a batch of messages. * ``await consumer.highwater(partition)`` to understand if we have more unconsumed messages or this one is the last one in the partition. @@ -168,7 +168,7 @@ start from `latest` offset:: consumer = AIOKafkaConsumer( "my_topic", - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', auto_offset_reset="latest", ) await consumer.start() @@ -205,9 +205,9 @@ records, but rather just skip to the most recent records. Or you can use *Another use case* is for a **system that maintains local state**. In such a system the consumer will want to initialize its position on startup to -whatever is contained in the local store. Likewise, if the local state is +whatever is contained in the local store. Likewise, if the local state is destroyed (say because the disk is lost) the state may be recreated on a new -machine by re-consuming all the data and recreating the state (assuming that +machine by re-consuming all the data and recreating the state (assuming that Kafka is retaining sufficient history). See also related configuration params and API docs: @@ -243,7 +243,7 @@ counts in Redis:: tp = TopicPartition("my_topic", 0) consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', enable_auto_commit=False, ) await consumer.start() @@ -293,14 +293,14 @@ Consumer Groups and Topic Subscriptions Kafka uses the concept of **Consumer Groups** to allow a pool of processes to divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to -provide scalability and fault tolerance for processing. +provide scalability and fault tolerance for processing. All **Consumer** instances sharing the same ``group_id`` will be part of the same **Consumer Group**:: # Process 1 consumer = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9092', + "my_topic", bootstrap_servers='localhost:9092', group_id="MyGreatConsumerGroup" # This will enable Consumer Groups ) await consumer.start() @@ -310,7 +310,7 @@ same **Consumer Group**:: # Process 2 consumer2 = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9092', + "my_topic", bootstrap_servers='localhost:9092', group_id="MyGreatConsumerGroup" # This will enable Consumer Groups ) await consumer2.start() @@ -328,13 +328,13 @@ consumer** in the group. So if there is a topic with *four* partitions and a consumer group with *two* processes, each process would consume from *two* partitions. -Membership in a consumer group is maintained dynamically: if a process fails, -the partitions assigned to it `will be reassigned to other consumers` in the -same group. Similarly, if a new consumer joins the group, partitions will be -`moved from existing consumers to the new one`. This is known as **rebalancing +Membership in a consumer group is maintained dynamically: if a process fails, +the partitions assigned to it `will be reassigned to other consumers` in the +same group. Similarly, if a new consumer joins the group, partitions will be +`moved from existing consumers to the new one`. This is known as **rebalancing the group**. -.. note:: Conceptually you can think of a **Consumer Group** as being a `single +.. note:: Conceptually you can think of a **Consumer Group** as being a `single logical subscriber` that happens to be made up of multiple processes. In addition, when group reassignment happens automatically, consumers can be @@ -369,7 +369,7 @@ etc. See :meth:`aiokafka.AIOKafkaConsumer.subscribe` docs for more details. You need to put ``consumer.getmany(timeout_ms=1000)`` call outside of the lock. -For more information on how **Consumer Groups** are organized see +For more information on how **Consumer Groups** are organized see `Official Kafka Docs `_. @@ -381,7 +381,7 @@ notice when new partitions are added to one of the subscribed topics or when a new topic matching a *subscribed regex* is created. For example:: consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', metadata_max_age_ms=30000, # This controls the polling interval ) await consumer.start() @@ -393,7 +393,7 @@ new topic matching a *subscribed regex* is created. For example:: Here **Consumer** will automatically detect new topics like ``MyGreatTopic-1`` or ``MyGreatTopic-2`` and start consuming them. -If you use **Consumer Groups** the group's *Leader* will trigger a +If you use **Consumer Groups** the group's *Leader* will trigger a **group rebalance** when it notices metadata changes. It's because only the *Leader* has full knowledge of which topics are assigned to the group. @@ -401,12 +401,12 @@ If you use **Consumer Groups** the group's *Leader* will trigger a Manual partition assignment ^^^^^^^^^^^^^^^^^^^^^^^^^^^ -It is also possible for the consumer to manually assign specific partitions +It is also possible for the consumer to manually assign specific partitions using ``assign([tp1, tp2])``. In this case, dynamic partition assignment and consumer group coordination will be disabled. For example:: consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092' + bootstrap_servers='localhost:9092' ) tp1 = TopicPartition("my_topic", 1) tp2 = TopicPartition("my_topic", 2) @@ -415,10 +415,10 @@ consumer group coordination will be disabled. For example:: async for msg in consumer: print("Consumed msg %s %s %s", msg.topic, msg.partition, msg.value) -``group_id`` can still be used for committing position, but be careful to +``group_id`` can still be used for committing position, but be careful to avoid **collisions** with multiple instances sharing the same group. -It is not possible to mix manual partition assignment ``consumer.assign()`` +It is not possible to mix manual partition assignment ``consumer.assign()`` and topic subscription ``consumer.subscribe()``. An attempt to do so will result in an ``IllegalStateError``. @@ -450,7 +450,7 @@ catch up). For example:: if position_lag > POSITION_THRESHOLD or time_lag > TIME_THRESHOLD: partitions.append(partition) -.. note:: This interface differs from `pause()`/`resume()` interface of +.. note:: This interface differs from `pause()`/`resume()` interface of `kafka-python` and Java clients. Here we will consume all partitions if they do not lag behind, but if some @@ -481,7 +481,7 @@ the consumer's configuration:: consumer = aiokafka.AIOKafkaConsumer( "my_topic", - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', isolation_level="read_committed" ) await consumer.start() @@ -493,7 +493,7 @@ messages which have been successfully committed. It will continue to read non-transactional messages as before. There is no client-side buffering in `read_committed` mode. Instead, the end offset of a partition for a `read_committed` consumer would be the offset of the first message in the -partition belonging to an open transaction. This offset is known as the +partition belonging to an open transaction. This offset is known as the **Last Stable Offset** (LSO). A `read_committed` consumer will only read up to the LSO and filter out any @@ -520,7 +520,7 @@ which indicate the result of a transaction. There markers are not returned to applications, yet have an offset in the log. As a result, applications reading from topics with transactional messages will see gaps in the consumed offsets. These missing messages would be the transaction markers, and they are filtered -out for consumers in both isolation levels. Additionally, applications using +out for consumers in both isolation levels. Additionally, applications using `read_committed` consumers may also see gaps due to aborted transactions, since those messages would not be returned by the consumer and yet would have valid offsets. @@ -532,7 +532,7 @@ Detecting Consumer Failures People who worked with ``kafka-python`` or Java Client probably know that the ``poll()`` API is designed to ensure liveness of a **Consumer Group**. In other words, Consumer will only be considered alive if it consumes messages. -It's not the same for ``aiokafka``, for more details read +It's not the same for ``aiokafka``, for more details read :ref:`Difference between aiokafka and kafka-python `. ``aiokafka`` will join the group on ``consumer.start()`` and will send diff --git a/docs/examples/batch_produce.rst b/docs/examples/batch_produce.rst index 869dc3e64..17abf8924 100644 --- a/docs/examples/batch_produce.rst +++ b/docs/examples/batch_produce.rst @@ -16,9 +16,9 @@ Producer import random from aiokafka.producer import AIOKafkaProducer - async def send_many(num, loop): + async def send_many(num): topic = "my_topic" - producer = AIOKafkaProducer(loop=loop) + producer = AIOKafkaProducer() await producer.start() batch = producer.create_batch() @@ -43,9 +43,7 @@ Producer % (batch.record_count(), partition)) await producer.stop() - loop = asyncio.get_event_loop() - loop.run_until_complete(send_many(1000, loop)) - loop.close() + asyncio.run(send_many(1000)) Output (topic `my_topic` has 3 partitions): diff --git a/docs/examples/custom_partitioner.rst b/docs/examples/custom_partitioner.rst index 215772183..e85c98dd4 100644 --- a/docs/examples/custom_partitioner.rst +++ b/docs/examples/custom_partitioner.rst @@ -22,27 +22,23 @@ Producer return all_partitions[-1] return random.choice(all_partitions) - @asyncio.coroutine - def produce_one(producer, key, value): - future = yield from producer.send('foobar', value, key=key) - resp = yield from future + async def produce_one(producer, key, value): + future = await producer.send('foobar', value, key=key) + resp = await future print("'%s' produced in partition: %i"%(value.decode(), resp.partition)) - @asyncio.coroutine - def produce_task(loop): + async def produce_task(): producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', partitioner=my_partitioner) - yield from producer.start() - yield from produce_one(producer, b'last', b'1') - yield from produce_one(producer, b'some', b'2') - yield from produce_one(producer, b'first', b'3') - yield from producer.stop() + await producer.start() + await produce_one(producer, b'last', b'1') + await produce_one(producer, b'some', b'2') + await produce_one(producer, b'first', b'3') + await producer.stop() - loop = asyncio.get_event_loop() - loop.run_until_complete(produce_task(loop)) - loop.close() + asyncio.run(produce_task()) diff --git a/docs/examples/group_consumer.rst b/docs/examples/group_consumer.rst index 3065414a3..c14867854 100644 --- a/docs/examples/group_consumer.rst +++ b/docs/examples/group_consumer.rst @@ -22,14 +22,12 @@ Producer: import asyncio from aiokafka import AIOKafkaProducer - @asyncio.coroutine - def produce(loop, value, partition): - producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092') + async def produce(value, partition): + producer = AIOKafkaProducer(bootstrap_servers='localhost:9092') - yield from producer.start() - yield from producer.send('some-topic', value, partition=partition) - yield from producer.stop() + await producer.start() + await producer.send('some-topic', value, partition=partition) + await producer.stop() if len(sys.argv) != 3: print("usage: producer.py ") @@ -37,37 +35,36 @@ Producer: value = sys.argv[2].encode() partition = int(sys.argv[1]) - loop = asyncio.get_event_loop() - loop.run_until_complete(produce(loop, value, partition)) - loop.close() + asyncio.run(produce(value, partition)) Consumer: .. code:: python - + import sys import asyncio from aiokafka import AIOKafkaConsumer + async def consume(): + consumer = AIOKafkaConsumer( + 'some-topic', + group_id=group_id, + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest') + await consumer.start() + for _ in range(msg_cnt): + msg = await consumer.getone() + print(f"Message from partition [{msg.partition}]: {msg.value}") + await consumer.stop() + if len(sys.argv) < 3: print("usage: consumer.py ") sys.exit(1) group_id = sys.argv[1] msg_cnt = int(sys.argv[2]) - loop = asyncio.get_event_loop() - consumer = AIOKafkaConsumer( - 'some-topic', loop=loop, - group_id=group_id, - bootstrap_servers='localhost:9092', - auto_offset_reset='earliest') - loop.run_until_complete(consumer.start()) - for _ in range(msg_cnt): - msg = loop.run_until_complete(consumer.getone()) - print("Message from partition [{}]: {}".format(msg.partition, msg.value)) - loop.run_until_complete(consumer.stop()) - loop.close() + asyncio.run(consume(group_id, msg_cnt)) diff --git a/docs/examples/local_state_consumer.rst b/docs/examples/local_state_consumer.rst index 68524933e..dd292dccd 100644 --- a/docs/examples/local_state_consumer.rst +++ b/docs/examples/local_state_consumer.rst @@ -104,9 +104,9 @@ Local State consumer: local_state.dump_local_state() - async def consume(loop): + async def consume(): consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my_group", # Consumer must be in a group to commit enable_auto_commit=False, # Will disable autocommit auto_offset_reset="none", @@ -118,7 +118,7 @@ Local State consumer: listener = RebalanceListener(consumer, local_state) consumer.subscribe(topics=["test"], listener=listener) - save_task = loop.create_task(save_state_every_second(local_state)) + save_task = asyncio.create_task(save_state_every_second(local_state)) try: @@ -146,23 +146,8 @@ Local State consumer: await save_task - def main(async_main): - # Setup to properly handle KeyboardInterrupt exception - loop = asyncio.get_event_loop() - m_task = loop.create_task(async_main(loop)) - m_task.add_done_callback(lambda task, loop=loop: loop.stop()) - - try: - loop.run_forever() - except KeyboardInterrupt: - m_task.cancel() - loop.run_forever() - finally: - if not m_task.cancelled(): - m_task.result() - if __name__ == "__main__": - main(consume) + asyncio.run(consume()) There are several points of interest in this example: @@ -196,7 +181,7 @@ Process TopicPartition(topic='test', partition=2) 2 Output for 2nd consumer: ->>> python examples/local_state_consumer.py +>>> python examples/local_state_consumer.py Revoked set() Assigned {TopicPartition(topic='test', partition=1)} Process TopicPartition(topic='test', partition=1) 321 diff --git a/docs/examples/manual_commit.rst b/docs/examples/manual_commit.rst index 68f89b9ca..a64fa257b 100644 --- a/docs/examples/manual_commit.rst +++ b/docs/examples/manual_commit.rst @@ -26,20 +26,20 @@ Consumer: from kafka.common import KafkaError from aiokafka import AIOKafkaConsumer - loop = asyncio.get_event_loop() - consumer = AIOKafkaConsumer( - 'foobar', loop=loop, - bootstrap_servers='localhost:9092', - auto_offset_reset='earliest', - group_id="some-consumer-group", - enable_auto_commit=False) - loop.run_until_complete(consumer.start()) - # we want to consume 10 messages from "foobar" topic - # and commit after that - for i in range(10): - msg = loop.run_until_complete(consumer.getone()) - loop.run_until_complete(consumer.commit()) - - loop.run_until_complete(consumer.stop()) - loop.close() - + async def consume(): + consumer = AIOKafkaConsumer( + 'foobar', + bootstrap_servers='localhost:9092', + auto_offset_reset='earliest', + group_id="some-consumer-group", + enable_auto_commit=False) + await consumer.start() + # we want to consume 10 messages from "foobar" topic + # and commit after that + for i in range(10): + msg = await (consumer.getone() + await consumer.commit() + + await consumer.stop() + + asyncio.run(consume()) diff --git a/docs/examples/python35_examples.rst b/docs/examples/python35_examples.rst deleted file mode 100644 index 1c1b0f670..000000000 --- a/docs/examples/python35_examples.rst +++ /dev/null @@ -1,34 +0,0 @@ - -Python async for usage -====================== - -``aiokafka`` supports ``async for`` syntax and adds some sugar using -this syntax. - - -**You can use AIOKafkaConsumer as a simple async iterator** - -.. note:: - All not-critical errors will just be logged to `aiokafka` logger when using - async iterator. See `Errors handling` of :ref:`API Documentation ` section. - - -.. code:: python - - from aiokafka import AIOKafkaConsumer - import asyncio - - loop = asyncio.get_event_loop() - - async def consume(): - consumer = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9092') - # Get cluster layout and topic/partition allocation - await consumer.start() - try: - async for msg in consumer: - print(msg.value) - finally: - await consumer.stop() - - loop.run_until_complete(consume()) diff --git a/docs/examples/serialize_and_compress.rst b/docs/examples/serialize_and_compress.rst index 307ee1668..c6123402b 100644 --- a/docs/examples/serialize_and_compress.rst +++ b/docs/examples/serialize_and_compress.rst @@ -26,29 +26,26 @@ Producer def serializer(value): return json.dumps(value).encode() - @asyncio.coroutine - def produce(loop): + async def produce(): producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', value_serializer=serializer, compression_type="gzip") - yield from producer.start() + await producer.start() data = {"a": 123.4, "b": "some string"} - yield from producer.send('foobar', data) + await producer.send('foobar', data) data = [1,2,3,4] - yield from producer.send('foobar', data) - yield from producer.stop() + await producer.send('foobar', data) + await producer.stop() - loop = asyncio.get_event_loop() - loop.run_until_complete(produce(loop)) - loop.close() + asyncio.run(produce()) Consumer .. code:: python - + import json import asyncio from kafka.common import KafkaError @@ -57,22 +54,22 @@ Consumer def deserializer(serialized): return json.loads(serialized) - loop = asyncio.get_event_loop() - # consumer will decompress messages automatically - # in accordance to compression type specified in producer - consumer = AIOKafkaConsumer( - 'foobar', loop=loop, - bootstrap_servers='localhost:9092', - value_deserializer=deserializer, - auto_offset_reset='earliest') - loop.run_until_complete(consumer.start()) - data = loop.run_until_complete(consumer.getmany(timeout_ms=10000)) - for tp, messages in data.items(): - for message in messages: - print(type(message.value), message.value) - loop.run_until_complete(consumer.stop()) - loop.close() - + async def consume(): + # consumer will decompress messages automatically + # in accordance to compression type specified in producer + consumer = AIOKafkaConsumer( + 'foobar', + bootstrap_servers='localhost:9092', + value_deserializer=deserializer, + auto_offset_reset='earliest') + await consumer.start() + data = await consumer.getmany(timeout_ms=10000) + for tp, messages in data.items(): + for message in messages: + print(type(message.value), message.value) + await consumer.stop() + + asyncio.run(consume()) Output: diff --git a/docs/examples/ssl_consume_produce.rst b/docs/examples/ssl_consume_produce.rst index 0d7fbd46a..67cce9891 100644 --- a/docs/examples/ssl_consume_produce.rst +++ b/docs/examples/ssl_consume_produce.rst @@ -21,10 +21,10 @@ information. password="123123" ) - async def produce_and_consume(loop): + async def produce_and_consume(): # Produce producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9093', + bootstrap_servers='localhost:9093', security_protocol="SSL", ssl_context=context) await producer.start() @@ -35,7 +35,7 @@ information. await producer.stop() consumer = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9093', + "my_topic", bootstrap_servers='localhost:9093', security_protocol="SSL", ssl_context=context) await consumer.start() try: @@ -47,17 +47,7 @@ information. print("Success", msg, fetch_msg) if __name__ == "__main__": - loop = asyncio.get_event_loop() - task = loop.create_task(produce_and_consume(loop)) - try: - loop.run_until_complete(task) - finally: - loop.run_until_complete(asyncio.sleep(0, loop=loop)) - task.cancel() - try: - loop.run_until_complete(task) - except asyncio.CancelledError: - pass + asyncio.run(produce_and_consume()) Output: diff --git a/docs/examples/transaction_example.rst b/docs/examples/transaction_example.rst index 36b2729a9..15aec28e7 100644 --- a/docs/examples/transaction_example.rst +++ b/docs/examples/transaction_example.rst @@ -42,9 +42,9 @@ process data and produce the resut to ``OUT_TOPIC`` in a transactional manner. return res - async def transactional_process(loop): + async def transactional_process(): consumer = AIOKafkaConsumer( - IN_TOPIC, loop=loop, + IN_TOPIC, bootstrap_servers=BOOTSTRAP_SERVERS, enable_auto_commit=False, group_id=GROUP_ID, @@ -53,7 +53,7 @@ process data and produce the resut to ``OUT_TOPIC`` in a transactional manner. await consumer.start() producer = AIOKafkaProducer( - loop=loop, bootstrap_servers=BOOTSTRAP_SERVERS, + bootstrap_servers=BOOTSTRAP_SERVERS, transactional_id=TRANSACTIONAL_ID ) await producer.start() @@ -84,21 +84,6 @@ process data and produce the resut to ``OUT_TOPIC`` in a transactional manner. await consumer.stop() await producer.stop() - def run_async(async_main): - # Setup to properly handle KeyboardInterrupt exception - loop = asyncio.get_event_loop() - m_task = loop.create_task(async_main(loop)) - m_task.add_done_callback(lambda task, loop=loop: loop.stop()) - - try: - loop.run_forever() - except KeyboardInterrupt: - m_task.cancel() - loop.run_forever() - finally: - if not m_task.cancelled(): - m_task.result() if __name__ == "__main__": - run_async(transactional_process) - + asyncio.run(transactional_process()) diff --git a/docs/index.rst b/docs/index.rst index 04f799e87..c82a78ce2 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -38,12 +38,10 @@ Here's a consumer example: from aiokafka import AIOKafkaConsumer import asyncio - loop = asyncio.get_event_loop() - async def consume(): consumer = AIOKafkaConsumer( 'my_topic', 'my_other_topic', - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my-group") # Get cluster layout and join group `my-group` await consumer.start() @@ -56,7 +54,7 @@ Here's a consumer example: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() - loop.run_until_complete(consume()) + asyncio.run(consume()) Read more in :ref:`Consumer client ` section. @@ -72,11 +70,9 @@ Here's a producer example: from aiokafka import AIOKafkaProducer import asyncio - loop = asyncio.get_event_loop() - async def send_one(): producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092') + bootstrap_servers='localhost:9092') # Get cluster layout and initial topic/partition leadership information await producer.start() try: @@ -86,7 +82,7 @@ Here's a producer example: # Wait for all pending messages to be delivered or expire. await producer.stop() - loop.run_until_complete(send_one()) + asyncio.run(send_one()) Read more in :ref:`Producer client ` section. diff --git a/docs/producer.rst b/docs/producer.rst index 009e10e94..9ba20f321 100644 --- a/docs/producer.rst +++ b/docs/producer.rst @@ -8,8 +8,7 @@ Producer client :ref:`AIOKafkaProducer ` is a client that publishes records to the Kafka cluster. Most simple usage would be:: - producer = aiokafka.AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092') + producer = aiokafka.AIOKafkaProducer(bootstrap_servers='localhost:9092') await producer.start() try: await producer.send_and_wait("my_topic", b"Super message") @@ -101,7 +100,7 @@ the Producer from creating duplicates on retries. *aiokafka* supports this mode by passing the parameter ``enable_idempotence=True`` to ``AIOKafkaProducer``:: producer = aiokafka.AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', enable_idempotence=True) await producer.start() try: @@ -132,7 +131,7 @@ after the transaction is committed. To use the transactional producer and the attendant APIs, you must set the ``transactional_id`` configuration property:: producer = aiokafka.AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', transactional_id="transactional_test") await producer.start() try: @@ -148,7 +147,7 @@ are included in transactions should be configured for durability. In particular, the ``replication.factor`` should be at least ``3``, and the ``min.insync.replicas`` for these topics should be set to ``2``. Finally, in order for transactional guarantees to be realized from end-to-end, the -consumers must be configured to read only committed messages as well. See +consumers must be configured to read only committed messages as well. See :ref:`Reading Transactional Messages `. The purpose of the ``transactional_id`` is to enable transaction recovery @@ -186,7 +185,7 @@ Returned RecordMetadata object After a message is sent the user receives a ``RecordMetadata`` object containing fields: - * ``offset`` - unique offset of the message in this partition. See + * ``offset`` - unique offset of the message in this partition. See :ref:`Offsets and Consumer Position ` for more details on offsets. * ``topic`` - *string* topic name diff --git a/examples/batch_produce.py b/examples/batch_produce.py index 0d3e43478..e3f5c5484 100644 --- a/examples/batch_produce.py +++ b/examples/batch_produce.py @@ -4,9 +4,9 @@ from aiokafka.producer import AIOKafkaProducer -async def send_many(num, loop): +async def send_many(num): topic = "my_topic" - producer = AIOKafkaProducer(loop=loop) + producer = AIOKafkaProducer() await producer.start() batch = producer.create_batch() @@ -32,6 +32,4 @@ async def send_many(num, loop): await producer.stop() -loop = asyncio.get_event_loop() -loop.run_until_complete(send_many(1000, loop)) -loop.close() +asyncio.run(send_many(1000)) diff --git a/examples/indempotent_produce.py b/examples/indempotent_produce.py index d5bd372d5..69ae269fb 100644 --- a/examples/indempotent_produce.py +++ b/examples/indempotent_produce.py @@ -17,4 +17,4 @@ async def send_one(): await producer.stop() raise -loop.run_until_complete(send_one()) +asyncio.run(send_one()) diff --git a/examples/local_state_consumer.py b/examples/local_state_consumer.py index 5a8eeef40..70ea32b46 100644 --- a/examples/local_state_consumer.py +++ b/examples/local_state_consumer.py @@ -86,9 +86,9 @@ async def save_state_every_second(local_state): local_state.dump_local_state() -async def consume(loop): +async def consume(): consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092', + bootstrap_servers='localhost:9092', group_id="my_group", # Consumer must be in a group to commit enable_auto_commit=False, # Will disable autocommit auto_offset_reset="none", @@ -100,7 +100,7 @@ async def consume(loop): listener = RebalanceListener(consumer, local_state) consumer.subscribe(topics=["test"], listener=listener) - save_task = loop.create_task(save_state_every_second(local_state)) + save_task = asyncio.create_task(save_state_every_second(local_state)) try: @@ -128,20 +128,5 @@ async def consume(loop): await save_task -def main(async_main): - # Setup to properly handle KeyboardInterrupt exception - loop = asyncio.get_event_loop() - m_task = loop.create_task(async_main(loop)) - m_task.add_done_callback(lambda task, loop=loop: loop.stop()) - - try: - loop.run_forever() - except KeyboardInterrupt: - m_task.cancel() - loop.run_forever() - finally: - if not m_task.cancelled(): - m_task.result() - if __name__ == "__main__": - main(consume) + asyncio.run(consume()) diff --git a/examples/simple_consumer-3.5.py b/examples/simple_consumer-3.5.py deleted file mode 100644 index 4e318c4cf..000000000 --- a/examples/simple_consumer-3.5.py +++ /dev/null @@ -1,28 +0,0 @@ -import asyncio - -from aiokafka import AIOKafkaConsumer - -import logging - -log_level = logging.DEBUG -log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s' -logging.basicConfig(level=logging.INFO, format=log_format) -# log = logging.getLogger('kafka') -# log.setLevel(log_level) - -loop = asyncio.get_event_loop() - -async def consume(): - consumer = AIOKafkaConsumer( - loop=loop, bootstrap_servers='localhost:9092', - metadata_max_age_ms=5000, group_id="test2") - consumer.subscribe(pattern="test*") - # Get cluster layout and topic/partition allocation - await consumer.start() - try: - async for msg in consumer: - print(msg.value) - finally: - await consumer.stop() - -loop.run_until_complete(consume()) diff --git a/examples/simple_consumer.py b/examples/simple_consumer.py index 07f10b618..bde25e2fd 100644 --- a/examples/simple_consumer.py +++ b/examples/simple_consumer.py @@ -1,11 +1,9 @@ from aiokafka import AIOKafkaConsumer import asyncio -loop = asyncio.get_event_loop() - async def consume(): consumer = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9092') + "my_topic", bootstrap_servers='localhost:9092') # Get cluster layout and topic/partition allocation await consumer.start() try: @@ -14,4 +12,4 @@ async def consume(): finally: await consumer.stop() -loop.run_until_complete(consume()) +asyncio.run(consume()) diff --git a/examples/simple_produce.py b/examples/simple_produce.py index 91958c877..6f5cea743 100644 --- a/examples/simple_produce.py +++ b/examples/simple_produce.py @@ -1,11 +1,9 @@ from aiokafka import AIOKafkaProducer import asyncio -loop = asyncio.get_event_loop() - async def send_one(): producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9092') + bootstrap_servers='localhost:9092') # Get cluster layout and topic/partition allocation await producer.start() while True: @@ -18,4 +16,4 @@ async def send_one(): await producer.stop() raise -loop.run_until_complete(send_one()) +asyncio.run(send_one()) diff --git a/examples/ssl_consume_produce.py b/examples/ssl_consume_produce.py index ae71dbd7b..bfc6d5980 100644 --- a/examples/ssl_consume_produce.py +++ b/examples/ssl_consume_produce.py @@ -11,10 +11,10 @@ password="123123" ) -async def produce_and_consume(loop): +async def produce_and_consume(): # Produce producer = AIOKafkaProducer( - loop=loop, bootstrap_servers='localhost:9093', + bootstrap_servers='localhost:9093', security_protocol="SSL", ssl_context=context) await producer.start() @@ -25,7 +25,7 @@ async def produce_and_consume(loop): await producer.stop() consumer = AIOKafkaConsumer( - "my_topic", loop=loop, bootstrap_servers='localhost:9093', + "my_topic", bootstrap_servers='localhost:9093', security_protocol="SSL", ssl_context=context) await consumer.start() try: @@ -37,14 +37,4 @@ async def produce_and_consume(loop): print("Success", msg, fetch_msg) if __name__ == "__main__": - loop = asyncio.get_event_loop() - task = loop.create_task(produce_and_consume(loop)) - try: - loop.run_until_complete(task) - finally: - loop.run_until_complete(asyncio.sleep(0, loop=loop)) - task.cancel() - try: - loop.run_until_complete(task) - except asyncio.CancelledError: - pass + asyncio.run(produce_and_consume()) diff --git a/examples/transactional_produce.py b/examples/transactional_produce.py index f0e822997..03d104a59 100644 --- a/examples/transactional_produce.py +++ b/examples/transactional_produce.py @@ -1,14 +1,10 @@ from aiokafka import AIOKafkaProducer import asyncio -loop = asyncio.get_event_loop() - - async def send_one(): producer = AIOKafkaProducer( bootstrap_servers='localhost:9092', - transactional_id="transactional_test", - loop=loop) + transactional_id="transactional_test") # Get cluster layout and topic/partition allocation await producer.start() @@ -23,4 +19,4 @@ async def send_one(): await producer.stop() print(res) -loop.run_until_complete(send_one()) +asyncio.run(send_one())