diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 9e738a79946..00976562c60 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -20,6 +20,7 @@ - [Python exceptions](exception.md) - [Calling Python from Rust](python_from_rust.md) - [Using `async` and `await`](async-await.md) + - [Awaiting Python awaitables](async-await/pyfuture.md) - [GIL, mutability and object types](types.md) - [Parallelism](parallelism.md) - [Debugging](debugging.md) diff --git a/guide/src/async-await/pyfuture.md b/guide/src/async-await/pyfuture.md new file mode 100644 index 00000000000..8a82aaee0ac --- /dev/null +++ b/guide/src/async-await/pyfuture.md @@ -0,0 +1,51 @@ +# Awaiting Python awaitables + +Python awaitable can be awaited on Rust side using [`PyFuture`]({{#PYO3_DOCS_URL}}/pyo3/types/struct.PyFuture.html). + +```rust +# #![allow(dead_code)] +use pyo3::{prelude::*, types::PyFuture}; + +#[pyfunction] +async fn wrap_awaitable(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + future.await +} +``` + +`PyFuture::from_object` construct a `PyFuture` from a Python awaitable object, by calling its `__await__` method (or `__iter__` for generator-based coroutine). + +## Restrictions + +`PyFuture` can only be awaited in the context of a PyO3 coroutine. Otherwise, it panics. + +```rust +# #![allow(dead_code)] +use pyo3::{prelude::*, types::PyFuture}; + +#[pyfunction] +fn block_on(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + futures::executor::block_on(future) // ERROR: PyFuture must be awaited in coroutine context +} +``` + +`PyFuture` must be the only Rust future awaited; it means that it's forbidden to `select!` a `Pyfuture`. Otherwise, it panics. + +```rust +# #![allow(dead_code)] +use std::future; +use futures::FutureExt; +use pyo3::{prelude::*, types::PyFuture}; + +#[pyfunction] +async fn select(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + futures::select_biased! { + _ = future::pending::<()>().fuse() => unreachable!(), + res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future + } +} +``` + +These restrictions exist because awaiting a `PyFuture` strongly binds it to the enclosing coroutine. The coroutine will then delegate its `send`/`throw`/`close` methods to the awaited `PyFuture`. If it was awaited in a `select!`, `Coroutine::send` would no able to know if the value passed would have to be delegated to the `Pyfuture` or not. diff --git a/newsfragments/3610.added.md b/newsfragments/3610.added.md new file mode 100644 index 00000000000..3b1493c29c0 --- /dev/null +++ b/newsfragments/3610.added.md @@ -0,0 +1 @@ +Add `#[pyo3(allow_threads)]` to release the GIL in (async) functions \ No newline at end of file diff --git a/newsfragments/3611.added.md b/newsfragments/3611.added.md new file mode 100644 index 00000000000..a81cef2972d --- /dev/null +++ b/newsfragments/3611.added.md @@ -0,0 +1 @@ +Add `PyFuture` to await Python awaitables \ No newline at end of file diff --git a/pyo3-ffi/src/abstract_.rs b/pyo3-ffi/src/abstract_.rs index 0b3b7dbb3c2..6e4f146aed4 100644 --- a/pyo3-ffi/src/abstract_.rs +++ b/pyo3-ffi/src/abstract_.rs @@ -128,7 +128,11 @@ extern "C" { pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject; #[cfg(all(not(PyPy), Py_3_10))] #[cfg_attr(PyPy, link_name = "PyPyIter_Send")] - pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject); + pub fn PyIter_Send( + iter: *mut PyObject, + arg: *mut PyObject, + presult: *mut *mut PyObject, + ) -> c_int; #[cfg_attr(PyPy, link_name = "PyPyNumber_Check")] pub fn PyNumber_Check(o: *mut PyObject) -> c_int; diff --git a/pyo3-macros-backend/src/attributes.rs b/pyo3-macros-backend/src/attributes.rs index e91b3b8d9a2..000ec4b5979 100644 --- a/pyo3-macros-backend/src/attributes.rs +++ b/pyo3-macros-backend/src/attributes.rs @@ -9,6 +9,7 @@ use syn::{ }; pub mod kw { + syn::custom_keyword!(allow_threads); syn::custom_keyword!(annotation); syn::custom_keyword!(attribute); syn::custom_keyword!(cancel_handle); diff --git a/pyo3-macros-backend/src/method.rs b/pyo3-macros-backend/src/method.rs index cb025149c0d..50ed79ce487 100644 --- a/pyo3-macros-backend/src/method.rs +++ b/pyo3-macros-backend/src/method.rs @@ -1,10 +1,11 @@ use std::fmt::Display; use proc_macro2::{Span, TokenStream}; -use quote::{quote, quote_spanned, ToTokens}; +use quote::{format_ident, quote, quote_spanned, ToTokens}; use syn::{ext::IdentExt, spanned::Spanned, Ident, Result}; use crate::{ + attributes, attributes::{TextSignatureAttribute, TextSignatureAttributeValue}, deprecations::{Deprecation, Deprecations}, params::impl_arg_params, @@ -241,6 +242,7 @@ pub struct FnSpec<'a> { pub asyncness: Option, pub unsafety: Option, pub deprecations: Deprecations, + pub allow_threads: Option, } pub fn get_return_info(output: &syn::ReturnType) -> syn::Type { @@ -284,6 +286,7 @@ impl<'a> FnSpec<'a> { text_signature, name, signature, + allow_threads, .. } = options; @@ -331,6 +334,7 @@ impl<'a> FnSpec<'a> { asyncness: sig.asyncness, unsafety: sig.unsafety, deprecations, + allow_threads, }) } @@ -474,6 +478,7 @@ impl<'a> FnSpec<'a> { } let rust_call = |args: Vec| { + let allow_threads = self.allow_threads.is_some(); let call = if self.asyncness.is_some() { let throw_callback = if cancel_handle.is_some() { quote! { Some(__throw_callback) } @@ -502,6 +507,7 @@ impl<'a> FnSpec<'a> { _pyo3::intern!(py, stringify!(#python_name)), #qualname_prefix, #throw_callback, + #allow_threads, async move { _pyo3::impl_::wrap::OkWrap::wrap(future.await) }, ) }}; @@ -513,6 +519,25 @@ impl<'a> FnSpec<'a> { }}; } call + } else if allow_threads { + let (self_arg_name, self_arg_decl) = if self_arg.is_empty() { + (quote!(), quote!()) + } else { + (quote!(__self), quote! { let __self = #self_arg; }) + }; + let arg_names: Vec = (0..args.len()) + .map(|i| format_ident!("__arg{}", i)) + .collect(); + let arg_decls: Vec = args + .into_iter() + .zip(&arg_names) + .map(|(arg, name)| quote! { let #name = #arg; }) + .collect(); + quote! {{ + #self_arg_decl + #(#arg_decls)* + py.allow_threads(|| function(#self_arg_name #(#arg_names),*)) + }} } else { quote! { function(#self_arg #(#args),*) } }; diff --git a/pyo3-macros-backend/src/pyfunction.rs b/pyo3-macros-backend/src/pyfunction.rs index b265a34d39f..8e2a2f621da 100644 --- a/pyo3-macros-backend/src/pyfunction.rs +++ b/pyo3-macros-backend/src/pyfunction.rs @@ -91,6 +91,7 @@ pub struct PyFunctionOptions { pub signature: Option, pub text_signature: Option, pub krate: Option, + pub allow_threads: Option, } impl Parse for PyFunctionOptions { @@ -99,7 +100,8 @@ impl Parse for PyFunctionOptions { while !input.is_empty() { let lookahead = input.lookahead1(); - if lookahead.peek(attributes::kw::name) + if lookahead.peek(attributes::kw::allow_threads) + || lookahead.peek(attributes::kw::name) || lookahead.peek(attributes::kw::pass_module) || lookahead.peek(attributes::kw::signature) || lookahead.peek(attributes::kw::text_signature) @@ -121,6 +123,7 @@ impl Parse for PyFunctionOptions { } pub enum PyFunctionOption { + AllowThreads(attributes::kw::allow_threads), Name(NameAttribute), PassModule(attributes::kw::pass_module), Signature(SignatureAttribute), @@ -131,7 +134,9 @@ pub enum PyFunctionOption { impl Parse for PyFunctionOption { fn parse(input: ParseStream<'_>) -> Result { let lookahead = input.lookahead1(); - if lookahead.peek(attributes::kw::name) { + if lookahead.peek(attributes::kw::allow_threads) { + input.parse().map(PyFunctionOption::AllowThreads) + } else if lookahead.peek(attributes::kw::name) { input.parse().map(PyFunctionOption::Name) } else if lookahead.peek(attributes::kw::pass_module) { input.parse().map(PyFunctionOption::PassModule) @@ -171,6 +176,7 @@ impl PyFunctionOptions { } for attr in attrs { match attr { + PyFunctionOption::AllowThreads(allow_threads) => set_option!(allow_threads), PyFunctionOption::Name(name) => set_option!(name), PyFunctionOption::PassModule(pass_module) => set_option!(pass_module), PyFunctionOption::Signature(signature) => set_option!(signature), @@ -198,6 +204,7 @@ pub fn impl_wrap_pyfunction( ) -> syn::Result { check_generic(&func.sig)?; let PyFunctionOptions { + allow_threads, pass_module, name, signature, @@ -247,6 +254,7 @@ pub fn impl_wrap_pyfunction( signature, output: ty, text_signature, + allow_threads, asyncness: func.sig.asyncness, unsafety: func.sig.unsafety, deprecations: Deprecations::new(), diff --git a/pyo3-macros/src/lib.rs b/pyo3-macros/src/lib.rs index d00ede89143..102693ec21d 100644 --- a/pyo3-macros/src/lib.rs +++ b/pyo3-macros/src/lib.rs @@ -125,6 +125,7 @@ pub fn pymethods(attr: TokenStream, input: TokenStream) -> TokenStream { /// | `#[pyo3(name = "...")]` | Defines the name of the function in Python. | /// | `#[pyo3(text_signature = "...")]` | Defines the `__text_signature__` attribute of the function in Python. | /// | `#[pyo3(pass_module)]` | Passes the module containing the function as a `&PyModule` first argument to the function. | +/// | `#[pyo3(allow_threads)]` | Release the GIL in the function body, or each time the returned future is polled for `async fn` | /// /// For more on exposing functions see the [function section of the guide][1]. /// diff --git a/src/coroutine.rs b/src/coroutine.rs index 6380b4e0a1f..f840828b4d4 100644 --- a/src/coroutine.rs +++ b/src/coroutine.rs @@ -11,29 +11,61 @@ use std::{ use pyo3_macros::{pyclass, pymethods}; use crate::{ - coroutine::{cancel::ThrowCallback, waker::AsyncioWaker}, + coroutine::{cancel::ThrowCallback, waker::CoroutineWaker}, exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration}, panic::PanicException, pyclass::IterNextOutput, - types::{PyIterator, PyString}, - IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python, + types::PyString, + IntoPy, Py, PyErr, PyObject, PyResult, Python, }; +mod asyncio; pub(crate) mod cancel; -mod waker; +pub(crate) mod waker; pub use cancel::CancelHandle; const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine"; +trait CoroutineFuture { + fn poll( + self: Pin<&mut Self>, + py: Python<'_>, + waker: &Waker, + allow_threads: bool, + ) -> Poll>; +} + +impl CoroutineFuture for F +where + F: Future> + Send, + T: IntoPy + Send, + E: Into + Send, +{ + fn poll( + self: Pin<&mut Self>, + py: Python<'_>, + waker: &Waker, + allow_threads: bool, + ) -> Poll> { + let result = if allow_threads { + py.allow_threads(|| self.poll(&mut Context::from_waker(waker))) + } else { + self.poll(&mut Context::from_waker(waker)) + }; + result.map_ok(|obj| obj.into_py(py)).map_err(Into::into) + } +} + /// Python coroutine wrapping a [`Future`]. #[pyclass(crate = "crate")] pub struct Coroutine { name: Option>, qualname_prefix: Option<&'static str>, throw_callback: Option, - future: Option> + Send>>>, - waker: Option>, + allow_threads: bool, + future: Option>>, + waker: Option>, } impl Coroutine { @@ -47,80 +79,79 @@ impl Coroutine { name: Option>, qualname_prefix: Option<&'static str>, throw_callback: Option, + allow_threads: bool, future: F, ) -> Self where F: Future> + Send + 'static, - T: IntoPy, - E: Into, + T: IntoPy + Send, + E: Into + Send, { - let wrap = async move { - let obj = future.await.map_err(Into::into)?; - // SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`) - Ok(obj.into_py(unsafe { Python::assume_gil_acquired() })) - }; Self { name, qualname_prefix, throw_callback, - future: Some(Box::pin(wrap)), + allow_threads, + future: Some(Box::pin(future)), waker: None, } } - fn poll( + fn poll_inner( &mut self, py: Python<'_>, - throw: Option, + mut sent_result: Option>, ) -> PyResult> { // raise if the coroutine has already been run to completion let future_rs = match self.future { Some(ref mut fut) => fut, None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)), }; - // reraise thrown exception it - match (throw, &self.throw_callback) { - (Some(exc), Some(cb)) => cb.throw(exc.as_ref(py)), - (Some(exc), None) => { - self.close(); - return Err(PyErr::from_value(exc.as_ref(py))); + // if the future is not pending on a Python awaitable, + // execute throw callback or complete on close + if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) { + match (sent_result, &self.throw_callback) { + (res @ Some(Ok(_)), _) => sent_result = res, + (Some(Err(err)), Some(cb)) => { + cb.throw(err.as_ref(py)); + sent_result = Some(Ok(py.None().into())); + } + (Some(Err(err)), None) => return Err(PyErr::from_value(err.as_ref(py))), + (None, _) => return Ok(IterNextOutput::Return(py.None().into())), } - (None, _) => {} } // create a new waker, or try to reset it in place if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) { - waker.reset(); + waker.reset(sent_result); } else { - self.waker = Some(Arc::new(AsyncioWaker::new())); + self.waker = Some(Arc::new(CoroutineWaker::new(sent_result))); } - let waker = Waker::from(self.waker.clone().unwrap()); - // poll the Rust future and forward its results if ready + // poll the future and forward its results if ready; otherwise, yield from waker // polling is UnwindSafe because the future is dropped in case of panic - let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker)); + let waker = Waker::from(self.waker.clone().unwrap()); + let poll = || future_rs.as_mut().poll(py, &waker, self.allow_threads); match panic::catch_unwind(panic::AssertUnwindSafe(poll)) { - Ok(Poll::Ready(res)) => { - self.close(); - return Ok(IterNextOutput::Return(res?)); - } - Err(err) => { - self.close(); - return Err(PanicException::from_panic_payload(err)); - } - _ => {} + Err(err) => Err(PanicException::from_panic_payload(err)), + Ok(Poll::Ready(res)) => Ok(IterNextOutput::Return(res?)), + Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) { + Ok(to_yield) => Ok(IterNextOutput::Yield(to_yield)), + Err(err) => Err(err), + }, } - // otherwise, initialize the waker `asyncio.Future` - if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? { - // `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__` - // and will yield itself if its result has not been set in polling above - if let Some(future) = PyIterator::from_object(future).unwrap().next() { - // future has not been leaked into Python for now, and Rust code can only call - // `set_result(None)` in `Wake` implementation, so it's safe to unwrap - return Ok(IterNextOutput::Yield(future.unwrap().into())); - } + } + + fn poll( + &mut self, + py: Python<'_>, + sent_result: Option>, + ) -> PyResult> { + let result = self.poll_inner(py, sent_result); + if matches!(result, Ok(IterNextOutput::Return(_)) | Err(_)) { + // the Rust future is dropped, and the field set to `None` + // to indicate the coroutine has been run to completion + drop(self.future.take()); } - // if waker has been waken during future polling, this is roughly equivalent to - // `await asyncio.sleep(0)`, so just yield `None`. - Ok(IterNextOutput::Yield(py.None().into())) + result } } @@ -152,18 +183,17 @@ impl Coroutine { } } - fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult { - iter_result(self.poll(py, None)?) + fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult { + iter_result(self.poll(py, Some(Ok(value)))?) } fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult { - iter_result(self.poll(py, Some(exc))?) + iter_result(self.poll(py, Some(Err(exc)))?) } - fn close(&mut self) { - // the Rust future is dropped, and the field set to `None` - // to indicate the coroutine has been run to completion - drop(self.future.take()); + fn close(&mut self, py: Python<'_>) -> PyResult<()> { + self.poll(py, None)?; + Ok(()) } fn __await__(self_: Py) -> Py { @@ -171,6 +201,6 @@ impl Coroutine { } fn __next__(&mut self, py: Python<'_>) -> PyResult> { - self.poll(py, None) + self.poll(py, Some(Ok(py.None().into()))) } } diff --git a/src/coroutine/asyncio.rs b/src/coroutine/asyncio.rs new file mode 100644 index 00000000000..0f19ac0cd4d --- /dev/null +++ b/src/coroutine/asyncio.rs @@ -0,0 +1,95 @@ +//! Coroutine implementation compatible with asyncio. +use pyo3_macros::pyfunction; + +use crate::{ + intern, + sync::GILOnceCell, + types::{PyCFunction, PyIterator}, + wrap_pyfunction, IntoPy, Py, PyAny, PyObject, PyResult, Python, +}; + +/// `asyncio.get_running_loop` +fn get_running_loop(py: Python<'_>) -> PyResult<&PyAny> { + static GET_RUNNING_LOOP: GILOnceCell = GILOnceCell::new(); + let import = || -> PyResult<_> { + let module = py.import("asyncio")?; + Ok(module.getattr("get_running_loop")?.into()) + }; + GET_RUNNING_LOOP + .get_or_try_init(py, import)? + .as_ref(py) + .call0() +} + +/// Asyncio-compatible coroutine waker. +/// +/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called +/// when `Waker::wake` is called. +pub(super) struct AsyncioWaker { + event_loop: PyObject, + future: PyObject, +} + +impl AsyncioWaker { + pub(super) fn new(py: Python<'_>) -> PyResult { + let event_loop = get_running_loop(py)?.into_py(py); + let future = event_loop.call_method0(py, "create_future")?; + Ok(Self { event_loop, future }) + } + + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + let __await__; + // `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`, + // but `create_future` may have been overriden + let mut iter = match PyIterator::from_object(self.future.as_ref(py)) { + Ok(iter) => iter, + Err(_) => { + __await__ = self.future.call_method0(py, intern!(py, "__await__"))?; + PyIterator::from_object(__await__.as_ref(py))? + } + }; + // future has not been waken (because `yield_waken` would have been called + // otherwise), so it is expected to yield itself + Ok(iter.next().expect("future didn't yield")?.into_py(py)) + } + + #[allow(clippy::unnecessary_wraps)] + pub(super) fn yield_waken(py: Python<'_>) -> PyResult { + Ok(py.None().into()) + } + + pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { + static RELEASE_WAITER: GILOnceCell> = GILOnceCell::new(); + let release_waiter = RELEASE_WAITER + .get_or_try_init(py, || wrap_pyfunction!(release_waiter, py).map(Into::into))?; + // `Future.set_result` must be called in event loop thread, + // so it requires `call_soon_threadsafe` + let call_soon_threadsafe = self.event_loop.call_method1( + py, + intern!(py, "call_soon_threadsafe"), + (release_waiter, self.future.as_ref(py)), + ); + if let Err(err) = call_soon_threadsafe { + // `call_soon_threadsafe` will raise if the event loop is closed; + // instead of catching an unspecific `RuntimeError`, check directly if it's closed. + let is_closed = self.event_loop.call_method0(py, "is_closed")?; + if !is_closed.extract(py)? { + return Err(err); + } + } + Ok(()) + } +} + +/// Call `future.set_result` if the future is not done. +/// +/// Future can be cancelled by the event loop before being waken. +/// See +#[pyfunction(crate = "crate")] +fn release_waiter(future: &PyAny) -> PyResult<()> { + let done = future.call_method0(intern!(future.py(), "done"))?; + if !done.extract::()? { + future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?; + } + Ok(()) +} diff --git a/src/coroutine/waker.rs b/src/coroutine/waker.rs index 8a1166ce3fb..64f63e756b1 100644 --- a/src/coroutine/waker.rs +++ b/src/coroutine/waker.rs @@ -1,101 +1,95 @@ -use crate::sync::GILOnceCell; -use crate::types::PyCFunction; -use crate::{intern, wrap_pyfunction, Py, PyAny, PyObject, PyResult, Python}; -use pyo3_macros::pyfunction; -use std::sync::Arc; -use std::task::Wake; +use std::{ + cell::Cell, + sync::Arc, + task::{Poll, Wake}, +}; -/// Lazy `asyncio.Future` wrapper, implementing [`Wake`] by calling `Future.set_result`. -/// -/// asyncio future is let uninitialized until [`initialize_future`][1] is called. -/// If [`wake`][2] is called before future initialization (during Rust future polling), -/// [`initialize_future`][1] will return `None` (it is roughly equivalent to `asyncio.sleep(0)`) -/// -/// [1]: AsyncioWaker::initialize_future -/// [2]: AsyncioWaker::wake -pub struct AsyncioWaker(GILOnceCell>); +use crate::{ + coroutine::asyncio::AsyncioWaker, exceptions::PyStopIteration, intern, pyclass::IterNextOutput, + sync::GILOnceCell, types::PyFuture, Py, PyNativeType, PyObject, PyResult, Python, +}; -impl AsyncioWaker { - pub(super) fn new() -> Self { - Self(GILOnceCell::new()) - } +const MIXED_AWAITABLE_AND_FUTURE_ERROR: &str = "Python awaitable mixed with Rust future"; - pub(super) fn reset(&mut self) { - self.0.take(); - } +pub(crate) enum FutureOrPoll { + Future(Py), + Poll(Poll>), +} - pub(super) fn initialize_future<'a>(&'a self, py: Python<'a>) -> PyResult> { - let init = || LoopAndFuture::new(py).map(Some); - let loop_and_future = self.0.get_or_try_init(py, init)?.as_ref(); - Ok(loop_and_future.map(|LoopAndFuture { future, .. }| future.as_ref(py))) - } +thread_local! { + pub(crate) static FUTURE_OR_POLL: Cell> = Cell::new(None); } -impl Wake for AsyncioWaker { - fn wake(self: Arc) { - self.wake_by_ref() +enum State { + Pending(AsyncioWaker), + Waken, + Delegated(PyObject), +} + +pub(super) struct CoroutineWaker { + state: GILOnceCell, + sent_result: Option>, +} + +impl CoroutineWaker { + pub(super) fn new(sent_result: Option>) -> Self { + Self { + state: GILOnceCell::new(), + sent_result, + } } - fn wake_by_ref(self: &Arc) { - Python::with_gil(|gil| { - if let Some(loop_and_future) = self.0.get_or_init(gil, || None) { - loop_and_future - .set_result(gil) - .expect("unexpected error in coroutine waker"); - } - }); + pub(super) fn reset(&mut self, sent_result: Option>) { + self.state.take(); + self.sent_result = sent_result; } -} -struct LoopAndFuture { - event_loop: PyObject, - future: PyObject, -} + pub(super) fn is_delegated(&self, py: Python<'_>) -> bool { + matches!(self.state.get(py), Some(State::Delegated(_))) + } -impl LoopAndFuture { - fn new(py: Python<'_>) -> PyResult { - static GET_RUNNING_LOOP: GILOnceCell = GILOnceCell::new(); - let import = || -> PyResult<_> { - let module = py.import("asyncio")?; - Ok(module.getattr("get_running_loop")?.into()) - }; - let event_loop = GET_RUNNING_LOOP.get_or_try_init(py, import)?.call0(py)?; - let future = event_loop.call_method0(py, "create_future")?; - Ok(Self { event_loop, future }) + pub(super) fn yield_(&self, py: Python<'_>) -> PyResult { + let init = || PyResult::Ok(State::Pending(AsyncioWaker::new(py)?)); + let state = self.state.get_or_try_init(py, init)?; + match state { + State::Waken => AsyncioWaker::yield_waken(py), + State::Delegated(obj) => Ok(obj.clone_ref(py)), + State::Pending(waker) => waker.yield_(py), + } } - fn set_result(&self, py: Python<'_>) -> PyResult<()> { - static RELEASE_WAITER: GILOnceCell> = GILOnceCell::new(); - let release_waiter = RELEASE_WAITER - .get_or_try_init(py, || wrap_pyfunction!(release_waiter, py).map(Into::into))?; - // `Future.set_result` must be called in event loop thread, - // so it requires `call_soon_threadsafe` - let call_soon_threadsafe = self.event_loop.call_method1( - py, - intern!(py, "call_soon_threadsafe"), - (release_waiter, self.future.as_ref(py)), - ); - if let Err(err) = call_soon_threadsafe { - // `call_soon_threadsafe` will raise if the event loop is closed; - // instead of catching an unspecific `RuntimeError`, check directly if it's closed. - let is_closed = self.event_loop.call_method0(py, "is_closed")?; - if !is_closed.extract(py)? { - return Err(err); + fn delegate(&self, future: &PyFuture) -> Poll> { + let py = future.py(); + match future.next(&self.sent_result) { + Ok(IterNextOutput::Return(ret)) => Poll::Ready(Ok(ret)), + Ok(IterNextOutput::Yield(yielded)) => { + let delegated = self.state.set(py, State::Delegated(yielded)); + assert!(delegated.is_ok(), "{}", MIXED_AWAITABLE_AND_FUTURE_ERROR); + Poll::Pending + } + Err(err) if err.is_instance_of::(py) => { + Poll::Ready(err.value(py).getattr(intern!(py, "value")).map(Into::into)) } + Err(err) => Poll::Ready(Err(err)), } - Ok(()) } } -/// Call `future.set_result` if the future is not done. -/// -/// Future can be cancelled by the event loop before being waken. -/// See -#[pyfunction(crate = "crate")] -fn release_waiter(future: &PyAny) -> PyResult<()> { - let done = future.call_method0(intern!(future.py(), "done"))?; - if !done.extract::()? { - future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?; +impl Wake for CoroutineWaker { + fn wake(self: Arc) { + self.wake_by_ref() + } + + fn wake_by_ref(self: &Arc) { + Python::with_gil(|gil| match FUTURE_OR_POLL.with(|cell| cell.take()) { + Some(FutureOrPoll::Future(fut)) => FUTURE_OR_POLL + .with(|cell| cell.set(Some(FutureOrPoll::Poll(self.delegate(fut.as_ref(gil)))))), + Some(FutureOrPoll::Poll(_)) => unreachable!(), + None => match self.state.get_or_init(gil, || State::Waken) { + State::Waken => {} + State::Delegated(_) => panic!("{}", MIXED_AWAITABLE_AND_FUTURE_ERROR), + State::Pending(waker) => waker.wake(gil).expect("wake error"), + }, + }) } - Ok(()) } diff --git a/src/gil.rs b/src/gil.rs index d346ad95ea9..b92ad25a67d 100644 --- a/src/gil.rs +++ b/src/gil.rs @@ -1,14 +1,18 @@ //! Interaction with Python's global interpreter lock -use crate::impl_::not_send::{NotSend, NOT_SEND}; -use crate::{ffi, Python}; -use parking_lot::{const_mutex, Mutex, Once}; -use std::cell::Cell; #[cfg(debug_assertions)] use std::cell::RefCell; #[cfg(not(debug_assertions))] use std::cell::UnsafeCell; -use std::{mem, ptr::NonNull}; +use std::{cell::Cell, mem, ptr::NonNull}; + +use parking_lot::{const_mutex, Mutex, Once}; + +use crate::{ + ffi, + impl_::not_send::{NotSend, NOT_SEND}, + Python, +}; static START: Once = Once::new(); @@ -506,11 +510,13 @@ fn decrement_gil_count() { #[cfg(test)] mod tests { - use super::{gil_is_acquired, GILPool, GIL_COUNT, OWNED_OBJECTS, POOL}; - use crate::{ffi, gil, PyObject, Python, ToPyObject}; + use std::ptr::NonNull; + #[cfg(not(target_arch = "wasm32"))] use parking_lot::{const_mutex, Condvar, Mutex}; - use std::ptr::NonNull; + + use super::{gil_is_acquired, GILPool, GIL_COUNT, OWNED_OBJECTS, POOL}; + use crate::{ffi, gil, PyObject, Python, ToPyObject}; fn get_object(py: Python<'_>) -> PyObject { // Convenience function for getting a single unique object, using `new_pool` so as to leave @@ -786,9 +792,10 @@ mod tests { #[test] #[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled fn test_clone_without_gil() { - use crate::{Py, PyAny}; use std::{sync::Arc, thread}; + use crate::{Py, PyAny}; + // Some events for synchronizing static GIL_ACQUIRED: Event = Event::new(); static OBJECT_CLONED: Event = Event::new(); @@ -851,9 +858,10 @@ mod tests { #[test] #[cfg(not(target_arch = "wasm32"))] // We are building wasm Python with pthreads disabled fn test_clone_in_other_thread() { - use crate::Py; use std::{sync::Arc, thread}; + use crate::Py; + // Some events for synchronizing static OBJECT_CLONED: Event = Event::new(); @@ -925,4 +933,46 @@ mod tests { POOL.update_counts(py); }) } + + #[cfg(feature = "macros")] + #[test] + fn allow_threads_fn() { + #[crate::pyfunction(allow_threads, crate = "crate")] + fn without_gil() { + GIL_COUNT.with(|c| assert_eq!(c.get(), 0)); + } + Python::with_gil(|gil| { + let without_gil = crate::wrap_pyfunction!(without_gil, gil).unwrap(); + crate::py_run!(gil, without_gil, "without_gil()"); + }) + } + + #[cfg(feature = "macros")] + #[test] + fn allow_threads_async_fn() { + #[crate::pyfunction(allow_threads, crate = "crate")] + async fn without_gil() { + use std::task::Poll; + GIL_COUNT.with(|c| assert_eq!(c.get(), 0)); + let mut ready = false; + futures::future::poll_fn(|cx| { + if ready { + return Poll::Ready(()); + } + ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + }) + .await; + GIL_COUNT.with(|c| assert_eq!(c.get(), 0)); + } + Python::with_gil(|gil| { + let without_gil = crate::wrap_pyfunction!(without_gil, gil).unwrap(); + crate::py_run!( + gil, + without_gil, + "import asyncio; asyncio.run(without_gil())" + ); + }) + } } diff --git a/src/impl_/coroutine.rs b/src/impl_/coroutine.rs index 32f3e94ad8a..d54d499c11a 100644 --- a/src/impl_/coroutine.rs +++ b/src/impl_/coroutine.rs @@ -15,14 +15,21 @@ pub fn new_coroutine( name: &PyString, qualname_prefix: Option<&'static str>, throw_callback: Option, + allow_threads: bool, future: F, ) -> Coroutine where F: Future> + Send + 'static, - T: IntoPy, - E: Into, + T: IntoPy + Send, + E: Into + Send, { - Coroutine::new(Some(name.into()), qualname_prefix, throw_callback, future) + Coroutine::new( + Some(name.into()), + qualname_prefix, + throw_callback, + allow_threads, + future, + ) } fn get_ptr(obj: &Py) -> *mut T { diff --git a/src/lib.rs b/src/lib.rs index fcf75610656..1386459f9af 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -466,6 +466,7 @@ pub mod doc_test { "README.md" => readme_md, "guide/src/advanced.md" => guide_advanced_md, "guide/src/async-await.md" => guide_async_await_md, + "guide/src/async-await/pyfuture.md" => guide_async_await_pyfuture_md, "guide/src/building_and_distribution.md" => guide_building_and_distribution_md, "guide/src/building_and_distribution/multiple_python_versions.md" => guide_bnd_multiple_python_versions_md, "guide/src/class.md" => guide_class_md, diff --git a/src/tests/common.rs b/src/tests/common.rs index bc5a33d5cdd..6042b029bff 100644 --- a/src/tests/common.rs +++ b/src/tests/common.rs @@ -6,13 +6,14 @@ #[macro_use] mod inner { + use pyo3::{ + prelude::*, + types::{IntoPyDict, PyList}, + }; + #[allow(unused_imports)] // pulls in `use crate as pyo3` in `test_utils.rs` use super::*; - use pyo3::prelude::*; - - use pyo3::types::{IntoPyDict, PyList}; - #[macro_export] macro_rules! py_assert { ($py:expr, $($val:ident)+, $assertion:literal) => { @@ -147,6 +148,17 @@ mod inner { .unwrap(); }}; } + + // see https://stackoverflow.com/questions/60359157/valueerror-set-wakeup-fd-only-works-in-main-thread-on-windows-on-python-3-8-wit + #[cfg(feature = "macros")] + pub fn asyncio_windows(test: &str) -> String { + let set_event_loop_policy = r#" + import asyncio, sys + if sys.platform == "win32": + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + "#; + pyo3::unindent::unindent(set_event_loop_policy) + &pyo3::unindent::unindent(test) + } } #[allow(unused_imports)] // some tests use just the macros and none of the other functionality diff --git a/src/types/future.rs b/src/types/future.rs new file mode 100644 index 00000000000..da349adb383 --- /dev/null +++ b/src/types/future.rs @@ -0,0 +1,178 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +#[cfg(feature = "macros")] +use crate::{ + coroutine::waker::{FutureOrPoll, FUTURE_OR_POLL}, + pyclass::IterNextOutput, + PyErr, +}; +use crate::{ + exceptions::{PyAttributeError, PyTypeError}, + ffi, + instance::Py2, + sync::GILOnceCell, + types::any::PyAnyMethods, + IntoPy, Py, PyAny, PyNativeType, PyObject, PyResult, Python, +}; + +const NOT_IN_COROUTINE_CONTEXT: &str = "PyFuture must be awaited in coroutine context"; + +/// A Python object returned by `__await__`. +/// +/// # Examples +/// +/// ```rust +/// use pyo3::prelude::*; +/// +/// # fn main() -> PyResult<()> { +/// Python::with_gil(|py| -> PyResult<()> { +/// let list = py.eval("iter([1, 2, 3, 4])", None, None)?; +/// let numbers: PyResult> = list +/// .iter()? +/// .map(|i| i.and_then(PyAny::extract::)) +/// .collect(); +/// let sum: usize = numbers?.iter().sum(); +/// assert_eq!(sum, 10); +/// Ok(()) +/// }) +/// # } +/// ``` +#[repr(transparent)] +pub struct PyFuture(PyAny); +pyobject_native_type_named!(PyFuture); + +fn is_awaitable(obj: &Py2<'_, PyAny>) -> PyResult { + static IS_AWAITABLE: GILOnceCell = GILOnceCell::new(); + let import = || PyResult::Ok(obj.py().import("inspect")?.getattr("isawaitable")?.into()); + IS_AWAITABLE + .get_or_try_init(obj.py(), import)? + .call1(obj.py(), (obj,))? + .extract(obj.py()) +} + +impl PyFuture { + /// Constructs a `PyFuture` from a Python awaitable object. + /// + /// Equivalent to calling `__await__` method (or `__iter__` for generator-based coroutines). + pub fn from_object(obj: &PyAny) -> PyResult<&PyFuture> { + Self::from_object2(Py2::borrowed_from_gil_ref(&obj)).map(|py2| { + // Can't use into_gil_ref here because T: PyTypeInfo bound is not satisfied + // Safety: into_ptr produces a valid pointer to PyFuture object + unsafe { obj.py().from_owned_ptr(py2.into_ptr()) } + }) + } + + pub(crate) fn from_object2<'py>(obj: &Py2<'py, PyAny>) -> PyResult> { + let __await__ = intern!(obj.py(), "__await__"); + match obj.call_method0(__await__) { + Ok(obj) => Ok(unsafe { obj.downcast_into_unchecked() }), + Err(err) if err.is_instance_of::(obj.py()) => { + if obj.hasattr(__await__)? { + Err(err) + } else if is_awaitable(obj)? { + unsafe { + Py2::from_owned_ptr_or_err(obj.py(), ffi::PyObject_GetIter(obj.as_ptr())) + .map(|any| any.downcast_into_unchecked()) + } + } else { + Err(PyTypeError::new_err(format!( + "object {tp} can't be used in 'await' expression", + tp = obj.get_type().name()? + ))) + } + } + Err(err) => Err(err), + } + } + + #[cfg(feature = "macros")] + pub(crate) fn next( + &self, + prev_result: &Option>, + ) -> PyResult> { + let py = self.0.py(); + match prev_result { + Some(Ok(val)) => { + cfg_if::cfg_if! { + if #[cfg(all(Py_3_10, not(PyPy), not(Py_LIMITED_API)))] { + let mut result = std::ptr::null_mut(); + match unsafe { ffi::PyIter_Send(self.0.as_ptr(), val.as_ptr(), &mut result) } + { + -1 => Err(PyErr::take(py).unwrap()), + 0 => Ok(IterNextOutput::Return(unsafe { + PyObject::from_owned_ptr(py, result) + })), + 1 => Ok(IterNextOutput::Yield(unsafe { + PyObject::from_owned_ptr(py, result) + })), + _ => unreachable!(), + } + } else { + let send = intern!(py, "send"); + if val.is_none(py) || !self.0.hasattr(send).unwrap_or(false) { + self.0.call_method0(intern!(py, "__next__")) + } else { + self.0.call_method1(send, (val,)) + } + .map(Into::into) + .map(IterNextOutput::Yield) + } + } + } + Some(Err(err)) => { + let throw = intern!(py, "throw"); + if self.0.hasattr(throw).unwrap_or(false) { + self.0 + .call_method1(throw, (err,)) + .map(Into::into) + .map(IterNextOutput::Yield) + } else { + Err(PyErr::from_value(err.as_ref(py))) + } + } + None => { + let close = intern!(py, "close"); + if self.0.hasattr(close).unwrap_or(false) { + self.0 + .call_method0(close) + .map(Into::into) + .map(IterNextOutput::Return) + } else { + Ok(IterNextOutput::Return(py.None().into())) + } + } + } + } +} + +impl Future for Py { + type Output = PyResult; + + #[cfg(feature = "macros")] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + FUTURE_OR_POLL.with(|cell| cell.set(Some(FutureOrPoll::Future(self.clone())))); + cx.waker().wake_by_ref(); + match FUTURE_OR_POLL.with(|cell| cell.take()) { + Some(FutureOrPoll::Poll(poll)) => poll, + Some(FutureOrPoll::Future(_)) => panic!("{}", NOT_IN_COROUTINE_CONTEXT), + None => unreachable!(), + } + } + #[cfg(not(feature = "macros"))] + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll { + panic!("{}", NOT_IN_COROUTINE_CONTEXT) + } +} + +impl Py { + /// Constructs a `PyFuture` from a Python awaitable object. + /// + /// Equivalent to calling `__await__` method (or `__iter__` for generator-based coroutines). + pub fn from_object(py: Python<'_>, awaitable: PyObject) -> PyResult { + Ok(PyFuture::from_object(awaitable.as_ref(py))?.into_py(py)) + } +} diff --git a/src/types/mod.rs b/src/types/mod.rs index b20fdf37305..9013eac7a30 100644 --- a/src/types/mod.rs +++ b/src/types/mod.rs @@ -24,6 +24,7 @@ pub use self::frozenset::{PyFrozenSet, PyFrozenSetBuilder}; pub use self::function::PyCFunction; #[cfg(all(not(Py_LIMITED_API), not(PyPy)))] pub use self::function::PyFunction; +pub use self::future::PyFuture; pub use self::iterator::PyIterator; pub use self::list::PyList; pub use self::mapping::PyMapping; @@ -286,6 +287,7 @@ mod floatob; mod frame; mod frozenset; mod function; +mod future; mod iterator; pub(crate) mod list; mod mapping; diff --git a/tests/test_awaitable.rs b/tests/test_awaitable.rs new file mode 100644 index 00000000000..82f27612299 --- /dev/null +++ b/tests/test_awaitable.rs @@ -0,0 +1,178 @@ +#![cfg(feature = "macros")] + +use std::task::Poll; + +use futures::{future::poll_fn, FutureExt}; +use pyo3::{ + coroutine::CancelHandle, + exceptions::{PyAttributeError, PyTypeError}, + prelude::*, + py_run, + types::PyFuture, +}; + +#[path = "../src/tests/common.rs"] +mod common; + +#[pyfunction] +async fn wrap_awaitable(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + future.await +} + +#[test] +fn awaitable() { + Python::with_gil(|gil| { + let wrap_awaitable = wrap_pyfunction!(wrap_awaitable, gil).unwrap(); + let test = r#" + import types + import asyncio; + + class BadAwaitable: + def __await__(self): + raise AttributeError("__await__") + + @types.coroutine + def gen_coro(): + yield None + + async def main(): + await wrap_awaitable(...) + asyncio.run(main()) + "#; + let globals = gil.import("__main__").unwrap().dict(); + globals.set_item("wrap_awaitable", wrap_awaitable).unwrap(); + let run = |awaitable| { + gil.run( + &common::asyncio_windows(test).replace("...", awaitable), + Some(globals), + None, + ) + }; + run("asyncio.sleep(0.001)").unwrap(); + run("gen_coro()").unwrap(); + assert!(run("None").unwrap_err().is_instance_of::(gil)); + assert!(run("BadAwaitable()") + .unwrap_err() + .is_instance_of::(gil)); + }) +} + +#[test] +fn cancel_delegation() { + #[pyfunction] + async fn wrap_cancellable(awaitable: PyObject, #[pyo3(cancel_handle)] cancel: CancelHandle) { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable)).unwrap(); + let result = future.await; + Python::with_gil(|gil| { + assert_eq!( + result.unwrap_err().get_type(gil).name().unwrap(), + "CancelledError" + ) + }); + assert!(!cancel.is_cancelled()); + } + Python::with_gil(|gil| { + let wrap_cancellable = wrap_pyfunction!(wrap_cancellable, gil).unwrap(); + let test = r#" + import asyncio; + + async def main(): + task = asyncio.create_task(wrap_cancellable(asyncio.sleep(0.001))) + await asyncio.sleep(0) + task.cancel() + await task + asyncio.run(main()) + "#; + let globals = gil.import("__main__").unwrap().dict(); + globals + .set_item("wrap_cancellable", wrap_cancellable) + .unwrap(); + gil.run(&common::asyncio_windows(test), Some(globals), None) + .unwrap(); + }) +} + +#[test] +#[should_panic(expected = "PyFuture must be awaited in coroutine context")] +fn pyfuture_without_coroutine() { + #[pyfunction] + fn block_on(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + futures::executor::block_on(future) + } + Python::with_gil(|gil| { + let block_on = wrap_pyfunction!(block_on, gil).unwrap(); + let test = r#" + async def coro(): + ... + block_on(coro()) + "#; + py_run!(gil, block_on, &common::asyncio_windows(test)); + }) +} + +async fn checkpoint() { + let mut ready = false; + poll_fn(|cx| { + if ready { + return Poll::Ready(()); + } + ready = true; + cx.waker().wake_by_ref(); + Poll::Pending + }) + .await +} + +#[test] +#[should_panic(expected = "Python awaitable mixed with Rust future")] +fn pyfuture_in_select() { + #[pyfunction] + async fn select(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + futures::select_biased! { + _ = checkpoint().fuse() => unreachable!(), + res = future.fuse() => res, + } + } + Python::with_gil(|gil| { + let select = wrap_pyfunction!(select, gil).unwrap(); + let test = r#" + import asyncio; + async def main(): + return await select(asyncio.sleep(1)) + asyncio.run(main()) + "#; + let globals = gil.import("__main__").unwrap().dict(); + globals.set_item("select", select).unwrap(); + gil.run(&common::asyncio_windows(test), Some(globals), None) + .unwrap(); + }) +} + +#[test] +#[should_panic(expected = "Python awaitable mixed with Rust future")] +fn pyfuture_in_select2() { + #[pyfunction] + async fn select2(awaitable: PyObject) -> PyResult { + let future: Py = Python::with_gil(|gil| Py::from_object(gil, awaitable))?; + futures::select_biased! { + res = future.fuse() => res, + _ = checkpoint().fuse() => unreachable!(), + } + } + Python::with_gil(|gil| { + let select2 = wrap_pyfunction!(select2, gil).unwrap(); + let test = r#" + import asyncio; + async def main(): + return await select2(asyncio.sleep(1)) + asyncio.run(main()) + "#; + let globals = gil.import("__main__").unwrap().dict(); + globals.set_item("select2", select2).unwrap(); + gil.run(&common::asyncio_windows(test), Some(globals), None) + .unwrap(); + }) +} diff --git a/tests/test_coroutine.rs b/tests/test_coroutine.rs index 8acea3ea3e8..86168851db8 100644 --- a/tests/test_coroutine.rs +++ b/tests/test_coroutine.rs @@ -13,15 +13,6 @@ use pyo3::{ #[path = "../src/tests/common.rs"] mod common; -fn handle_windows(test: &str) -> String { - let set_event_loop_policy = r#" - import asyncio, sys - if sys.platform == "win32": - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - "#; - pyo3::unindent::unindent(set_event_loop_policy) + &pyo3::unindent::unindent(test) -} - #[test] fn noop_coroutine() { #[pyfunction] @@ -31,7 +22,7 @@ fn noop_coroutine() { Python::with_gil(|gil| { let noop = wrap_pyfunction!(noop, gil).unwrap(); let test = "import asyncio; assert asyncio.run(noop()) == 42"; - py_run!(gil, noop, &handle_windows(test)); + py_run!(gil, noop, &common::asyncio_windows(test)); }) } @@ -69,7 +60,7 @@ fn test_coroutine_qualname() { ("MyClass", gil.get_type::()), ] .into_py_dict(gil); - py_run!(gil, *locals, &handle_windows(test)); + py_run!(gil, *locals, &common::asyncio_windows(test)); }) } @@ -91,7 +82,7 @@ fn sleep_0_like_coroutine() { Python::with_gil(|gil| { let sleep_0 = wrap_pyfunction!(sleep_0, gil).unwrap(); let test = "import asyncio; assert asyncio.run(sleep_0()) == 42"; - py_run!(gil, sleep_0, &handle_windows(test)); + py_run!(gil, sleep_0, &common::asyncio_windows(test)); }) } @@ -110,7 +101,7 @@ fn sleep_coroutine() { Python::with_gil(|gil| { let sleep = wrap_pyfunction!(sleep, gil).unwrap(); let test = r#"import asyncio; assert asyncio.run(sleep(0.1)) == 42"#; - py_run!(gil, sleep, &handle_windows(test)); + py_run!(gil, sleep, &common::asyncio_windows(test)); }) } @@ -130,11 +121,7 @@ fn cancelled_coroutine() { let globals = gil.import("__main__").unwrap().dict(); globals.set_item("sleep", sleep).unwrap(); let err = gil - .run( - &pyo3::unindent::unindent(&handle_windows(test)), - Some(globals), - None, - ) + .run(&common::asyncio_windows(test), Some(globals), None) .unwrap_err(); assert_eq!(err.value(gil).get_type().name().unwrap(), "CancelledError"); }) @@ -167,12 +154,8 @@ fn coroutine_cancel_handle() { globals .set_item("cancellable_sleep", cancellable_sleep) .unwrap(); - gil.run( - &pyo3::unindent::unindent(&handle_windows(test)), - Some(globals), - None, - ) - .unwrap(); + gil.run(&common::asyncio_windows(test), Some(globals), None) + .unwrap(); }) } @@ -197,12 +180,8 @@ fn coroutine_is_cancelled() { "#; let globals = gil.import("__main__").unwrap().dict(); globals.set_item("sleep_loop", sleep_loop).unwrap(); - gil.run( - &pyo3::unindent::unindent(&handle_windows(test)), - Some(globals), - None, - ) - .unwrap(); + gil.run(&common::asyncio_windows(test), Some(globals), None) + .unwrap(); }) } @@ -231,7 +210,7 @@ fn coroutine_panic() { else: assert False; "#; - py_run!(gil, panic, &handle_windows(test)); + py_run!(gil, panic, &common::asyncio_windows(test)); }) } @@ -284,6 +263,6 @@ fn test_async_method_receiver() { assert asyncio.run(coro3) == 1 "#; let locals = [("Counter", gil.get_type::())].into_py_dict(gil); - py_run!(gil, *locals, test); + py_run!(gil, *locals, &common::asyncio_windows(test)); }) } diff --git a/tests/ui/invalid_pyfunction_signatures.stderr b/tests/ui/invalid_pyfunction_signatures.stderr index dbca169d8ea..d48e21fa1ea 100644 --- a/tests/ui/invalid_pyfunction_signatures.stderr +++ b/tests/ui/invalid_pyfunction_signatures.stderr @@ -16,7 +16,7 @@ error: expected argument from function definition `y` but got argument `x` 13 | #[pyo3(signature = (x))] | ^ -error: expected one of: `name`, `pass_module`, `signature`, `text_signature`, `crate` +error: expected one of: `allow_threads`, `name`, `pass_module`, `signature`, `text_signature`, `crate` --> tests/ui/invalid_pyfunction_signatures.rs:18:14 | 18 | #[pyfunction(x)]