Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions codex-rs/core/src/tools/handlers/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ impl ToolExecutor<ToolInvocation> for McpHandler {
}

fn supports_parallel_tool_calls(&self) -> bool {
// Correctly implemented MCP servers should tolerate parallel calls to
// tools that advertise themselves as read-only.
self.tool_info.supports_parallel_tool_calls
|| self
.tool_info
.tool
.annotations
.as_ref()
.and_then(|annotations| annotations.read_only_hint)
.unwrap_or(false)
}

async fn handle(
Expand Down Expand Up @@ -443,6 +452,44 @@ mod tests {
assert_eq!(mcp_hook_tool_input(" "), json!({}));
}

#[test]
fn mcp_read_only_hint_supports_parallel_calls_without_server_opt_in() {
let mut read_only_info = tool_info("foo", "mcp__foo__", "read");
read_only_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(true));

assert!(
McpHandler::new(read_only_info)
.expect("MCP tool spec should build")
.supports_parallel_tool_calls()
);
}

#[test]
fn mcp_parallel_calls_require_read_only_hint_or_server_opt_in() {
let missing_hint_info = tool_info("foo", "mcp__foo__", "unannotated");
assert!(
!McpHandler::new(missing_hint_info)
.expect("MCP tool spec should build")
.supports_parallel_tool_calls()
);

let mut writable_info = tool_info("foo", "mcp__foo__", "write");
writable_info.tool.annotations = Some(rmcp::model::ToolAnnotations::new().read_only(false));
assert!(
!McpHandler::new(writable_info)
.expect("MCP tool spec should build")
.supports_parallel_tool_calls()
);

let mut server_opt_in_info = tool_info("foo", "mcp__foo__", "server_opt_in");
server_opt_in_info.supports_parallel_tool_calls = true;
assert!(
McpHandler::new(server_opt_in_info)
.expect("MCP tool spec should build")
.supports_parallel_tool_calls()
);
}

