Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 5 additions & 22 deletions docs/guides/dynamo_run.md
Original file line number Diff line number Diff line change
Expand Up @@ -318,36 +318,19 @@ dynamo-run in=dyn://dynamo.mocker.generate out=mocker --model-path TinyLlama/Tin
dynamo-run in=http out=auto --router-mode kv
```

### echo_full
### echo

The `echo_full` engine accepts un-processed requests and echoes the prompt back as the response.
The `echo` engine echoes the prompt back as the response.

```
dynamo-run in=http out=echo_full --model-name my_model
dynamo-run in=http out=echo --model-name my_model
```

### echo_core

The `echo_core` engine accepts pre-processed requests and echoes the tokens back as the response. This is useful for testing pre-processing functionality as the response includes the full prompt template.

```
dynamo-run in=http out=echo_core --model-path <hf-repo-checkout>
```

Note that to use it with `in=http` you need to tell the post processor to ignore stop tokens from the template by adding `nvext.ignore_eos` like this:
```
curl -N -d '{"nvext": {"ignore_eos": true}, "stream": true, "model": "Qwen2.5-3B-Instruct", "max_completion_tokens": 4096, "messages":[{"role":"user", "content": "Tell me a story" }]}' ...
```

The default `in=text` sets that for you.

### Echo Configuration

Both echo engines use a configurable delay between tokens to simulate generation speed. You can adjust this using the `DYN_TOKEN_ECHO_DELAY_MS` environment variable:
The echo engine uses a configurable delay between tokens to simulate generation speed. You can adjust this using the `DYN_TOKEN_ECHO_DELAY_MS` environment variable:

```
# Set token echo delay to 1ms (1000 tokens per second)
DYN_TOKEN_ECHO_DELAY_MS=1 dynamo-run in=http out=echo_full
DYN_TOKEN_ECHO_DELAY_MS=1 dynamo-run in=http out=echo
```

The default delay is 10ms, which produces approximately 100 tokens per second.
Expand Down
9 changes: 1 addition & 8 deletions launch/dynamo-run/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,7 @@ impl Flags {
);
}
}
Output::EchoFull => {}
Output::EchoCore => {
if !local_model.card().has_tokenizer() {
anyhow::bail!(
"out=echo_core need to find the tokenizer. Pass flag --model-path <path>"
);
};
}
Output::Echo => {}
#[cfg(feature = "mistralrs")]
Output::MistralRs => {}
#[cfg(feature = "llamacpp")]
Expand Down
13 changes: 4 additions & 9 deletions launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,9 @@ async fn engine_for(
// A single static backend, no etcd
Ok(EngineConfig::StaticRemote(Box::new(local_model)))
}
Output::EchoFull => Ok(EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
is_static: flags.static_worker,
}),
Output::EchoCore => Ok(EngineConfig::StaticCore {
engine: dynamo_llm::engines::make_engine_core(),
Output::Echo => Ok(EngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_echo_engine(),
is_static: flags.static_worker,
}),
#[cfg(feature = "mistralrs")]
Expand Down Expand Up @@ -213,7 +208,7 @@ fn gguf_default() -> Output {

#[cfg(not(any(feature = "mistralrs", feature = "llamacpp")))]
{
Output::EchoFull
Output::Echo
}
}

Expand All @@ -225,6 +220,6 @@ fn safetensors_default() -> Output {

#[cfg(not(feature = "mistralrs"))]
{
Output::EchoFull
Output::Echo
}
}
19 changes: 5 additions & 14 deletions launch/dynamo-run/src/opt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ use dynamo_runtime::protocols::ENDPOINT_SCHEME;
use std::fmt;

pub enum Output {
/// Accept un-preprocessed requests, echo the prompt back as the response
EchoFull,

/// Accept preprocessed requests, echo the tokens back as the response
EchoCore,
/// Echos the prompt back as the response
Echo,

/// Listen for models on nats/etcd, add/remove dynamically
Auto,
Expand Down Expand Up @@ -44,8 +41,7 @@ impl TryFrom<&str> for Output {
"llamacpp" | "llama_cpp" => Ok(Output::LlamaCpp),

"mocker" => Ok(Output::Mocker),
"echo_full" => Ok(Output::EchoFull),
"echo_core" => Ok(Output::EchoCore),
"echo" | "echo_full" => Ok(Output::Echo),

"dyn" | "auto" => Ok(Output::Auto),

Expand All @@ -69,8 +65,7 @@ impl fmt::Display for Output {
Output::LlamaCpp => "llamacpp",

Output::Mocker => "mocker",
Output::EchoFull => "echo_full",
Output::EchoCore => "echo_core",
Output::Echo => "echo",

Output::Auto => "auto",
Output::Static(endpoint) => &format!("{ENDPOINT_SCHEME}{endpoint}"),
Expand All @@ -82,11 +77,7 @@ impl fmt::Display for Output {
impl Output {
#[allow(unused_mut)]
pub fn available_engines() -> Vec<String> {
let mut out = vec![
"echo_core".to_string(),
"echo_full".to_string(),
Output::Mocker.to_string(),
];
let mut out = vec!["echo".to_string(), Output::Mocker.to_string()];
#[cfg(feature = "mistralrs")]
{
out.push(Output::MistralRs.to_string());
Expand Down
17 changes: 17 additions & 0 deletions lib/bindings/python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lib/bindings/python/rust/llm/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ async fn select_engine(
// There is no validation for the echo engine
RsEngineConfig::StaticFull {
model: Box::new(local_model),
engine: dynamo_llm::engines::make_engine_full(),
engine: dynamo_llm::engines::make_echo_engine(),
is_static: false,
}
}
Expand Down
63 changes: 9 additions & 54 deletions lib/llm/src/engines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ use dynamo_runtime::engine::{AsyncEngine, AsyncEngineContextProvider, ResponseSt
use dynamo_runtime::pipeline::{Error, ManyOut, SingleIn};
use dynamo_runtime::protocols::annotated::Annotated;

use crate::backend::ExecutionContext;
use crate::preprocessor::PreprocessedRequest;
use crate::protocols::common::llm_backend::LLMEngineOutput;
use crate::protocols::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse, prompt_to_string},
Expand Down Expand Up @@ -65,53 +62,9 @@ pub static TOKEN_ECHO_DELAY: LazyLock<Duration> = LazyLock::new(|| {
Duration::from_millis(delay_ms)
});

/// Engine that accepts pre-processed requests and echos the tokens back as the response
/// The response will include the full prompt template.
/// Useful for testing pre-processing.
struct EchoEngineCore {}
pub fn make_engine_core() -> ExecutionContext {
Arc::new(EchoEngineCore {})
}

#[async_trait]
impl AsyncEngine<SingleIn<PreprocessedRequest>, ManyOut<Annotated<LLMEngineOutput>>, Error>
for EchoEngineCore
{
async fn generate(
&self,
incoming_request: SingleIn<PreprocessedRequest>,
) -> Result<ManyOut<Annotated<LLMEngineOutput>>, Error> {
let (request, context) = incoming_request.into_parts();
let ctx = context.context();

let output = stream! {
for tok in request.token_ids {
tokio::time::sleep(*TOKEN_ECHO_DELAY).await;
yield delta_core(tok);
}
yield Annotated::from_data(LLMEngineOutput::stop());
};
Ok(ResponseStream::new(Box::pin(output), ctx))
}
}

fn delta_core(tok: u32) -> Annotated<LLMEngineOutput> {
let delta = LLMEngineOutput {
token_ids: vec![tok],
tokens: None,
text: None,
cum_log_probs: None,
log_probs: None,
top_logprobs: None,
finish_reason: None,
index: None,
};
Annotated::from_data(delta)
}

/// Engine that accepts un-preprocessed requests and echos the prompt back as the response
/// Useful for testing ingress such as service-http.
struct EchoEngineFull {}
struct EchoEngine {}

/// Validate Engine that verifies request data
pub struct ValidateEngine<E> {
Expand Down Expand Up @@ -164,8 +117,8 @@ pub trait EmbeddingEngine: Send + Sync {
) -> Result<ManyOut<Annotated<NvCreateEmbeddingResponse>>, Error>;
}

pub fn make_engine_full() -> Arc<dyn StreamingEngine> {
let engine = EchoEngineFull {};
pub fn make_echo_engine() -> Arc<dyn StreamingEngine> {
let engine = EchoEngine {};
let data = EngineDispatcher::new(engine);
Arc::new(data)
}
Expand All @@ -176,7 +129,7 @@ impl
SingleIn<NvCreateChatCompletionRequest>,
ManyOut<Annotated<NvCreateChatCompletionStreamResponse>>,
Error,
> for EchoEngineFull
> for EchoEngine
{
async fn generate(
&self,
Expand All @@ -185,7 +138,9 @@ impl
let (request, context) = incoming_request.transfer(());
let ctx = context.context();
let mut deltas = request.response_generator(ctx.id().to_string());
let req = request.inner.messages.into_iter().next_back().unwrap();
let Some(req) = request.inner.messages.into_iter().next_back() else {
anyhow::bail!("Empty chat messages in request");
};

let prompt = match req {
dynamo_async_openai::types::ChatCompletionRequestMessage::User(user_msg) => {
Expand Down Expand Up @@ -223,7 +178,7 @@ impl
SingleIn<NvCreateCompletionRequest>,
ManyOut<Annotated<NvCreateCompletionResponse>>,
Error,
> for EchoEngineFull
> for EchoEngine
{
async fn generate(
&self,
Expand Down Expand Up @@ -256,7 +211,7 @@ impl
SingleIn<NvCreateEmbeddingRequest>,
ManyOut<Annotated<NvCreateEmbeddingResponse>>,
Error,
> for EchoEngineFull
> for EchoEngine
{
async fn generate(
&self,
Expand Down
53 changes: 0 additions & 53 deletions lib/llm/src/entrypoint/input/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,56 +310,3 @@ where
.link(frontend)?;
Ok(engine)
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::openai::{
chat_completions::{NvCreateChatCompletionRequest, NvCreateChatCompletionStreamResponse},
completions::{NvCreateCompletionRequest, NvCreateCompletionResponse},
};

const HF_PATH: &str = concat!(
env!("CARGO_MANIFEST_DIR"),
"/tests/data/sample-models/mock-llama-3.1-8b-instruct"
);

#[tokio::test]
async fn test_build_chat_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::load(HF_PATH, None)?;
let engine = crate::engines::make_engine_core();

// Build pipeline for chat completions
let pipeline = build_pipeline::<
NvCreateChatCompletionRequest,
NvCreateChatCompletionStreamResponse,
>(&card, engine, card.tokenizer_hf()?)
.await?;

// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);

Ok(())
}

#[tokio::test]
async fn test_build_completions_pipeline_core_engine_succeeds() -> anyhow::Result<()> {
// Create test model card
let card = ModelDeploymentCard::load(HF_PATH, None)?;
let engine = crate::engines::make_engine_core();

// Build pipeline for completions
let pipeline = build_pipeline::<NvCreateCompletionRequest, NvCreateCompletionResponse>(
&card,
engine,
card.tokenizer_hf()?,
)
.await?;

// Verify pipeline was created
assert!(Arc::strong_count(&pipeline) >= 1);

Ok(())
}
}
4 changes: 2 additions & 2 deletions lib/llm/src/entrypoint/input/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ mod integration_tests {
.await
.map_err(|e| anyhow::anyhow!("Failed to create distributed runtime: {}", e))?;

let engine_config = EngineConfig::StaticCore {
engine: crate::engines::make_engine_core(),
let engine_config = EngineConfig::StaticFull {
engine: crate::engines::make_echo_engine(),
model: Box::new(
crate::local_model::LocalModelBuilder::default()
.model_name(Some("test-model".to_string()))
Expand Down
Loading
Loading