Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
74bc13f
use BTreeSet, and allow for push_front (preemption)
PeaBrane May 27, 2025
f2343d5
preemption is push_front
PeaBrane May 27, 2025
6fe3154
use Hongkuan's quadratic formulas for decode and prefill
PeaBrane May 27, 2025
cccebad
cleaner scheduling + generation separation, and waterline bug fix
PeaBrane May 28, 2025
793d1d1
Merge branch 'main' into rupei/mocker-v0
PeaBrane May 28, 2025
394c2bf
restore printing out fwd pass metrics in test
PeaBrane May 30, 2025
f5ab2e1
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jun 11, 2025
dad183f
multi-dp mocker engine
PeaBrane Jun 11, 2025
009ec78
fixed prefill cost, and more conservative watermarking
PeaBrane Jun 12, 2025
ee11427
fwd pass metrics
PeaBrane Jun 12, 2025
8e8d0b4
can emit kv event, not tested
PeaBrane Jun 13, 2025
e96f810
move block resp test in kv manager
PeaBrane Jun 13, 2025
c09f007
basic test passes for both load metrics and kv events
PeaBrane Jun 14, 2025
4502e5e
better tracing
PeaBrane Jun 14, 2025
fe20aa3
async engine core
PeaBrane Jun 16, 2025
2fbf998
hook up with dynamo run
PeaBrane Jun 17, 2025
b548050
docs
PeaBrane Jun 17, 2025
c7c4be5
fmt
PeaBrane Jun 17, 2025
1845a8d
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 17, 2025
3ad7780
refactor
PeaBrane Jun 17, 2025
c78bef2
works with kv router
PeaBrane Jun 17, 2025
a206569
actually load extra mocker args in guide
PeaBrane Jun 17, 2025
d3730ff
free blocks if failed to send (receiver dropped)
PeaBrane Jun 23, 2025
68d822a
do not regenereate tokens after pre-emption
PeaBrane Jun 23, 2025
d69edcf
evictor cleanup
PeaBrane Jun 30, 2025
c08f9ea
only need runtime in dynamic arms
PeaBrane Jun 30, 2025
dee1413
no separate extra-mocker-args
PeaBrane Jun 30, 2025
082bcec
Merge branch 'main' into rupei/mocker-v0
PeaBrane Jun 30, 2025
99fd3f2
update to match batched tokens
PeaBrane Jun 30, 2025
85c7ccf
max-num-seqs
PeaBrane Jun 30, 2025
ec1f360
enable_prefix_caching arg
PeaBrane Jun 30, 2025
94abc0d
only publish kv events if enable_prefix_caching set true
PeaBrane Jun 30, 2025
35da284
small note on chunked prefill being false for now
PeaBrane Jun 30, 2025
c7c072d
revert flags
PeaBrane Jul 1, 2025
de54247
revert dynamo-run changes
PeaBrane Jul 1, 2025
81c12aa
tiny reversion
PeaBrane Jul 1, 2025
b959df4
another reversion
PeaBrane Jul 1, 2025
f07e28d
Merge remote-tracking branch 'origin/main' into rupei/mocker-v0
PeaBrane Jul 1, 2025
b15070a
usize reversion
PeaBrane Jul 1, 2025
3a20b9d
clippy
PeaBrane Jul 1, 2025
c747606
more clippy
PeaBrane Jul 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 103 additions & 101 deletions lib/llm/src/mocker/evictor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,100 +13,133 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Eq;
use std::collections::{HashMap, VecDeque};
use std::cmp::{Eq, Ordering};
use std::collections::{BTreeSet, HashMap};
use std::hash::Hash;
use std::time::Instant;

/// A wrapper for (T, counter) that implements Ord based only on counter
#[derive(Debug, Clone, Eq, PartialEq)]
struct PriorityItem<T> {
item: T,
counter: i64,
}

impl<T: Eq> Ord for PriorityItem<T> {
fn cmp(&self, other: &Self) -> Ordering {
self.counter.cmp(&other.counter)
}
}

impl<T: Eq> PartialOrd for PriorityItem<T> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

/// An LRU evictor that maintains objects and evicts them based on their
/// last accessed time. Implements a "lazy" eviction mechanism where:
/// 1. The priority queue does not immediately reflect updates or removes
/// 2. Objects are pushed to the queue in order of increasing priority (older objects first)
/// 3. The user must ensure objects are added in correct priority (temporal order)
/// 4. Remove and update operations are lazy - entries remain in the queue until
/// they are either evicted or cleaned up during maintenance
/// priority counter. Lower counter values are evicted first.
#[derive(Debug)]
pub struct LRUEvictor<T: Clone + Eq + Hash> {
free_table: HashMap<T, f64>,
priority_queue: VecDeque<(T, f64)>,
cleanup_threshold: usize,
start_time: Instant,
free_table: HashMap<T, i64>,
priority_queue: BTreeSet<PriorityItem<T>>,
positive_counter: i64,
negative_counter: i64,
}

