Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a242346
checkpt before branch switch
karen-sy Nov 13, 2025
1771f85
checkpt before buffers
karen-sy Nov 20, 2025
a415a1a
checkpoint before testing localkvindexer
karen-sy Nov 20, 2025
6adda0d
undo accidental toml change
karen-sy Nov 21, 2025
ad485f0
fix Arc patterns, patch unit test races
karen-sy Nov 24, 2025
76a2088
Merge remote-tracking branch 'origin/main' into karenc/distributed_bl…
karen-sy Nov 24, 2025
54eca97
format + fix clippy warnings
karen-sy Nov 24, 2025
6e58c2d
checkpoint before branch switch
karen-sy Nov 25, 2025
2608be0
draft of components with hacky raw nats client
karen-sy Nov 26, 2025
43855ae
small comments
karen-sy Nov 26, 2025
b35781a
up
karen-sy Nov 26, 2025
4227941
merge main
karen-sy Dec 1, 2025
eaf72b8
integration tests with mock workers
karen-sy Dec 2, 2025
c077d36
docstrings, formatting
karen-sy Dec 2, 2025
42a5a3e
helpers for indexing into buffers
karen-sy Dec 2, 2025
02ed5b9
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
e439489
merge main for ci fix
karen-sy Dec 2, 2025
44628af
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
ded0b08
move shared DRT creator to distributed.rs
karen-sy Dec 2, 2025
6ee2921
expose publisher feature via frontend/router flag which sets env var
karen-sy Dec 3, 2025
05bd78b
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
e806343
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
777ee1a
revert os flag thing
karen-sy Dec 4, 2025
df28d3f
mdc registration for enable localkvindexer + related workerqueryclien…
karen-sy Dec 5, 2025
54c2efb
nit
karen-sy Dec 5, 2025
b23a84a
fix nits
karen-sy Dec 5, 2025
fe3b56a
use routerevents, move test
karen-sy Dec 5, 2025
0dad7c5
merge main
karen-sy Dec 5, 2025
37fee4b
up
karen-sy Dec 5, 2025
43ebf91
oops didnt include args.py diff
karen-sy Dec 5, 2025
3b60d0f
nit
karen-sy Dec 8, 2025
4b00804
quick checkpoint before switching branches
karen-sy Dec 9, 2025
cb7568c
ranged workerevent query
karen-sy Dec 10, 2025
c76c74a
recovery for startup and gaps
karen-sy Dec 11, 2025
6a5b2dd
Merge remote-tracking branch 'origin/main' into karenc/mdc_from_frontend
karen-sy Dec 11, 2025
69135ad
add back integration guard
karen-sy Dec 11, 2025
f2be88e
clippy
karen-sy Dec 11, 2025
3fda0f3
Merge branch 'main' of github.com:ai-dynamo/dynamo into karenc/distri…
karen-sy Dec 11, 2025
582d836
Merge branch 'karenc/mdc_from_frontend' into karenc/distributed_blocks
karen-sy Dec 11, 2025
6b5eeb1
clone mdc rx, os env args, test header
karen-sy Dec 11, 2025
afb519c
Merge branch 'main' into karenc/distributed_blocks
PeaBrane Dec 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
draft of components with hacky raw nats client
  • Loading branch information
karen-sy committed Nov 26, 2025
commit 2608be07b5eda035f3c3410ef25c99633f86cc58
33 changes: 32 additions & 1 deletion lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ use crate::{
compute_block_hash_for_seq, compute_seq_hash_for_block,
},
protocols::{
LocalBlockHash, RouterRequest, RouterResponse, WorkerSelectionResult, WorkerWithDpRank,
LocalBlockHash, RouterRequest, RouterResponse, WorkerId, WorkerSelectionResult,
WorkerWithDpRank,
},
scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest},
sequence::SequenceError,
Expand Down Expand Up @@ -249,6 +250,8 @@ pub struct KvRouter {
cancellation_token: tokio_util::sync::CancellationToken,

client: Client,

worker_query_client: Option<worker_query::WorkerQueryClient>,
}

