Skip to content

How to use allow_threads with existing stream to avoid dead lock? #4

@npuichigo

Description

@npuichigo

Recently, I encountered a same issue mentioned here related to dead lock PyO3/pyo3#3540 (comment)

Here I use the par_stream crate to provide a parallel async stream implementation and wrap that with pyo3-async. The par_map returns a flume RecvStream which internally has a lock for the queue.

#[pymodule]
fn snake(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
    m.add_function(wrap_pyfunction!(parallel_stream, m)?)?;
    Ok(())
}

fn tokio() -> &'static tokio::runtime::Runtime {
    use std::sync::OnceLock;
    static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
    RT.get_or_init(|| tokio::runtime::Runtime::new().unwrap())
}

fn fib(n: u64) -> u64 {
    match n {
        0 => 0,
        1 => 1,
        _ => fib(n - 1) + fib(n - 2),
    }
}

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let _guard = tokio().enter();
    futures::stream::iter(0..100).par_map(None, |i| move || Ok(fib(i)))
}

#[pyfunction]
fn parallel_stream() -> pyo3_async::asyncio::AsyncGenerator {
    pyo3_async::asyncio::AsyncGenerator::from_stream(map())
}

Now it has a dead lock and hangs in Python.

import snake
import asyncio

async def fn():
    async for i in snake.parallel_stream():
        print(i)

asyncio.run(fn())

微信图片_20231209225251
微信图片_20231209225245

I have no idea where to add allow_thread here, just adding to parallel_stream seems not work.

BTW, if I change to use a tokio mpsc channel like this:

fn map() -> impl Stream<Item = PyResult<u64>> + Send {
    let (tx, mut rx) = tokio::sync::mpsc::channel(100);
    tokio().spawn(async move {
        futures::stream::iter(0..1000)
            .par_map(
                ParParams {
                    num_workers: 32,
                    buf_size: Some(10),
                },
                |_| move || Ok(fib(45)),
            )
            .for_each(|item| {
                let tx = tx.clone();
                async move {
                    let _ = tx.send(item).await;
                }
            })
            .await;
    });

    stream! {
        while let Some(item) = rx.recv().await {
            yield item;
        }
    }

It works well and no dead lock occurs. I'm not quite sure the essential difference.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions