Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
a242346
checkpt before branch switch
karen-sy Nov 13, 2025
1771f85
checkpt before buffers
karen-sy Nov 20, 2025
a415a1a
checkpoint before testing localkvindexer
karen-sy Nov 20, 2025
6adda0d
undo accidental toml change
karen-sy Nov 21, 2025
ad485f0
fix Arc patterns, patch unit test races
karen-sy Nov 24, 2025
76a2088
Merge remote-tracking branch 'origin/main' into karenc/distributed_bl…
karen-sy Nov 24, 2025
54eca97
format + fix clippy warnings
karen-sy Nov 24, 2025
6e58c2d
checkpoint before branch switch
karen-sy Nov 25, 2025
2608be0
draft of components with hacky raw nats client
karen-sy Nov 26, 2025
43855ae
small comments
karen-sy Nov 26, 2025
b35781a
up
karen-sy Nov 26, 2025
4227941
merge main
karen-sy Dec 1, 2025
eaf72b8
integration tests with mock workers
karen-sy Dec 2, 2025
c077d36
docstrings, formatting
karen-sy Dec 2, 2025
42a5a3e
helpers for indexing into buffers
karen-sy Dec 2, 2025
02ed5b9
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
e439489
merge main for ci fix
karen-sy Dec 2, 2025
44628af
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 2, 2025
ded0b08
move shared DRT creator to distributed.rs
karen-sy Dec 2, 2025
6ee2921
expose publisher feature via frontend/router flag which sets env var
karen-sy Dec 3, 2025
05bd78b
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
e806343
Merge remote-tracking branch 'origin' into karenc/distributed_blocks
karen-sy Dec 3, 2025
777ee1a
revert os flag thing
karen-sy Dec 4, 2025
df28d3f
mdc registration for enable localkvindexer + related workerqueryclien…
karen-sy Dec 5, 2025
54c2efb
nit
karen-sy Dec 5, 2025
b23a84a
fix nits
karen-sy Dec 5, 2025
fe3b56a
use routerevents, move test
karen-sy Dec 5, 2025
0dad7c5
merge main
karen-sy Dec 5, 2025
37fee4b
up
karen-sy Dec 5, 2025
43ebf91
oops didnt include args.py diff
karen-sy Dec 5, 2025
3b60d0f
nit
karen-sy Dec 8, 2025
4b00804
quick checkpoint before switching branches
karen-sy Dec 9, 2025
cb7568c
ranged workerevent query
karen-sy Dec 10, 2025
c76c74a
recovery for startup and gaps
karen-sy Dec 11, 2025
6a5b2dd
Merge remote-tracking branch 'origin/main' into karenc/mdc_from_frontend
karen-sy Dec 11, 2025
69135ad
add back integration guard
karen-sy Dec 11, 2025
f2be88e
clippy
karen-sy Dec 11, 2025
3fda0f3
Merge branch 'main' of github.com:ai-dynamo/dynamo into karenc/distri…
karen-sy Dec 11, 2025
582d836
Merge branch 'karenc/mdc_from_frontend' into karenc/distributed_blocks
karen-sy Dec 11, 2025
6b5eeb1
clone mdc rx, os env args, test header
karen-sy Dec 11, 2025
afb519c
Merge branch 'main' into karenc/distributed_blocks
PeaBrane Dec 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions components/src/dynamo/mocker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def create_temp_engine_args_file(args) -> Path:
else None,
"is_prefill": getattr(args, "is_prefill_worker", None),
"is_decode": getattr(args, "is_decode_worker", None),
"enable_local_indexer": getattr(args, "enable_local_indexer", None),
}

