Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

156 changes: 156 additions & 0 deletions examples/cli/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0

# Example cli using the Python bindings, similar to `dynamo-run`.
# Usage: `python cli.py in=text out=mistralrs <your-model>`.
# Must be in a virtualenv with the Dynamo bindings (or wheel) installed.

import argparse
import asyncio
import sys
from pathlib import Path

import uvloop

from dynamo.llm import EngineType, EntrypointArgs, make_engine, run_input
from dynamo.runtime import DistributedRuntime


def parse_args():
in_mode = "text"
out_mode = "echo"
batch_file = None # Specific to in_mode="batch"

# List to hold arguments that argparse will process (flags and model path)
argparse_args = []

# --- Step 1: Manual Pre-parsing for 'in=' and 'out=' ---
# Iterate through sys.argv[1:] to extract in= and out=
# and collect remaining arguments for argparse.
for arg in sys.argv[1:]:
if arg.startswith("in="):
in_val = arg[len("in=") :]
if in_val.startswith("batch:"):
in_mode = "batch"
batch_file = in_val[len("batch:") :]
else:
in_mode = in_val
elif arg.startswith("out="):
out_mode = arg[len("out=") :]
else:
# This argument is not 'in=' or 'out=', so it's either a flag or the model path
argparse_args.append(arg)

# --- Step 2: Argparse for flags and the model path ---
parser = argparse.ArgumentParser(
description="Dynamo CLI: Connect inputs to an engine",
formatter_class=argparse.RawTextHelpFormatter, # To preserve multi-line help formatting
)

# model_name: Option<String>
parser.add_argument("--model-name", type=str, help="Name of the model to load.")
# model_config: Option<PathBuf>
parser.add_argument(
"--model-config", type=Path, help="Path to the model configuration file."
)
# context_length: Option<u32>
parser.add_argument(
"--context-length", type=int, help="Maximum context length for the model (u32)."
)
# template_file: Option<PathBuf>
parser.add_argument(
"--template-file",
type=Path,
help="Path to the template file for text generation.",
)
# kv_cache_block_size: Option<u32>
parser.add_argument(
"--kv-cache-block-size", type=int, help="KV cache block size (u32)."
)
# http_port: Option<u16>
parser.add_argument("--http-port", type=int, help="HTTP port for the engine (u16).")

# TODO: Not yet used here
parser.add_argument(
"--tensor-parallel-size",
type=int,
help="Tensor parallel size for the model (e.g., 4).",
)

# Add the positional model argument.
# It's made optional (nargs='?') because its requirement depends on 'out_mode',
# which is handled in post-parsing validation.
parser.add_argument(
"model",
nargs="?", # Make it optional for argparse, we'll validate manually
help="Path to the model (e.g., Qwen/Qwen3-0.6B).\n" "Required unless out=dyn.",
)

# Parse the arguments that were not 'in=' or 'out='
flags = parser.parse_args(argparse_args)

# --- Step 3: Post-parsing Validation and Final Assignment ---

# Validate 'batch' mode requires a file path
if in_mode == "batch" and not batch_file:
parser.error("Batch mode requires a file path: in=batch:FILE")

# Validate model path requirement based on 'out_mode'
if out_mode != "dyn" and flags.model is None:
parser.error("Model path is required unless out=dyn.")

# Consolidate all parsed arguments into a dictionary
parsed_args = {
"in_mode": in_mode,
"out_mode": out_mode,
"batch_file": batch_file, # Will be None if in_mode is not "batch"
"model_path": flags.model,
"flags": flags,
}

return parsed_args


async def run():
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop, False)

args = parse_args()

engine_type_map = {
"echo": EngineType.Echo,
"mistralrs": EngineType.MistralRs,
"llamacpp": EngineType.LlamaCpp,
"dyn": EngineType.Dynamic,
}
out_mode = args["out_mode"]
engine_type = engine_type_map.get(out_mode)
if engine_type is None:
print(f"Unsupported output type: {out_mode}")
sys.exit(1)

# TODO: The "vllm", "sglang" and "trtllm" cases should call Python directly

entrypoint_kwargs = {"model_path": args["model_path"]}

flags = args["flags"]
if flags.model_name is not None:
entrypoint_kwargs["model_name"] = flags.model_name
if flags.model_config is not None:
entrypoint_kwargs["model_config"] = flags.model_config
if flags.context_length is not None:
entrypoint_kwargs["context_length"] = flags.context_length
if flags.template_file is not None:
entrypoint_kwargs["template_file"] = flags.template_file
if flags.kv_cache_block_size is not None:
entrypoint_kwargs["kv_cache_block_size"] = flags.kv_cache_block_size
if flags.http_port is not None:
entrypoint_kwargs["http_port"] = flags.http_port

e = EntrypointArgs(engine_type, **entrypoint_kwargs)
engine = await make_engine(runtime, e)
await run_input(runtime, args["in_mode"], engine)


if __name__ == "__main__":
uvloop.run(run())
1 change: 1 addition & 0 deletions launch/dynamo-run/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ anyhow = { workspace = true }
async-openai = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
either = { workspace = true }
futures = { workspace = true }
libc = { workspace = true }
serde = { workspace = true }
Expand Down
13 changes: 9 additions & 4 deletions launch/dynamo-run/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use dynamo_llm::local_model::{LocalModel, LocalModelBuilder};
use dynamo_runtime::CancellationToken;

mod flags;
use either::Either;
pub use flags::Flags;
mod opt;
pub use dynamo_llm::request_template::RequestTemplate;
Expand Down Expand Up @@ -41,14 +42,19 @@ pub async fn run(
.kv_cache_block_size(flags.kv_cache_block_size)
// Only set if user provides. Usually loaded from tokenizer_config.json
.context_length(flags.context_length)
.http_port(flags.http_port)
.http_port(Some(flags.http_port))
.router_config(flags.router_config())
.request_template(flags.request_template.clone());

// If `in=dyn` we want the trtllm/sglang/vllm subprocess to listen on that endpoint.
// If not, then the endpoint isn't exposed so we let LocalModel invent one.
let mut rt = Either::Left(runtime.clone());
if let Input::Endpoint(path) = &in_opt {
builder.endpoint_id(path.parse().with_context(|| path.clone())?);
builder.endpoint_id(Some(path.parse().with_context(|| path.clone())?));

let distributed_runtime =
dynamo_runtime::DistributedRuntime::from_settings(runtime.clone()).await?;
rt = Either::Right(distributed_runtime);
};

let local_model = builder.build().await?;
Expand All @@ -70,8 +76,7 @@ pub async fn run(
//
// Run in from an input
//

dynamo_llm::entrypoint::input::run_input(in_opt, runtime, engine_config).await?;
dynamo_llm::entrypoint::input::run_input(rt, in_opt, engine_config).await?;

// Allow engines to ask main thread to wait on an extra future.
// We use this to stop the vllm and sglang sub-process
Expand Down
Loading
Loading