Skip to content

Conversation

@PeaBrane
Copy link
Contributor

@PeaBrane PeaBrane commented Dec 12, 2025

.. no jetstream needed. A follow-up on #4519

Dual Transport Support

  • Router dynamically chooses between JetStream (durable) and NATS Core (pub/sub) based on whether all workers have enable_local_indexer enabled
  • When local indexer is enabled, workers publish events via NATS Core instead of JetStream, eliminating JetStream durability overhead since recovery is handled by the local indexer

Gap Detection & Recovery

  • Per-worker event ID tracking: Router tracks last_event_ids per worker to detect non-monotonic event sequences
  • Automatic gap recovery: When event_id > last_id + 1 is detected, router queries worker's local indexer for missing events in range [last_id + 1, event_id - 1] before processing current event
  • Graceful failure handling: If recovery fails, current event is still applied (better partial data than dropping events)

Worker Discovery Integration

  • On worker Added: Router dumps worker's full local indexer state into router indexer (catches up on all existing KV state)
  • On worker Removed: Router removes worker from indexer via remove_worker_tx

TODOs (future tests)

  • Gap detection and router warm restart test: Add E2E test to verify gap detection works correctly when event IDs are non-monotonic, and that router can recover complete state from worker local indexers when a new router instance starts up (warm restart scenario)

Summary by CodeRabbit

Release Notes

  • New Features

    • Added support for NATS Core event routing mode as an alternative to JetStream
    • Runtime configuration now dynamically selects the appropriate event subscription mode
  • Improvements

    • Enhanced worker event recovery with gap detection and on-demand recovery
    • Improved error handling for event subscription initialization failures
    • More detailed event query responses with structured result types

✏️ Tip: You can customize this high-level summary in your review settings.

Signed-off-by: PeaBrane <[email protected]>
@PeaBrane PeaBrane requested review from a team as code owners December 12, 2025 08:11
@PeaBrane PeaBrane requested a review from karen-sy December 12, 2025 08:11
@github-actions github-actions bot added the feat label Dec 12, 2025
Signed-off-by: PeaBrane <[email protected]>
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 12, 2025

Walkthrough

This PR introduces dual subscription modes for KV event routing by adding NATS Core support alongside JetStream. The changes refactor the KV event API, rename the subject from "kv_events" to "kv-events", restructure query response types as enums, and update publishing and subscription logic to conditionally route through either mode based on runtime configuration.

Changes

Cohort / File(s) Change Summary
KV Router Core
lib/llm/src/kv_router.rs
Replaced startup path with background task that selects between NATS Core and JetStream based on runtime configs; added start_kv_router_background_nats_core public function; renamed KV_EVENT_SUBJECT constant from "kv_events" to "kv-events".
KV Indexer API & Types
lib/llm/src/kv_router/indexer.rs
Exposed worker_id and event fields as public on RouterEvent; changed WorkerKvQueryRequest fields to start_event_id and end_event_id as Option<u64>; transformed WorkerKvQueryResponse from struct to enum with variants Events, TreeDump, TooNew, InvalidRange; made KvIndexer derive Clone with _ref_count: Arc<()> field; updated get_events_in_id_range to return enum; refactored buffer vs. tree dump routing logic with boundary checks.
KV Event Publisher
lib/llm/src/kv_router/publisher.rs
Reworked publishing to support enable_local_indexer flag: use NATS Core when enabled (component-based publishing) or JetStream when disabled; updated subject to KV_EVENT_SUBJECT; simplified WorkerKvQuery service to call get_events_in_id_range directly; updated test assertions for subject changes.
KV Event Subscriber
lib/llm/src/kv_router/subscriber.rs
Added new public function start_kv_router_background_nats_core for NATS Core subscriptions with per-worker gap detection and on-demand recovery; updated recover_from_worker to handle enum variants (Events, TreeDump, TooNew, InvalidRange) with refined error handling and logging.
Worker Query Utilities
lib/llm/src/kv_router/worker_query.rs
Updated error message and log formatting to use placeholder-style strings instead of interpolated variables for worker_id and subject construction.
E2E Test Updates
tests/router/test_router_e2e_with_mockers.py
Added enable_local_indexer support to mocker command-line arguments; parameterized test_router_decisions with use_nats_core (False/True) to test both JetStream and NATS Core modes; updated endpoint derivation to use runtime namespace resolution; added mode logging.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Areas requiring extra attention:

  • Public API changes: The WorkerKvQueryResponse enum transformation and field exposure on RouterEvent require verification that all call sites correctly match enum variants and handle the new response types.
  • Clone semantics and Arc<()>: The addition of _ref_count field and Clone derive on KvIndexer changes drop/shutdown behavior; verify correct reference counting and cancellation logic across cloned instances.
  • Dual subscription paths: The new NATS Core path (start_kv_router_background_nats_core) introduces complex per-worker gap detection and recovery logic; ensure consistency between JetStream and NATS Core flows.
  • Buffer vs. tree dump routing: The reworked logic in LocalKvIndexer.get_events_in_id_range with boundary checks, clamping, and tree dump fallbacks needs careful validation against edge cases (None start_id, ranges outside buffer, end_id beyond buffer).
  • Test parameterization: Verify that both use_nats_core=False and use_nats_core=True test paths exercise the correct code branches and that endpoint derivation works correctly in both modes.

