Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
74bc13f
use BTreeSet, and allow for push_front (preemption)
PeaBrane May 27, 2025
f2343d5
preemption is push_front
PeaBrane May 27, 2025
6fe3154
use Hongkuan's quadratic formulas for decode and prefill
PeaBrane May 27, 2025
cccebad
cleaner scheduling + generation separation, and waterline bug fix
PeaBrane May 28, 2025
793d1d1
Merge branch 'main' into rupei/mocker-v0
PeaBrane May 28, 2025
394c2bf
restore printing out fwd pass metrics in test
PeaBrane May 30, 2025
f5ab2e1
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jun 11, 2025
dad183f
multi-dp mocker engine
PeaBrane Jun 11, 2025
009ec78
fixed prefill cost, and more conservative watermarking
PeaBrane Jun 12, 2025
ee11427
fwd pass metrics
PeaBrane Jun 12, 2025
8e8d0b4
can emit kv event, not tested
PeaBrane Jun 13, 2025
e96f810
move block resp test in kv manager
PeaBrane Jun 13, 2025
c09f007
basic test passes for both load metrics and kv events
PeaBrane Jun 14, 2025
4502e5e
better tracing
PeaBrane Jun 14, 2025
fe20aa3
async engine core
PeaBrane Jun 16, 2025
2fbf998
hook up with dynamo run
PeaBrane Jun 17, 2025
b548050
docs
PeaBrane Jun 17, 2025
c7c4be5
fmt
PeaBrane Jun 17, 2025
1845a8d
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 17, 2025
3ad7780
refactor
PeaBrane Jun 17, 2025
c78bef2
works with kv router
PeaBrane Jun 17, 2025
a206569
actually load extra mocker args in guide
PeaBrane Jun 17, 2025
d3730ff
free blocks if failed to send (receiver dropped)
PeaBrane Jun 23, 2025
68d822a
do not regenereate tokens after pre-emption
PeaBrane Jun 23, 2025
d69edcf
evictor cleanup
PeaBrane Jun 30, 2025
c08f9ea
only need runtime in dynamic arms
PeaBrane Jun 30, 2025
dee1413
no separate extra-mocker-args
PeaBrane Jun 30, 2025
082bcec
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 30, 2025
99fd3f2
update to match batched tokens
PeaBrane Jun 30, 2025
85c7ccf
max-num-seqs
PeaBrane Jun 30, 2025
ec1f360
enable_prefix_caching arg
PeaBrane Jun 30, 2025
94abc0d
only publish kv events if enable_prefix_caching set true
PeaBrane Jun 30, 2025
35da284
small note on chunked prefill being false for now
PeaBrane Jun 30, 2025
c7c072d
revert flags
PeaBrane Jul 1, 2025
de54247
revert dynamo-run changes
PeaBrane Jul 1, 2025
81c12aa
tiny reversion
PeaBrane Jul 1, 2025
b959df4
another reversion
PeaBrane Jul 1, 2025
f07e28d
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jul 1, 2025
b15070a
usize reversion
PeaBrane Jul 1, 2025
3a20b9d
clippy
PeaBrane Jul 1, 2025
c747606
more clippy
PeaBrane Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
multi-dp mocker engine
  • Loading branch information
PeaBrane committed Jun 11, 2025
commit dad183f81503c973b31c96dee3c75435b2e82747
1 change: 1 addition & 0 deletions lib/llm/src/mocker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

pub mod engine;
pub mod evictor;
pub mod kv_manager;
pub mod protocols;
Expand Down
251 changes: 251 additions & 0 deletions lib/llm/src/mocker/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! MockSchedulerEngine - AsyncEngine wrapper around the Scheduler
//!
//! This module provides an AsyncEngine implementation that wraps the Scheduler
//! to provide streaming token generation with realistic timing simulation.

use crate::mocker::protocols::{DirectRequest, MockEngineArgs, OutputSignal};
use crate::mocker::scheduler::Scheduler;

