Skip to content
35 changes: 29 additions & 6 deletions components/backends/vllm/src/dynamo/vllm/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, component, engine, default_sampling_params):
self.kv_publisher = None

@abstractmethod
async def generate(self, request) -> AsyncGenerator[dict, None]:
async def generate(self, request, context) -> AsyncGenerator[dict, None]:
raise NotImplementedError

async def clear_kv_blocks(self, request=None):
Expand Down Expand Up @@ -110,7 +110,7 @@ def cleanup(self):
self._prefill_check_task.cancel()
super().cleanup()

async def generate(self, request):
async def generate(self, request, context):
request_id = str(uuid.uuid4().hex)
logger.debug(f"New Request ID: {request_id}")

Expand Down Expand Up @@ -147,9 +147,20 @@ async def generate(self, request):

# TODO Change to prefill queue
if self.prefill_worker_client is not None:
prefill_response = await anext(
await self.prefill_worker_client.round_robin(prefill_request)
)
try:
prefill_response = await anext(
await self.prefill_worker_client.round_robin(
prefill_request, context=context
)
)
except Exception as e:
# TODO: Cancellation does not propagate until the first token is received
if context.is_stopped() or context.is_killed():
logger.debug(f"Aborted Remote Prefill Request ID: {request_id}")
# TODO: Raise asyncio.CancelledError into bindings
return
raise e

prefill_response = MyRequestOutput.model_validate_json(
prefill_response.data()
)
Expand All @@ -162,14 +173,20 @@ async def generate(self, request):
] = prefill_response.kv_transfer_params

async for tok in self.generate_tokens(prompt, sampling_params, request_id):
if context.is_stopped() or context.is_killed():
await self.engine_client.abort(request_id)
logger.debug(f"Aborted Request ID: {request_id}")
# TODO: Raise asyncio.CancelledError into bindings
break

yield tok


class PrefillWorkerHandler(BaseWorkerHandler):
def __init__(self, component, engine, default_sampling_params):
super().__init__(component, engine, default_sampling_params)

async def generate(self, request):
async def generate(self, request, context):
request_id = request["request_id"]
logger.debug(f"New Prefill Request ID: {request_id}")

Expand All @@ -181,6 +198,12 @@ async def generate(self, request):
# Generate only 1 token in prefill
try:
async for res in gen:
if context.is_stopped() or context.is_killed():
await self.engine_client.abort(request_id)
logger.debug(f"Aborted Prefill Request ID: {request_id}")
# TODO: Raise asyncio.CancelledError into bindings
break