# Remove None values to only include explicitly set arguments
Expand Down Expand Up @@ -284,6 +285,12 @@ def parse_args():
default=False,
help="Mark this as a decode worker which does not publish KV events and skips prefill cost estimation (default: False)",
)
parser.add_argument(
"--enable-local-indexer",
action="store_true",
default=False,
help="Enable worker-local KV indexer for tracking this worker's own KV cache state (default: False)",
)
parser.add_argument(
"--store-kv",
type=str,
Expand Down
7 changes: 7 additions & 0 deletions components/src/dynamo/vllm/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class Config:
custom_jinja_template: Optional[str] = None
store_kv: str
request_plane: str
enable_local_indexer: bool = False

# mirror vLLM
model: str
Expand Down Expand Up @@ -204,6 +205,11 @@ def parse_args() -> Config:
default=os.environ.get("DYN_REQUEST_PLANE", "nats"),
help="Determines how requests are distributed from routers to workers. 'tcp' is fastest [nats|http|tcp]",
)
parser.add_argument(
"--enable-local-indexer",
action="store_true",
help="Enable worker-local KV indexer for tracking this worker's own KV cache state.",
)
parser.add_argument(
"--use-vllm-tokenizer",
action="store_true",
Expand Down Expand Up @@ -312,6 +318,7 @@ def parse_args() -> Config:
config.mm_prompt_template = args.mm_prompt_template
config.store_kv = args.store_kv
config.request_plane = args.request_plane
config.enable_local_indexer = args.enable_local_indexer
config.use_vllm_tokenizer = args.use_vllm_tokenizer

# Validate custom Jinja template file exists if provided
Expand Down
2 changes: 2 additions & 0 deletions components/src/dynamo/vllm/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def setup_kv_event_publisher(
worker_id=generate_endpoint.connection_id(),
kv_block_size=vllm_config.cache_config.block_size,
zmq_endpoint=zmq_endpoint,
enable_local_indexer=config.enable_local_indexer,
)
kv_publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
kv_publishers.append(kv_publisher)
Expand Down Expand Up @@ -336,6 +337,7 @@ async def register_vllm_model(
runtime_config.total_kv_blocks = runtime_values["num_gpu_blocks"]
runtime_config.max_num_seqs = runtime_values["max_num_seqs"]
runtime_config.max_num_batched_tokens = runtime_values["max_num_batched_tokens"]
runtime_config.enable_local_indexer = config.enable_local_indexer

# Add tool/reasoning parsers for decode models
if model_type != ModelType.Prefill:
Expand Down
15 changes: 11 additions & 4 deletions lib/bindings/python/rust/llm/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use rs::traits::events::EventSubscriber;
use tracing;

use llm_rs::kv_router::protocols::*;
use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks};
use llm_rs::kv_router::publisher::{KvEventSourceConfig, create_stored_blocks, start_zmq_listener};
use llm_rs::protocols::common::{OutputOptions, SamplingOptions, StopConditions};

#[pyfunction]
Expand Down Expand Up @@ -106,6 +106,9 @@ pub struct ZmqKvEventPublisherConfig {
pub zmq_endpoint: String,
#[pyo3(get, set)]
pub zmq_topic: String,
#[pyo3(get, set)]
pub enable_local_indexer: bool, // whether the underlying KvEventPublisher publishes to
// both global and worker-local KvIndexers
}

#[pymethods]
Expand All @@ -115,19 +118,22 @@ impl ZmqKvEventPublisherConfig {
worker_id,
kv_block_size,
zmq_endpoint = "tcp://127.0.0.1:5557".to_string(),
zmq_topic = "".to_string()
zmq_topic = "".to_string(),
enable_local_indexer = false
))]
pub fn new(
worker_id: WorkerId,
kv_block_size: usize,
zmq_endpoint: String,
zmq_topic: String,
enable_local_indexer: bool,
) -> Self {
Self {
worker_id,
kv_block_size,
zmq_endpoint,
zmq_topic,
enable_local_indexer,
}
}
}
Expand All @@ -141,13 +147,14 @@ pub(crate) struct ZmqKvEventPublisher {
impl ZmqKvEventPublisher {
#[new]
fn new(component: Component, config: ZmqKvEventPublisherConfig) -> PyResult<Self> {
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new(
let inner = llm_rs::kv_router::publisher::KvEventPublisher::new_with_local_indexer(
component.inner,
config.kv_block_size as u32,
Some(KvEventSourceConfig::Zmq {
endpoint: config.zmq_endpoint,
topic: config.zmq_topic,
}),
config.enable_local_indexer,
)
.map_err(to_pyerr)?;
Ok(Self { inner })
Expand Down Expand Up @@ -179,7 +186,7 @@ impl ZmqKvEventListener {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<KvCacheEvent>();
let shutdown_token = tokio_util::sync::CancellationToken::new();

tokio::spawn(llm_rs::kv_router::publisher::start_zmq_listener(
tokio::spawn(start_zmq_listener(
zmq_endpoint,
zmq_topic,
tx,
Expand Down
10 changes: 10 additions & 0 deletions lib/bindings/python/rust/llm/local_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ impl ModelRuntimeConfig {
self.inner.data_parallel_size = data_parallel_size;
}

#[setter]
fn set_enable_local_indexer(&mut self, enable_local_indexer: bool) {
self.inner.enable_local_indexer = enable_local_indexer;
}

fn set_engine_specific(&mut self, key: &str, value: String) -> PyResult<()> {
let value: serde_json::Value = serde_json::from_str(&value).map_err(to_pyerr)?;
self.inner
Expand Down Expand Up @@ -103,6 +108,11 @@ impl ModelRuntimeConfig {
self.inner.reasoning_parser.clone()
}

#[getter]
fn enable_local_indexer(&self) -> bool {
self.inner.enable_local_indexer
}

#[getter]
fn runtime_data(&self, py: Python<'_>) -> PyResult<PyObject> {
let dict = PyDict::new(py);
Expand Down
5 changes: 4 additions & 1 deletion lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ class ModelRuntimeConfig:
max_num_batched_tokens: int | None
tool_call_parser: str | None
reasoning_parser: str | None
enable_local_indexer: bool
runtime_data: dict[str, Any]
tensor_model_config: Any | None

Expand Down Expand Up @@ -843,7 +844,8 @@ class ZmqKvEventPublisherConfig:
worker_id: int,
kv_block_size: int,
zmq_endpoint: str = "tcp://127.0.0.1:5557",
zmq_topic: str = ""
zmq_topic: str = "",
enable_local_indexer: bool = False
) -> None:
"""
Configuration for the ZmqKvEventPublisher.
Expand All @@ -852,6 +854,7 @@ class ZmqKvEventPublisherConfig:
:param kv_block_size: The block size for the key-value store.
:param zmq_endpoint: The ZeroMQ endpoint. Defaults to "tcp://127.0.0.1:5557".
:param zmq_topic: The ZeroMQ topic to subscribe to. Defaults to an empty string.
:param enable_local_indexer: Whether to enable the worker-local KV indexer. Defaults to False.
"""
...

Expand Down
115 changes: 106 additions & 9 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::{
},
scheduler::{KvScheduler, KvSchedulerError, PotentialLoad, SchedulingRequest},
sequence::SequenceError,
subscriber::start_kv_router_background,
subscriber::{recover_from_all_workers, start_kv_router_background},
},
local_model::runtime_config::ModelRuntimeConfig,
model_card::ModelDeploymentCard,
Expand Down Expand Up @@ -83,6 +83,7 @@ pub const RADIX_STATE_FILE: &str = "radix-state";

// for worker-local kvindexer query
pub const WORKER_KV_INDEXER_QUERY_SUBJECT: &str = "worker_kv_indexer_query";
pub const WORKER_KV_INDEXER_BUFFER_SIZE: usize = 1024; // store 1024 most recent events in worker buffer

// for router discovery registration
pub const KV_ROUTER_COMPONENT: &str = "kv-router";
Expand Down Expand Up @@ -305,13 +306,23 @@ impl KvRouter {
endpoint: endpoint_id.name.clone(),
};
let discovery_stream = discovery
.list_and_watch(discovery_key, Some(cancellation_token.clone()))
.list_and_watch(discovery_key.clone(), Some(cancellation_token.clone()))
.await?;
let runtime_configs_rx =
watch_and_extract_field(discovery_stream, |card: ModelDeploymentCard| {
card.runtime_config
});

// Watch for local indexer states via discovery interface (separate stream needed
// because streams are consumed by watch_and_extract_field)
let discovery_stream_local_indexer = discovery
.list_and_watch(discovery_key, Some(cancellation_token.clone()))
.await?;
let local_indexer_rx = watch_and_extract_field(
discovery_stream_local_indexer,
|card: ModelDeploymentCard| card.runtime_config.enable_local_indexer,
);

let indexer = if kv_router_config.overlap_score_weight == 0.0 {
// When overlap_score_weight is zero, we don't need to track prefixes
Indexer::None
Expand Down Expand Up @@ -349,6 +360,12 @@ impl KvRouter {
)
.await?;

// Initialize worker query client using namespace abstraction
// (created before background task so we can use it for startup recovery)
let worker_query_client =
worker_query::WorkerQueryClient::new(component.clone(), local_indexer_rx);
tracing::info!("Worker query client initialized");

// Start KV event subscriber background process (only when use_kv_events is enabled)
if kv_router_config.use_kv_events
&& let Indexer::KvIndexer(ref kv_indexer) = indexer
Expand All @@ -369,11 +386,48 @@ impl KvRouter {
kv_router_config.router_reset_states,
)
.await?;
}

// Initialize worker query client using the namespace abstraction
// NATS client is managed by DRT and accessed through namespace.drt()
let worker_query_client = Some(WorkerQueryClient::new(component.namespace().clone()));
// Perform startup recovery from workers with local indexers
// This catches up on any events missed while the router was offline
let last_event_ids = kv_indexer
.get_last_received_event_ids()
.await
.unwrap_or_default();
let instances = client.instance_source.as_ref().borrow().clone();
let worker_ids: Vec<WorkerId> = instances.iter().map(|i| i.instance_id).collect();

if !worker_ids.is_empty() {
tracing::info!(
worker_count = worker_ids.len(),
"Starting recovery from workers with local indexers"
);

// NOTE: recover_from_all_workers() is a no-op if
// Worker with worker_id is not associated with a
// local indexer instance.
let recovered = recover_from_all_workers(
&worker_query_client,
&last_event_ids,
&worker_ids,
&kv_indexer.event_sender(),
)
.await;

if recovered > 0 {
tracing::info!(
recovered_events = recovered,
"KV Router startup: Recovered {} KV events from workers {:?}",
recovered,
worker_ids
);
} else {
tracing::info!(
"KV Router startup: No KV events recovered from workers {:?}",
worker_ids
);
}
}
}

tracing::info!("KV Routing initialized");
Ok(Self {
Expand All @@ -383,7 +437,7 @@ impl KvRouter {
kv_router_config,
cancellation_token,
client,
worker_query_client,
worker_query_client: Some(worker_query_client),
})
}

Expand Down Expand Up @@ -517,17 +571,60 @@ impl KvRouter {
self.indexer.dump_events().await
}

/// Query a specific worker's local KV indexer for its buffered events
/// Query a specific worker's local KV indexer for its events
/// (See docstring for `WorkerQueryClient.query_worker()`)
pub async fn query_worker_local_kv(
&self,
worker_id: WorkerId,
start_event_id: Option<u64>,
end_event_id: Option<u64>,
) -> Result<WorkerKvQueryResponse> {
let query_client = self
.worker_query_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Worker query client not available (NATS required)"))?;

query_client.query_worker(worker_id).await
query_client
.query_worker(worker_id, start_event_id, end_event_id)
.await
}

/// Recover missed KV events from a specific worker.
///
/// Queries the worker's local KV indexer for events starting from
/// `start_event_id` and applies them to the router's indexer.
///
/// # Arguments
///
/// * `worker_id` - The worker to recover from
/// * `start_event_id` - First event ID to fetch (inclusive), or None to start from beginning
/// * `end_event_id` - Last event ID to fetch (inclusive), or None for all
pub async fn recover_from_worker(
&self,
worker_id: WorkerId,
start_event_id: Option<u64>,
end_event_id: Option<u64>,
) -> Result<usize> {
let query_client = self
.worker_query_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("Worker query client not available"))?;

let event_tx = match &self.indexer {
Indexer::KvIndexer(kv_indexer) => kv_indexer.event_sender(),
Indexer::None => {
anyhow::bail!("Cannot recover: indexer is disabled (--overlap_score_weight is 0)")
}
};

subscriber::recover_from_worker(
query_client,
worker_id,
start_event_id,
end_event_id,
&event_tx,
)
.await
}
}

Expand Down
Loading
Loading