Skip to content
Merged
Prev Previous commit
Next Next commit
fix
Signed-off-by: richardhuo-nv <[email protected]>
  • Loading branch information
richardhuo-nv committed Aug 29, 2025
commit 04c3cdb96bee65624d0ee3c8447b1d810c2c13cb
2 changes: 1 addition & 1 deletion docs/guides/run_kvbm_in_trtllm.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ This guide explains how to leverage KVBM (KV Block Manager) to mange KV cache an
To learn what KVBM is, please check [here](https://docs.nvidia.com/dynamo/latest/architecture/kvbm_intro.html)

> [!Note]
> - Ensure that `etcd` is running before starting.
> - Ensure that `etcd` and 'nats' are running before starting.
> - KVBM does not currently support CUDA graphs in TensorRT-LLM.
> - KVBM only supports TensorRT-LLM’s PyTorch backend.
> - KVBM requires TensorRT-LLM at commit ce580ce4f52af3ad0043a800b3f9469e1f1109f6 or newer.
Expand Down
51 changes: 48 additions & 3 deletions lib/llm/src/block_manager/distributed/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use zmq::*;

use dynamo_runtime::utils::leader_worker_barrier::LeaderBarrier;

use anyhow::Context;
use anyhow::{Context, anyhow};
use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
Expand Down Expand Up @@ -85,6 +85,7 @@ pub struct KvbmLeaderState {
pub num_host_blocks: Arc<AtomicUsize>,
pub num_disk_blocks: Arc<AtomicUsize>,
pub workers_allocation_ready: Arc<AtomicBool>,
pub workers_ready_notify: Arc<Notify>,
}

/// The leader of the KVBM.
Expand Down Expand Up @@ -203,6 +204,7 @@ impl KvbmLeader {
.min()
.unwrap();

// TODO: this works for TP, need to redefine bytes_per_block when we enable the DP/PP
let bytes_per_block: usize = worker_data.values().map(|d| d.bytes_per_block).sum();

assert!(
Expand Down Expand Up @@ -282,6 +284,7 @@ impl KvbmLeader {
state
.workers_allocation_ready
.store(true, Ordering::Release);
state.workers_ready_notify.notify_waiters();
}
Err(e) => {
tracing::error!("ZMQ init failed: {e:?}");
Expand All @@ -292,9 +295,31 @@ impl KvbmLeader {

// This is supposed to be used in non-blocking leader initialization
pub fn spawn_leader_readiness_barrier(&self, drt: DistributedRuntime) {
let timeout_secs = self.config.leader_init_timeout_secs;
let state = self.state.clone();
let leader_config = self.config.clone();
let handle = drt.runtime().primary();
handle.spawn(async move {
if !state.workers_allocation_ready.load(Ordering::Acquire) {
// Wait until ZMQ marks ready or we time out.
let waited = tokio::time::timeout(
Duration::from_secs(timeout_secs),
state.workers_ready_notify.notified(),
)
.await;
if waited.is_err() {
tracing::error!(
"leader readiness barrier wait timed out after {timeout_secs} seconds"
);
return;
}
// Double-check the flag (Acquire) after wakeup.
if !state.workers_allocation_ready.load(Ordering::Acquire) {
tracing::error!("leader readiness notify fired but flag not set; aborting");
return;
}
}

match KvbmLeader::run_leader_readiness(drt, leader_config).await {
Ok(()) => {
tracing::info!("leader readiness barrier synced!");
Expand All @@ -311,12 +336,32 @@ impl KvbmLeader {
&self,
drt: DistributedRuntime,
) -> anyhow::Result<()> {
let state = self.state.clone();
let timeout_secs = self.config.leader_init_timeout_secs;
let leader_config = self.config.clone();
let fut = KvbmLeader::run_leader_readiness(drt, leader_config);

tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(fut)
.block_on(async move {
// Create the future *before* checking the flag to avoid a lost-notify race.
let notified = state.workers_ready_notify.notified();

if !state.workers_allocation_ready.load(Ordering::Acquire) {
// Wait (with timeout) until ZMQ task marks ready.
tokio::time::timeout(Duration::from_secs(timeout_secs), notified)
.await
.map_err(|_| anyhow!("timed out waiting for workers_allocation_ready after {timeout_secs} seconds"))?;

// Double-check after wake to ensure the flag is actually set.
if !state.workers_allocation_ready.load(Ordering::Acquire) {
return Err(anyhow!(
"notified but workers_allocation_ready is still false"
));
}
}

KvbmLeader::run_leader_readiness(drt, leader_config).await
})
.context("leader readiness barrier failed")
})
}
Expand Down
Loading