Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wait for or cancel running asyncio tasks in an event emitter #168

Merged
merged 12 commits into from
Nov 16, 2024
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,36 @@
# Changelog

## Next

- New features in `pyee.asyncio.AsyncIOEventEmitter`:
- `wait_for_complete` method to wait for all running handlers to complete
execution
- `cancel` method to cancel execution of all running handlers
- `complete` property that's `True` when no handlers are currently running
- Updated changelog for v12 release to describe where to find alternatives
to deprecated and removed imports
- Add support for Python 3.13
- Upgrade GitHub Actions
- Upgrade `actions/setup-python` to v5
- Upgrade `actions/setup-node` to v4
- Upgrade `actions/upload-artifact` to v4

## 2024/08/30 Version 12.0.0

- Remove deprecated imports:
- `pyee.BaseEventEmitter`
- Use `pyee.base.EventEmitter` or `pyee.EventEmitter` instead
- `pyee.AsyncIOEventEmitter`
- Use `pyee.asyncio.AsyncIOEventEmitter` instead
- `pyee.TwistedEventEmitter`
- Use `pyee.twisted.TwistedEventEmitter` instead
- `pyee.ExecutorEventEmitter`
- Use `pyee.executor.ExecutorEventEmitter` instead
- `pyee.TrioEventEmitter`
- Use `pyee.trio.TrioEventEmitter` instead
- Add `PyeeError` which inherits from `PyeeException`, and use throughout
- Deprecate direct use of `PyeeException`
- Use `PyeeError` instead

## 2024/08/30 Version 11.1.1

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ Listed in no particular order:
- Ivan Gretchka @leirons
- Max Schmitt @mxschmitt
- Masaya Suzuki @massongit
- Xiao Shuai @xiaoshuai
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ See [CHANGELOG.md](./CHANGELOG.md).

## Contributors

See [ONTRIBUTORS.,md](./CONTRIBUTORS.md).
See [CONTRIBUTORS.md](./CONTRIBUTORS.md).

## License

Expand Down
101 changes: 99 additions & 2 deletions pyee/asyncio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-

from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine
from asyncio import AbstractEventLoop, ensure_future, Future, iscoroutine, wait
from typing import Any, Callable, cast, Dict, Optional, Set, Tuple

from pyee.base import EventEmitter
Expand Down Expand Up @@ -41,6 +41,32 @@ def __init__(self, loop: Optional[AbstractEventLoop] = None):
self._loop: Optional[AbstractEventLoop] = loop
self._waiting: Set[Future] = set()

def emit(
self,
event: str,
*args: Any,
**kwargs: Any,
) -> bool:
"""Emit `event`, passing `*args` and `**kwargs` to each attached
function or coroutine. Returns `True` if any functions are attached to
`event`; otherwise returns `False`.

Example:

```py
ee.emit('data', '00101001')
```

Assuming `data` is an attached function, this will call
`data('00101001')'`.

When executing coroutine handlers, their respective futures will be
stored in a "waiting" state. These futures may be waited on or
canceled with `wait_for_complete` and `cancel`, respectively; and
their status may be checked via the `complete` property.
"""
return super().emit(event, *args, **kwargs)

def _emit_run(
self,
f: Callable,
Expand All @@ -67,7 +93,7 @@ def _emit_run(
return

def callback(f):
self._waiting.remove(f)
self._waiting.discard(f)

if f.cancelled():
return
Expand All @@ -78,3 +104,74 @@ def callback(f):

fut.add_done_callback(callback)
self._waiting.add(fut)

async def wait_for_complete(self):
"""Waits for all pending tasks to complete. For example:

```py
@ee.on('event')
async def async_handler(*args, **kwargs):
await returns_a_future()

# Triggers execution of async_handler
ee.emit('data', '00101001')

await ee.wait_for_complete()

# async_handler has completed execution
```

This is useful if you're attempting a graceful shutdown of your
application and want to ensure all coroutines have completed execution
beforehand.
"""
if self._waiting:
await wait(self._waiting)

def cancel(self):
"""Cancel all pending tasks. For example:

```py
@ee.on('event')
async def async_handler(*args, **kwargs):
await returns_a_future()

# Triggers execution of async_handler
ee.emit('data', '00101001')

ee.cancel()

# async_handler execution has been canceled
```

This is useful if you're attempting to shut down your application and
attempts at a graceful shutdown via `wait_for_complete` have failed.
"""
for fut in self._waiting:
if not fut.done() and not fut.cancelled():
fut.cancel()
self._waiting.clear()

@property
def complete(self) -> bool:
"""When true, there are no pending tasks, and execution is complete.
For example:

```py
@ee.on('event')
async def async_handler(*args, **kwargs):
await returns_a_future()

# Triggers execution of async_handler
ee.emit('data', '00101001')

# async_handler is still running, so this prints False
print(ee.complete)

await ee.wait_for_complete()

# async_handler has completed execution, so this prints True
print(ee.complete)
```
"""
return not self._waiting
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Other/Nonlisted Topic",
]
requires-python = ">=3.8"
Expand Down
8 changes: 1 addition & 7 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ comm==0.2.1
# via ipykernel
constantly==23.10.4
# via twisted
cryptography==42.0.4
# via secretstorage
debugpy==1.8.0
# via ipykernel
decorator==5.1.1
Expand Down Expand Up @@ -94,7 +92,7 @@ importlib-metadata==7.0.1
# via
# trio-typing
# twine
incremental==22.10.0
incremental==24.7.2
# via twisted
iniconfig==2.0.0
# via pytest
Expand All @@ -110,10 +108,6 @@ jaraco-classes==3.3.1
# via keyring
jedi==0.19.1
# via ipython
jeepney==0.8.0
# via
# keyring
# secretstorage
jinja2==3.1.4
# via
# mkdocs
Expand Down
Loading