fn tool_info(server_name: &str, callable_namespace: &str, tool_name: &str) -> ToolInfo {
ToolInfo {
server_name: server_name.to_string(),
Expand Down
126 changes: 123 additions & 3 deletions codex-rs/core/tests/suite/rmcp_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,28 @@ fn read_only_user_turn_with_model(
fixture: &TestCodex,
text: impl Into<String>,
model: String,
) -> Op {
user_turn_with_permission_profile(fixture, text, model, PermissionProfile::read_only())
}

fn auto_approved_user_turn(fixture: &TestCodex, text: impl Into<String>) -> Op {
user_turn_with_permission_profile(
fixture,
text,
fixture.session_configured.model.clone(),
PermissionProfile::Disabled,
)
}

fn user_turn_with_permission_profile(
fixture: &TestCodex,
text: impl Into<String>,
model: String,
permission_profile: PermissionProfile,
) -> Op {
let cwd = fixture.cwd.path().to_path_buf();
let (sandbox_policy, permission_profile) =
turn_permission_fields(PermissionProfile::read_only(), cwd.as_path());
turn_permission_fields(permission_profile, cwd.as_path());
Op::UserInput {
items: vec![UserInput::Text {
text: text.into(),
Expand Down Expand Up @@ -840,7 +858,10 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
.await?;
fixture
.codex
.submit(read_only_user_turn(
// Keep this baseline on the mutable sync tool so read-only hints do not
// make the call parallel-safe. Bypass read-only turn permissions so
// approval behavior does not block the scheduling assertion.
.submit(auto_approved_user_turn(
&fixture,
"call the rmcp sync tool twice",
))
Expand Down Expand Up @@ -899,6 +920,102 @@ async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stdio_mcp_read_only_tool_calls_run_concurrently_without_server_opt_in()
Comment thread
pakrym-oai marked this conversation as resolved.
-> anyhow::Result<()> {
skip_if_no_network!(Ok(()));

let server = responses::start_mock_server().await;

let first_call_id = "sync-read-only-1";
let second_call_id = "sync-read-only-2";
let server_name = "rmcp";
let namespace = format!("mcp__{server_name}__");
// The stdio MCP test server holds each sync call at this barrier until both
// calls arrive. A serial scheduler times out inside the server instead of
// returning the structured `{ "result": "ok" }` result asserted below.
let args = json!({
"sleep_after_ms": 100,
"barrier": {
"id": "stdio-mcp-read-only-tool-calls",
"participants": 2,
"timeout_ms": 1_000
}
})
.to_string();

mount_sse_once(
&server,
responses::sse(vec![
responses::ev_response_created("resp-1"),
responses::ev_function_call_with_namespace(
first_call_id,
&namespace,
"sync_readonly",
&args,
),
responses::ev_function_call_with_namespace(
second_call_id,
&namespace,
"sync_readonly",
&args,
),
responses::ev_completed("resp-1"),
]),
)
.await;
let final_mock = mount_sse_once(
&server,
responses::sse(vec![
responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."),
responses::ev_completed("resp-2"),
]),
)
.await;

let rmcp_test_server_bin = remote_aware_stdio_server_bin()?;

let fixture = test_codex()
.with_config(move |config| {
insert_mcp_server(
config,
server_name,
stdio_transport(rmcp_test_server_bin, /*env*/ None, Vec::new()),
TestMcpServerOptions {
environment_id: remote_aware_environment_id(),
tool_timeout_sec: Some(Duration::from_secs(2)),
..Default::default()
},
);
})
.build_with_remote_env(&server)
.await?;
fixture
.codex
.submit(read_only_user_turn(
&fixture,
"call the rmcp sync_readonly tool twice",
))
.await?;

wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await;

let request = final_mock.single_request();
for call_id in [first_call_id, second_call_id] {
let output_text = request
.function_call_output_text(call_id)
.expect("function_call_output present for rmcp sync call");
let wrapped_payload = split_wall_time_wrapped_output(&output_text);
let output_json: Value = serde_json::from_str(wrapped_payload)
.expect("wrapped MCP output should preserve structured JSON");
assert_eq!(output_json, json!({ "result": "ok" }));
}

server.verify().await;

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> {
skip_if_no_network!(Ok(()));
Expand Down Expand Up @@ -957,7 +1074,10 @@ async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Res
.await?;
fixture
.codex
.submit(read_only_user_turn(
// Exercise the server opt-in with the mutable sync tool rather than the
// read-only sync_readonly tool. Bypass read-only turn permissions so
// approval behavior does not block the scheduling assertion.
.submit(auto_approved_user_turn(
Comment thread
anp-oai marked this conversation as resolved.
&fixture,
"call the rmcp sync tool twice",
))
Expand Down
11 changes: 11 additions & 0 deletions codex-rs/rmcp-client/src/bin/test_stdio_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ impl TestToolServer {
Self::echo_dash_tool(),
Self::cwd_tool(),
Self::sync_tool(),
Self::sync_readonly_tool(),
Self::image_tool(),
Self::image_scenario_tool(),
sandbox_meta_tool,
Expand Down Expand Up @@ -205,6 +206,12 @@ impl TestToolServer {
}))
.expect("sync tool output schema should deserialize");
tool.output_schema = Some(Arc::new(output_schema));
tool
}

fn sync_readonly_tool() -> Tool {
let mut tool = Self::sync_tool();
tool.name = Cow::Borrowed("sync_readonly");
tool.annotations = Some(ToolAnnotations::new().read_only(true));
tool
}
Expand Down Expand Up @@ -551,6 +558,10 @@ impl ServerHandler for TestToolServer {
let args = Self::parse_call_args::<SyncArgs>(&request, "sync")?;
Self::sync_result(args).await
}
"sync_readonly" => {
let args = Self::parse_call_args::<SyncArgs>(&request, "sync_readonly")?;
Self::sync_result(args).await
}
other => Err(McpError::invalid_params(
format!("unknown tool: {other}"),
None,
Expand Down
Loading