From 83d7b8dc891ec182dcbbf1a9585c98f3b7267736 Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Thu, 21 Aug 2025 04:51:37 +0000 Subject: [PATCH 1/5] sync-enable-endpoint --- lib/bindings/python/rust/http.rs | 15 ++++++++++++++- lib/llm/src/http/service/service_v2.rs | 4 ++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/bindings/python/rust/http.rs b/lib/bindings/python/rust/http.rs index 3a22092334..ffd51f5840 100644 --- a/lib/bindings/python/rust/http.rs +++ b/lib/bindings/python/rust/http.rs @@ -19,8 +19,8 @@ use pyo3::{exceptions::PyException, prelude::*}; use crate::{engine::*, to_pyerr, CancellationToken}; +pub use dynamo_llm::endpoint_type::EndpointType; pub use dynamo_llm::http::service::{error as http_error, service_v2}; - pub use dynamo_runtime::{ error, pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn}, @@ -92,6 +92,19 @@ impl HttpService { Ok(()) }) } + + fn enable_endpoint(&self, endpoint_type: String, enabled: bool) -> PyResult<()> { + let endpoint_type: EndpointType = match endpoint_type.as_str() { + "chat" => EndpointType::Chat, + "completion" => EndpointType::Completion, + "embedding" => EndpointType::Embedding, + _ => return Err(to_pyerr("Invalid endpoint type")), + }; + + self.inner + .sync_enable_model_endpoint(endpoint_type, enabled); + return Ok(()); + } } /// Python Exception for HTTP errors diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index e7d710332e..fff6118de0 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -263,6 +263,10 @@ impl HttpService { } pub async fn enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { + self.sync_enable_model_endpoint(endpoint_type, enable); + } + + pub fn sync_enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { self.state.flags.set(&endpoint_type, enable); tracing::info!( "{} endpoints {}", From 3dba9057e6b33a7533dd200e0d8b17e432f4b354 Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Thu, 21 Aug 2025 04:52:59 +0000 Subject: [PATCH 2/5] delay streaming --- lib/bindings/python/rust/engine.rs | 36 +++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/lib/bindings/python/rust/engine.rs b/lib/bindings/python/rust/engine.rs index c26c0f3bb5..e5b898c9b9 100644 --- a/lib/bindings/python/rust/engine.rs +++ b/lib/bindings/python/rust/engine.rs @@ -14,6 +14,7 @@ // limitations under the License. use super::context::{callable_accepts_kwarg, PyContext}; +use futures::stream::{self, StreamExt as FuturesStreamExt}; use pyo3::prelude::*; use pyo3::types::{PyDict, PyModule}; use pyo3::{PyAny, PyErr}; @@ -21,7 +22,7 @@ use pyo3_async_runtimes::TaskLocals; use pythonize::{depythonize, pythonize}; use std::sync::Arc; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt}; +use tokio_stream::{wrappers::ReceiverStream, StreamExt as TokioStreamExt}; pub use dynamo_runtime::{ pipeline::{ @@ -208,7 +209,7 @@ where }) .await??; - let stream = Box::pin(stream); + let mut stream = Box::pin(stream); // process the stream // any error thrown in the stream will be caught and complete the processing task @@ -216,6 +217,34 @@ where // the error will be emitted as an annotated error let request_id = id.clone(); + let first_item = match FuturesStreamExt::next(&mut stream).await { + Some(Ok(item)) => item, + Some(Err(e)) => { + // Any Python exception (including HttpError) is already wrapped in PyErr + // The HttpAsyncEngine will inspect this PyErr later to see if it's an HttpError + tracing::debug!( + request_id, + "Python exception occurred before finish of first iteration: {}", + e + ); + return Err(Error::new(e)); + } + None => { + tracing::warn!( + request_id, + "python async generator stream ended before processing started" + ); + return Err(Error::new(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "python async generator stream ended before processing started", + ))); + } + }; + // Create a new stream that yields the first item followed by the rest of the original stream + let stream = + futures::StreamExt::chain(stream::once(futures::future::ok(first_item)), stream); + let stream = FuturesStreamExt::boxed(stream); + tokio::spawn(async move { tracing::debug!( request_id, @@ -225,7 +254,8 @@ where let mut stream = stream; let mut count = 0; - while let Some(item) = stream.next().await { + // Fix the third error by explicitly using FuturesStreamExt::next + while let Some(item) = FuturesStreamExt::next(&mut stream).await { count += 1; tracing::trace!( request_id, From 24537b14dab6529c71b51eef3c0b990453c830ff Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Thu, 21 Aug 2025 17:22:27 +0000 Subject: [PATCH 3/5] reset http and service --- lib/bindings/python/rust/http.rs | 15 +-------------- lib/llm/src/http/service/service_v2.rs | 4 ---- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/lib/bindings/python/rust/http.rs b/lib/bindings/python/rust/http.rs index ffd51f5840..3a22092334 100644 --- a/lib/bindings/python/rust/http.rs +++ b/lib/bindings/python/rust/http.rs @@ -19,8 +19,8 @@ use pyo3::{exceptions::PyException, prelude::*}; use crate::{engine::*, to_pyerr, CancellationToken}; -pub use dynamo_llm::endpoint_type::EndpointType; pub use dynamo_llm::http::service::{error as http_error, service_v2}; + pub use dynamo_runtime::{ error, pipeline::{async_trait, AsyncEngine, Data, ManyOut, SingleIn}, @@ -92,19 +92,6 @@ impl HttpService { Ok(()) }) } - - fn enable_endpoint(&self, endpoint_type: String, enabled: bool) -> PyResult<()> { - let endpoint_type: EndpointType = match endpoint_type.as_str() { - "chat" => EndpointType::Chat, - "completion" => EndpointType::Completion, - "embedding" => EndpointType::Embedding, - _ => return Err(to_pyerr("Invalid endpoint type")), - }; - - self.inner - .sync_enable_model_endpoint(endpoint_type, enabled); - return Ok(()); - } } /// Python Exception for HTTP errors diff --git a/lib/llm/src/http/service/service_v2.rs b/lib/llm/src/http/service/service_v2.rs index fff6118de0..e7d710332e 100644 --- a/lib/llm/src/http/service/service_v2.rs +++ b/lib/llm/src/http/service/service_v2.rs @@ -263,10 +263,6 @@ impl HttpService { } pub async fn enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { - self.sync_enable_model_endpoint(endpoint_type, enable); - } - - pub fn sync_enable_model_endpoint(&self, endpoint_type: EndpointType, enable: bool) { self.state.flags.set(&endpoint_type, enable); tracing::info!( "{} endpoints {}", From 18d5d15bd6457d26ed09c8ba24d0fa2581fd67f9 Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Thu, 21 Aug 2025 17:27:28 +0000 Subject: [PATCH 4/5] make tracing.warn --- lib/bindings/python/rust/engine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/bindings/python/rust/engine.rs b/lib/bindings/python/rust/engine.rs index e5b898c9b9..fde5be6d0c 100644 --- a/lib/bindings/python/rust/engine.rs +++ b/lib/bindings/python/rust/engine.rs @@ -222,7 +222,7 @@ where Some(Err(e)) => { // Any Python exception (including HttpError) is already wrapped in PyErr // The HttpAsyncEngine will inspect this PyErr later to see if it's an HttpError - tracing::debug!( + tracing::warn!( request_id, "Python exception occurred before finish of first iteration: {}", e From ea151f751249f50636242c9f62e227eb1004c82c Mon Sep 17 00:00:00 2001 From: michaelfeil Date: Thu, 21 Aug 2025 18:53:21 +0000 Subject: [PATCH 5/5] clippy fix --- lib/bindings/python/rust/engine.rs | 72 ++++++++++++++++++------------ lib/bindings/python/rust/http.rs | 4 ++ 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/lib/bindings/python/rust/engine.rs b/lib/bindings/python/rust/engine.rs index fde5be6d0c..dd3bfed42a 100644 --- a/lib/bindings/python/rust/engine.rs +++ b/lib/bindings/python/rust/engine.rs @@ -22,7 +22,7 @@ use pyo3_async_runtimes::TaskLocals; use pythonize::{depythonize, pythonize}; use std::sync::Arc; use tokio::sync::mpsc; -use tokio_stream::{wrappers::ReceiverStream, StreamExt as TokioStreamExt}; +use tokio_stream::wrappers::ReceiverStream; pub use dynamo_runtime::{ pipeline::{ @@ -97,6 +97,10 @@ impl PythonAsyncEngine { Arc::new(event_loop), ))) } + + pub fn block_until_stream_item(&mut self, enabled: bool) { + self.0.block_until_stream_item(enabled); + } } #[async_trait] @@ -116,6 +120,7 @@ pub struct PythonServerStreamingEngine { generator: Arc, event_loop: Arc, has_pycontext: bool, + block_until_stream_item: bool, } impl PythonServerStreamingEngine { @@ -134,8 +139,13 @@ impl PythonServerStreamingEngine { generator, event_loop, has_pycontext, + block_until_stream_item: false, } } + + pub fn block_until_stream_item(&mut self, enabled: bool) { + self.block_until_stream_item = enabled; + } } #[derive(Debug, thiserror::Error)] @@ -209,41 +219,45 @@ where }) .await??; - let mut stream = Box::pin(stream); - // process the stream // any error thrown in the stream will be caught and complete the processing task // errors are captured by a task that is watching the processing task // the error will be emitted as an annotated error let request_id = id.clone(); - let first_item = match FuturesStreamExt::next(&mut stream).await { - Some(Ok(item)) => item, - Some(Err(e)) => { - // Any Python exception (including HttpError) is already wrapped in PyErr - // The HttpAsyncEngine will inspect this PyErr later to see if it's an HttpError - tracing::warn!( - request_id, - "Python exception occurred before finish of first iteration: {}", - e - ); - return Err(Error::new(e)); - } - None => { - tracing::warn!( - request_id, - "python async generator stream ended before processing started" - ); - return Err(Error::new(std::io::Error::new( - std::io::ErrorKind::UnexpectedEof, - "python async generator stream ended before processing started", - ))); - } + let mut stream = Box::pin(stream); + + let stream = if self.block_until_stream_item { + let first_item = match FuturesStreamExt::next(&mut stream).await { + Some(Ok(item)) => item, + Some(Err(e)) => { + // Any Python exception (including HttpError) is already wrapped in PyErr + // The HttpAsyncEngine will inspect this PyErr later to see if it's an HttpError + tracing::warn!( + request_id, + "Python exception occurred before finish of first iteration: {}", + e + ); + return Err(Error::new(e)); + } + None => { + tracing::warn!( + request_id, + "python async generator stream ended before processing started" + ); + return Err(Error::new(std::io::Error::new( + std::io::ErrorKind::UnexpectedEof, + "python async generator stream ended before processing started", + ))); + } + }; + // Create a new stream that yields the first item followed by the rest of the original stream + let stream = + futures::StreamExt::chain(stream::once(futures::future::ok(first_item)), stream); + FuturesStreamExt::boxed(stream) + } else { + stream }; - // Create a new stream that yields the first item followed by the rest of the original stream - let stream = - futures::StreamExt::chain(stream::once(futures::future::ok(first_item)), stream); - let stream = FuturesStreamExt::boxed(stream); tokio::spawn(async move { tracing::debug!( diff --git a/lib/bindings/python/rust/http.rs b/lib/bindings/python/rust/http.rs index 3a22092334..e7862d92ba 100644 --- a/lib/bindings/python/rust/http.rs +++ b/lib/bindings/python/rust/http.rs @@ -147,6 +147,10 @@ impl HttpAsyncEngine { pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult { Ok(PythonAsyncEngine::new(generator, event_loop)?.into()) } + + pub fn block_until_stream_item(&mut self, enabled: bool) { + self.0.block_until_stream_item(enabled); + } } #[async_trait]