use dynamo_runtime::{
engine::AsyncEngineContextProvider,
pipeline::{async_trait, AsyncEngine, Error, ManyOut, ResponseStream, SingleIn},
protocols::annotated::Annotated,
};

use rand::Rng;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::wrappers::ReceiverStream;
use uuid::Uuid;

/// Generate a random printable character
fn generate_random_char() -> String {
let mut rng = rand::rng();
let selection = match rng.random_range(0..4) {
0 => ('a'..='z').nth(rng.random_range(0..26)).unwrap(), // lowercase
1 => ('A'..='Z').nth(rng.random_range(0..26)).unwrap(), // uppercase
2 => ('0'..='9').nth(rng.random_range(0..10)).unwrap(), // digits
_ => [' ', '.', ',', '!', '?'][rng.random_range(0..5)], // punctuation/space
};
selection.to_string()
}

/// AsyncEngine wrapper around the Scheduler that generates random character tokens
pub struct MockVllmEngine {
schedulers: Vec<Scheduler>,
active_requests: Arc<Mutex<HashMap<Uuid, mpsc::Sender<OutputSignal>>>>,
dp_size: u32,
}

impl MockVllmEngine {
/// Create a new MockVllmEngine with the given parameters
pub fn new(args: MockEngineArgs) -> Self {
let mut schedulers = Vec::new();
let active_requests = Arc::new(Mutex::new(
HashMap::<Uuid, mpsc::Sender<OutputSignal>>::new(),
));

// Create multiple schedulers and their background tasks
for _ in 0..args.dp_size {
// Create a shared output channel that this scheduler will use
let (output_tx, output_rx) = mpsc::channel::<OutputSignal>(1024);

let scheduler = Scheduler::new(
args.clone(),
Some(output_tx),
None, // No global cancellation token
);

schedulers.push(scheduler);

// Spawn a background task for this scheduler to distribute token notifications to active requests
let output_rx = Arc::new(Mutex::new(output_rx));
let active_requests_clone = active_requests.clone();

tokio::spawn(async move {
loop {
let signal = {
let mut rx = output_rx.lock().await;
match rx.recv().await {
Some(signal) => signal,
None => break, // Channel closed
}
};

// Notify the specific request that a token was generated
let active = active_requests_clone.lock().await;
if let Some(request_tx) = active.get(&signal.uuid) {
let _ = request_tx.send(signal).await;
}
}
});
}

Self {
schedulers,
active_requests,
dp_size: args.dp_size,
}
}
}

#[async_trait]
impl AsyncEngine<SingleIn<DirectRequest>, ManyOut<Annotated<String>>, Error> for MockVllmEngine {
async fn generate(
&self,
input: SingleIn<DirectRequest>,
) -> Result<ManyOut<Annotated<String>>, Error> {
let (mut request, ctx) = input.into_parts();

let dp_rank = request.dp_rank.unwrap_or(0);

// Validate dp_rank
if dp_rank >= self.dp_size {
return Err(Error::msg(format!(
"dp_rank {} is out of bounds for dp_size {}",
dp_rank, self.dp_size
)));
}

let request_uuid = ctx.id().parse().unwrap_or(Uuid::new_v4());
request.uuid = Some(request_uuid);

let (request_tx, mut request_rx) = mpsc::channel::<OutputSignal>(64);
{
let mut active = self.active_requests.lock().await;
active.insert(request_uuid, request_tx);
}

// Send the request to the appropriate scheduler based on dp_rank
self.schedulers[dp_rank as usize]
.receive(request.clone())
.await;

// Create a simple channel for the stream
let (stream_tx, stream_rx) = mpsc::channel::<Annotated<String>>(64);

let active_requests = self.active_requests.clone();
let async_context = ctx.context();

// Spawn a task to handle the complex async logic
tokio::spawn(async move {
loop {
tokio::select! {
Some(signal) = request_rx.recv() => {
if signal.completed {
break;
}
let output = generate_random_char();
if stream_tx.send(Annotated::from_data(output)).await.is_err() {
break;
}
}

_ = async_context.stopped() => {
break;
}
}
}

// Clean up: remove this request from active requests
let mut active = active_requests.lock().await;
active.remove(&request_uuid);
});

// Create a simple ReceiverStream which is naturally Send + Sync
let stream = ReceiverStream::new(stream_rx);
Ok(ResponseStream::new(Box::pin(stream), ctx.context()))
}
}

