Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 41 additions & 26 deletions lib/bindings/python/rust/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// limitations under the License.

use super::context::{callable_accepts_kwarg, Context};
use dynamo_llm::protocols::DataStream;
use dynamo_runtime::engine::AsyncEngineContext;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyModule};
use pyo3::{PyAny, PyErr};
Expand Down Expand Up @@ -73,7 +75,7 @@ pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
/// ```
#[pyclass]
#[derive(Clone)]
pub struct PythonAsyncEngine(PythonServerStreamingEngine);
pub struct PythonAsyncEngine(pub PythonServerStreamingEngine);

#[pymethods]
impl PythonAsyncEngine {
Expand Down Expand Up @@ -135,31 +137,16 @@ impl PythonServerStreamingEngine {
has_context,
}
}
}

#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),

#[error("python generator exit: {0}")]
PyGeneratorExit(String),

#[error("deserialize error: {0}")]
DeserializeError(String),

#[error("gil offload error: {0}")]
OffloadError(String),
}

#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
/// Generate the response in parts.
pub async fn generate_in_parts<Req, Resp>(
&self,
request: SingleIn<Req>,
) -> Result<(DataStream<Annotated<Resp>>, Arc<dyn AsyncEngineContext>), Error>
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
// Create a context
let (request, context) = request.transfer(());
let ctx = context.context();
Expand Down Expand Up @@ -290,8 +277,36 @@ where
});

let stream = ReceiverStream::new(rx);
let context = context.context();
Ok((Box::pin(stream), context))
}
}

#[derive(Debug, thiserror::Error)]
enum ResponseProcessingError {
#[error("python exception: {0}")]
PythonException(String),

#[error("python generator exit: {0}")]
PyGeneratorExit(String),

#[error("deserialize error: {0}")]
DeserializeError(String),

Ok(ResponseStream::new(Box::pin(stream), context.context()))
#[error("gil offload error: {0}")]
OffloadError(String),
}

#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error>
for PythonServerStreamingEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
let (stream, context) = self.generate_in_parts(request).await?;
Ok(ResponseStream::new(Box::pin(stream), context))
}
Comment on lines +307 to 310
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix double pin when wrapping DataStream in ResponseStream

generate_in_parts already returns a DataStream (pinned). Wrapping it again with Box::pin yields a Pin<Box<Pin<Box<...>>>> and will not compile.

-        let (stream, context) = self.generate_in_parts(request).await?;
-        Ok(ResponseStream::new(Box::pin(stream), context))
+        let (stream, context) = self.generate_in_parts(request).await?;
+        Ok(ResponseStream::new(stream, context))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
let (stream, context) = self.generate_in_parts(request).await?;
Ok(ResponseStream::new(Box::pin(stream), context))
}
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
let (stream, context) = self.generate_in_parts(request).await?;
Ok(ResponseStream::new(stream, context))
}
🤖 Prompt for AI Agents
In lib/bindings/python/rust/engine.rs around lines 307 to 310, the code
double-pins the stream returned by generate_in_parts by calling Box::pin(stream)
even though generate_in_parts already returns a pinned DataStream; remove the
extra Box::pin and pass the returned pinned stream directly to
ResponseStream::new (or adjust types so ResponseStream::new accepts the pinned
DataStream) to avoid the Pin<Box<Pin<...>>> compile error.

}

Expand Down
25 changes: 23 additions & 2 deletions lib/bindings/python/rust/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,29 @@ where
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
match self.0.generate(request).await {
Ok(res) => Ok(res),
match self.0 .0.generate_in_parts(request).await {
Ok((mut stream, context)) => {
let request_id = context.id().to_string();
let first_item = match futures::StreamExt::next(&mut stream).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should extract the error message check / check if HTTPError (identical to if let Some(py_err) = e.downcast_ref:)

// TODO - item may still contain an Annotated error. How do we want to handle that?
// TODO - should we be returning an HttpError here?
Some(item) => item,
None => {
let error_msg = "python async generator stream ended before processing started";
tracing::warn!(request_id, error_msg);
return Err(Error::new(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error_msg,
)));
}
Comment on lines +188 to +194
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix tracing::warn! invocation (won’t compile as written)

tracing requires a format string and/or key-value fields. Current call will not compile.

Apply this diff:

-                        tracing::warn!(request_id, error_msg);
+                        tracing::warn!(%request_id, "{}", error_msg);
🤖 Prompt for AI Agents
In lib/bindings/python/rust/http.rs around lines 188 to 194, the tracing::warn!
call is using bare values and won't compile; replace it with a tracing
invocation that provides a format string and key-value fields (e.g. include
request_id as a field and log the message with a "{}" format or as a keyed
value) so the call becomes a valid tracing macro with either
tracing::warn!(request_id = ?request_id, "{}", error_msg) or
tracing::warn!(request_id = ?request_id, error = %error_msg), then keep the
subsequent Err creation unchanged.

};

// Create a new stream that yields the first item followed by the rest of the original stream
let once_stream = futures::stream::once(async { first_item });
let stream = futures::StreamExt::chain(once_stream, stream);

Ok(ResponseStream::new(Box::pin(stream), context))
}
Comment on lines +180 to +202
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Actually enforce “inspect first item for error” and fail early

