Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
74bc13f
use BTreeSet, and allow for push_front (preemption)
PeaBrane May 27, 2025
f2343d5
preemption is push_front
PeaBrane May 27, 2025
6fe3154
use Hongkuan's quadratic formulas for decode and prefill
PeaBrane May 27, 2025
cccebad
cleaner scheduling + generation separation, and waterline bug fix
PeaBrane May 28, 2025
793d1d1
Merge branch 'main' into rupei/mocker-v0
PeaBrane May 28, 2025
394c2bf
restore printing out fwd pass metrics in test
PeaBrane May 30, 2025
f5ab2e1
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jun 11, 2025
dad183f
multi-dp mocker engine
PeaBrane Jun 11, 2025
009ec78
fixed prefill cost, and more conservative watermarking
PeaBrane Jun 12, 2025
ee11427
fwd pass metrics
PeaBrane Jun 12, 2025
8e8d0b4
can emit kv event, not tested
PeaBrane Jun 13, 2025
e96f810
move block resp test in kv manager
PeaBrane Jun 13, 2025
c09f007
basic test passes for both load metrics and kv events
PeaBrane Jun 14, 2025
4502e5e
better tracing
PeaBrane Jun 14, 2025
fe20aa3
async engine core
PeaBrane Jun 16, 2025
2fbf998
hook up with dynamo run
PeaBrane Jun 17, 2025
b548050
docs
PeaBrane Jun 17, 2025
c7c4be5
fmt
PeaBrane Jun 17, 2025
1845a8d
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 17, 2025
3ad7780
refactor
PeaBrane Jun 17, 2025
c78bef2
works with kv router
PeaBrane Jun 17, 2025
a206569
actually load extra mocker args in guide
PeaBrane Jun 17, 2025
d3730ff
free blocks if failed to send (receiver dropped)
PeaBrane Jun 23, 2025
68d822a
do not regenereate tokens after pre-emption
PeaBrane Jun 23, 2025
d69edcf
evictor cleanup
PeaBrane Jun 30, 2025
c08f9ea
only need runtime in dynamic arms
PeaBrane Jun 30, 2025
dee1413
no separate extra-mocker-args
PeaBrane Jun 30, 2025
082bcec
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 30, 2025
99fd3f2
update to match batched tokens
PeaBrane Jun 30, 2025
85c7ccf
max-num-seqs
PeaBrane Jun 30, 2025
ec1f360
enable_prefix_caching arg
PeaBrane Jun 30, 2025
94abc0d
only publish kv events if enable_prefix_caching set true
PeaBrane Jun 30, 2025
35da284
small note on chunked prefill being false for now
PeaBrane Jun 30, 2025
c7c072d
revert flags
PeaBrane Jul 1, 2025
de54247
revert dynamo-run changes
PeaBrane Jul 1, 2025
81c12aa
tiny reversion
PeaBrane Jul 1, 2025
b959df4
another reversion
PeaBrane Jul 1, 2025
f07e28d
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jul 1, 2025
b15070a
usize reversion
PeaBrane Jul 1, 2025
3a20b9d
clippy
PeaBrane Jul 1, 2025
c747606
more clippy
PeaBrane Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
use Hongkuan's quadratic formulas for decode and prefill
  • Loading branch information
PeaBrane committed May 27, 2025
commit 6fe3154115fcda4f49c0ab00a78b042540b1bb22
7 changes: 6 additions & 1 deletion lib/llm/src/mocker/kv_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,11 @@ impl KvManager {
self.active_blocks.len()
}

/// Get the percentage of active blocks relative to maximum capacity
pub fn get_active_perc(&self) -> f64 {
self.active_blocks.len() as f64 / self.max_capacity as f64
}

