Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a242346
checkpt before branch switch
karen-sy Nov 13, 2025
1771f85
checkpt before buffers
karen-sy Nov 20, 2025
a415a1a
checkpoint before testing localkvindexer
karen-sy Nov 20, 2025
6adda0d
undo accidental toml change
karen-sy Nov 21, 2025
ad485f0
fix Arc patterns, patch unit test races
karen-sy Nov 24, 2025
76a2088
Merge remote-tracking branch 'origin/main' into karenc/distributed_bl…
karen-sy Nov 24, 2025
54eca97
format + fix clippy warnings
karen-sy Nov 24, 2025
6e58c2d
checkpoint before branch switch
karen-sy Nov 25, 2025
2608be0
draft of components with hacky raw nats client
karen-sy Nov 26, 2025
43855ae
small comments
karen-sy Nov 26, 2025
b35781a
up
karen-sy Nov 26, 2025
4227941
merge main
karen-sy Dec 1, 2025
eaf72b8
integration tests with mock workers
karen-sy Dec 2, 2025
c077d36
docstrings, formatting
karen-sy Dec 2, 2025
42a5a3e
helpers for indexing into buffers
karen-sy Dec 2, 2025
02ed5b9
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
e439489
merge main for ci fix
karen-sy Dec 2, 2025
44628af
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
ded0b08
move shared DRT creator to distributed.rs
karen-sy Dec 2, 2025
6ee2921
expose publisher feature via frontend/router flag which sets env var
karen-sy Dec 3, 2025
05bd78b
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
e806343
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
777ee1a
revert os flag thing
karen-sy Dec 4, 2025
df28d3f
mdc registration for enable localkvindexer + related workerqueryclien…
karen-sy Dec 5, 2025
54c2efb
nit
karen-sy Dec 5, 2025
b23a84a
fix nits
karen-sy Dec 5, 2025
fe3b56a
use routerevents, move test
karen-sy Dec 5, 2025
0dad7c5
merge main
karen-sy Dec 5, 2025
37fee4b
up
karen-sy Dec 5, 2025
43ebf91
oops didnt include args.py diff
karen-sy Dec 5, 2025
3b60d0f
nit
karen-sy Dec 8, 2025
4b00804
quick checkpoint before switching branches
karen-sy Dec 9, 2025
cb7568c
ranged workerevent query
karen-sy Dec 10, 2025
c76c74a
recovery for startup and gaps
karen-sy Dec 11, 2025
6a5b2dd
Merge remote-tracking branch 'origin/main' into karenc/mdc_from_frontend
karen-sy Dec 11, 2025
69135ad
add back integration guard
karen-sy Dec 11, 2025
f2be88e
clippy
karen-sy Dec 11, 2025
3fda0f3
Merge branch 'main' of github.com:ai-dynamo/dynamo into karenc/distri…
karen-sy Dec 11, 2025
582d836
Merge branch 'karenc/mdc_from_frontend' into karenc/distributed_blocks
karen-sy Dec 11, 2025
6b5eeb1
clone mdc rx, os env args, test header
karen-sy Dec 11, 2025
afb519c
Merge branch 'main' into karenc/distributed_blocks
PeaBrane Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use routerevents, move test
  • Loading branch information
karen-sy committed Dec 5, 2025
commit fe3b56a110c8e0e8f5711fd0a3a968302599ea49
3 changes: 2 additions & 1 deletion lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub mod sequence;
pub mod subscriber;
pub mod worker_query;

use indexer::WorkerKvQueryResponse;
pub use prefill_router::PrefillRouter;
use worker_query::WorkerQueryClient;

Expand Down Expand Up @@ -518,7 +519,7 @@ impl KvRouter {
pub async fn query_worker_local_kv(
&self,
worker_id: WorkerId,
) -> Result<protocols::WorkerKvQueryResponse> {
) -> Result<WorkerKvQueryResponse> {
let query_client = self
.worker_query_client
.as_ref()
Expand Down
164 changes: 132 additions & 32 deletions lib/llm/src/kv_router/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,28 @@ impl RouterEvent {
}
}

// -------
// Distributed router - Worker KV Query types
// -------

/// Request to query a worker's local KV indexer.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerKvQueryRequest {
/// The worker ID of the worker to query.
/// TODO: let request query for `n` events.
/// For now, assume that every request is
/// a request to dump the whole event buffer
/// in the worker local KvIndexer
pub worker_id: WorkerId,
}

/// Response from a worker's local KV indexer.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerKvQueryResponse {
/// The events from the worker local KvIndexer.
pub events: Vec<RouterEvent>,
}

