Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
86c79ba
some prelim cleanups
PeaBrane May 30, 2025
6bee243
router can route to dp ranks
PeaBrane May 30, 2025
dab052c
make the bunny hoppy
PeaBrane May 30, 2025
be6900e
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
25e1291
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
34e5c5b
new struct combining worker_id with dp_rank, dirty commit, breaks bin…
PeaBrane May 30, 2025
2cef74c
binding works
PeaBrane May 30, 2025
10d3326
dummy c binding note
PeaBrane May 30, 2025
4483c68
add_class WorkerWithDpRank
PeaBrane May 30, 2025
263c12d
renames + comments + fmt
PeaBrane May 31, 2025
65ea6b5
allow suffix for dp_rank identification
PeaBrane Jun 3, 2025
a2ef896
WIP: fix fn dp_rank, add TODO's
alec-flowers Jun 3, 2025
e80d66c
refactor: fix bugs, kv publishing working
alec-flowers Jun 3, 2025
7a733bd
fix panicing metric thread issue
alec-flowers Jun 4, 2025
1bddc8e
remove verbose log
alec-flowers Jun 4, 2025
ee283cc
update v1 worker
alec-flowers Jun 4, 2025
183a8fe
put dp_rank in PreprocessedRequest
PeaBrane Jun 4, 2025
be7f951
new agg config
PeaBrane Jun 4, 2025
e1011d8
updated comments
PeaBrane Jun 4, 2025
5bf4fae
update v1 example
alec-flowers Jun 4, 2025
d6ded6c
final touches for it working with dp
alec-flowers Jun 4, 2025
61b94ac
Merge branch 'main' into rupei/router-general
alec-flowers Jun 4, 2025
9335efe
fix cost function trace
PeaBrane Jun 4, 2025
931b837
fmt
PeaBrane Jun 4, 2025
2a72271
Merge branch 'main' into rupei/router-general
PeaBrane Jun 4, 2025
eb7bb10
WIP document current work steps
alec-flowers Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP: fix fn dp_rank, add TODO's
  • Loading branch information
alec-flowers committed Jun 3, 2025
commit a2ef896d7ded39a30a5f5fd1f62be95dbbe518ad
25 changes: 18 additions & 7 deletions launch/dynamo-run/src/subprocess/vllm_v1_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import uvloop
from vllm.config import VllmConfig
from vllm.distributed.kv_events import KVEventsConfig
from vllm.distributed.kv_events import KVEventsConfig, ZmqEventPublisher
from vllm.engine.arg_utils import AsyncEngineArgs
from vllm.inputs import TokensPrompt
from vllm.sampling_params import SamplingParams
Expand Down Expand Up @@ -68,7 +68,7 @@ class DynamoStatLoggerPublisher(StatLoggerBase):

def __init__(self, component: Component, dp_rank: int) -> None:
self.inner = WorkerMetricsPublisher()
self.inner.create_endpoint(component)
self.inner.create_endpoint(component, dp_rank=dp_rank)
self.dp_rank = dp_rank

def record(
Expand Down Expand Up @@ -246,12 +246,23 @@ async def init(runtime: DistributedRuntime, config: Config):
)

logger.info("VllmWorker has been initialized")
base_zmq_endpoint = "tcp://127.0.0.1:5557"
dp_rank_size = vllm_config.parallel_config.data_parallel_size

# TODO This isn't working still
for dp_rank in range(dp_rank_size):
print(f"DP_RANK in Dynamo: {dp_rank}")
zmq_endpoint = ZmqEventPublisher.offset_endpoint_port(
base_zmq_endpoint, data_parallel_rank=dp_rank
)
print(f"ZMQ_ENPDPOINT in Dynamo: {zmq_endpoint}")
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(),
kv_block_size=engine_args.block_size,
zmq_endpoint=zmq_endpoint,
)

zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(), kv_block_size=engine_args.block_size
)

_ = ZmqKvEventPublisher(component=component, config=zmq_config)
_ = ZmqKvEventPublisher(component=component, config=zmq_config)

handler = RequestHandler(component, engine_client, default_sampling_params)

Expand Down
2 changes: 1 addition & 1 deletion lib/bindings/python/src/dynamo/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ class WorkerMetricsPublisher:
Create a `WorkerMetricsPublisher` object
"""

def create_service(self, component: Component) -> None:
def create_endpoint(self, component: Component, dp_rank: int) -> None:
"""
Similar to Component.create_service, but only service created through
this method will interact with KV router of the same component.
Expand Down
2 changes: 1 addition & 1 deletion lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,7 +497,7 @@ impl WorkerMetricsPublisher {
let handler = Ingress::for_engine(handler)?;

let endpoint_name = match suffix {
Some(s) => format!("{}_{}", KV_METRICS_ENDPOINT, s),
Some(s) => format!("{}-{}", KV_METRICS_ENDPOINT, s),
None => KV_METRICS_ENDPOINT.to_string(),
};

Expand Down
7 changes: 5 additions & 2 deletions lib/llm/src/kv_router/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ impl Endpoint {
}

pub fn dp_rank(&self) -> Option<DpRank> {
tracing::info!("Parsing dp_rank from subject: {}", self.subject);
let parts: Vec<&str> = self.subject.split("-").collect();
if parts.len() < 2 {
if parts.len() < 3 {
return None;
}
let second_to_last = parts[parts.len() - 2];
second_to_last.parse::<DpRank>().ok()
let result = second_to_last.parse::<DpRank>().ok();
result
}
} // TODO: make dp_rank

Expand All @@ -70,6 +72,7 @@ impl ProcessedEndpoints {
.map(|endpoint| endpoint.data.kv_active_blocks as f64)
.collect();
if load_values.is_empty() {
// TODO we hit this panic while vLLM is starting the ranks up. Need to avoid this
panic!("No endpoints to process!")
};
let load_avg = load_values.iter().copied().sum::<f64>() / load_values.len() as f64;
Expand Down