Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
revert: output capture change
  • Loading branch information
dolan-openai committed Sep 5, 2025
commit 24887c0402505d9692b560cc15cd1324d6bb94f4
10 changes: 1 addition & 9 deletions codex-rs/core/src/mcp_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,7 @@ impl McpConnectionManager {
.await
{
Ok(_response) => (server_name, Ok((client, timeout))),
Err(e) => {
let (stdout, stderr) = client.output_snippet().await;
let err = anyhow!(
"initialize failed: {e}\nstdout:\n{}\nstderr:\n{}",
stdout.join("\n"),
stderr.join("\n")
);
(server_name, Err(err))
}
Err(e) => (server_name, Err(e)),
}
}
Err(e) => (server_name, Err(e.into())),
Expand Down
66 changes: 3 additions & 63 deletions codex-rs/mcp-client/src/mcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
//! issue requests and receive strongly-typed results.

use std::collections::HashMap;
use std::collections::VecDeque;
use std::ffi::OsString;
use std::sync::Arc;
use std::sync::atomic::AtomicI64;
Expand Down Expand Up @@ -57,9 +56,6 @@ use tracing::warn;
/// client API and the IO tasks.
const CHANNEL_CAPACITY: usize = 128;

/// How many lines of STDOUT/STDERR output to retain for debugging purposes.
const CAPTURED_OUTPUT_LINES: usize = 20;

/// Internal representation of a pending request sender.
type PendingSender = oneshot::Sender<JSONRPCMessage>;

Expand All @@ -80,12 +76,6 @@ pub struct McpClient {

/// Monotonically increasing counter used to generate request IDs.
id_counter: AtomicI64,

/// Ring buffer of the most recent lines read from the server's STDOUT.
stdout_log: Arc<Mutex<VecDeque<String>>>,

/// Ring buffer of the most recent lines read from the server's STDERR.
stderr_log: Arc<Mutex<VecDeque<String>>>,
}

impl McpClient {
Expand All @@ -103,7 +93,7 @@ impl McpClient {
.envs(create_env_for_mcp_server(env))
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
// As noted in the `kill_on_drop` documentation, the Tokio runtime makes
// a "best effort" to reap-after-exit to avoid zombie processes, but it
// is not a guarantee.
Expand All @@ -118,17 +108,9 @@ impl McpClient {
.stdout
.take()
.ok_or_else(|| std::io::Error::other("failed to capture child stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| std::io::Error::other("failed to capture child stderr"))?;

let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<JSONRPCMessage>(CHANNEL_CAPACITY);
let pending: Arc<Mutex<HashMap<i64, PendingSender>>> = Arc::new(Mutex::new(HashMap::new()));
let stdout_log: Arc<Mutex<VecDeque<String>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(CAPTURED_OUTPUT_LINES)));
let stderr_log: Arc<Mutex<VecDeque<String>>> =
Arc::new(Mutex::new(VecDeque::with_capacity(CAPTURED_OUTPUT_LINES)));

// Spawn writer task. It listens on the `outgoing_rx` channel and
// writes messages to the child's STDIN.
Expand Down Expand Up @@ -159,18 +141,10 @@ impl McpClient {
// STDOUT and dispatches responses to the pending map.
let reader_handle = {
let pending = pending.clone();
let stdout_log = stdout_log.clone();
let mut lines = BufReader::new(stdout).lines();

tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
{
let mut log = stdout_log.lock().await;
if log.len() == CAPTURED_OUTPUT_LINES {
log.pop_front();
}
log.push_back(line.clone());
}
debug!("MCP message from server: {line}");
match serde_json::from_str::<JSONRPCMessage>(&line) {
Ok(JSONRPCMessage::Response(resp)) => {
Expand All @@ -196,37 +170,17 @@ impl McpClient {
})
};

let stderr_handle = {
let stderr_log = stderr_log.clone();
let mut lines = BufReader::new(stderr).lines();

tokio::spawn(async move {
while let Ok(Some(line)) = lines.next_line().await {
{
let mut log = stderr_log.lock().await;
if log.len() == CAPTURED_OUTPUT_LINES {
log.pop_front();
}
log.push_back(line.clone());
}
debug!("MCP stderr from server: {line}");
}
})
};

// We intentionally *detach* the tasks. They will keep running in the
// background as long as their respective resources (channels/stdin/
// stdout/stderr) are alive. Dropping `McpClient` cancels the tasks due to
// stdout) are alive. Dropping `McpClient` cancels the tasks due to
// dropped resources.
let _ = (writer_handle, reader_handle, stderr_handle);
let _ = (writer_handle, reader_handle);

Ok(Self {
child,
outgoing_tx,
pending,
id_counter: AtomicI64::new(1),
stdout_log,
stderr_log,
})
}

Expand Down Expand Up @@ -393,20 +347,6 @@ impl McpClient {
self.send_request::<CallToolRequest>(params, timeout).await
}

/// Return the most recent lines of STDOUT and STDERR produced by the
/// server. The buffers are limited to `CAPTURED_OUTPUT_LINES` lines each.
pub async fn output_snippet(&self) -> (Vec<String>, Vec<String>) {
let stdout = {
let guard = self.stdout_log.lock().await;
guard.iter().cloned().collect()
};
let stderr = {
let guard = self.stderr_log.lock().await;
guard.iter().cloned().collect()
};
(stdout, stderr)
}

/// Internal helper: route a JSON-RPC *response* object to the pending map.
async fn dispatch_response(
resp: JSONRPCResponse,
Expand Down