impl<T: Clone + Eq + Hash> Default for LRUEvictor<T> {
fn default() -> Self {
Self {
free_table: HashMap::new(),
priority_queue: VecDeque::new(),
cleanup_threshold: 50,
start_time: Instant::now(),
priority_queue: BTreeSet::new(),
positive_counter: 0,
negative_counter: 0,
}
}
}

impl<T: Clone + Eq + Hash> LRUEvictor<T> {
/// Create a new LRUEvictor with the default cleanup threshold
pub fn new(cleanup_threshold: usize) -> Self {
Self {
cleanup_threshold,
..Default::default()
}
}

/// Get the current timestamp as seconds since initialization
pub fn current_timestamp(&self) -> f64 {
self.start_time.elapsed().as_secs_f64()
/// Create a new LRUEvictor
pub fn new(_cleanup_threshold: usize) -> Self {
// Keep the parameter for API compatibility, but ignore it
Self::default()
}

/// Get an iterator over the keys in the evictor
pub fn keys(&self) -> std::collections::hash_map::Keys<'_, T, f64> {
pub fn keys(&self) -> std::collections::hash_map::Keys<'_, T, i64> {
self.free_table.keys()
}

/// Insert or update an object in the evictor with current timestamp
/// Private helper method to update the data structures with object and counter
fn _update(&mut self, object: T, counter: i64) {
self.free_table.insert(object.clone(), counter);
self.priority_queue.insert(PriorityItem {
item: object,
counter,
});
}

/// Insert or update an object in the evictor with positive counter
pub fn insert(&mut self, object: T) {
let timestamp = self.current_timestamp();
self._insert(object, timestamp);
// Remove old entry if it exists
if let Some(&old_counter) = self.free_table.get(&object) {
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter: old_counter,
});
}

// Increment positive counter and insert
self.positive_counter += 1;
let counter = self.positive_counter;

self._update(object, counter);
}

/// Push an object to the front with negative counter (highest priority for eviction)
pub fn push_front(&mut self, object: T) {
// Remove old entry if it exists
if let Some(&old_counter) = self.free_table.get(&object) {
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter: old_counter,
});
}

// Decrement negative counter and insert
self.negative_counter -= 1;
let counter = self.negative_counter;

self._update(object, counter);
}

/// Check if the evictor contains the given object
pub fn contains(&self, object: &T) -> bool {
self.free_table.contains_key(object)
}

/// Evict an object based on LRU policy
/// Evict an object based on LRU policy (lowest counter value)
/// Returns the evicted object or None if no objects are available
pub fn evict(&mut self) -> Option<T> {
if self.free_table.is_empty() {
return None;
}

while let Some((object, last_accessed)) = self.priority_queue.pop_front() {
let Some(&current_last_accessed) = self.free_table.get(&object) else {
continue; // entry is already removed
};

if current_last_accessed == last_accessed {
self.free_table.remove(&object);
return Some(object);
} // otherwise entry is stale
if let Some(item) = self.priority_queue.pop_first() {
self.free_table.remove(&item.item);
Some(item.item)
} else {
None
}

None
}

/// Insert or update an object in the evictor
fn _insert(&mut self, object: T, last_accessed: f64) {
self.free_table.insert(object.clone(), last_accessed);
self.priority_queue.push_back((object, last_accessed));
self.cleanup_if_necessary();
}

/// Remove an object from the evictor
/// We don't remove from the priority queue immediately, as that would be inefficient
/// Outdated entries will be filtered out during eviction or cleanup
pub fn remove(&mut self, object: &T) -> bool {
self.free_table.remove(object).is_some()
if let Some(&counter) = self.free_table.get(object) {
self.free_table.remove(object);
self.priority_queue.remove(&PriorityItem {
item: object.clone(),
counter,
});
true
} else {
false
}
}

/// Get the number of objects in the evictor
Expand All @@ -118,74 +151,43 @@ impl<T: Clone + Eq + Hash> LRUEvictor<T> {
pub fn is_empty(&self) -> bool {
self.free_table.is_empty()
}

/// Check if cleanup is necessary and perform it if needed
fn cleanup_if_necessary(&mut self) {
if self.priority_queue.len() > self.cleanup_threshold * self.free_table.len() {
self.cleanup();
}
}

/// Clean up the priority queue by removing outdated entries
fn cleanup(&mut self) {
let mut new_priority_queue = VecDeque::new();
for (object, timestamp) in self.priority_queue.drain(..) {
let Some(&current_timestamp) = self.free_table.get(&object) else {
continue;
};

if current_timestamp == timestamp {
new_priority_queue.push_back((object, timestamp));
}
}
self.priority_queue = new_priority_queue;
}
}

#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;

#[rstest]
#[case(1)]
#[case(2)]
#[case(3)]
fn test_lru_evictor_eviction_order(#[case] threshold: usize) {
// Create a new LRUEvictor with the given cleanup threshold
let mut evictor = LRUEvictor::<i32>::new(threshold);
#[test]
fn test_lru_evictor_eviction_order() {
// Create a new LRUEvictor
let mut evictor = LRUEvictor::<i32>::new(1); // threshold value doesn't matter anymore

// Add items in the specified order with small delays between each
// Add items in the specified order
evictor.insert(4);
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(3);
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(2);
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(1);
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(5);
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(1); // Updates timestamp for 1
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(4); // Updates timestamp for 4
std::thread::sleep(std::time::Duration::from_millis(1));
evictor.insert(2); // Updates timestamp for 2
evictor.insert(1); // Updates counter for 1
evictor.insert(4); // Updates counter for 4
evictor.insert(2); // Updates counter for 2
evictor.push_front(4);

// Verify the eviction order
println!("Testing with threshold {}", threshold);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 4);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 3);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 5);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 1);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 4);
let evicted = evictor.evict().unwrap();
assert_eq!(evicted, 2);
let evicted = evictor.evict();
assert_eq!(evicted, None);
assert_eq!(evictor.len(), 0);
}

