Skip to content

Commit bf8db83

Browse files
Merge branch 'main' of https://github.com/ai-dynamo/dynamo into mabdulwahhab/defaults
2 parents 1a05dab + f6fef48 commit bf8db83

File tree

10 files changed

+133
-88
lines changed

10 files changed

+133
-88
lines changed

components/backends/trtllm/engine_configs/agg.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ backend: pytorch
2222
enable_chunked_prefill: true
2323

2424
kv_cache_config:
25-
free_gpu_memory_fraction: 0.95
25+
free_gpu_memory_fraction: 0.85
2626

2727
# NOTE: pytorch_backend_config section flattened since: https://github.com/NVIDIA/TensorRT-LLM/pull/4603
2828
# NOTE: overlap_scheduler enabled by default since this commit and changed

components/backends/trtllm/engine_configs/decode.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ cuda_graph_config:
2525
max_batch_size: 16
2626

2727
kv_cache_config:
28-
free_gpu_memory_fraction: 0.95
28+
free_gpu_memory_fraction: 0.85
2929

3030
cache_transceiver_config:
3131
backend: default

components/backends/trtllm/engine_configs/prefill.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ disable_overlap_scheduler: true
2424
cuda_graph_config:
2525
max_batch_size: 16
2626
kv_cache_config:
27-
free_gpu_memory_fraction: 0.95
27+
free_gpu_memory_fraction: 0.85
2828

2929
cache_transceiver_config:
3030
backend: default

