Skip to content
Merged
Prev Previous commit
Next Next commit
resolve comments
Signed-off-by: richardhuo-nv <[email protected]>
  • Loading branch information
richardhuo-nv committed Aug 29, 2025
commit 63aa8f3ebb6c0e6d6860f93ff051af4f600cbe4d
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl KvConnectorLeader {

let block_manager = match BlockManagerBuilder::new()
.worker_id(worker_id)
.leader(leader_py) // your distributed::KvbmLeader
.leader(leader_py)
.page_size(page_size)
.disable_device_pool(false)
.build()
Expand Down Expand Up @@ -177,11 +177,6 @@ impl Leader for KvConnectorLeader {
.lock()
.map_err(|e| anyhow::anyhow!("Failed to lock slot: {}", e))?;

if slot.state() == SlotState::Prefilling {
tracing::warn!("slot is in the Prefilled state; this seems like we need to reset the slot and start over");
slot.reset();
}

// early exit if we cannot match full block
if (slot.sequence().total_tokens() - num_computed_tokens) < self.block_size {
let total_tokens = slot.sequence().total_tokens();
Expand Down Expand Up @@ -412,7 +407,7 @@ impl Leader for KvConnectorLeader {
.remove(&request_id);

// if the slot has finished, we can return false to trtllm, indicating all gpu blocks are free to be reused
// otherwise, we return false, which means there are still outstanding operations on gpu blocks which
// otherwise, we return true, which means there are still outstanding operations on gpu blocks which
// must be awaited before the gpu blocks can be reused. if we return true, then it is the worker side
// of the connector api which will be used to inform trtllm that the request is finished.
if let SlotState::Finished = slot.state() {
Expand Down
36 changes: 17 additions & 19 deletions lib/llm/src/block_manager/distributed/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,17 @@ impl KvbmLeader {
};

let cancel_token = tokio_util::sync::CancellationToken::new();
leader.spawn_barrier_task(
drt,

// The leader_sockets struct cannot be cloned,
// so we use a tuple to "struct" the two urls
let leader_urls = (
leader_sockets.pub_url.clone(),
leader_sockets.ack_url.clone(),
);
leader.spawn_barrier_task(
drt,
leader_urls
);
leader.spawn_zmq_task(leader_sockets, cancel_token);

Ok(leader)
Expand All @@ -136,8 +142,7 @@ impl KvbmLeader {
fn spawn_barrier_task(
&self,
drt: DistributedRuntime,
leader_sockets_pub_url: String,
leader_sockets_ack_url: String,
leader_urls: (String, String),
) {
let state = self.state.clone();
let leader_config = self.config.clone();
Expand All @@ -148,8 +153,7 @@ impl KvbmLeader {
tokio::spawn(async move {
match KvbmLeader::run_barrier_sync(
drt,
leader_sockets_pub_url,
leader_sockets_ack_url,
leader_urls,
leader_config,
)
.await
Expand Down Expand Up @@ -180,8 +184,7 @@ impl KvbmLeader {

async fn run_barrier_sync(
drt: DistributedRuntime,
leader_sockets_pub_url: String,
leader_sockets_ack_url: String,
leader_urls: (String, String),
leader_config: KvbmLeaderConfig,
) -> anyhow::Result<(usize, usize, usize)> {
let barrier_id_worker_to_leader =
Expand All @@ -191,24 +194,18 @@ impl KvbmLeader {
leader_config.world_size,
barrier_id_worker_to_leader
);
let zmq_data_worker_to_leader: Arc<KvbmLeaderData> = Arc::new(KvbmLeaderData {
pub_url: leader_sockets_pub_url.clone(),
ack_url: leader_sockets_ack_url.clone(),
num_host_blocks: 0, // doesn't matter for worker to leader sync
num_disk_blocks: 0, // doesn't matter for worker to leader sync
});

// Build our leader barrier and publish the data.
// TODO: Use a separate timeout parameter from the ZMQ connection timeout
let worker_to_leader_barrier: LeaderBarrier<KvbmLeaderData, worker::KvbmWorkerData> =
let worker_to_leader_barrier: LeaderBarrier<(), worker::KvbmWorkerData> =
LeaderBarrier::new(
barrier_id_worker_to_leader.clone(),
leader_config.world_size,
Some(Duration::from_secs(leader_config.leader_init_timeout_secs)),
);

let worker_data = worker_to_leader_barrier
.sync(&drt, zmq_data_worker_to_leader.as_ref())
.sync(&drt, &())
.await
.map_err(|e| anyhow::anyhow!("Failed to sync worker to leader barrier: {:?}", e))?;

Expand Down Expand Up @@ -245,14 +242,15 @@ impl KvbmLeader {
barrier_id_leader_to_worker
);

let (leader_pub_url, leader_ack_url) = leader_urls;
let zmq_data_leader_to_worker = Arc::new(KvbmLeaderData {
pub_url: leader_sockets_pub_url.clone(),
ack_url: leader_sockets_ack_url.clone(),
pub_url: leader_pub_url,
ack_url: leader_ack_url,
num_host_blocks,
num_disk_blocks,
});

let leader_to_worker_barrier: LeaderBarrier<KvbmLeaderData, worker::KvbmWorkerData> =
let leader_to_worker_barrier: LeaderBarrier<KvbmLeaderData, ()> =
LeaderBarrier::new(
barrier_id_leader_to_worker.clone(),
leader_config.world_size,
Expand Down
14 changes: 6 additions & 8 deletions lib/llm/src/block_manager/distributed/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ impl KvbmWorker {
barrier_id_worker_to_leader
);

let worker_to_leader_barrier = WorkerBarrier::<KvbmLeaderData, KvbmWorkerData>::new(
let worker_to_leader_barrier = WorkerBarrier::<(), KvbmWorkerData>::new(
barrier_id_worker_to_leader,
worker_id.to_string(),
);
Expand All @@ -344,8 +344,7 @@ impl KvbmWorker {
bytes_per_block,
};

// leader_data is not important in the worker to leader phase
let _leader_data = tokio::select! {
tokio::select! {
_ = cancel_token.cancelled() => {
return Err(anyhow::anyhow!("Cancelled"))
}
Expand All @@ -356,9 +355,8 @@ impl KvbmWorker {
.map_err(|e| anyhow::anyhow!("Failed to sync worker to leader barrier: {:?}", e))?;

tracing::debug!(
"Worker {} received leader data: {:?} in worker to leader phase",
worker_id,
_leader_data
"Worker {} sent the worker data in worker to leader phase",
worker_id
);

let barrier_id_leader_to_worker =
Expand All @@ -369,7 +367,7 @@ impl KvbmWorker {
barrier_id_leader_to_worker
);

let leader_to_worker_barrier = WorkerBarrier::<KvbmLeaderData, KvbmWorkerData>::new(
let leader_to_worker_barrier = WorkerBarrier::<KvbmLeaderData, ()>::new(
barrier_id_leader_to_worker,
worker_id.to_string(),
);
Expand All @@ -378,7 +376,7 @@ impl KvbmWorker {
_ = cancel_token.cancelled() => {
return Err(anyhow::anyhow!("Cancelled"))
}
leader_data = leader_to_worker_barrier.sync(&drt, &worker_data) => {
leader_data = leader_to_worker_barrier.sync(&drt, &()) => {
leader_data
}
}
Expand Down