Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
chunked prefill seems to revisit the get_num_new_matched_tokens more …
…than once; short-circuiting on multiple invocations; will need to reset this state on eviction
  • Loading branch information
ryanolson committed Aug 5, 2025
commit a7792f1db6b431635a7285e65acea4f536d12f75
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,19 @@ impl KvConnectorLeader {
"request_num_tokens: {request_num_tokens}; num_computed_tokens: {num_computed_tokens}"
);

// the number of device matched tokens should be less than or equal to the number of tokens in the request
debug_assert!(num_computed_tokens % self.block_size == 0);

let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;

if slot.has_matched_external_tokens() {
tracing::debug!(
"detected multiple calls to get_num_new_matched_tokens; skipping lookup"
);
return Ok((0, false));
}

// the number of device matched tokens should be less than or equal to the number of tokens in the request
debug_assert!(num_computed_tokens % self.block_size == 0);

// vllm is telling us that the tokens have been computed, since we do not have insight into the device pool
// we accept this and advance the computed position
slot.advance_computed_position(num_computed_tokens)?;
Expand Down Expand Up @@ -153,7 +160,10 @@ impl KvConnectorLeader {
let shared_slot = self.slot_manager.get_slot(&request_id).map_err(to_pyerr)?;
let mut slot = shared_slot.lock().map_err(to_pyerr)?;

slot.append_mutable_device_blocks(&block_ids)?;
if slot.has_matched_external_tokens() {
tracing::debug!("detected multiple calls to update_state_after_alloc; skipping lookup");
return Ok(());
}

// the second call will show num_external_tokens == 0
// this call is just letting us know the other blocks that are being used for the remainder of the prefill
Expand All @@ -170,7 +180,7 @@ impl KvConnectorLeader {
Ok(())
}

#[tracing::instrument(level = "debug", skip_all)]
#[tracing::instrument(level = "debug", skip_all, fields(iteration = self.iteration_counter + 1))]
pub fn build_connector_metadata(
&mut self,
scheduler_output: SchedulerOutput,
Expand All @@ -183,8 +193,8 @@ impl KvConnectorLeader {
self.iteration_counter += 1;
let iteration = self.iteration_counter;

tracing::debug!("Building connector metadata; iteration {iteration}");
tracing::debug!("scheduler_output: {scheduler_output:#?}");
tracing::debug!("Building connector metadata");
tracing::debug!("SchedulerOutput: {scheduler_output:#?}");

let mut inflight_requests = self.inflight_requests.clone();
let mut md = ConnectorMetadata::new(iteration);
Expand Down Expand Up @@ -285,8 +295,9 @@ impl KvConnectorLeader {
}
}

tracing::debug!("scheduler_output: {scheduler_output:#?}");
serde_json::to_vec(&md).map_err(to_pyerr)
let metadata = serde_json::to_vec(&md).map_err(to_pyerr)?;
tracing::debug!("metadata: {metadata:#?}");
Ok(metadata)
}

fn request_finished(&mut self, request_id: String, block_ids: Vec<BlockId>) -> PyResult<bool> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub enum SlotState {
pub trait Slot: std::fmt::Debug {
fn request_id(&self) -> &str;

/// It appears vLLM can call `get_num_new_matched_tokens` multiple times for the same request, even without an eviction
/// event. Generally, we only need to rematch to external tokens one time at the beginning of the request, or after an
/// eviction event.
///
/// Call this method to determine if the slot holds a match state or if [`Slot::acquire_all_local_matches`] should be called.
fn has_matched_external_tokens(&self) -> bool;

fn state(&self) -> SlotState;

fn sequence(&self) -> &TokenBlockSequence;
Expand Down Expand Up @@ -277,6 +284,9 @@ pub struct VllmConnectorSlot {
/// The number of blocks that have been evaluated by the policy.
/// Each policy evaluation will skip the already evaluated blocks.
evaluated_blocks: usize,

/// Whether the slot has already matched to external tokens.
has_matched_external_tokens: bool,
}

impl VllmConnectorSlot {
Expand Down Expand Up @@ -310,6 +320,7 @@ impl VllmConnectorSlot {
tokens_cached_from_device: 0,
tokens_cached_from_host: 0,
tokens_cached_from_disk: 0,
has_matched_external_tokens: false,
}
}
}
Expand Down Expand Up @@ -348,17 +359,19 @@ impl Slot for VllmConnectorSlot {
tracing::debug!("recording {} cached disk tokens", num_tokens);
}

#[tracing::instrument(level = "debug", skip_all, fields(request_id = self.request_id.as_str()))]
fn apply_scheduler_output(
&mut self,
tokens: &[u32],
block_ids: &[BlockId],
num_computed_tokens: usize,
num_scheduled_tokens: usize,
) -> Result<(), SlotError> {
// debug_assert!(num_computed_tokens == self.computed_tokens());

if !tokens.is_empty() {
tracing::debug!("appending {} newly decodedtokens to sequence", tokens.len());
tracing::debug!(
"appending {} newly decoded tokens to sequence",
tokens.len()
);
self.state = SlotState::Decoding;
self.sequence.extend(tokens.into()).unwrap();
} else {
Expand Down Expand Up @@ -442,6 +455,8 @@ impl Slot for VllmConnectorSlot {

self.offload_blocks(&offload_block_ids, &offload_token_blocks)
.expect("failed to offload blocks");

self.evaluated_blocks += num_candidate_blocks;
}

// done applying policy
Expand Down Expand Up @@ -492,8 +507,15 @@ impl Slot for VllmConnectorSlot {
self.pending_operations.take()
}

fn has_matched_external_tokens(&self) -> bool {
self.has_matched_external_tokens
}

#[tracing::instrument(level = "debug", skip_all)]
fn acquire_all_local_matches(&mut self) -> Result<(), SlotError> {
assert!(!self.has_matched_external_tokens);
self.has_matched_external_tokens = true;

if !matches!(self.state(), SlotState::Initialized) {
return Err(SlotError::InvalidOperation(format!(
"slot must be in the NotScheduled state to acquire local matches; got {:?}",
Expand Down
Loading