Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: add recorder to KVBM Connector
  • Loading branch information
ziqifan617 committed Aug 5, 2025
commit 88690d29919e55aff0fabc6adddd72d0b4c0394e
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use std::{
sync::{Arc, Mutex},
};
use tokio::sync::mpsc;
use tokio;
use pyo3_async_runtimes;

type VllmLocality = Logical<DistributedLeaderWorkerResources>;

Expand All @@ -36,6 +38,17 @@ impl From<SlotError> for PyErr {
to_pyerr(err)
}
}
use dynamo_llm::recorder::Recorder;
use tokio_util::sync::CancellationToken;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MatchedTokensEvent {
request_id: String,
request_num_tokens: usize,
num_computed_tokens: usize,
num_matched_tokens: usize,
has_matched: bool,
}

#[pyclass]
pub struct KvConnectorLeader {
Expand All @@ -44,6 +57,7 @@ pub struct KvConnectorLeader {
inflight_requests: HashSet<String>,
onboarding_slots: HashSet<String>,
iteration_counter: u64,
recorder: Option<Recorder<MatchedTokensEvent>>,
}

#[pymethods]
Expand All @@ -70,12 +84,34 @@ impl KvConnectorLeader {
// if we need a drt, get it from here
let drt = drt.inner().clone();

let enable_kvbm_record = std::env::var("ENABLE_KVBM_RECORD")
.map(|v| v == "1" || v.eq_ignore_ascii_case("true"))
.unwrap_or(false);

let recorder = if enable_kvbm_record {
let token = CancellationToken::new();
let output_path = "/tmp/records.jsonl";
tracing::info!("recording events to {}", output_path);

// Create recorder synchronously using pyo3 async runtime
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let recorder_result = runtime.block_on(async {
// TODO: using Some(2) for testing - quick shutdown of the recorder
// Should be None or a larger number
Recorder::new(token, &output_path, None, Some(2), None).await
}).unwrap();
Some(recorder_result)
} else {
None
};

Self {
slot_manager: ConnectorSlotManager::new(block_manager.clone(), leader, drt.clone()),
block_size,
inflight_requests: HashSet::new(),
onboarding_slots: HashSet::new(),
iteration_counter: 0,
recorder,
}
}

Expand Down Expand Up @@ -125,8 +161,34 @@ impl KvConnectorLeader {
"scheduling onboarding for {} external tokens",
num_external_tokens
);
if let Some(recorder) = &self.recorder {
let event_tx = recorder.event_sender();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async {
event_tx.send(MatchedTokensEvent {
request_id: request_id,
request_num_tokens: request_num_tokens,
num_computed_tokens: num_computed_tokens,
num_matched_tokens: num_external_tokens,
has_matched: true,
}).await
});
}
Ok((num_external_tokens, true))
} else {
if let Some(recorder) = &self.recorder {
let event_tx = recorder.event_sender();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async {
event_tx.send(MatchedTokensEvent {
request_id: request_id,
request_num_tokens: request_num_tokens,
num_computed_tokens: num_computed_tokens,
num_matched_tokens: 0,
has_matched: false,
}).await
});
}
Ok((0, false))
}
}
Expand Down
Loading