logger.debug(f"kv transfer params: {res.kv_transfer_params}")
yield MyRequestOutput(
request_id=res.request_id,
Expand Down
86 changes: 86 additions & 0 deletions docs/architecture/request_cancellation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Request Cancellation Architecture

This document describes how Dynamo implements request cancellation to cancel in-flight requests between Dynamo workers. Request cancellation allows in-flight requests to terminate early, saving computational resources that would otherwise be spent on responses that are no longer needed.

## AsyncEngineContext Trait

At the core of Dynamo's request cancellation system is the `AsyncEngineContext` trait. This trait is associated with every request stream and provides lifecycle management for async operations, including stream identification, graceful shutdown capabilities, and immediate termination capabilities.

### Key Methods

#### Identification
- **`id()`**: Returns the unique identifier for the stream. This ID is set by the user for request identification, and the same ID can be used for sub-requests to associate them with the original user request.

#### Status Checking
- **`is_stopped()`**: Returns `true` if graceful cancellation has been requested via `stop_generating()`. This represents a signal to the worker that the request has been cancelled and it should return early.
- **`is_killed()`**: Returns `true` if a hard stop has been issued via `kill()`. This typically indicates that the network connection between client and server has been cut or an immediate termination is required.

#### Async Status Monitoring
- **`stopped()`**: An async method that completes when the context becomes stopped. If already stopped, returns immediately.
- **`killed()`**: An async method that completes when the context becomes killed. If already killed, returns immediately.

#### Cancellation Control
- **`stop_generating()`**: The recommended method for cancelling a request. This informs the engine to stop producing results for the stream gracefully. This method is idempotent and does not invalidate results currently in the stream.
- **`stop()`**: Alias for `stop_generating()`.
- **`kill()`**: Extends `stop_generating()` but also indicates a preference to terminate without draining remaining items in the stream. This is implementation-specific and may not be supported by all engines.

#### Child Request Management
- **`link_child(child: Arc<dyn AsyncEngineContext>)`**: Links a child `AsyncEngineContext` to this context. When `stop_generating()`, `stop()`, or `kill()` is called on the parent context, the same method is automatically called on all linked child contexts in the order they were linked. This is especially useful in disaggregated serving scenarios where a frontend receives cancellation notification and needs to cancel requests to workers, and the worker can then cancel its sub-requests (e.g., remote prefill operations).

### Thread Safety

The `AsyncEngineContext` trait ensures thread-safety with `Send + Sync` bounds, allowing safe concurrent access across multiple threads and async tasks.

## Python Bindings

The `AsyncEngineContext` functionality is exposed to Python through the `Context` class, which provides a largely one-to-one mapping from Rust methods to Python methods.

### Python Context Class

The Python `Context` class wraps the Rust `AsyncEngineContext` and exposes the following methods:

- **`id()`**: Returns the unique identifier for the context
- **`is_stopped()`**: Synchronous method equivalent to the Rust `is_stopped()`
- **`is_killed()`**: Synchronous method equivalent to the Rust `is_killed()`
- **`stop_generating()`**: Issues a stop generating signal, equivalent to the Rust method
- **`async_killed_or_stopped()`**: An async method that completes when the context becomes either killed or stopped, whichever happens first. This combines the functionality of the Rust `killed()` and `stopped()` async methods using `tokio::select!`.

### Context Usage in Python

The context is available optionally in both incoming and outgoing request scenarios:

#### Incoming Requests
For incoming requests, the generate method may optionally accept a `context` argument after the `request` argument. If the `context` parameter is specified in the method signature, it will receive the context object of the incoming request. Request handlers can:

- Check for cancellation synchronously using `context.is_stopped()` before beginning expensive operations
- Listen for cancellation asynchronously using `await context.async_killed_or_stopped()`

Example:
```python
async def generate(self, request, context):
for i in range(1000):
# Check for cancellation before expensive work
if context.is_stopped():
raise asyncio.CancelledError

# Perform work...
await expensive_computation()
yield result
```

#### Outgoing Requests
For outgoing requests, Python scripts may optionally provide a context object to outgoing runtime endpoint client router operations (such as `generate`, `round_robin`, `random`, `direct` methods) as a keyword argument. The script can cancel the outgoing request via the provided context object.

This is especially useful when child outgoing requests need to be cancelled when the parent incoming request is cancelled. In such cases, the script can simply pass the incoming context object to the outgoing request, automatically linking the cancellation behavior.

Example:
```python
async def generate(self, request, context):
# Forward the incoming context to outgoing request
# If the incoming request is cancelled, the outgoing request will be too
stream = await self.client.generate(request, context=context)
async for response in stream:
yield response
```

This design enables seamless cancellation propagation through multi-tier request chains, ensuring that when a client cancels a request, all associated sub-requests are automatically cancelled, saving computational resources across the entire request pipeline.
22 changes: 22 additions & 0 deletions docs/guides/backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,25 @@ class RequestHandler:
When `GeneratorExit` is raised, the frontend receives the incomplete response and can seamlessly continue generation on another available worker instance, preserving the user experience even during worker shutdowns.

For more information about how request migration works, see the [Request Migration Architecture](../architecture/request_migration.md) documentation.

## Request Cancellation

Your Python worker's request handler can optionally support request cancellation by accepting a `context` argument after the `request` argument. This context object allows you to check for cancellation signals and respond appropriately:

```python
class RequestHandler:

async def generate(self, request, context):
"""Generate response with cancellation support"""
for result in self.engine.generate_streaming(request):
# Check if the request has been cancelled
if context.is_stopped():
# Stop processing and clean up
break

yield result
```

The context parameter is optional - if your generate method doesn't include it in its signature, Dynamo will call your method without the context argument.

For detailed information about request cancellation, including async cancellation monitoring and context propagation patterns, see the [Request Cancellation Architecture](../architecture/request_cancellation.md) documentation.
6 changes: 6 additions & 0 deletions docs/guides/dynamo_run.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ dynamo-run in=dyn://... out=<engine> ... --migration-limit=3

This allows a request to be migrated up to 3 times before failing. See the [Request Migration Architecture](../architecture/request_migration.md) documentation for details on how this works.

### Request Cancellation

When using the HTTP interface (`in=http`), if the HTTP request connection is dropped by the client, Dynamo automatically cancels the downstream request to the worker. This ensures that computational resources are not wasted on generating responses that are no longer needed.

For detailed information about how request cancellation works across the system, see the [Request Cancellation Architecture](../architecture/request_cancellation.md) documentation.

## Development

`dynamo-run` is also an example of what can be built in Rust with the `dynamo-llm` and `dynamo-runtime` crates. The following guide shows how to build from source with all the features.
Expand Down
30 changes: 24 additions & 6 deletions lib/bindings/python/rust/context.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,46 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

// PyContext is a wrapper around the AsyncEngineContext to allow for Python bindings.
// Context is a wrapper around the AsyncEngineContext to allow for Python bindings.

use dynamo_runtime::pipeline::context::Controller;
pub use dynamo_runtime::pipeline::AsyncEngineContext;
use pyo3::prelude::*;
use std::sync::Arc;

// PyContext is a wrapper around the AsyncEngineContext to allow for Python bindings.
// Context is a wrapper around the AsyncEngineContext to allow for Python bindings.
// Not all methods of the AsyncEngineContext are exposed, jsut the primary ones for tracing + cancellation.
// Kept as class, to allow for future expansion if needed.
#[derive(Clone)]
#[pyclass]
pub struct PyContext {
pub inner: Arc<dyn AsyncEngineContext>,
pub struct Context {
inner: Arc<dyn AsyncEngineContext>,
}

impl PyContext {
impl Context {
pub fn new(inner: Arc<dyn AsyncEngineContext>) -> Self {
Self { inner }
}

pub fn inner(&self) -> Arc<dyn AsyncEngineContext> {
self.inner.clone()
}
}

#[pymethods]
impl PyContext {
impl Context {
#[new]
#[pyo3(signature = (id=None))]
fn py_new(id: Option<String>) -> Self {
let controller = match id {
Some(id) => Controller::new(id),
None => Controller::default(),
};
Self {
inner: Arc::new(controller),
}
}

// sync method of `await async_is_stopped()`
fn is_stopped(&self) -> bool {
self.inner.is_stopped()
Expand Down
14 changes: 7 additions & 7 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use super::context::{callable_accepts_kwarg, PyContext};
use super::context::{callable_accepts_kwarg, Context};
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};
use pyo3::{PyAny, PyErr};
Expand Down Expand Up @@ -114,7 +114,7 @@ pub struct PythonServerStreamingEngine {
_cancel_token: CancellationToken,
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
has_pycontext: bool,
has_context: bool,
}

impl PythonServerStreamingEngine {
Expand All @@ -123,7 +123,7 @@ impl PythonServerStreamingEngine {
generator: Arc<PyObject>,
event_loop: Arc<PyObject>,
) -> Self {
let has_pycontext = Python::with_gil(|py| {
let has_context = Python::with_gil(|py| {
let callable = generator.bind(py);
callable_accepts_kwarg(py, callable, "context").unwrap_or(false)
});
Expand All @@ -132,7 +132,7 @@ impl PythonServerStreamingEngine {
_cancel_token: cancel_token,
generator,
event_loop,
has_pycontext,
has_context,
}
}
}
Expand Down Expand Up @@ -175,7 +175,7 @@ where
let generator = self.generator.clone();
let event_loop = self.event_loop.clone();
let ctx_python = ctx.clone();
let has_pycontext = self.has_pycontext;
let has_context = self.has_context;

// Acquiring the GIL is similar to acquiring a standard lock/mutex
// Performing this in an tokio async task could block the thread for an undefined amount of time
Expand All @@ -190,9 +190,9 @@ where
let stream = tokio::task::spawn_blocking(move || {
Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let py_ctx = Py::new(py, PyContext::new(ctx_python.clone()))?;
let py_ctx = Py::new(py, Context::new(ctx_python.clone()))?;

let gen = if has_pycontext {
let gen = if has_context {
// Pass context as a kwarg
let kwarg = PyDict::new(py);
kwarg.set_item("context", &py_ctx)?;
Expand Down
Loading
Loading