/// A block in the Radix Tree.
#[derive(Debug)]
struct RadixBlock {
Expand Down Expand Up @@ -1296,8 +1318,7 @@ pub struct LocalKvIndexer {
/// The underlying indexer
indexer: KvIndexer,
/// Circular buffer of recent events
/// Stores (worker_id, event) tuples
event_buffer: Mutex<VecDeque<(WorkerId, KvCacheEvent)>>,
event_buffer: Mutex<VecDeque<RouterEvent>>,
/// Maximum number of events to keep in buffer
max_buffer_size: usize,
}
Expand All @@ -1312,32 +1333,26 @@ impl LocalKvIndexer {
) -> Self {
Self {
indexer: KvIndexer::new(token, kv_block_size, metrics),
event_buffer: std::sync::Mutex::new(std::collections::VecDeque::with_capacity(
max_buffer_size,
)), // circular buffer for O(1) pop
event_buffer: Mutex::new(VecDeque::with_capacity(max_buffer_size)),
max_buffer_size,
}
}

/// get the N most recent events (returned in oldest->newest order)
pub fn get_recent_events(&self, n: usize) -> Vec<(WorkerId, KvCacheEvent)> {
pub fn get_recent_events(&self, n: usize) -> Vec<RouterEvent> {
// TODO what if n > buffer size
let buffer = self.event_buffer.lock().unwrap();
buffer.iter().rev().take(n).cloned().rev().collect()
}

/// get all buffered events (oldest first)
pub fn get_all_buffered_events(&self) -> Vec<(WorkerId, KvCacheEvent)> {
pub fn get_all_buffered_events(&self) -> Vec<RouterEvent> {
let buffer = self.event_buffer.lock().unwrap();
buffer.iter().cloned().collect()
}

/// Returns events in [start_id, end_id)
pub fn get_events_in_id_range(
&self,
start_id: u64,
end_id: u64,
) -> Vec<(WorkerId, KvCacheEvent)> {
pub fn get_events_in_id_range(&self, start_id: u64, end_id: u64) -> Vec<RouterEvent> {
let buffer = self.event_buffer.lock().unwrap();
if buffer.is_empty() {
tracing::warn!("No events in buffer yet; returning empty result.");
Expand All @@ -1352,10 +1367,10 @@ impl LocalKvIndexer {
return Vec::new();
}

let first_id = buffer.front().map(|(_, event)| event.event_id).unwrap();
let last_id = buffer.back().map(|(_, event)| event.event_id).unwrap();
let first_id = buffer.front().map(|e| e.event.event_id).unwrap();
let last_id = buffer.back().map(|e| e.event.event_id).unwrap();

let start_idx = match buffer.binary_search_by_key(&start_id, |(_, event)| event.event_id) {
let start_idx = match buffer.binary_search_by_key(&start_id, |e| e.event.event_id) {
Ok(idx) => idx,
Err(_) if start_id < first_id => {
tracing::warn!(
Expand All @@ -1376,7 +1391,7 @@ impl LocalKvIndexer {
Err(insertion_point) => insertion_point,
};

let end_idx = match buffer.binary_search_by_key(&end_id, |(_, event)| event.event_id) {
let end_idx = match buffer.binary_search_by_key(&end_id, |e| e.event.event_id) {
Ok(idx) => idx,
Err(_) if end_id < first_id => {
return Vec::new();
Expand All @@ -1401,24 +1416,24 @@ impl LocalKvIndexer {
}

/// Record an event in the buffer
fn record_event(&self, worker_id: WorkerId, event: KvCacheEvent) {
fn record_event(&self, event: RouterEvent) {
let mut buffer = self.event_buffer.lock().unwrap();

// Check that event id is consecutive to last one
if let Some((_, last_event)) = buffer.back()
&& event.event_id != last_event.event_id + 1
if let Some(last_event) = buffer.back()
&& event.event.event_id != last_event.event.event_id + 1
{
let expected = last_event.event_id + 1;
let expected = last_event.event.event_id + 1;
tracing::error!(
worker_id,
worker_id = event.worker_id,
expected,
got = event.event_id,
got = event.event.event_id,
"Non-consecutive KV event id; buffer may have gaps"
);
}

// Add to back
buffer.push_back((worker_id, event));
buffer.push_back(event);

// Remove from front if over capacity (circular buffer behavior)
while buffer.len() > self.max_buffer_size {
Expand All @@ -1430,11 +1445,8 @@ impl LocalKvIndexer {
///
/// This records the event in the buffer and forwards it to the underlying indexer.
pub async fn apply_event_with_buffer(&self, event: RouterEvent) -> Result<(), KvRouterError> {
let worker_id = event.worker_id;
let kv_event = event.event.clone();

// Record in buffer
self.record_event(worker_id, kv_event);
self.record_event(event.clone());

// Forward to underlying indexer
self.indexer
Expand Down Expand Up @@ -1497,7 +1509,7 @@ mod local_kv_indexer_tests {
{
let mut buffer = indexer.event_buffer.lock().unwrap();
for &id in ids {
buffer.push_back((
buffer.push_back(RouterEvent::new(
0,
KvCacheEvent {
event_id: id,
Expand All @@ -1514,19 +1526,31 @@ mod local_kv_indexer_tests {
fn returns_slice_within_range() {
let indexer = make_indexer_with_events(&[1, 2, 3, 4, 5]);
let mut result = indexer.get_events_in_id_range(2, 4);
let mut ids: Vec<u64> = result.iter().map(|(_, event)| event.event_id).collect();
let mut ids: Vec<u64> = result
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![2, 3]); // return slice within range

result = indexer.get_events_in_id_range(2, 6);
ids = result.iter().map(|(_, event)| event.event_id).collect();
ids = result
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![2, 3, 4, 5]); // clamp max (TODO error instead?)

result = indexer.get_events_in_id_range(0, 4);
ids = result.iter().map(|(_, event)| event.event_id).collect();
ids = result
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert_eq!(ids, vec![1, 2, 3]); // clamp min (TODO error instead?)

result = indexer.get_events_in_id_range(0, 0);
ids = result.iter().map(|(_, event)| event.event_id).collect();
ids = result
.iter()
.map(|router_event| router_event.event.event_id)
.collect();
assert!(ids.is_empty()); // return empty when start is before buffer
}
}
Expand Down Expand Up @@ -3286,3 +3310,79 @@ mod tests {
assert!(result.contains_key(&WorkerWithDpRank::from_worker_id(worker_2)));
}
}

#[cfg(test)]
mod tests_local_indexer_query {
use super::*;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use tokio_util::sync::CancellationToken;

#[tokio::test]
async fn test_local_indexer_buffer_and_serialization() {
// Tests components of the LocalKvIndexer query without using nats

let worker_id = 42u64;

// Create a local indexer
let token = CancellationToken::new();
let metrics = Arc::new(KvIndexerMetrics::new_unregistered());
let local_indexer = Arc::new(LocalKvIndexer::new(token.clone(), 4, metrics, 100));

// Add events to local indexer's buffer
let test_event_1 = RouterEvent::new(
worker_id,
KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
blocks: vec![KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200),
}],
}),
dp_rank: 0,
},
);

// Apply events with buffer
local_indexer
.apply_event_with_buffer(test_event_1)
.await
.unwrap();

// Wait for events to be processed
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;

// Get buffered events (what the query service would return)
let buffered_events = local_indexer.get_all_buffered_events();

// Verify buffer contents
assert_eq!(buffered_events.len(), 1, "Buffer should have 1 event");
assert_eq!(buffered_events[0].worker_id, worker_id);
assert_eq!(buffered_events[0].event.event_id, 1);

// Build the response that would be sent
let response = WorkerKvQueryResponse {
events: buffered_events.clone(),
};

// Test serialization/deserialization (simulating NATS round-trip)
let serialized = serde_json::to_vec(&response).unwrap();
let deserialized: WorkerKvQueryResponse = serde_json::from_slice(&serialized).unwrap();

// Verify response correctness
assert_eq!(deserialized.events.len(), 1);
assert_eq!(deserialized.events[0].worker_id, worker_id);
assert_eq!(deserialized.events[0].event.event_id, 1);

// Verify event data
match &deserialized.events[0].event.data {
KvCacheEventData::Stored(store_data) => {
assert_eq!(store_data.blocks.len(), 1);
assert_eq!(store_data.blocks[0].block_hash.0, 100);
assert_eq!(store_data.blocks[0].tokens_hash.0, 200);
}
_ => panic!("Expected Stored event"),
}
}
}
20 changes: 0 additions & 20 deletions lib/llm/src/kv_router/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,26 +315,6 @@ impl<'de> Deserialize<'de> for ExternalSequenceBlockHash {
}
}

// -------
// Distributed router
// -------
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerKvQueryRequest {
/// The worker ID of the worker to query.
/// TODO: let request query for `n` events.
/// For now, assume that every request is
/// a request to dump the whole event buffer
/// in the worker local KvIndexer
pub worker_id: WorkerId,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct WorkerKvQueryResponse {
pub worker_id: WorkerId,
/// The events from the worker local KvIndexer.
pub events: Vec<KvCacheEvent>,
}

// ------
// Tests
// ------
Expand Down
Loading
Loading