impl KvRouter {
Expand Down Expand Up @@ -341,6 +344,20 @@ impl KvRouter {
.await?;
}

// Initialize worker query client by creating NATS client from env (same pattern as subscriber)
let worker_query_client = if let Ok(nats_server) =
std::env::var(dynamo_runtime::config::environment_names::nats::NATS_SERVER)
{
let client_options = dynamo_runtime::transports::nats::Client::builder()
.server(&nats_server)
.build()?;
let nats_client = client_options.connect().await?;
let namespace_name = component.namespace().name();
Some(worker_query::WorkerQueryClient::new(nats_client, namespace_name))
} else {
None
};

tracing::info!("KV Routing initialized");
Ok(Self {
indexer,
Expand All @@ -349,6 +366,7 @@ impl KvRouter {
kv_router_config,
cancellation_token,
client,
worker_query_client,
})
}

Expand Down Expand Up @@ -481,6 +499,19 @@ impl KvRouter {
pub async fn dump_events(&self) -> Result<Vec<RouterEvent>, KvRouterError> {
self.indexer.dump_events().await
}

/// Query a specific worker's local KV indexer for its buffered events
pub async fn query_worker_local_kv(
&self,
worker_id: WorkerId,
) -> Result<protocols::WorkerKvQueryResponse> {
let query_client = self
.worker_query_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Worker query client not available (NATS required)"))?;

query_client.query_worker(worker_id).await
}
}

// NOTE: KVRouter works like a PushRouter,
Expand Down
211 changes: 200 additions & 11 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,14 @@ impl KvEventPublisher {
let local_indexer_query_handle = local_indexer.as_ref().map(|local_indexer_ref| {
let component = component.clone();
let local_indexer = local_indexer_ref.clone();
component.drt().runtime().secondary().spawn({
async move {
start_worker_kv_query_service(
component,
worker_id,
local_indexer,
).await
}
})

component.drt().runtime().secondary().spawn(
start_worker_kv_query_service(
component,
worker_id,
local_indexer,
)
)
});

let stream_name = Slug::slugify(&format!("{}.{}", component.subject(), KV_EVENT_SUBJECT))
Expand Down Expand Up @@ -294,14 +293,15 @@ async fn start_worker_kv_query_service(

// Create NATS subscriber on a subject specific to worker's id
let subject = format!("{}.{}", WORKER_KV_INDEXER_QUERY_SUBJECT, worker_id);
let full_subject = format!("namespace.{}.{}", component.namespace().name(), subject);
let mut subscriber = match component.namespace().subscribe(&subject).await {
Ok(sub) => sub,
Err(e) => {
tracing::error!("Failed to subscribe to {}: {}", subject, e);
return; // No ? because function doesn't return Result
}
};
tracing::info!("Query service on worker {} listening on {}", worker_id, subject);
tracing::info!("Query service on worker {} listening on NATS subject: {}", worker_id, full_subject);

// Receive query request from router, retrieve event(s) from LocalKvIndexer, return response
// TODO: currently just dumps all events from LocalKvIndexer; need to implement
Expand Down Expand Up @@ -1147,6 +1147,7 @@ mod tests_startup_helpers {
use super::*;
use crate::kv_router::indexer::KvIndexerInterface;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};
use crate::kv_router::KvIndexer;
use async_trait;
use bytes::Bytes;
use std::sync::{Arc, Mutex};
Expand Down Expand Up @@ -1617,12 +1618,200 @@ mod tests_startup_helpers {
token.cancel();
let _ = listener_handle.await;
}

//--------------------------------------------------------------------
// Test distributed recovery: Router queries worker's LocalKvIndexer after outage
//--------------------------------------------------------------------
#[tokio::test]
async fn test_distributed_kvindexer_recovery_from_outage() {
let worker_1_id = 1u64;
let block_size = 4u32;
let token = CancellationToken::new();

// === SETUP: Worker Components ===
let (worker_component, worker_published) = MockComponent::new();
let local_indexer_1 = Arc::new(LocalKvIndexer::new(
token.clone(),
block_size,
Arc::new(KvIndexerMetrics::new_unregistered()),
100, // buffer size
));

let (worker_tx, worker_rx) = mpsc::unbounded_channel::<KvCacheEvent>();

// Start worker's event processor
tokio::spawn(start_event_processor(
worker_component,
worker_1_id,
token.clone(),
worker_rx,
Some(local_indexer_1.clone()),
));

// === SETUP: Router Components ===
let router_indexer = Arc::new(KvIndexer::new(
token.clone(),
block_size,
Arc::new(KvIndexerMetrics::new_unregistered()),
));

// === STEP 1: Normal Operation ===
let event_1 = KvCacheEvent {
event_id: 1,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
blocks: vec![
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100),
tokens_hash: LocalBlockHash(200),
},
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(101),
tokens_hash: LocalBlockHash(201),
},
],
}),
dp_rank: 0,
};

