Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
86c79ba
some prelim cleanups
PeaBrane May 30, 2025
6bee243
router can route to dp ranks
PeaBrane May 30, 2025
dab052c
make the bunny hoppy
PeaBrane May 30, 2025
be6900e
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
25e1291
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
34e5c5b
new struct combining worker_id with dp_rank, dirty commit, breaks bin…
PeaBrane May 30, 2025
2cef74c
binding works
PeaBrane May 30, 2025
10d3326
dummy c binding note
PeaBrane May 30, 2025
4483c68
add_class WorkerWithDpRank
PeaBrane May 30, 2025
263c12d
renames + comments + fmt
PeaBrane May 31, 2025
65ea6b5
allow suffix for dp_rank identification
PeaBrane Jun 3, 2025
a2ef896
WIP: fix fn dp_rank, add TODO's
alec-flowers Jun 3, 2025
e80d66c
refactor: fix bugs, kv publishing working
alec-flowers Jun 3, 2025
7a733bd
fix panicing metric thread issue
alec-flowers Jun 4, 2025
1bddc8e
remove verbose log
alec-flowers Jun 4, 2025
ee283cc
update v1 worker
alec-flowers Jun 4, 2025
183a8fe
put dp_rank in PreprocessedRequest
PeaBrane Jun 4, 2025
be7f951
new agg config
PeaBrane Jun 4, 2025
e1011d8
updated comments
PeaBrane Jun 4, 2025
5bf4fae
update v1 example
alec-flowers Jun 4, 2025
d6ded6c
final touches for it working with dp
alec-flowers Jun 4, 2025
61b94ac
Merge branch 'main' into rupei/router-general
alec-flowers Jun 4, 2025
9335efe
fix cost function trace
PeaBrane Jun 4, 2025
931b837
fmt
PeaBrane Jun 4, 2025
2a72271
Merge branch 'main' into rupei/router-general
PeaBrane Jun 4, 2025
eb7bb10
WIP document current work steps
alec-flowers Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/metrics/src/bin/mock_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// limitations under the License.