/// Get the number of inactive blocks
pub fn num_inactive_blocks(&self) -> usize {
self.inactive_blocks.len()
Expand Down Expand Up @@ -261,7 +266,7 @@ impl KvManager {

// Calculate prefill compute
let prefill_compute =
new_tokens as f64 * (new_tokens + overlap_blocks * self.block_size) as f64;
1.25e-6 * (new_tokens as f64).powi(2) + 7.41e-2 * (new_tokens as f64) + 2.62e1;

Some(PrefillCost {
new_tokens,
Expand Down
50 changes: 26 additions & 24 deletions lib/llm/src/mocker/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl Scheduler {
kv_capacity: usize,
watermark: f64,
block_size: usize,
chunk_size: Option<usize>,
speedup_ratio: Option<f64>,
output_tx: Option<mpsc::Sender<Uuid>>,
cancellation_token: Option<CancellationToken>,
) -> Self {
Expand All @@ -205,7 +205,16 @@ impl Scheduler {
let state = Arc::new(Mutex::new(SchedulerState::default()));

let kv_manager = Arc::new(Mutex::new(kv_manager));
let chunk_size = chunk_size.unwrap_or(256);

// Assert speedup_ratio is greater than 0 if provided
if let Some(ratio) = speedup_ratio {
assert!(
ratio > 0.0,
"speedup_ratio must be greater than 0, got: {}",
ratio
);
}
let speedup_ratio = speedup_ratio.unwrap_or(1.0);

// Create channel for request handling
let (request_tx, mut request_rx) = mpsc::channel::<DirectRequest>(1024);
Expand Down Expand Up @@ -242,7 +251,7 @@ impl Scheduler {
// Process DirectRequests, converting them to ActiveSequence and scheduling them until we can't
// schedule anymore.
while let Some((uuid, request)) = state_guard.next() {
let active_sequence = get_active_sequence(request, block_size, chunk_size);
let active_sequence = get_active_sequence(request, block_size);

// Calculate token budget using new_tokens from PrefillCost
let total_prefill_tokens = state_guard.num_batched_tokens();
Expand Down Expand Up @@ -271,10 +280,10 @@ impl Scheduler {
let mut state_guard = state_clone.lock().await;
let mut kv_manager_guard = kv_manager_clone.lock().await;

// Base time needed for decoding (assumed memory bound on KV cache)
let active_tokens = kv_manager_guard.num_active_blocks() * block_size;
// TODO: 2 is a dummy / magic scaling factor
let mut generation_time = Duration::from_micros((active_tokens / 2) as u64);
// Base time needed for decoding using active percentage and quadratic formula
let active_perc = kv_manager_guard.get_active_perc();
let decoding_time = -5.47 * active_perc.powi(2) + 43.88 * active_perc + 19.44;
let mut total_time = Duration::from_secs_f64(decoding_time / 1000.0);

// Process each running request
let uuids: Vec<Uuid> = state_guard.running.keys().cloned().collect();
Expand All @@ -285,7 +294,7 @@ impl Scheduler {
}

// Get prefill compute value first
let prefill_compute = state_guard.get_prefill_compute(&uuid);
let prefill_compute = state_guard.get_prefill_compute(&uuid).unwrap_or(0.);

// Get the active sequence for this UUID
let sequence = state_guard.requests.get_mut(&uuid)
Expand All @@ -295,14 +304,6 @@ impl Scheduler {
// Generate token and get signals
let signals = sequence.generate();

// Accumulate sleep duration based on prefill_compute if available
// prefill compute = (cached_tokens + new_tokens) * new_tokens
let sleep_ms = if let Some(compute) = prefill_compute {
// TODO: 1024 is a dummy / magic scaling factor
(compute / 1024.0) as u64
} else { 0 };
generation_time += Duration::from_micros(sleep_ms);

// Process all signals with the KvManager
// Handling of preemption on failure
if !process_signals(&mut kv_manager_guard, &signals) {
Expand All @@ -319,8 +320,10 @@ impl Scheduler {
continue;
}

// Accumulate sleep duration based on prefill_compute if available
total_time += Duration::from_secs_f64(prefill_compute / 1000.0);

// Send UUID notification for each generated token
// TODO: hook this up to an AsyncEngine
if let Some(tx) = &output_tx_clone {
let _ = tx.try_send(uuid);
}
Expand All @@ -337,9 +340,10 @@ impl Scheduler {
}
}

// Sleep once for the accumulated duration
if generation_time.as_millis() > 0 {
tokio::time::sleep(generation_time).await;
// Sleep once for the adjusted duration
let adjusted_time = Duration::from_secs_f64(total_time.as_secs_f64() / speedup_ratio);
if adjusted_time.as_millis() > 0 {
tokio::time::sleep(adjusted_time).await;
}
}
}
Expand Down Expand Up @@ -405,7 +409,7 @@ impl Scheduler {
}

/// Convert a Request to an ActiveSequence
fn get_active_sequence(request: Request, block_size: usize, chunk_size: usize) -> ActiveSequence {
fn get_active_sequence(request: Request, block_size: usize) -> ActiveSequence {
if let Request::Active(active_seq) = request {
return active_seq;
}
Expand All @@ -418,7 +422,6 @@ fn get_active_sequence(request: Request, block_size: usize, chunk_size: usize) -
direct_request.tokens,
direct_request.max_output_tokens,
Some(block_size),
Some(chunk_size),
)
}

Expand Down Expand Up @@ -475,7 +478,6 @@ mod tests {
let kv_capacity: usize = 500;
let watermark: f64 = 0.01; // 1% watermark
let block_size: usize = 64;
let chunk_size: usize = 256;
let num_requests: usize = 100;
let input_len: usize = 1000;
let max_output_tokens: usize = 100;
Expand All @@ -488,7 +490,7 @@ mod tests {
kv_capacity,
watermark,
block_size,
Some(chunk_size),
Some(10.0), // speedup_ratio
Some(output_tx),
None,
);
Expand Down
24 changes: 5 additions & 19 deletions lib/llm/src/mocker/sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ pub struct ActiveSequence {
#[getter(copy)]
block_size: usize,

#[getter(copy)]
chunk_size: usize, // TODO: not actually used

#[getter(copy)]
max_output_tokens: usize,

Expand All @@ -69,15 +66,9 @@ pub struct ActiveSequence {

impl ActiveSequence {
/// Create a new ActiveSequence instance with the provided tokens
pub fn new(
tokens: Vec<u32>,
max_output_tokens: usize,
block_size: Option<usize>,
chunk_size: Option<usize>,
) -> Self {
pub fn new(tokens: Vec<u32>, max_output_tokens: usize, block_size: Option<usize>) -> Self {
let block_size = block_size.unwrap_or(64);
assert!(block_size > 1, "block_size must be greater than 1");
let chunk_size = chunk_size.unwrap_or(256);
let num_input_tokens = tokens.len();

let tokens = Tokens::from(tokens).into_sequence(block_size, None);
Expand All @@ -88,7 +79,6 @@ impl ActiveSequence {
unique_blocks,
tokens,
block_size,
chunk_size,
max_output_tokens,
generated_tokens: 0,
num_input_tokens,
Expand All @@ -113,9 +103,8 @@ impl ActiveSequence {
tokens: Vec<u32>,
max_output_tokens: usize,
block_size: Option<usize>,
chunk_size: Option<usize>,
) -> (Self, Option<MoveBlock>) {
let mut sequence = Self::new(tokens, max_output_tokens, block_size, chunk_size);
let mut sequence = Self::new(tokens, max_output_tokens, block_size);
let signal = sequence.creation_signal.take();
(sequence, signal)
}
Expand Down Expand Up @@ -237,8 +226,7 @@ mod tests {
fn test_active_sequence_push() {
// Create a sequence with block size 16 initialized with tokens [0..15]
let initial_tokens: Vec<u32> = (0..15).collect();
let (mut seq1, signal1) =
ActiveSequence::new_with_signal(initial_tokens, 100, Some(16), Some(256));
let (mut seq1, signal1) = ActiveSequence::new_with_signal(initial_tokens, 100, Some(16));
assert_eq!(seq1.num_input_tokens(), 15);
assert_eq!(seq1.len(), 15);

Expand Down Expand Up @@ -289,8 +277,7 @@ mod tests {

// Create another sequence with block size 16 initialized with tokens [0..17]
let extended_tokens: Vec<u32> = (0..16).collect();
let (mut seq2, _) =
ActiveSequence::new_with_signal(extended_tokens, 100, Some(16), Some(256));
let (mut seq2, _) = ActiveSequence::new_with_signal(extended_tokens, 100, Some(16));
seq2.push(16);
seq2.pop();
seq2.push(16);
Expand Down Expand Up @@ -363,8 +350,7 @@ mod tests {
fn test_active_sequence_generate_signals() {
// Create a sequence with block size 16, max_output_tokens 4, initialized with tokens [0..14)
let initial_tokens: Vec<u32> = (0..14).collect();
let (mut seq, signal) =
ActiveSequence::new_with_signal(initial_tokens, 5, Some(16), Some(256));
let (mut seq, signal) = ActiveSequence::new_with_signal(initial_tokens, 5, Some(16));

// Initial signal - should have received a Use signal for the partial block
assert!(signal.is_some());
Expand Down