Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
558482d
feat: initial benchmarking wrapper in-cluster work
hhzhang16 Sep 8, 2025
7cc6edb
Merge branch 'main' of github.com:ai-dynamo/dynamo into hannahz/dyn-9…
hhzhang16 Sep 18, 2025
5a09233
feat: update benchmark job for in-cluster benchmarking following late…
hhzhang16 Sep 18, 2025
8f19a4d
feat: update in-cluster benchmark job and yaml
hhzhang16 Sep 19, 2025
3ff6675
feat: enhance GPT OSS frontend with improved harmony tool calling par…
zhongdaor-nv Sep 18, 2025
9482320
feat(operator): mechanism for disabling imagePullSecrets discovery (#…
tmonty12 Sep 18, 2025
f7cc9e9
refactor: simplify Dockerfile.vllm, enable local-dev for all framewor…
keivenchang Sep 19, 2025
d5f0495
feat: Request Cancellation unary request support (#3004)
kthui Sep 19, 2025
1648836
build: update trtllm to v1.1.0rc5 to enable trtllm + KVBM integration…
richardhuo-nv Sep 19, 2025
91181f6
build: OPS-597, OPS-861 restructure TRT-LLM to follow container strat…
nv-tusharma Sep 19, 2025
89e074c
feat: Sglang canary health check (#3103)
tzulingk Sep 19, 2025
271ef47
feat: Convert message[content] from list to string. (#3067)
KrishnanPrash Sep 19, 2025
f79e57b
feat: KVBM connector : enabling vectorized copy from pinned memory to…
oandreeva-nv Sep 19, 2025
8ee077f
feat: update READMe commands
hhzhang16 Sep 19, 2025
4ac8147
feat: update READMe commands
hhzhang16 Sep 19, 2025
e7ed272
Merge branch 'main' of github.com:ai-dynamo/dynamo into hannahz/dyn-9…
hhzhang16 Sep 19, 2025
534ba19
docs: move in-cluster benchmarking doc to the overall benchmarking do…
hhzhang16 Sep 19, 2025
0235ece
feat: minor adjustments based on self look-through and coderabbit com…
hhzhang16 Sep 19, 2025
b392205
Merge branch 'main' of github.com:ai-dynamo/dynamo into hannahz/dyn-9…
hhzhang16 Sep 22, 2025
ef92388
docs: add benchmarking cross-namespace
hhzhang16 Sep 22, 2025
69bcfa8
docs: have user modify benchmark job instead of using envsubst
hhzhang16 Sep 22, 2025
e83590b
docs: add tldr
hhzhang16 Sep 22, 2025
efd16d6
docs: minor doc updates
hhzhang16 Sep 22, 2025
ae9e70e
Merge branch 'main' of github.com:ai-dynamo/dynamo into hannahz/dyn-9…
hhzhang16 Sep 22, 2025
5131348
docs: update k8s-related stuff in benchmarking.md
hhzhang16 Sep 23, 2025
38955ef
Merge branch 'main' into hannahz/dyn-973-allow-in-cluster-perf-benchm…
hhzhang16 Sep 23, 2025
a5e5b18
docs: updating client-side prereqs
hhzhang16 Sep 23, 2025
de853cf
Merge branch 'main' of github.com:ai-dynamo/dynamo into hannahz/dyn-9…
hhzhang16 Sep 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: Request Cancellation unary request support (#3004)
Signed-off-by: Jacky <[email protected]>
  • Loading branch information
kthui authored and hhzhang16 committed Sep 19, 2025
commit d5f04957df308fffb661d6a0bb28ceefa0c537e2
16 changes: 6 additions & 10 deletions examples/custom_backend/cancellation/middle_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,12 @@ async def generate(self, request, context):
stream = await self.backend_client.generate(request, context=context)

# Stream responses back to client
try:
async for response in stream:
data = response.data()
print(f"Middle server: Forwarding response {data}")
yield data

except ValueError as e:
if str(e) != "Stream ended before generation completed":
raise
print("Middle server: Backend stream ended early due to cancellation")
async for response in stream:
data = response.data()
print(f"Middle server: Forwarding response {data}")
yield data

print("Middle server: Backend stream ended")


async def main():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ async def test_client_context_cancel(server, client):
if iteration_count >= 2:
print("Cancelling after 2 responses...")
context.stop_generating()
break

iteration_count += 1

# Verify we received exactly 3 responses (0, 1, 2)
assert iteration_count == 3

# Give server a moment to process the cancellation
await asyncio.sleep(0.2)

# Verify server detected the cancellation
assert handler.context_is_stopped
assert handler.context_is_killed
assert not handler.context_is_killed

# TODO: Test with _generate_until_asyncio_cancelled server handler
6 changes: 3 additions & 3 deletions lib/bindings/python/tests/test_cancellation/test_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ async def test_middle_server_cancellation(
assert (
"Client: Cancelling after 3 responses..." in client_output
), f"Client output: {client_output}"
assert (
"Middle server: Forwarding response 2" in middle_output
), f"Middle server output: {middle_output}"
assert (
"Server: Cancelled at iteration" in server_output
), f"Server output: {server_output}"
assert (
"Middle server: Backend stream ended early due to cancellation" in middle_output
), f"Middle server output: {middle_output}"
106 changes: 44 additions & 62 deletions lib/llm/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use crate::{

use dynamo_runtime::{
pipeline::{
AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream,
ServerStreamingEngine, SingleIn, async_trait,
AsyncEngineContext, AsyncEngineContextProvider, Context, ManyOut, Operator, ResponseStream,
ServerStreamingEngine, SingleIn, async_trait, network::STREAM_ERR_MSG,
},
protocols::{annotated::Annotated, maybe_error::MaybeError},
};
Expand Down Expand Up @@ -55,30 +55,23 @@ impl
next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>> {
let (preprocessed_request, context) = request.transfer(());
let context_id = context.id().to_string();
let engine_ctx = context.context();
let engine_ctx_ = engine_ctx.clone();
let retry_manager =
RetryManager::build(context_id, preprocessed_request, next, self.migration_limit)
RetryManager::build(engine_ctx, preprocessed_request, next, self.migration_limit)
.await?;
let response_stream = stream::unfold(retry_manager, move |mut retry_manager| {
let engine_ctx = engine_ctx_.clone();
async move {
if engine_ctx.is_stopped() || engine_ctx.is_killed() {
return None; // Stop if the context is cancelled or stopped
}
retry_manager
.next()
.await
.map(|response| (response, retry_manager))
}
let response_stream = stream::unfold(retry_manager, move |mut retry_manager| async move {
retry_manager
.next()
.await
.map(|response| (response, retry_manager))
});
Ok(ResponseStream::new(Box::pin(response_stream), engine_ctx))
Ok(ResponseStream::new(Box::pin(response_stream), engine_ctx_))
}
}

struct RetryManager {
context_id: String,
context: Arc<dyn AsyncEngineContext>,
request: PreprocessedRequest,
next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
next_stream: Option<ManyOut<Annotated<LLMEngineOutput>>>,
Expand All @@ -87,13 +80,13 @@ struct RetryManager {

impl RetryManager {
pub async fn build(
context_id: String,
context: Arc<dyn AsyncEngineContext>,
preprocessed_request: PreprocessedRequest,
next: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>>,
retries_left: u32,
) -> Result<Self> {
let mut slf = Self {
context_id,
context,
request: preprocessed_request,
next_generate: next,
next_stream: None,
Expand All @@ -115,18 +108,16 @@ impl RetryManager {
}
};
if let Some(response) = response_stream.next().await {
if let Some(err) = response.err() {
const STREAM_ERR_MSG: &str = "Stream ended before generation completed";
if err
if let Some(err) = response.err()
&& err
.chain()
.any(|e| e.to_string().starts_with(STREAM_ERR_MSG))
{
tracing::warn!("Stream disconnected... recreating stream...");
if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:#}", err);
} else {
continue;
}
{
tracing::warn!("Stream disconnected... recreating stream...");
if let Err(err) = self.new_stream().await {
tracing::warn!("Cannot recreate stream: {:#}", err);
} else {
continue;
}
}
self.track_response(&response);
Expand All @@ -140,7 +131,8 @@ impl RetryManager {
let mut response_stream: Option<Result<ManyOut<Annotated<LLMEngineOutput>>>> = None;
while self.retries_left > 0 {
self.retries_left -= 1;
let request = Context::with_id(self.request.clone(), self.context_id.clone());
let request = Context::with_id(self.request.clone(), self.context.id().to_string());
self.context.link_child(request.context());
response_stream = Some(self.next_generate.generate(request).await);
if let Some(err) = response_stream.as_ref().unwrap().as_ref().err()
&& let Some(req_err) = err.downcast_ref::<NatsRequestError>()
Expand Down Expand Up @@ -339,10 +331,8 @@ mod tests {
}
}
// Send the specific error that triggers retry logic
let error_response = Annotated::from_err(
anyhow::Error::msg("Stream ended before generation completed")
.into(),
);
let error_response =
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into());
let _ = tx.send(error_response).await;
});
} else {
Expand Down Expand Up @@ -381,10 +371,8 @@ mod tests {
}
}
// Send the specific error that triggers retry logic
let error_response = Annotated::from_err(
anyhow::Error::msg("Stream ended before generation completed")
.into(),
);
let error_response =
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into());
let _ = tx.send(error_response).await;
});

Expand Down Expand Up @@ -417,10 +405,8 @@ mod tests {
}
}
// Send the specific error that triggers retry logic
let error_response = Annotated::from_err(
anyhow::Error::msg("Stream ended before generation completed")
.into(),
);
let error_response =
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into());
let _ = tx.send(error_response).await;
});

Expand All @@ -434,10 +420,8 @@ mod tests {
// Subsequent calls - immediately send stream error (no successful responses)
tokio::spawn(async move {
// Send the stream error immediately
let error_response = Annotated::from_err(
anyhow::Error::msg("Stream ended before generation completed")
.into(),
);
let error_response =
Annotated::from_err(anyhow::Error::msg(STREAM_ERR_MSG).into());
let _ = tx.send(error_response).await;
});

Expand Down Expand Up @@ -503,7 +487,8 @@ mod tests {
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
mock_engine;

let mut retry_manager = RetryManager::build(context_id, request, next_generate, 0)
let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 0)
.await
.expect("Failed to build RetryManager");

Expand Down Expand Up @@ -541,7 +526,8 @@ mod tests {
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
mock_engine;

let mut retry_manager = RetryManager::build(context_id, request, next_generate, 3)
let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3)
.await
.expect("Failed to build RetryManager");

Expand Down Expand Up @@ -580,7 +566,8 @@ mod tests {
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
mock_engine;

let mut retry_manager = RetryManager::build(context_id, request, next_generate, 3)
let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3)
.await
.expect("Failed to build RetryManager");

Expand Down Expand Up @@ -620,7 +607,8 @@ mod tests {
mock_engine;

// Should fail to build due to initial stream creation failure after exhausting all 3 retries
let retry_manager_result = RetryManager::build(context_id, request, next_generate, 3).await;
let ctx = Arc::new(Controller::new(context_id.clone()));
let retry_manager_result = RetryManager::build(ctx, request, next_generate, 3).await;

assert!(retry_manager_result.is_err());
if let Err(error) = retry_manager_result {
Expand All @@ -646,7 +634,8 @@ mod tests {
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
mock_engine;

let mut retry_manager = RetryManager::build(context_id, request, next_generate, 3) // 3 retries
let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) // 3 retries
.await
.expect("Failed to build RetryManager");

Expand All @@ -672,11 +661,7 @@ mod tests {
let error_response = &responses[3];
assert!(error_response.err().is_some());
if let Some(error) = error_response.err() {
assert!(
error
.to_string()
.contains("Stream ended before generation completed")
);
assert!(error.to_string().contains(STREAM_ERR_MSG));
}
}

Expand All @@ -698,7 +683,8 @@ mod tests {
let next_generate: ServerStreamingEngine<PreprocessedRequest, Annotated<LLMEngineOutput>> =
mock_engine;

let mut retry_manager = RetryManager::build(context_id, request, next_generate, 3) // 3 retries
let ctx = Arc::new(Controller::new(context_id.clone()));
let mut retry_manager = RetryManager::build(ctx, request, next_generate, 3) // 3 retries
.await
.expect("Failed to build RetryManager");

Expand All @@ -724,11 +710,7 @@ mod tests {
let error_response = &responses[3];
assert!(error_response.err().is_some());
if let Some(error) = error_response.err() {
assert!(
error
.to_string()
.contains("Stream ended before generation completed")
);
assert!(error.to_string().contains(STREAM_ERR_MSG));
}
}
}
14 changes: 8 additions & 6 deletions lib/runtime/src/pipeline/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,18 +358,20 @@ impl AsyncEngineContext for Controller {

async fn stopped(&self) {
let mut rx = self.rx.clone();
if *rx.borrow_and_update() != State::Live {
return;
loop {
if *rx.borrow_and_update() != State::Live || rx.changed().await.is_err() {
return;
}
}
let _ = rx.changed().await;
}

async fn killed(&self) {
let mut rx = self.rx.clone();
if *rx.borrow_and_update() == State::Killed {
return;
loop {
if *rx.borrow_and_update() == State::Killed || rx.changed().await.is_err() {
return;
}
}
let _ = rx.changed().await;
}

fn stop_generating(&self) {
Expand Down
3 changes: 3 additions & 0 deletions lib/runtime/src/pipeline/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use super::{
};
use ingress::push_handler::WorkHandlerMetrics;

// Define stream error message constant
pub const STREAM_ERR_MSG: &str = "Stream ended before generation completed";

// Add Prometheus metrics types
use crate::metrics::MetricsRegistry;
use prometheus::{CounterVec, Histogram, IntCounter, IntCounterVec, IntGauge};
Expand Down
14 changes: 11 additions & 3 deletions lib/runtime/src/pipeline/network/egress/addressed_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ where
let (addressed_request, context) = request.transfer(());
let (request, address) = addressed_request.into_parts();
let engine_ctx = context.context();
let engine_ctx_ = engine_ctx.clone();

// registration options for the data plane in a singe in / many out configuration
let options = StreamOptions::builder()
Expand Down Expand Up @@ -209,11 +210,18 @@ where
}
}
} else if is_complete_final {
// end of stream
None
} else if engine_ctx_.is_stopped() {
// Gracefully end the stream if 'stop_generating()' was called. Do NOT check for
// 'is_killed()' here because it implies the stream ended abnormally which should be
// handled by the error branch below.
log::debug!("Request cancelled and then trying to read a response");
None
} else {
Some(U::from_err(
Error::msg("Stream ended before generation completed").into(),
))
// stream ended unexpectedly
log::debug!("{STREAM_ERR_MSG}");
Some(U::from_err(Error::msg(STREAM_ERR_MSG).into()))
}
});

Expand Down
Loading