// ... existing test_push_front test ...
}
59 changes: 14 additions & 45 deletions lib/llm/src/mocker/kv_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl KvManager {
pub fn probe_new_blocks(&self, blocks: &[UniqueBlock]) -> usize {
blocks
.iter()
.filter(|&block| !self.all_blocks.contains(block))
.filter(|&block| !self.active_blocks.contains_key(block))
.count()
}

Expand All @@ -200,6 +200,11 @@ impl KvManager {
self.active_blocks.len()
}

/// Get the percentage of active blocks relative to maximum capacity
pub fn get_active_perc(&self) -> f64 {
self.active_blocks.len() as f64 / self.max_capacity as f64
}

/// Get the number of inactive blocks
pub fn num_inactive_blocks(&self) -> usize {
self.inactive_blocks.len()
Expand All @@ -216,57 +221,21 @@ impl KvManager {
}

/// Check if a sequence can be scheduled and calculate cost if possible
pub fn try_schedule(
&self,
sequence: &ActiveSequence,
watermark: f64,
tokens_budget: usize,
) -> Option<PrefillCost> {
// Return None immediately if tokens_budget is 0
if tokens_budget == 0 {
return None;
}

// Get unique blocks from the sequence
let unique_blocks = sequence.unique_blocks();

// Get the count of new blocks
let new_blocks = self.probe_new_blocks(unique_blocks);

// Calculate current usage and available capacity
let active_count = self.active_blocks.len();

// Check if we can schedule based on the watermark
if (active_count + new_blocks) as f64 > (1.0 - watermark) * self.max_capacity as f64 {
return None;
}

// Calculate overlap blocks
let overlap_blocks = unique_blocks.len() - new_blocks;

// Calculate new tokens
pub fn get_prefill_cost(&self, sequence: &ActiveSequence) -> PrefillCost {
let seq_blocks = sequence.unique_blocks();
let new_blocks = self.probe_new_blocks(seq_blocks);
let overlap_blocks = seq_blocks.len() - new_blocks;
let new_tokens = sequence.num_input_tokens() - overlap_blocks * self.block_size;

// // Print the full equation with actual values substituted
// println!("{} = {} - ({} * {}) (new_tokens = num_input_tokens - overlap_blocks * block_size)",
// new_tokens,
// sequence.num_input_tokens(),
// overlap_blocks,
// self.block_size);

// Return None if new_tokens exceeds tokens_budget
if new_tokens > tokens_budget {
return None;
}

// Calculate prefill compute
let prefill_compute =
new_tokens as f64 * (new_tokens + overlap_blocks * self.block_size) as f64;
1.25e-6 * (new_tokens as f64).powi(2) + 7.41e-2 * (new_tokens as f64) + 2.62e1;

Some(PrefillCost {
PrefillCost {
new_blocks,
new_tokens,
prefill_compute,
})
}
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/llm/src/mocker/protocols.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub struct DirectRequest {
/// Represents the cost of prefilling content in the cache
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PrefillCost {
pub new_blocks: usize,
pub new_tokens: usize,
pub prefill_compute: f64,
}
Expand Down
Loading