worker_tx.send(event_1.clone()).unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Simulate JetStream: forward worker's published event to router
{
let published = worker_published.lock().unwrap();
assert_eq!(published.len(), 1, "Worker should have published 1 event");
let (subject, bytes) = &published[0];
assert_eq!(subject, QUEUE_NAME);

let router_event: RouterEvent = rmp_serde::from_slice(bytes).unwrap();
router_indexer
.event_sender()
.send(router_event)
.await
.unwrap();
}

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// assert: Router's indexer has event
let get_workers_tx = router_indexer.get_workers_sender();
let mut router_has_worker = false;
for _ in 0..20 {
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
get_workers_tx
.send(crate::kv_router::indexer::GetWorkersRequest { resp: resp_tx })
.await
.unwrap();
let workers: Vec<u64> = resp_rx.await.unwrap();
if workers.contains(&worker_1_id) {
router_has_worker = true;
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
}
assert!(
router_has_worker,
"Router should see worker 1 after normal operation"
);

// assert: Worker's local indexer buffered event
let buffered = local_indexer_1.get_all_buffered_events();
assert_eq!(buffered.len(), 1, "Local indexer should buffer 1 event");

// === STEP 2 & 3: Simulate Outage - Stop forwarding to router ===
let event_2 = KvCacheEvent {
event_id: 2,
data: KvCacheEventData::Stored(KvCacheStoreData {
parent_hash: None,
blocks: vec![
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(100), // Shared prefix
tokens_hash: LocalBlockHash(200),
},
KvCacheStoredBlockData {
block_hash: ExternalSequenceBlockHash(102), // New block
tokens_hash: LocalBlockHash(202),
},
],
}),
dp_rank: 0,
};

worker_tx.send(event_2.clone()).unwrap(); // send to worker but not to router
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// assert: Worker published event_2 to "NATS" (MockComponent)
{
let published = worker_published.lock().unwrap();
assert_eq!(
published.len(),
2,
"Worker should have published 2 events total"
);
}

// assert: Worker's local indexer has both events
let buffered = local_indexer_1.get_all_buffered_events();
assert_eq!(
buffered.len(),
2,
"Local indexer should have both events during outage"
);

// assert: Router DOESN'T have event_2
let block_hashes_2 = vec![LocalBlockHash(200), LocalBlockHash(202)];
let overlap = router_indexer
.find_matches(block_hashes_2.clone())
.await
.unwrap();
let router_overlap = overlap
.scores
.get(&crate::kv_router::protocols::WorkerWithDpRank::from_worker_id(
worker_1_id,
))
.copied()
.unwrap_or(0);
assert_eq!(
router_overlap, 1,
"Router should only see 1 shared block (not the new block from event_2)"
);

// === STEP 4 & 5: Recovery - Apply buffered events to router ===
// This simulates: router.query_worker_local_kv(worker_1_id)
// followed by applying the returned events
for (worker_id, event) in buffered {
let router_event = RouterEvent::new(worker_id, event);
router_indexer
.event_sender()
.send(router_event)
.await
.unwrap();
}

tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// assert: Router now has complete state
let overlap = router_indexer.find_matches(block_hashes_2).await.unwrap();
let router_overlap_after = overlap
.scores
.get(&crate::kv_router::protocols::WorkerWithDpRank::from_worker_id(
worker_1_id,
))
.copied()
.unwrap_or(0);
assert_eq!(
router_overlap_after, 2,
"Router should now see both blocks after recovery"
);

token.cancel();
}
}

#[cfg(test)]
mod tests_local_indexer_query {
use super::*;
use crate::kv_router::indexer::KvIndexerInterface;
use crate::kv_router::protocols::{ExternalSequenceBlockHash, LocalBlockHash};

#[tokio::test]
Expand Down
Loading