The PR goal says we await the first item to check for errors. Right now we only guard against EOF; annotated errors pass through. Fail fast and surface an HTTP error.

Apply this diff (adjust message as desired):

-                let first_item = match futures::StreamExt::next(&mut stream).await {
+                let first_item = match futures::StreamExt::next(&mut stream).await {
                     // TODO - item may still contain an Annotated error. How do we want to handle that?
                     // TODO - should we be returning an HttpError here?
                     Some(item) => item,
                     None => {
                         let error_msg = "python async generator stream ended before processing started";
-                        tracing::warn!(request_id, error_msg);
+                        tracing::warn!(%request_id, "{}", error_msg);
                         return Err(Error::new(std::io::Error::new(
                             std::io::ErrorKind::UnexpectedEof,
                             error_msg,
                         )));
                     }
                 };
+
+                // Fail fast if Python produced an annotated error as the first item.
+                if first_item.is_error() {
+                    let msg = format!("first stream item was an error; request_id={}", request_id);
+                    // Prefer explicit HTTP error so callers get an HTTP status instead of a 500 IO error.
+                    return Err(http_error::HttpError { code: 500, message: msg })?;
+                }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match self.0 .0.generate_in_parts(request).await {
Ok((mut stream, context)) => {
let request_id = context.id().to_string();
let first_item = match futures::StreamExt::next(&mut stream).await {
// TODO - item may still contain an Annotated error. How do we want to handle that?
// TODO - should we be returning an HttpError here?
Some(item) => item,
None => {
let error_msg = "python async generator stream ended before processing started";
tracing::warn!(request_id, error_msg);
return Err(Error::new(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error_msg,
)));
}
};
// Create a new stream that yields the first item followed by the rest of the original stream
let once_stream = futures::stream::once(async { first_item });
let stream = futures::StreamExt::chain(once_stream, stream);
Ok(ResponseStream::new(Box::pin(stream), context))
}
match self.0 .0.generate_in_parts(request).await {
Ok((mut stream, context)) => {
let request_id = context.id().to_string();
let first_item = match futures::StreamExt::next(&mut stream).await {
// TODO - item may still contain an Annotated error. How do we want to handle that?
// TODO - should we be returning an HttpError here?
Some(item) => item,
None => {
let error_msg = "python async generator stream ended before processing started";
tracing::warn!(%request_id, "{}", error_msg);
return Err(Error::new(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
error_msg,
)));
}
};
// Fail fast if Python produced an annotated error as the first item.
if first_item.is_error() {
let msg = format!("first stream item was an error; request_id={}", request_id);
// Prefer explicit HTTP error so callers get an HTTP status instead of a 500 IO error.
return Err(http_error::HttpError { code: 500, message: msg })?;
}
// Create a new stream that yields the first item followed by the rest of the original stream
let once_stream = futures::stream::once(async { first_item });
let stream = futures::StreamExt::chain(once_stream, stream);
Ok(ResponseStream::new(Box::pin(stream), context))
}
🤖 Prompt for AI Agents
lib/bindings/python/rust/http.rs around lines 180-202: the code awaits the first
stream item only for EOF but does not fail early when that first item contains
an Annotated/error; update the logic after receiving first_item to inspect
whether it represents an error and, if so, convert it into an immediate
Err(HttpError) (or the project’s HTTP error type) with a clear message and
request_id, logging appropriately; if first_item is Ok/valid, wrap it and chain
the rest of the stream as before.


// Inspect the error - if it was an HttpError from Python, extract the code and message
// and return the rust version of HttpError
Expand Down
6 changes: 6 additions & 0 deletions lib/bindings/python/rust/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,12 @@ impl DistributedRuntime {
self.inner.runtime().shutdown();
}

fn child_token(&self) -> CancellationToken {
CancellationToken {
inner: self.inner.runtime().child_token(),
}
}

fn event_loop(&self) -> PyObject {
self.event_loop.clone()
}
Expand Down
24 changes: 24 additions & 0 deletions lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,30 @@ class DistributedRuntime:
Shutdown the runtime by triggering the cancellation token
"""
...

def child_token(self) -> CancellationToken:
"""
Get a child cancellation token from the runtime
"""
...

class CancellationToken:
"""
A cancellation token for coordinating shutdown across components
"""

def cancel(self) -> None:
"""
Cancel the token
"""
...

async def cancelled(self) -> None:
"""
Wait for the token to be cancelled
"""
...

class EtcdClient:
"""
Etcd is used for discovery in the DistributedRuntime
Expand Down
Loading