#[cfg(test)]
mod tests {
use super::*;
use dynamo_runtime::pipeline::Context;
use futures::StreamExt;

#[tokio::test]
async fn test_multiple_workers_with_token_limit() {
const DP_SIZE: u32 = 2;
const TOKENS_PER_REQUEST: usize = 20;

// Create the MockVllmEngine using builder pattern
let args = MockEngineArgs::builder()
.speedup_ratio(10.0)
.dp_size(DP_SIZE)
.build()
.unwrap();

let engine = MockVllmEngine::new(args);

// Create 4 DirectRequests: 2 for worker 0, 2 for worker 1
let requests = vec![
DirectRequest {
tokens: vec![1, 2, 3, 4],
max_output_tokens: TOKENS_PER_REQUEST,
uuid: None,
dp_rank: Some(0),
},
DirectRequest {
tokens: vec![5, 6, 7, 8],
max_output_tokens: TOKENS_PER_REQUEST,
uuid: None,
dp_rank: Some(0),
},
DirectRequest {
tokens: vec![9, 10, 11, 12],
max_output_tokens: TOKENS_PER_REQUEST,
uuid: None,
dp_rank: Some(1),
},
DirectRequest {
tokens: vec![13, 14, 15, 16],
max_output_tokens: TOKENS_PER_REQUEST,
uuid: None,
dp_rank: Some(1),
},
];

// Generate streams and collect all tokens from each
for request in requests {
let ctx = Context::new(request);
let stream = engine.generate(ctx).await.unwrap();

let tokens: Vec<_> = stream.collect().await;

// Verify each stream produces exactly the expected number of tokens
assert_eq!(tokens.len(), TOKENS_PER_REQUEST);

// Verify all tokens contain valid data
for token in tokens {
assert!(token.data.is_some());
}
}

// Give a small delay to ensure cleanup tasks complete
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// Verify that active_requests is empty (all requests cleaned up)
let active_requests = engine.active_requests.lock().await;
assert!(
active_requests.is_empty(),
"Active requests should be empty after streams complete"
);
}
}
38 changes: 38 additions & 0 deletions lib/llm/src/mocker/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use derive_builder::Builder;
use serde::{Deserialize, Serialize};
use uuid::Uuid;

Expand Down Expand Up @@ -52,6 +53,7 @@ pub struct DirectRequest {
pub tokens: Vec<Token>,
pub max_output_tokens: usize,
pub uuid: Option<Uuid>,
pub dp_rank: Option<u32>,
}

/// Represents the cost of prefilling content in the cache
Expand All @@ -62,6 +64,42 @@ pub struct PrefillCost {
pub prefill_compute: f64,
}

/// Signal for output token generation with completion status
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OutputSignal {
pub uuid: Uuid,
pub completed: bool,
}

/// Configuration arguments for MockVllmEngine
#[derive(Debug, Clone, Serialize, Deserialize, Builder)]
#[builder(pattern = "owned", build_fn(public))]
pub struct MockEngineArgs {
#[builder(default = "16384")]
pub num_gpu_blocks: usize,

#[builder(default = "64")]
pub block_size: usize,

#[builder(default)]
pub max_num_batched_tokens: Option<usize>,

#[builder(default = "0.01")]
pub watermark: f64,

#[builder(default = "1.0")]
pub speedup_ratio: f64,

#[builder(default = "1")]
pub dp_size: u32,
}

impl MockEngineArgs {
pub fn builder() -> MockEngineArgsBuilder {
MockEngineArgsBuilder::default()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading
Loading