Poem

🐰 Two paths now merge in the event flow,
Core and Stream together grow,
Enums dance where structs once stood,
Gaps we find and mend with good,
KV router hops to new delight! ✨

Pre-merge checks

✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main feature—decentralized router support with NATS Core—which is the primary change throughout the changeset.
Description check ✅ Passed The PR description covers all required template sections (Overview, Details, Related Issues) with clear explanations of dual transport support, gap detection, and worker discovery integration, addressing the main architectural changes.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
tests/router/test_router_e2e_with_mockers.py (1)

42-89: Port allocation ignores use_nats_core parametrization ⇒ collisions under parallel runs.
get_unique_ports() removes the parametrization suffix (Line 65) and only offsets on store/request-plane. Now that test_router_decisions is parameterized, both variants can pick the same port(s) in the same session.

A contained fix is to add a mode_offset derived from request.node.callspec.params.get("use_nats_core"):

 def get_unique_ports(
@@
     # Get test name without parametrization suffix
     test_name = request.node.name.split("[")[0]
@@
     plane_offset = 0 if request_plane == "nats" else 25
+
+    # Additional offset for parametrized modes (e.g., JetStream vs NATS Core)
+    callspec = getattr(request.node, "callspec", None)
+    params = getattr(callspec, "params", {}) if callspec is not None else {}
+    mode_offset = 75 if params.get("use_nats_core") else 0
@@
     ports = [
-        BASE_PORT + base_offset + store_offset + plane_offset + i
+        BASE_PORT + base_offset + store_offset + plane_offset + mode_offset + i
         for i in range(num_ports)
     ]

Also applies to: 560-608

🧹 Nitpick comments (4)
lib/llm/src/kv_router/worker_query.rs (2)

58-70: Subject composition looks consistent with worker subscribe behavior; consider structured fields for logs.
This should line up with the worker’s start_worker_kv_query_service subscription naming (assuming Component::subscribe prefixes component.subject()). For readability/observability, you may want to switch the debug log to structured fields (tracing::debug!(worker_id, subject = %subject, ...)) rather than interpolating into the message.


57-62: has_local_indexer() pre-check may be racy during worker add—ensure callers tolerate false negatives.
Right now this hard-bails before sending any request. If runtime-config propagation can lag discovery, consider either retrying at the call site or turning this into a soft check (attempt request, handle “no responders” / explicit “no local indexer” response).

tests/router/test_router_e2e_with_mockers.py (1)

173-175: CLI flag wiring is fine; consider explicit disable flag if default may change.
Right now only the enabling flag is emitted. If --enable-local-indexer ever flips to default-on, you’ll want --no-enable-local-indexer symmetry (similar to other flags).

lib/llm/src/kv_router/indexer.rs (1)

1401-1408: Potential race in clone-aware Drop.

The Arc::strong_count(&self._ref_count) == 1 check has a theoretical race: if two clones are dropped concurrently on different threads, both may observe count > 1 and neither will call shutdown().

In practice, this is unlikely if the indexer is used carefully, but for robustness consider alternatives:

  • Use Arc::try_unwrap() pattern
  • Store the background task's JoinHandle and use explicit shutdown
  • Accept the race if concurrent drops are architecturally prevented
 impl Drop for KvIndexer {
     fn drop(&mut self) {
-        // Only cancel the token if we're the last reference.
-        // This allows clones to be dropped without killing the background task.
-        if Arc::strong_count(&self._ref_count) == 1 {
-            self.shutdown();
-        }
+        // Cancel the token on last reference.
+        // Note: strong_count check has a theoretical race with concurrent drops,
+        // but is acceptable given typical single-threaded usage patterns.
+        if Arc::strong_count(&self._ref_count) == 1 {
+            self.shutdown();
+        }
     }
 }

If concurrent drops are possible, consider documenting this limitation or switching to an explicit shutdown model.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 32bfc0f and cff41b8.

📒 Files selected for processing (6)
  • lib/llm/src/kv_router.rs (3 hunks)
  • lib/llm/src/kv_router/indexer.rs (14 hunks)
  • lib/llm/src/kv_router/publisher.rs (8 hunks)
  • lib/llm/src/kv_router/subscriber.rs (6 hunks)
  • lib/llm/src/kv_router/worker_query.rs (2 hunks)
  • tests/router/test_router_e2e_with_mockers.py (2 hunks)
🧰 Additional context used
🧠 Learnings (17)
📓 Common learnings
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3184
File: docs/architecture/kv_cache_routing.md:70-73
Timestamp: 2025-09-23T20:08:37.105Z
Learning: PeaBrane prefers to keep documentation diagrams simplified to avoid visual overload, even when this means sacrificing some technical precision for the sake of clarity and comprehension. They prioritize pedagogical effectiveness over exhaustive technical detail in architectural diagrams.
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2756
File: lib/llm/src/kv_router/subscriber.rs:36-44
Timestamp: 2025-08-29T10:03:48.330Z
Learning: PeaBrane prefers to keep PRs contained in scope and is willing to defer technical improvements to future PRs when the current implementation works for the immediate use case. They acknowledge technical debt but prioritize deliverability over completeness in individual PRs.
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/subscriber.rs:200-223
Timestamp: 2025-09-17T20:55:41.416Z
Learning: In the dynamo codebase, PeaBrane prefers to maintain consistency with existing etcd key parsing patterns (like splitting on '/' and parsing the last segment) rather than introducing more robust parsing approaches, even when the current approach might be brittle, to keep the codebase aligned and avoid divergent patterns.
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/indexer.rs:0-0
Timestamp: 2025-09-17T20:55:06.333Z
Learning: When PeaBrane encounters a complex implementation issue that would significantly expand PR scope (like the remove_worker_sender method in lib/llm/src/kv_router/indexer.rs that required thread-safe map updates and proper shard targeting), they prefer to remove the problematic implementation entirely rather than rush a partial fix, deferring the proper solution to a future PR.
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/publisher.rs
  • lib/llm/src/kv_router/worker_query.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-08-29T10:08:18.434Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2756
File: lib/bindings/python/rust/llm/kv.rs:401-436
Timestamp: 2025-08-29T10:08:18.434Z
Learning: In the Python KvIndexer bindings (lib/bindings/python/rust/llm/kv.rs), the hardcoded reset_states=true parameter passed to start_kv_router_background is intentional behavior, not an oversight that needs to be made configurable.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/subscriber.rs
📚 Learning: 2025-05-29T00:02:35.018Z
Learnt from: alec-flowers
Repo: ai-dynamo/dynamo PR: 1181
File: lib/llm/src/kv_router/publisher.rs:379-425
Timestamp: 2025-05-29T00:02:35.018Z
Learning: In lib/llm/src/kv_router/publisher.rs, the functions `create_stored_blocks` and `create_stored_block_from_parts` are correctly implemented and not problematic duplications of existing functionality elsewhere in the codebase.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/publisher.rs
📚 Learning: 2025-06-13T22:07:24.843Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/publisher.rs
  • lib/llm/src/kv_router/worker_query.rs
📚 Learning: 2025-09-17T01:00:50.937Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane suggested using tokio::select! arm ordering with the existing biased directive in the indexer to create a natural barrier for dump requests, ensuring KV events are drained before snapshotting. This approach leverages existing architecture (biased select) to solve race conditions with minimal code changes, which aligns with their preference for contained solutions.

Applied to files:

  • lib/llm/src/kv_router.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-05-30T06:38:09.630Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1285
File: lib/llm/src/kv_router/scoring.rs:58-63
Timestamp: 2025-05-30T06:38:09.630Z
Learning: In lib/llm/src/kv_router/scoring.rs, the user prefers to keep the panic behavior when calculating load_avg and variance with empty endpoints rather than adding guards for division by zero. They want the code to fail fast on this error condition.

Applied to files:

  • lib/llm/src/kv_router.rs
📚 Learning: 2025-09-03T19:31:32.621Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2840
File: lib/llm/src/kv_router/sequence.rs:86-88
Timestamp: 2025-09-03T19:31:32.621Z
Learning: PeaBrane chose to defer fixing the corner case where a single late-arriving request might never expire in the ActiveSequences expiry mechanism (lib/llm/src/kv_router/sequence.rs). They prefer to avoid adding a background loop for periodic cleanup at this time, accepting the technical debt to keep the current PR scope contained.

Applied to files:

  • lib/llm/src/kv_router.rs
📚 Learning: 2025-10-14T00:58:05.744Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3597
File: lib/llm/src/kv_router/indexer.rs:437-441
Timestamp: 2025-10-14T00:58:05.744Z
Learning: In lib/llm/src/kv_router/indexer.rs, when a KvCacheEventData::Cleared event is received, the system intentionally clears all dp_ranks for the given worker_id by calling clear_all_blocks(worker.worker_id). This is the desired behavior and should not be scoped to individual dp_ranks.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/publisher.rs
  • lib/llm/src/kv_router/worker_query.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-06-05T01:02:15.318Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 1392
File: lib/llm/src/kv_router/scoring.rs:35-46
Timestamp: 2025-06-05T01:02:15.318Z
Learning: In lib/llm/src/kv_router/scoring.rs, PeaBrane prefers panic-based early failure over Result-based error handling for the worker_id() method to catch invalid data early during development.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/worker_query.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.

Applied to files:

  • lib/llm/src/kv_router/subscriber.rs
  • lib/llm/src/kv_router/publisher.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.

Applied to files:

  • lib/llm/src/kv_router/worker_query.rs
📚 Learning: 2025-09-17T20:55:06.333Z
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3095
File: lib/llm/src/kv_router/indexer.rs:0-0
Timestamp: 2025-09-17T20:55:06.333Z
Learning: When PeaBrane encounters a complex implementation issue that would significantly expand PR scope (like the remove_worker_sender method in lib/llm/src/kv_router/indexer.rs that required thread-safe map updates and proper shard targeting), they prefer to remove the problematic implementation entirely rather than rush a partial fix, deferring the proper solution to a future PR.

Applied to files:

  • lib/llm/src/kv_router/worker_query.rs
  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::error::RequestErrorKind::NoResponders. Use err.downcast_ref::<async_nats::error::RequestError>() and then check req_err.kind() against RequestErrorKind::NoResponders to handle this error properly.

Applied to files:

  • lib/llm/src/kv_router/worker_query.rs
📚 Learning: 2025-06-13T22:32:05.022Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:32:05.022Z
Learning: In async-nats, the "no responders" error is represented as async_nats::client::RequestErrorKind::NoResponders, not async_nats::Error::NoResponders. Use err.downcast_ref::<async_nats::client::RequestError>() and then check request_err.kind() against RequestErrorKind::NoResponders.

Applied to files:

  • lib/llm/src/kv_router/worker_query.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.

Applied to files:

  • lib/llm/src/kv_router/indexer.rs
📚 Learning: 2025-06-08T03:12:03.985Z
Learnt from: jthomson04
Repo: ai-dynamo/dynamo PR: 1429
File: lib/runtime/src/utils/leader_worker_barrier.rs:69-72
Timestamp: 2025-06-08T03:12:03.985Z
Learning: In the leader-worker barrier implementation in lib/runtime/src/utils/leader_worker_barrier.rs, the `wait_for_key_count` function correctly uses exact equality (`==`) instead of greater-than-or-equal (`>=`) because worker IDs must be unique (enforced by etcd create-only operations), ensuring exactly the expected number of workers can register.

Applied to files:

  • lib/llm/src/kv_router/indexer.rs
🧬 Code graph analysis (2)
lib/llm/src/kv_router.rs (1)
lib/llm/src/kv_router/subscriber.rs (2)
  • start_kv_router_background (406-637)
  • start_kv_router_background_nats_core (649-818)
lib/llm/src/kv_router/worker_query.rs (3)
lib/runtime/src/logging.rs (1)
  • format (848-848)
lib/llm/src/kv_router/publisher.rs (1)
  • subject (1252-1254)
lib/bindings/python/src/dynamo/_core.pyi (1)
  • component (108-112)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (16)
  • GitHub Check: vllm (amd64)
  • GitHub Check: sglang (amd64)
  • GitHub Check: operator (amd64)
  • GitHub Check: trtllm (arm64)
  • GitHub Check: operator (arm64)
  • GitHub Check: trtllm (amd64)
  • GitHub Check: vllm (arm64)
  • GitHub Check: tests (lib/bindings/python)
  • GitHub Check: clippy (lib/bindings/python)
  • GitHub Check: tests (lib/runtime/examples)
  • GitHub Check: clippy (lib/runtime/examples)
  • GitHub Check: tests (launch/dynamo-run)
  • GitHub Check: clippy (.)
  • GitHub Check: tests (.)
  • GitHub Check: clippy (launch/dynamo-run)
  • GitHub Check: Build and Test - dynamo
🔇 Additional comments (15)
lib/llm/src/kv_router/publisher.rs (3)

300-305: Publishing on KV_EVENT_SUBJECT is the right normalization.
This aligns publisher/consumer subject naming across JetStream and Core paths and matches the updated tests.


359-362: Good: query service returns enum response, not a bare Vec, enabling TreeDump/TooNew/InvalidRange.
This makes recovery semantics explicit and matches the router-side variant handling.


186-230: Both publishing paths encode RouterEvent as JSON and are compatible with subscriber deserialization. Component (NATS Core) and NatsQueue (JetStream) both use serde_json::to_vec(), matching the serde_json::from_slice() calls in the subscriber.

lib/llm/src/kv_router/subscriber.rs (2)

127-216: Variant handling in recover_from_worker is solid (TreeDump/TooNew/InvalidRange).
The early returns and explicit InvalidRange bail make the recovery behavior clearer and safer than “best effort” Vec handling.


639-818: Use subscribe_with_type<RouterEvent> to avoid manual deserialization.

The EventSubscriber trait provides a typed subscription method that handles deserialization automatically. Replace the manual serde_json::from_slice with component.subscribe_with_type::<RouterEvent>(KV_EVENT_SUBJECT).await? to eliminate manual serialization logic and reduce the risk of format mismatches. This aligns with the existing API design in dynamo_runtime.

The blocking recovery concern can be addressed in a follow-up; the current approach is acceptable given scope constraints.

⛔ Skipped due to learnings
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 3077
File: lib/llm/src/kv_router/subscriber.rs:334-336
Timestamp: 2025-09-17T01:00:50.937Z
Learning: PeaBrane identified that reordering tokio::select! arms in the indexer (moving dump_rx.recv() to be after event_rx.recv()) creates a natural barrier that ensures RouterEvents are always processed before dump requests, solving the ack-before-commit race condition. This leverages the existing biased directive and requires minimal code changes, aligning with their preference for contained solutions.
Learnt from: PeaBrane
Repo: ai-dynamo/dynamo PR: 2465
File: lib/runtime/src/utils/typed_prefix_watcher.rs:94-101
Timestamp: 2025-08-15T23:51:04.958Z
Learning: In the dynamo codebase's etcd client implementation, `PrefixWatcher` uses `#[derive(Dissolve)]` to generate a `dissolve()` method. The pattern `let (_, _watcher, mut events_rx) = prefix_watcher.dissolve();` is the standard and intended usage throughout the codebase. The `mpsc::Receiver<WatchEvent>` maintains the etcd watch stream independently, so the `Watcher` handle can be safely dropped. This pattern is used consistently in critical infrastructure modules like component/client.rs, utils/leader_worker_barrier.rs, and entrypoint/input/http.rs.
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
lib/llm/src/kv_router.rs (1)

72-72: Migration of KV event subject is complete. All Rust code references the KV_EVENT_SUBJECT constant, and configuration files/examples use the correct "kv-events" value. No hard-coded string literals found that would create compile or runtime inconsistencies.

lib/llm/src/kv_router/indexer.rs (9)

178-184: LGTM!

Making worker_id and event fields public aligns with the need for other modules (subscriber, worker_query) to access these fields directly for the decentralized router implementation.


207-216: LGTM!

The updated documentation clearly describes the semantics of start_event_id and end_event_id fields.


218-233: Well-designed enum-based response API.

The enum variants clearly distinguish the response types:

  • Events for buffer-served data with original event IDs
  • TreeDump for full reconstructions with synthetic IDs
  • TooNew and InvalidRange for explicit error states

This design enables callers to handle each case appropriately, especially for gap detection and recovery.


913-936: LGTM - Clone-aware lifecycle management.

Using Arc<()> for reference counting is an appropriate lightweight pattern to track clones and ensure only the last clone triggers shutdown. This is well-suited for sharing the indexer across async tasks while maintaining proper cleanup.


1461-1555: Logic looks correct with clear response routing.

The method properly handles all cases:

  1. Invalid range validation
  2. Tree dump for missing start_id or when range extends before buffer
  3. TooNew for requests beyond available data
  4. Buffer-served events with proper range clamping

One consideration: the binary search assumes contiguous event IDs in the buffer. If record_event accepts non-consecutive events (logged as error at line 1567-1572), the range query could return unexpected results. This appears to be an accepted degraded state, but worth noting for operational awareness.


1670-1708: LGTM - Tests properly exercise the new enum API.

The test helpers correctly handle both Events and TreeDump variants, and the assertions verify the expected response types including InvalidRange.


1710-1886: Excellent test coverage for the enum-based response API.

This comprehensive test exercises all WorkerKvQueryResponse variants:

  • Events: buffer-served queries within and at boundaries
  • TreeDump: queries extending before buffer
  • TooNew: requests beyond available data
  • InvalidRange: invalid start > end

The small buffer size (5) effectively tests the eviction/tree-dump fallback behavior.


3727-3753: LGTM - Serialization test updated for enum response.

The test correctly constructs and deserializes the WorkerKvQueryResponse::Events variant, validating the JSON round-trip for NATS communication.


3756-3799: LGTM - Gap detection test validates per-worker tracking.

The test confirms that:

  • Event ID gaps are detected per-worker (Worker B has gap 2-4)
  • last_received_event_id tracks the maximum seen per worker, enabling recovery queries

This aligns well with the PR's gap detection and recovery objectives.

Signed-off-by: PeaBrane <[email protected]>
Signed-off-by: PeaBrane <[email protected]>
Copy link
Contributor

@karen-sy karen-sy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look great (thank you for writing the docs too, I should have done that in #4519), the only thing is kv_router::publisher::test_integration_publisher_with_kvindexer::test_distributed_kvindexer_e2e_startup is now failing at L2522 so we could either fix it or remove it if you think it is outdated with the new changes

Signed-off-by: PeaBrane <[email protected]>
@PeaBrane PeaBrane merged commit ef3027b into main Dec 12, 2025
40 of 41 checks passed
@PeaBrane PeaBrane deleted the rupei/decentralized-router branch December 12, 2025 21:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants