Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
86c79ba
some prelim cleanups
PeaBrane May 30, 2025
6bee243
router can route to dp ranks
PeaBrane May 30, 2025
dab052c
make the bunny hoppy
PeaBrane May 30, 2025
be6900e
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
25e1291
Merge remote-tracking branch 'origin/main' into rupei/router-general
PeaBrane May 30, 2025
34e5c5b
new struct combining worker_id with dp_rank, dirty commit, breaks bin…
PeaBrane May 30, 2025
2cef74c
binding works
PeaBrane May 30, 2025
10d3326
dummy c binding note
PeaBrane May 30, 2025
4483c68
add_class WorkerWithDpRank
PeaBrane May 30, 2025
263c12d
renames + comments + fmt
PeaBrane May 31, 2025
65ea6b5
allow suffix for dp_rank identification
PeaBrane Jun 3, 2025
a2ef896
WIP: fix fn dp_rank, add TODO's
alec-flowers Jun 3, 2025
e80d66c
refactor: fix bugs, kv publishing working
alec-flowers Jun 3, 2025
7a733bd
fix panicing metric thread issue
alec-flowers Jun 4, 2025
1bddc8e
remove verbose log
alec-flowers Jun 4, 2025
ee283cc
update v1 worker
alec-flowers Jun 4, 2025
183a8fe
put dp_rank in PreprocessedRequest
PeaBrane Jun 4, 2025
be7f951
new agg config
PeaBrane Jun 4, 2025
e1011d8
updated comments
PeaBrane Jun 4, 2025
5bf4fae
update v1 example
alec-flowers Jun 4, 2025
d6ded6c
final touches for it working with dp
alec-flowers Jun 4, 2025
61b94ac
Merge branch 'main' into rupei/router-general
alec-flowers Jun 4, 2025
9335efe
fix cost function trace
PeaBrane Jun 4, 2025
931b837
fmt
PeaBrane Jun 4, 2025
2a72271
Merge branch 'main' into rupei/router-general
PeaBrane Jun 4, 2025
eb7bb10
WIP document current work steps
alec-flowers Jun 5, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: fix bugs, kv publishing working
  • Loading branch information
alec-flowers committed Jun 3, 2025
commit e80d66cdb7c2a01389ece616ea0a6ce3d5f26ef3
20 changes: 15 additions & 5 deletions launch/dynamo-run/src/subprocess/vllm_v1_inc.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,20 +249,30 @@ async def init(runtime: DistributedRuntime, config: Config):
base_zmq_endpoint = "tcp://127.0.0.1:5557"
dp_rank_size = vllm_config.parallel_config.data_parallel_size

# TODO This isn't working still
# Store references to prevent garbage collection
kv_publishers = []

for dp_rank in range(dp_rank_size):
print(f"DP_RANK in Dynamo: {dp_rank}")
zmq_endpoint = ZmqEventPublisher.offset_endpoint_port(
base_zmq_endpoint, data_parallel_rank=dp_rank
)
print(f"ZMQ_ENPDPOINT in Dynamo: {zmq_endpoint}")
zmq_config = ZmqKvEventPublisherConfig(
worker_id=endpoint.lease_id(),
kv_block_size=engine_args.block_size,
zmq_endpoint=zmq_endpoint,
)

_ = ZmqKvEventPublisher(component=component, config=zmq_config)
try:
publisher = ZmqKvEventPublisher(component=component, config=zmq_config)
kv_publishers.append(publisher)
except Exception as e:
logger.error(
f"Failed to create ZmqKvEventPublisher for dp_rank {dp_rank}: {e}"
)

logger.debug(
f"Successfully created {len(kv_publishers)} ZmqKvEventPublishers out of {dp_rank_size} expected"
)

handler = RequestHandler(component, engine_client, default_sampling_params)