use dynamo_llm::kv_router::{
protocols::ForwardPassMetrics, scheduler::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
protocols::ForwardPassMetrics, protocols::KVHitRateEvent, KV_HIT_RATE_SUBJECT,
};
use dynamo_runtime::{
component::{service::EndpointStats, Namespace},
Expand Down
15 changes: 7 additions & 8 deletions components/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ use std::net::SocketAddr;
use std::time::Duration as StdDuration;

use dynamo_llm::kv_router::protocols::ForwardPassMetrics;
use dynamo_llm::kv_router::scheduler::Endpoint;
use dynamo_llm::kv_router::scoring::ProcessedEndpoints;
use dynamo_llm::kv_router::scoring::{Endpoint, ProcessedEndpoints};

use dynamo_runtime::{
distributed::Component, error, service::EndpointInfo, utils::Duration, Result,
Expand Down Expand Up @@ -455,31 +454,31 @@ impl PrometheusMetrics {
&self.kv_blocks_active,
config,
&worker_id,
metrics.kv_active_blocks as f64,
metrics[0].kv_active_blocks as f64,
);
self.set_worker_gauge(
&self.kv_blocks_total,
config,
&worker_id,
metrics.kv_total_blocks as f64,
metrics[0].kv_total_blocks as f64,
);
self.set_worker_gauge(
&self.requests_active,
config,
&worker_id,
metrics.request_active_slots as f64,
metrics[0].request_active_slots as f64,
);
self.set_worker_gauge(
&self.requests_total,
config,
&worker_id,
metrics.request_total_slots as f64,
metrics[0].request_total_slots as f64,
);
self.set_worker_gauge(
&self.kv_hit_rate_percent,
config,
&worker_id,
metrics.gpu_prefix_cache_hit_rate as f64,
metrics[0].gpu_prefix_cache_hit_rate as f64,
);
}

Expand Down Expand Up @@ -602,7 +601,7 @@ pub fn postprocess_metrics(
e.id().ok().map(|id| Endpoint {
name: format!("worker-{id}"),
subject: e.subject.clone(),
data: m.clone(),
data: vec![m.clone()],
})
})
.collect();
Expand Down
12 changes: 7 additions & 5 deletions components/metrics/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
//! - ISL Blocks: Cumulative count of total blocks in all KV hit rate events
//! - Overlap Blocks: Cumulative count of blocks that were already in the KV cache
use clap::Parser;
use dynamo_llm::kv_router::scheduler::KVHitRateEvent;
use dynamo_llm::kv_router::protocols::{KVHitRateEvent, WorkerId, DpRank};
use dynamo_llm::kv_router::KV_HIT_RATE_SUBJECT;
use dynamo_runtime::{
error, logging,
Expand Down Expand Up @@ -180,14 +180,15 @@ async fn app(runtime: Runtime) -> Result<()> {
tracing::debug!("Successfully subscribed to KV hit rate events");

while let Some(msg) = subscriber.next().await {
match serde_json::from_slice::<KVHitRateEvent>(&msg.payload) {
match serde_json::from_slice::<KVHitRateEvent<(WorkerId, DpRank)>>(&msg.payload) {
Ok(event) => {
// TODO: Lower to debug
let cache_hit_pct =
(event.overlap_blocks as f64 / event.isl_blocks as f64) * 100.0;
tracing::debug!(
"Received KV hit rate event: worker_id={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
event.worker_id,
"Received KV hit rate event: worker_id={}, dp_rank={}, isl_blocks={}, overlap_blocks={}, cache_hit_pct={:.2}%",
event.worker_id.0,
event.worker_id.1,
event.isl_blocks,
event.overlap_blocks,
cache_hit_pct
Expand All @@ -197,7 +198,8 @@ async fn app(runtime: Runtime) -> Result<()> {
let mut metrics = metrics_collector_clone.lock().await;
metrics.update_kv_hit_rate(
&config_clone,
event.worker_id,
// TODO: this will not take care of dp ranks
event.worker_id.0,
event.isl_blocks,
event.overlap_blocks,
);
Expand Down
4 changes: 2 additions & 2 deletions components/router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;
use clap::Parser;

use dynamo_llm::kv_router::{
protocols::WorkerSelectionResult,
protocols::{WorkerSelectionResult, WorkerId, DpRank},
scheduler::{DefaultWorkerSelector, KvSchedulerError, SchedulingRequest},
scoring::ProcessedEndpoints,
KvRouter, WorkerSelector,
Expand Down Expand Up @@ -89,7 +89,7 @@ impl WorkerSelector for CustomWorkerSelector {
workers: &ProcessedEndpoints,
request: &SchedulingRequest,
block_size: usize,
) -> Result<WorkerSelectionResult, KvSchedulerError> {
) -> Result<WorkerSelectionResult<(WorkerId, DpRank)>, KvSchedulerError> {
// customize logic here
// F12 into [DefaultWorkerSelector] to see the original logic
self.0.select_worker(workers, request, block_size)
Expand Down
51 changes: 33 additions & 18 deletions lib/llm/src/kv_router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ pub mod scoring;

use crate::{
kv_router::{
indexer::{KvIndexer, KvIndexerInterface, RouterEvent},
indexer::{KvIndexer, KvIndexerInterface},
metrics_aggregator::KvMetricsAggregator,
protocols::{LocalBlockHash, RouterRequest, RouterResponse, WorkerSelectionResult},
protocols::{
DpRank, LocalBlockHash, RouterEvent, RouterRequest, RouterResponse, WorkerId,
WorkerSelectionResult,
},
scheduler::{KvScheduler, KvSchedulerError, SchedulingRequest},
scoring::ProcessedEndpoints,
},
Expand All @@ -51,13 +54,13 @@ pub trait WorkerSelector {
workers: &ProcessedEndpoints,
request: &SchedulingRequest,
block_size: usize,
) -> Result<WorkerSelectionResult, KvSchedulerError>;
) -> Result<WorkerSelectionResult<(WorkerId, DpRank)>, KvSchedulerError>;
}

/// A KvRouter only decides which worker you should use. It doesn't send you there.
/// TODO: Rename this to indicate it only selects a worker, it does not route.
pub struct KvRouter {
indexer: KvIndexer,
indexer: KvIndexer<(WorkerId, DpRank)>,
scheduler: KvScheduler,
block_size: usize,
}
Expand Down Expand Up @@ -92,15 +95,16 @@ impl KvRouter {

tokio::spawn(async move {
while let Some(event) = kv_events_rx.next().await {
let event: RouterEvent = match serde_json::from_slice(&event.payload) {
Ok(event) => event,
Err(e) => {
tracing::warn!("Failed to deserialize RouterEvent: {:?}", e);
// Choosing warn and continue to process other events from other workers
// A bad event likely signals a problem with a worker, but potentially other workers are still healthy
continue;
}
};
let event: RouterEvent<(WorkerId, DpRank)> =
match serde_json::from_slice(&event.payload) {
Ok(event) => event,
Err(e) => {
tracing::warn!("Failed to deserialize RouterEvent: {:?}", e);
// Choosing warn and continue to process other events from other workers
// A bad event likely signals a problem with a worker, but potentially other workers are still healthy
continue;
}
};
if let Err(e) = kv_events_tx.send(event).await {
tracing::debug!("failed to send kv event to indexer; shutting down: {:?}", e);
}
Expand All @@ -115,7 +119,11 @@ impl KvRouter {
}

// [TODO] indexer needs to take 'lora_id' as parameter
pub async fn schedule(&self, token_ids: &Vec<u32>, _lora_id: u64) -> Result<i64> {
pub async fn schedule(
&self,
token_ids: &Vec<u32>,
_lora_id: u64,
) -> Result<(WorkerId, DpRank)> {
// Extracting part of the code in KvRouter::generate() for only
// the decision making part, routing is done by the caller
let isl_tokens = token_ids.len();
Expand All @@ -130,7 +138,7 @@ impl KvRouter {

/// Give these tokens, find the worker with the best match in it's KV cache.
/// Returned overlap amount is in number of blocks.
async fn find_best_match(&self, tokens: &[u32]) -> anyhow::Result<(i64, u32)> {
async fn find_best_match(&self, tokens: &[u32]) -> anyhow::Result<((WorkerId, DpRank), u32)> {
let isl_tokens = tokens.len();
let block_size = self.block_size;

Expand All @@ -157,11 +165,17 @@ impl KvRouter {
}

#[async_trait]
impl AsyncEngine<SingleIn<RouterRequest>, ManyOut<Annotated<RouterResponse>>, Error> for KvRouter {
impl
AsyncEngine<
SingleIn<RouterRequest>,
ManyOut<Annotated<RouterResponse<(WorkerId, DpRank)>>>,
Error,
> for KvRouter
{
async fn generate(
&self,
request: SingleIn<RouterRequest>,
) -> Result<ManyOut<Annotated<RouterResponse>>> {
) -> Result<ManyOut<Annotated<RouterResponse<(WorkerId, DpRank)>>>> {
let (request, ctx) = request.into_parts();
let (worker_id, _) = self.find_best_match(&request.tokens).await?;

Expand Down Expand Up @@ -203,7 +217,8 @@ impl AsyncEngine<SingleIn<BackendInput>, ManyOut<Annotated<LLMEngineOutput>>, Er
let (mut backend_input, context) = request.into_parts();
backend_input.estimated_prefix_hit_num_blocks = Some(overlap_amount);
let updated_request = context.map(|_| backend_input);
self.inner.direct(updated_request, instance_id).await
// TODO: this does not do dp routing
self.inner.direct(updated_request, instance_id.0).await
}
}
}
Expand Down
Loading
Loading