Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions examples/runtime/hello_world/hello_world.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,21 @@

import uvloop

from dynamo.runtime import DistributedRuntime, dynamo_endpoint, dynamo_worker
from dynamo.runtime import DistributedRuntime, PyContext, dynamo_endpoint, dynamo_worker
from dynamo.runtime.logging import configure_dynamo_logging

logger = logging.getLogger(__name__)
configure_dynamo_logging(service_name="backend")


@dynamo_endpoint(str, str)
async def content_generator(request: str):
logger.info(f"Received request: {request}")
async def content_generator(request: str, context: PyContext):
logger.info(f"Received request: {request} with `id={context.id()}`")
for word in request.split(","):
await asyncio.sleep(1)
if context.is_stopped() or context.is_killed():
print("request got cancelled.")
return
yield f"Hello {word}!"


Expand Down
76 changes: 76 additions & 0 deletions lib/bindings/python/rust/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

// PyContext is a wrapper around the AsyncEngineContext to allow for Python bindings.

pub use dynamo_runtime::pipeline::AsyncEngineContext;
use pyo3::prelude::*;
use std::sync::Arc;

// PyContext is a wrapper around the AsyncEngineContext to allow for Python bindings.
// Not all methods of the AsyncEngineContext are exposed, jsut the primary ones for tracing + cancellation.
// Kept as class, to allow for future expansion if needed.
#[pyclass]
pub struct PyContext {
pub inner: Arc<dyn AsyncEngineContext>,
}

impl PyContext {
pub fn new(inner: Arc<dyn AsyncEngineContext>) -> Self {
Self { inner }
}
}

#[pymethods]
impl PyContext {
// sync method of `await async_is_stopped()`
fn is_stopped(&self) -> bool {
self.inner.is_stopped()
}

// sync method of `await async_is_killed()`
fn is_killed(&self) -> bool {
self.inner.is_killed()
}
// issues a stop generating
fn stop_generating(&self) {
self.inner.stop_generating();
}

fn id(&self) -> &str {
self.inner.id()
}

// allows building a async callback.
fn async_killed_or_stopped<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();

pyo3_async_runtimes::tokio::future_into_py(py, async move {
tokio::select! {
_ = inner.killed() => {
Ok(true)
}
_ = inner.stopped() => {
Ok(true)
}
}
})
}
}

// PyO3 equivalent for verify if signature contains target_name
// def callable_accepts_kwarg(target_name: str):
// import inspect
// return target_name in inspect.signature(func).parameters
pub fn callable_accepts_kwarg(
py: Python,
callable: &Bound<'_, PyAny>,
target_name: &str,
) -> PyResult<bool> {
let inspect: Bound<'_, PyModule> = py.import("inspect")?;
let signature = inspect.call_method1("signature", (callable,))?;
let params_any: Bound<'_, PyAny> = signature.getattr("parameters")?;
params_any
.call_method1("__contains__", (target_name,))?
.extract::<bool>()
}
29 changes: 25 additions & 4 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use super::context::{callable_accepts_kwarg, PyContext};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};
use pyo3::{PyAny, PyErr};
use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};

Expand All @@ -36,7 +38,6 @@ pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PythonAsyncEngine>()?;
Ok(())
}

// todos:
// - [ ] enable context cancellation
// - this will likely require a change to the function signature python calling arguments
Expand Down Expand Up @@ -113,6 +114,7 @@ pub struct PythonServerStreamingEngine {
_cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
has_pycontext: bool,
}

impl PythonServerStreamingEngine {
Expand All @@ -121,10 +123,16 @@ impl PythonServerStreamingEngine {
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
) -> Self {
let has_pycontext = Python::with_gil(|py| {
let callable = generator.bind(py);
callable_accepts_kwarg(py, callable, "context").unwrap_or(false)
});

PythonServerStreamingEngine {
_cancel_token: cancel_token,
generator,
event_loop,
has_pycontext,
}
}
}
Expand Down Expand Up @@ -166,6 +174,8 @@ where

let generator = self.generator.clone();
let event_loop = self.event_loop.clone();
let ctx_python = ctx.clone();
let has_pycontext = self.has_pycontext;

// Acquiring the GIL is similar to acquiring a standard lock/mutex
// Performing this in an tokio async task could block the thread for an undefined amount of time
Expand All @@ -180,7 +190,18 @@ where
let stream = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let gen = generator.call1(py, (py_request,))?;
let py_ctx = Py::new(py, PyContext::new(ctx_python.clone()))?;

let gen = if has_pycontext {
// Pass context as a kwarg
let kwarg = PyDict::new(py);
kwarg.set_item("context", &py_ctx)?;
generator.call(py, (py_request,), Some(&kwarg))
} else {
// Legacy: No `context` arg
generator.call1(py, (py_request,))
}?;

let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})
Expand Down
2 changes: 2 additions & 0 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ impl From<RouterMode> for RsRouterMode {
}
}

mod context;
mod engine;
mod http;
mod llm;
Expand Down Expand Up @@ -103,6 +104,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?;
m.add_class::<context::PyContext>()?;
m.add_class::<EtcdKvCache>()?;
m.add_class::<ModelType>()?;
m.add_class::<llm::kv::ForwardPassMetrics>()?;
Expand Down
1 change: 1 addition & 0 deletions lib/bindings/python/src/dynamo/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from dynamo._core import EtcdKvCache as EtcdKvCache
from dynamo._core import ModelDeploymentCard as ModelDeploymentCard
from dynamo._core import OAIChatPreprocessor as OAIChatPreprocessor
from dynamo._core import PyContext as PyContext


def dynamo_worker(static=False):
Expand Down
Loading