Expand Down Expand Up @@ -324,7 +334,7 @@ def cmd_line_args():
endpoint_str = args.endpoint.replace("dyn://", "", 1)
endpoint_parts = endpoint_str.split(".")
if len(endpoint_parts) != 3:
logging.error(
logger.error(
f"Invalid endpoint format: '{args.endpoint}'. Expected 'dyn://namespace.component.endpoint' or 'namespace.component.endpoint'."
)
sys.exit(1)
Expand Down
42 changes: 27 additions & 15 deletions lib/llm/src/kv_router/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,15 +189,17 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>(
cancellation_token: CancellationToken,
mut rx: mpsc::UnboundedReceiver<KvCacheEventWithDp>,
) {
tracing::debug!("KV Event processor starting for worker_id: {}", worker_id);

loop {
tokio::select! {
_ = cancellation_token.cancelled() => {
tracing::info!("KV Event source received cancellation signal");
tracing::debug!("KV Event processor received cancellation signal for worker_id: {}", worker_id);
break;
}
maybe_data = rx.recv() => {
let Some(data) = maybe_data else {
tracing::debug!("Event processor channel closed.");
tracing::debug!("KV Event processor channel closed for worker_id: {}", worker_id);
break;
};

Expand All @@ -207,11 +209,12 @@ async fn start_event_processor<P: EventPublisher + Send + Sync + 'static>(

let router_event = RouterEvent::new((worker_id, dp_rank), event);
if let Err(e) = publisher.publish(KV_EVENT_SUBJECT, &router_event).await {
tracing::error!("Failed to publish event: {}", e);
tracing::error!("Failed to publish event for worker_id: {}, dp_rank: {}, error: {}", worker_id, dp_rank, e);
}
}
}
}
tracing::debug!("KV Event processor exiting for worker_id: {}", worker_id);
}

// Error handling configuration for ZMQ operations
Expand All @@ -236,7 +239,7 @@ async fn start_zmq_listener(
kv_block_size: usize,
) {
tracing::debug!(
"KVEventPublisher connecting to ZMQ endpoint {} (topic '{}')",
"ZMQ listener starting - connecting to endpoint: {}, topic: '{}'",
zmq_endpoint,
zmq_topic
);
Expand All @@ -247,24 +250,27 @@ async fn start_zmq_listener(

// Subscribe to the requested topic (empty string == all topics)
if let Err(e) = socket.subscribe(&zmq_topic).await {
tracing::error!("Failed to subscribe on ZMQ socket: {}", e);
tracing::error!("Failed to subscribe on ZMQ socket for {}: {}", zmq_endpoint, e);
return;
}

if let Err(e) = socket.connect(&zmq_endpoint).await {
tracing::error!("Failed to connect ZMQ SUB socket: {}", e);
tracing::error!("Failed to connect ZMQ SUB socket to {}: {}", zmq_endpoint, e);
return;
}

tracing::debug!("ZMQ listener successfully connected to {}", zmq_endpoint);

let mut consecutive_errors = 0u32;
let mut message_count = 0u64;

loop {
tokio::select! {
biased;

// Check for cancellation
_ = cancellation_token.cancelled() => {
tracing::info!("ZMQ listener received cancellation signal");
tracing::debug!("ZMQ listener received cancellation signal for {}", zmq_endpoint);
break;
}

Expand All @@ -278,6 +284,7 @@ async fn start_zmq_listener(
tracing::error!(
error=%e,
consecutive_errors=%consecutive_errors,
endpoint=%zmq_endpoint,
"Too many consecutive ZMQ errors, terminating listener"
);
break;
Expand All @@ -290,6 +297,7 @@ async fn start_zmq_listener(
error=%e,
consecutive_errors=%consecutive_errors,
backoff_ms=%backoff_ms,
endpoint=%zmq_endpoint,
"Error reading from ZMQ socket, applying exponential backoff"
);

Expand All @@ -298,12 +306,13 @@ async fn start_zmq_listener(
};
// Reset error count on successful message
consecutive_errors = 0;
message_count += 1;

// We expect multipart frames: [topic, seq, payload]
let mut frames: Vec<Vec<u8>> = msg.into_vec().into_iter().map(|frame| frame.to_vec()).collect();

if frames.len() != 3 {
tracing::warn!(expected=3, actual=%frames.len(), "Received unexpected ZMQ frame count");
tracing::warn!(expected=3, actual=%frames.len(), endpoint=%zmq_endpoint, "Received unexpected ZMQ frame count");
continue;
}

Expand All @@ -312,7 +321,7 @@ async fn start_zmq_listener(
let seq_bytes = frames.pop().unwrap();

if seq_bytes.len() != 8 {
tracing::warn!(expected=8, actual=%seq_bytes.len(), "Invalid sequence number byte length");
tracing::warn!(expected=8, actual=%seq_bytes.len(), endpoint=%zmq_endpoint, "Invalid sequence number byte length");
continue;
}

Expand All @@ -322,23 +331,25 @@ async fn start_zmq_listener(
let batch_result = rmps::from_slice::<KvEventBatch>(&payload);
let Ok(batch) = batch_result else {
let e = batch_result.unwrap_err();
tracing::warn!(error=%e, "Failed to decode KVEventBatch msgpack");
tracing::warn!(error=%e, endpoint=%zmq_endpoint, "Failed to decode KVEventBatch msgpack");
continue;
};

tracing::trace!("ZMQ listener decoded batch with {} events, dp_rank: {:?} from {}", batch.events.len(), batch.data_parallel_rank, zmq_endpoint);

// For each of our events, convert them to [`KvCacheEvent`] and send to the event_processor.
let dp_rank = batch.dp_rank;
let dp_rank = batch.data_parallel_rank;
for raw_event in batch.events.into_iter() {
let kv_cache_event = convert_event(raw_event, seq, kv_block_size, &warning_count);
if tx.send(KvCacheEventWithDp { kv_cache_event, dp_rank }).is_err() {
tracing::warn!("Failed to send message to channel - receiver dropped");
tracing::warn!("Failed to send message to channel - receiver dropped for {}", zmq_endpoint);
return;
}
}
}
}
tracing::debug!("ZMQ listener exiting");
}
tracing::debug!("ZMQ listener exiting for {}", zmq_endpoint);
}

/// Convert a raw event coming from the ZMQ channel into the internal
Expand Down Expand Up @@ -449,7 +460,8 @@ pub fn create_stored_blocks(
struct KvEventBatch {
ts: f64,
events: Vec<RawKvEvent>,
dp_rank: Option<DpRank>,
#[serde(alias = "dp_rank")]
data_parallel_rank: Option<DpRank>,
}

#[derive(Debug, Deserialize, Serialize)]
Expand Down Expand Up @@ -795,7 +807,7 @@ mod tests_startup_helpers {
let batch = KvEventBatch {
ts: 0.0,
events,
dp_rank: None,
data_parallel_rank: None,
};

let payload = Bytes::from(rmps::to_vec(&batch).unwrap());
Expand Down
5 changes: 2 additions & 3 deletions lib/llm/src/kv_router/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ impl Endpoint {
return None;
}
let second_to_last = parts[parts.len() - 2];
let result = second_to_last.parse::<DpRank>().ok();
result
second_to_last.parse::<DpRank>().ok()
}
} // TODO: make dp_rank
}

#[derive(Debug, Default, Serialize, Deserialize, Clone)]
pub struct ProcessedEndpoints {
Expand Down