Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
enable unboundedsend
  • Loading branch information
ziqifan617 committed Aug 6, 2025
commit b5fab2396846b54da22e5b686a5dcbb08fb8e665
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub struct CreateSlotOutput {

#[derive(Debug)]
pub struct KvConnectorLeaderRecorder {
recorder_tx: mpsc::Sender<Action>,
_recorder: Recorder<Action>, // Keep recorder alive
unbounded_tx: mpsc::UnboundedSender<Action>,
connector_leader: Box<dyn Leader>,
}

Expand Down Expand Up @@ -110,8 +111,7 @@ impl KvConnectorLeaderRecorder {
// Create recorder synchronously using pyo3 async runtime
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let recorder = runtime.block_on(async {
// TODO: using max 10 events for testing - quick shutdown of the recorder to flush to file
Recorder::new(token, &output_path, None, Some(10), None).await
Recorder::new(token, &output_path, None, None, None).await
}).unwrap();

let connector_leader = KvConnectorLeader {
Expand All @@ -122,11 +122,28 @@ impl KvConnectorLeaderRecorder {
iteration_counter: 0,
};

let (unbounded_tx, unbounded_rx) = mpsc::unbounded_channel();
let recorder_tx = recorder.event_sender();

let _ = runtime.spawn(Self::forward_unbounded_to_sender(unbounded_rx, recorder_tx));

Self {
recorder_tx: recorder.event_sender(),
_recorder: recorder,
unbounded_tx,
connector_leader: Box::new(connector_leader),
}
}

async fn forward_unbounded_to_sender<T: Send + 'static>(
mut unbounded_rx: mpsc::UnboundedReceiver<T>,
bounded_tx: mpsc::Sender<T>,
) {
while let Some(msg) = unbounded_rx.recv().await {
if bounded_tx.send(msg).await.is_err() {
tracing::error!("Failed to send message to bounded channel");
}
}
}
}

impl Leader for KvConnectorLeaderRecorder {
Expand All @@ -149,15 +166,10 @@ impl Leader for KvConnectorLeaderRecorder {
};
let output = self.connector_leader.get_num_new_matched_tokens(request_id, request_num_tokens, num_computed_tokens);
let output_copy = output.as_ref().unwrap().clone();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
.send(Action::GetNumNewMatchedTokens(input_copy, GetNumNewMatchedTokensOutput {
let _ = self.unbounded_tx.send(Action::GetNumNewMatchedTokens(input_copy, GetNumNewMatchedTokensOutput {
num_new_matched_tokens: output_copy.0,
has_matched: output_copy.1,
}))
.await
});
}));
output
}

Expand All @@ -178,12 +190,7 @@ impl Leader for KvConnectorLeaderRecorder {
num_external_tokens: num_external_tokens.clone(),
};
let _ = self.connector_leader.update_state_after_alloc(request_id, block_ids, num_external_tokens).unwrap();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
.send(Action::UpdateStateAfterAlloc(input_copy, UpdateStateAfterAllocOutput {}))
.await
});
let _ = self.unbounded_tx.send(Action::UpdateStateAfterAlloc(input_copy, UpdateStateAfterAllocOutput {}));
Ok(())
}

Expand All @@ -196,14 +203,10 @@ impl Leader for KvConnectorLeaderRecorder {
};
let output = self.connector_leader.build_connector_metadata(scheduler_output);
let output_copy = output.as_ref().unwrap().clone();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
let _ = self.unbounded_tx
.send(Action::BuildConnectorMeta(input_copy, BuildConnectorMetaOutput {
metadata: output_copy,
}))
.await
});
}));
output
}

Expand All @@ -214,14 +217,10 @@ impl Leader for KvConnectorLeaderRecorder {
};
let output = self.connector_leader.request_finished(request_id, block_ids);
let output_copy = output.as_ref().unwrap().clone();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
let _ = self.unbounded_tx
.send(Action::RequestFinished(input_copy, RequestFinishedOutput {
is_finished: output_copy,
}))
.await
});
}));
output
}

Expand All @@ -231,14 +230,10 @@ impl Leader for KvConnectorLeaderRecorder {
};
let output = self.connector_leader.has_slot(request_id);
let output_copy = output.clone();
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
let _ = self.unbounded_tx
.send(Action::HasSlot(input_copy, HasSlotOutput {
result: output_copy,
}))
.await
});
}));
output
}

Expand All @@ -250,12 +245,7 @@ impl Leader for KvConnectorLeaderRecorder {
tokens: tokens.clone(),
};
let _ = self.connector_leader.create_slot(request, tokens);
let runtime = pyo3_async_runtimes::tokio::get_runtime();
let _ = runtime.block_on(async move {
self.recorder_tx
.send(Action::CreateSlot(input_copy, CreateSlotOutput {}))
.await
});
let _ = self.unbounded_tx.send(Action::CreateSlot(input_copy, CreateSlotOutput {}));
Ok(())
}
}
8 changes: 8 additions & 0 deletions lib/llm/src/recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ where
}

/// A generic recorder for events that streams directly to a JSONL file
#[derive(Debug)]
pub struct Recorder<T> {
/// A sender for events that can be cloned and shared with producers
event_tx: mpsc::Sender<T>,
Expand Down Expand Up @@ -386,6 +387,13 @@ where
}
}

impl<T> Drop for Recorder<T> {
fn drop(&mut self) {
tracing::info!("Dropping Recorder");
self.cancel.cancel();
}
}

/// Helper function to create a rotated file path with an index suffix
fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
let path_str = base_path.to_string_lossy();
Expand Down
Loading