components/backends/trtllm/multinode/multinode-examples.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ deployment across 8 nodes:
186186
./srun_disaggregated.sh
187187
```
188188
189+
> [!Tip]
190+
> To launch multiple replicas of the configured prefill/decode workers, you can set
191+
> NUM_PREFILL_WORKERS and NUM_DECODE_WORKERS respectively (default: 1).
192+
189193
## Understanding the Output
190194
191195
1. The `srun_aggregated.sh` launches two `srun` jobs. The first launches

components/backends/trtllm/multinode/srun_disaggregated.sh

Lines changed: 40 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,11 @@ MOUNTS="${MOUNTS:-${DEFAULT_MOUNT}}"
1616
NUM_GPUS_PER_NODE=${NUM_GPUS_PER_NODE:-4}
1717

1818
NUM_PREFILL_NODES=${NUM_PREFILL_NODES:-4}
19+
NUM_PREFILL_WORKERS=${NUM_PREFILL_WORKERS:-1}
1920
PREFILL_ENGINE_CONFIG="${PREFILL_ENGINE_CONFIG:-/mnt/engine_configs/deepseek_r1/wide_ep/wide_ep_prefill.yaml}"
2021

2122
NUM_DECODE_NODES=${NUM_DECODE_NODES:-4}
23+
NUM_DECODE_WORKERS=${NUM_DECODE_WORKERS:-1}
2224
DECODE_ENGINE_CONFIG="${DECODE_ENGINE_CONFIG:-/mnt/engine_configs/deepseek_r1/wide_ep/wide_ep_decode.yaml}"
2325

2426
DISAGGREGATION_STRATEGY=${DISAGGREGATION_STRATEGY:-"decode_first"}
@@ -59,38 +61,42 @@ srun \
5961
# NOTE: Output streamed to stdout for ease of understanding the example, but
6062
# in practice you would probably set `srun --output ... --error ...` to pipe
6163
# the stdout/stderr to files.
62-
echo "Launching multi-node prefill worker in background."
63-
DISAGGREGATION_MODE=prefill \
64-
ENGINE_CONFIG=${PREFILL_ENGINE_CONFIG} \
65-
srun \
66-
--mpi pmix \
67-
--oversubscribe \
68-
--container-image "${IMAGE}" \
69-
--container-mounts "${MOUNTS}" \
70-
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \
71-
--verbose \
72-
--label \
73-
-A "${ACCOUNT}" \
74-
-J "${ACCOUNT}-dynamo.trtllm" \
75-
--nodes "${NUM_PREFILL_NODES}" \
76-
--ntasks-per-node "${NUM_GPUS_PER_NODE}" \
77-
--jobid "${SLURM_JOB_ID}" \
78-
/mnt/multinode/start_trtllm_worker.sh &
64+
for ((i=1; i<=${NUM_PREFILL_WORKERS}; i++)); do
65+
echo "Launching multi-node prefill worker in background."
66+
DISAGGREGATION_MODE=prefill \
67+
ENGINE_CONFIG=${PREFILL_ENGINE_CONFIG} \
68+
srun \
69+
--mpi pmix \
70+
--oversubscribe \
71+
--container-image "${IMAGE}" \
72+
--container-mounts "${MOUNTS}" \
73+
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \
74+
--verbose \
75+
--label \
76+
-A "${ACCOUNT}" \
77+
-J "${ACCOUNT}-dynamo.trtllm" \
78+
--nodes "${NUM_PREFILL_NODES}" \
79+
--ntasks-per-node "${NUM_GPUS_PER_NODE}" \
80+
--jobid "${SLURM_JOB_ID}" \
81+
/mnt/multinode/start_trtllm_worker.sh &
82+
done
7983

80-
echo "Launching multi-node decode worker in background."
81-
DISAGGREGATION_MODE=decode \
82-
ENGINE_CONFIG=${DECODE_ENGINE_CONFIG} \
83-
srun \
84-
--mpi pmix \
85-
--oversubscribe \
86-
--container-image "${IMAGE}" \
87-
--container-mounts "${MOUNTS}" \
88-
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \
89-
--verbose \
90-
--label \
91-
-A "${ACCOUNT}" \
92-
-J "${ACCOUNT}-dynamo.trtllm" \
93-
--nodes "${NUM_DECODE_NODES}" \
94-
--ntasks-per-node "${NUM_GPUS_PER_NODE}" \
95-
--jobid "${SLURM_JOB_ID}" \
96-
/mnt/multinode/start_trtllm_worker.sh &
84+
for ((i=1; i<=${NUM_DECODE_WORKERS}; i++)); do
85+
echo "Launching multi-node decode worker in background."
86+
DISAGGREGATION_MODE=decode \
87+
ENGINE_CONFIG=${DECODE_ENGINE_CONFIG} \
88+
srun \
89+
--mpi pmix \
90+
--oversubscribe \
91+
--container-image "${IMAGE}" \
92+
--container-mounts "${MOUNTS}" \
93+
--container-env ETCD_ENDPOINTS,NATS_SERVER,HEAD_NODE_IP,HEAD_NODE,DISAGGREGATION_MODE,DISAGGREGATION_STRATEGY,ENGINE_CONFIG \
94+
--verbose \
95+
--label \
96+
-A "${ACCOUNT}" \
97+
-J "${ACCOUNT}-dynamo.trtllm" \
98+
--nodes "${NUM_DECODE_NODES}" \
99+
--ntasks-per-node "${NUM_GPUS_PER_NODE}" \
100+
--jobid "${SLURM_JOB_ID}" \
101+
/mnt/multinode/start_trtllm_worker.sh &
102+
done

lib/runtime/src/distributed.rs

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,38 +44,17 @@ impl MetricsRegistry for DistributedRuntime {
4444

4545
impl DistributedRuntime {
4646
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
47-
let secondary = runtime.secondary();
4847
let (etcd_config, nats_config, is_static) = config.dissolve();
4948

5049
let runtime_clone = runtime.clone();
5150

5251
let etcd_client = if is_static {
5352
None
5453
} else {
55-
Some(
56-
secondary
57-
.spawn(async move {
58-
let client = etcd::Client::new(etcd_config.clone(), runtime_clone)
59-
.await
60-
.context(format!(
61-
"Failed to connect to etcd server with config {:?}",
62-
etcd_config
63-
))?;
64-
OK(client)
65-
})
66-
.await??,
67-
)
54+
Some(etcd::Client::new(etcd_config.clone(), runtime_clone).await?)
6855
};
6956

70-
let nats_client = secondary
71-
.spawn(async move {
72-
let client = nats_config.clone().connect().await.context(format!(
73-
"Failed to connect to NATS server with config {:?}",
74-
nats_config
75-
))?;
76-
anyhow::Ok(client)
77-
})
78-
.await??;
57+
let nats_client = nats_config.clone().connect().await?;
7958

8059
// Start system status server for health and metrics if enabled in configuration
8160
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();

lib/runtime/src/transports.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,5 @@
2121
pub mod etcd;
2222
pub mod nats;
2323
pub mod tcp;
24+
mod utils;
2425
pub mod zmq;

lib/runtime/src/transports/etcd.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ mod path;
3737
use lease::*;
3838
pub use path::*;
3939

40+
use super::utils::build_in_runtime;
41+
4042
//pub use etcd::ConnectOptions as EtcdConnectOptions;
4143

4244
/// ETCD Client
@@ -45,6 +47,7 @@ pub struct Client {
4547
client: etcd_client::Client,
4648
primary_lease: i64,
4749
runtime: Runtime,
50+
rt: Arc<tokio::runtime::Runtime>,
4851
}
4952

5053
#[derive(Debug, Clone)]
@@ -101,33 +104,36 @@ impl Client {
101104
/// If the lease expires, the [`Runtime`] will be shutdown.
102105
/// If the [`Runtime`] is shutdown, the lease will be revoked.
103106
pub async fn new(config: ClientOptions, runtime: Runtime) -> Result<Self> {
104-
runtime
105-
.secondary()
106-
.spawn(Self::create(config, runtime.clone()))
107-
.await?
108-
}
109-
110-
/// Create a new etcd client and tie the primary [`CancellationToken`] to the primary etcd lease.
111-
async fn create(config: ClientOptions, runtime: Runtime) -> Result<Self> {
112107
let token = runtime.primary_token();
113-
let client =
114-
etcd_client::Client::connect(config.etcd_url, config.etcd_connect_options).await?;
115108

116-
let lease_id = if config.attach_lease {
117-
let lease_client = client.lease_client();
109+
let ((client, lease_id), rt) = build_in_runtime(
110+
async move {
111+
let client =
112+
etcd_client::Client::connect(config.etcd_url, config.etcd_connect_options)
113+
.await?;
118114

119-
let lease = create_lease(lease_client, 10, token)
120-
.await
121-
.context("creating primary lease")?;
115+
let lease_id = if config.attach_lease {
116+
let lease_client = client.lease_client();
122117

123-
lease.id
124-
} else {
125-
0
126-
};
118+
let lease = create_lease(lease_client, 10, token)
119+
.await
120+
.context("creating primary lease")?;
121+
122+
lease.id
123+
} else {
124+
0
125+
};
126+
127+
Ok((client, lease_id))
128+
},
129+
1,
130+
)
131+
.await?;
127132

128133
Ok(Client {
129134
client,
130135
primary_lease: lease_id,
136+
rt,
131137
runtime,
132138
})
133139
}
@@ -155,19 +161,15 @@ impl Client {
155161
pub async fn create_lease(&self, ttl: i64) -> Result<Lease> {
156162
let token = self.runtime.child_token();
157163
let lease_client = self.client.lease_client();
158-
self.runtime
159-
.secondary()
164+
self.rt
160165
.spawn(create_lease(lease_client, ttl, token))
161166
.await?
162167
}
163168

164169
// Revoke an etcd lease given its lease id. A wrapper over etcd_client::LeaseClient::revoke
165170
pub async fn revoke_lease(&self, lease_id: i64) -> Result<()> {
166171
let lease_client = self.client.lease_client();
167-
self.runtime
168-
.secondary()
169-
.spawn(revoke_lease(lease_client, lease_id))
170-
.await?
172+
self.rt.spawn(revoke_lease(lease_client, lease_id)).await?
171173
}
172174

173175
pub async fn kv_create(&self, key: &str, value: Vec<u8>, lease_id: Option<i64>) -> Result<()> {
@@ -340,7 +342,7 @@ impl Client {
340342

341343
let (tx, rx) = mpsc::channel(32);
342344

343-
self.runtime.secondary().spawn(async move {
345+
self.rt.spawn(async move {
344346
for kv in kvs {
345347
if tx.send(WatchEvent::Put(kv)).await.is_err() {
346348
// receiver is already closed

lib/runtime/src/transports/nats.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use bytes::Bytes;
3535
use derive_builder::Builder;
3636
use futures::{StreamExt, TryStreamExt};
3737
use std::path::{Path, PathBuf};
38+
use std::sync::Arc;
3839
use tokio::fs::File as TokioFile;
3940
use tokio::io::AsyncRead;
4041
use tokio::time;
@@ -44,6 +45,8 @@ use validator::{Validate, ValidationError};
4445
pub use crate::slug::Slug;
4546
use tracing as log;
4647

48+
use super::utils::build_in_runtime;
49+
4750
pub const URL_PREFIX: &str = "nats://";
4851

4952
#[derive(Clone)]
@@ -236,7 +239,9 @@ fn validate_nats_server(server: &str) -> Result<(), ValidationError> {
236239
}
237240
}
238241

239-
#[allow(dead_code)]
242+
// TODO(jthomson04): We really shouldn't be hardcoding this.
243+
const NATS_WORKER_THREADS: usize = 4;
244+
240245
impl ClientOptions {
241246
/// Create a new [`ClientOptionsBuilder`]
242247
pub fn builder() -> ClientOptionsBuilder {
@@ -258,7 +263,17 @@ impl ClientOptions {
258263
}
259264
};
260265

261-
let client = client.connect(self.server).await?;
266+
let (client, _) = build_in_runtime(
267+
async move {
268+
client
269+
.connect(self.server)
270+
.await
271+
.map_err(|e| anyhow::anyhow!("Failed to connect to NATS: {e}"))
272+
},
273+
NATS_WORKER_THREADS,
274+
)
275+
.await?;
276+
262277
let js_ctx = jetstream::new(client.clone());
263278

264279
Ok(Client { client, js_ctx })
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::{future::Future, sync::Arc};
5+
6+
use anyhow::Result;
7+
8+
pub async fn build_in_runtime<
9+
T: Send + Sync + 'static,
10+
F: Future<Output = Result<T>> + Send + 'static,
11+
>(
12+
f: F,
13+
num_threads: usize,
14+
) -> Result<(T, Arc<tokio::runtime::Runtime>)> {
15+
let (tx, rx) = tokio::sync::oneshot::channel();
16+
17+
let runtime = Arc::new(
18+
tokio::runtime::Builder::new_multi_thread()
19+
.worker_threads(num_threads)
20+
.enable_all()
21+
.build()?,
22+
);
23+
24+
let runtime_clone = runtime.clone();
25+
std::thread::spawn(move || {
26+
runtime_clone.block_on(async move {
27+
let result = f.await;
28+
tx.send(result)
29+
.unwrap_or_else(|_| panic!("This should never happen!"));
30+
31+
std::future::pending::<()>().await;
32+
})
33+
});
34+
35+
let result = rx.await??;
36+
37+
Ok((result, runtime))
38+
}

0 commit comments

Comments
 (0)