-
Notifications
You must be signed in to change notification settings - Fork 784
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #3540 from wyfo/coroutine
feat: support `async fn` in macros with coroutine implementation
- Loading branch information
Showing
20 changed files
with
474 additions
and
61 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Using `async` and `await` | ||
|
||
*This feature is still in active development. See [the related issue](https://github.com/PyO3/pyo3/issues/1632).* | ||
|
||
`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`. | ||
|
||
```rust | ||
# #![allow(dead_code)] | ||
use std::{thread, time::Duration}; | ||
use futures::channel::oneshot; | ||
use pyo3::prelude::*; | ||
|
||
#[pyfunction] | ||
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> { | ||
let (tx, rx) = oneshot::channel(); | ||
thread::spawn(move || { | ||
thread::sleep(Duration::from_secs_f64(seconds)); | ||
tx.send(()).unwrap(); | ||
}); | ||
rx.await.unwrap(); | ||
result | ||
} | ||
``` | ||
|
||
*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.* | ||
|
||
## `Send + 'static` constraint | ||
|
||
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object. | ||
|
||
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile(arg: &PyAny, py: Python<'_>) -> &PyAny`. | ||
|
||
It also means that methods cannot use `&self`/`&mut self`, *but this restriction should be dropped in the future.* | ||
|
||
|
||
## Implicit GIL holding | ||
|
||
Even if it is not possible to pass a `py: Python<'_>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'_>`/`&PyAny` parameter, yet the GIL is held. | ||
|
||
It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible. | ||
|
||
## Release the GIL across `.await` | ||
|
||
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*. | ||
|
||
Here is the advised workaround for now: | ||
|
||
```rust,ignore | ||
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}}; | ||
use pyo3::prelude::*; | ||
struct AllowThreads<F>(F); | ||
impl<F> Future for AllowThreads<F> | ||
where | ||
F: Future + Unpin + Send, | ||
F::Output: Send, | ||
{ | ||
type Output = F::Output; | ||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let waker = cx.waker(); | ||
Python::with_gil(|gil| { | ||
gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker))) | ||
}) | ||
} | ||
} | ||
``` | ||
|
||
## Cancellation | ||
|
||
*To be implemented* | ||
|
||
## The `Coroutine` type | ||
|
||
To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). Each `coroutine.send` call is translated to `Future::poll` call, while `coroutine.throw` call reraise the exception *(this behavior will be configurable with cancellation support)*. | ||
|
||
*The type does not yet have a public constructor until the design is finalized.* |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Support `async fn` in macros with coroutine implementation |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
//! Python coroutine implementation, used notably when wrapping `async fn` | ||
//! with `#[pyfunction]`/`#[pymethods]`. | ||
use std::{ | ||
any::Any, | ||
future::Future, | ||
panic, | ||
pin::Pin, | ||
sync::Arc, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use futures_util::FutureExt; | ||
use pyo3_macros::{pyclass, pymethods}; | ||
|
||
use crate::{ | ||
coroutine::waker::AsyncioWaker, | ||
exceptions::{PyRuntimeError, PyStopIteration}, | ||
panic::PanicException, | ||
pyclass::IterNextOutput, | ||
types::PyIterator, | ||
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python, | ||
}; | ||
|
||
mod waker; | ||
|
||
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine"; | ||
|
||
type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>; | ||
|
||
/// Python coroutine wrapping a [`Future`]. | ||
#[pyclass(crate = "crate")] | ||
pub struct Coroutine { | ||
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>, | ||
waker: Option<Arc<AsyncioWaker>>, | ||
} | ||
|
||
impl Coroutine { | ||
/// Wrap a future into a Python coroutine. | ||
/// | ||
/// Coroutine `send` polls the wrapped future, ignoring the value passed | ||
/// (should always be `None` anyway). | ||
/// | ||
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed | ||
pub(crate) fn from_future<F, T, E>(future: F) -> Self | ||
where | ||
F: Future<Output = Result<T, E>> + Send + 'static, | ||
T: IntoPy<PyObject>, | ||
PyErr: From<E>, | ||
{ | ||
let wrap = async move { | ||
let obj = future.await?; | ||
// SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`) | ||
Ok(obj.into_py(unsafe { Python::assume_gil_acquired() })) | ||
}; | ||
Self { | ||
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())), | ||
waker: None, | ||
} | ||
} | ||
|
||
fn poll( | ||
&mut self, | ||
py: Python<'_>, | ||
throw: Option<PyObject>, | ||
) -> PyResult<IterNextOutput<PyObject, PyObject>> { | ||
// raise if the coroutine has already been run to completion | ||
let future_rs = match self.future { | ||
Some(ref mut fut) => fut, | ||
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)), | ||
}; | ||
// reraise thrown exception it | ||
if let Some(exc) = throw { | ||
self.close(); | ||
return Err(PyErr::from_value(exc.as_ref(py))); | ||
} | ||
// create a new waker, or try to reset it in place | ||
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) { | ||
waker.reset(); | ||
} else { | ||
self.waker = Some(Arc::new(AsyncioWaker::new())); | ||
} | ||
let waker = futures_util::task::waker(self.waker.clone().unwrap()); | ||
// poll the Rust future and forward its results if ready | ||
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) { | ||
self.close(); | ||
return match res { | ||
Ok(res) => Ok(IterNextOutput::Return(res?)), | ||
Err(err) => Err(PanicException::from_panic_payload(err)), | ||
}; | ||
} | ||
// otherwise, initialize the waker `asyncio.Future` | ||
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? { | ||
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__` | ||
// and will yield itself if its result has not been set in polling above | ||
if let Some(future) = PyIterator::from_object(future).unwrap().next() { | ||
// future has not been leaked into Python for now, and Rust code can only call | ||
// `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap | ||
return Ok(IterNextOutput::Yield(future.unwrap().into())); | ||
} | ||
} | ||
// if waker has been waken during future polling, this is roughly equivalent to | ||
// `await asyncio.sleep(0)`, so just yield `None`. | ||
Ok(IterNextOutput::Yield(py.None().into())) | ||
} | ||
} | ||
|
||
pub(crate) fn iter_result(result: IterNextOutput<PyObject, PyObject>) -> PyResult<PyObject> { | ||
match result { | ||
IterNextOutput::Yield(ob) => Ok(ob), | ||
IterNextOutput::Return(ob) => Err(PyStopIteration::new_err(ob)), | ||
} | ||
} | ||
|
||
#[pymethods(crate = "crate")] | ||
impl Coroutine { | ||
fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult<PyObject> { | ||
iter_result(self.poll(py, None)?) | ||
} | ||
|
||
fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> { | ||
iter_result(self.poll(py, Some(exc))?) | ||
} | ||
|
||
fn close(&mut self) { | ||
// the Rust future is dropped, and the field set to `None` | ||
// to indicate the coroutine has been run to completion | ||
drop(self.future.take()); | ||
} | ||
|
||
fn __await__(self_: Py<Self>) -> Py<Self> { | ||
self_ | ||
} | ||
|
||
fn __next__(&mut self, py: Python<'_>) -> PyResult<IterNextOutput<PyObject, PyObject>> { | ||
self.poll(py, None) | ||
} | ||
} |
Oops, something went wrong.