[2/4] Implement executor HTTP request runner#18582
Conversation
|
This change is part of the following stack: Change managed by git-spice. |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1a1b28d67b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let body = response.bytes().await.map_err(|err| { | ||
| internal_error(format!("failed to read http/request response body: {err}")) | ||
| })?; |
There was a problem hiding this comment.
Enforce a max size when buffering HTTP response bodies
response.bytes().await reads the entire payload into memory with no limit before returning bodyBase64. A remote endpoint can return an arbitrarily large body and force the exec-server process into excessive memory use or OOM, making this an availability bug. Add a bounded read (or require streaming) for buffered mode.
Useful? React with 👍 / 👎.
d0f4fc9 to
1fc4cd7
Compare
1a1b28d to
afcf571
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: afcf57105c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let timeout = params | ||
| .timeout_ms | ||
| .map(Duration::from_millis) | ||
| .unwrap_or(DEFAULT_HTTP_REQUEST_TIMEOUT); | ||
| let headers = build_headers(params.headers)?; | ||
| let client = reqwest::Client::builder() | ||
| .timeout(timeout) | ||
| .build() |
There was a problem hiding this comment.
Disable total timeout for streamed HTTP responses
Client::builder().timeout(timeout) sets a whole-request deadline, including reading the entire body. In streamResponse mode this truncates long-lived/SSE responses at the default 30s when timeoutMs is omitted, emitting a terminal error even if upstream is healthy. That breaks the intended streaming behavior for remote MCP HTTP flows.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: afcf57105c
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| response: reqwest::Response, | ||
| notifications: RpcNotificationSender, | ||
| ) { | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
Cancel body-stream task when RPC connection closes
spawn_body_stream detaches a task that owns RpcNotificationSender. If the JSON-RPC transport disconnects, that sender keeps the outbound channel alive until the HTTP body stream ends. Because timeout_ms is caller-controlled, a peer can request a very long timeout (or a never-ending stream), causing run_connection teardown to hang and leak per-connection tasks/resources.
Useful? React with 👍 / 👎.
1fc4cd7 to
5b3829a
Compare
afcf571 to
81ff713
Compare
5b3829a to
53977b8
Compare
81ff713 to
d6a38e3
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d6a38e3191
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // The JSON-RPC response carries status and headers; the body follows as | ||
| // ordered notifications so callers can begin parsing streaming content | ||
| // without waiting for the server to close the HTTP response. | ||
| spawn_body_stream(request_id, response, notifications); |
There was a problem hiding this comment.
Send http/request response before bodyDelta notifications
run_http_request starts spawn_body_stream before the handler enqueues the JSON-RPC response. The spawned task can push http/request/bodyDelta messages first, so headers/status may be delayed behind body frames. With bounded channels, large/fast streams can fill notification queues and stall http_request_stream() while it is still waiting for the response.
Useful? React with 👍 / 👎.
53977b8 to
8edc173
Compare
d6a38e3 to
0a24aa2
Compare
8edc173 to
1a9358f
Compare
0a24aa2 to
4690cae
Compare
1a9358f to
3018783
Compare
4690cae to
b85c91f
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b85c91f52b
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| let request_id = if params.stream_response { | ||
| Some(params.request_id.clone().ok_or_else(|| { | ||
| invalid_params("http/request streamResponse requires requestId".to_string()) | ||
| })?) | ||
| } else { |
There was a problem hiding this comment.
Reject duplicate stream request IDs before spawning streams
Validate requestId uniqueness for active streamed requests. run_http_request accepts any caller-supplied requestId, and each spawn_body_stream starts seq at 1. If two in-flight streamed calls reuse the same requestId, their http/request/bodyDelta frames can interleave under one key with colliding sequence numbers, violating ordered-stream semantics and causing consumers to corrupt or reject the body.
Useful? React with 👍 / 👎.
3018783 to
c3159bc
Compare
b85c91f to
f14bc16
Compare
c3159bc to
888b4d7
Compare
f14bc16 to
dbee178
Compare
888b4d7 to
f33c92e
Compare
dbee178 to
5baf804
Compare
f33c92e to
b003868
Compare
5baf804 to
941b42e
Compare
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
| params: HttpRequestParams, | ||
| ) -> Result<HttpRequestResponse, JSONRPCErrorError> { | ||
| self.require_initialized_for("http")?; | ||
| let request_id = if params.stream_response { |
There was a problem hiding this comment.
lets just make request_id required always?
| Ok((response, pending_stream)) => { | ||
| if let Some(pending_stream) = pending_stream { | ||
| let mut http_body_streams = self.http_body_streams.lock().await; | ||
| http_body_streams.pending.push_back(pending_stream); |
There was a problem hiding this comment.
can we just drive the body stream with a tokio task here? why do we enqueue the stream and drive them all separately?
There was a problem hiding this comment.
if it's to ensure we clean-up successfully, maybe use a JoinSet + CancellationToken
| let status = response.status().as_u16(); | ||
| let headers = response_headers(response.headers()); | ||
|
|
||
| if let Some(request_id) = request_id { |
There was a problem hiding this comment.
same as above - I think making request_id required simplifies the code a bit and is reasonable to always have
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: fa029b87db
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| async fn reserve_http_body_stream(&self, request_id: &str) -> Result<(), JSONRPCErrorError> { | ||
| let mut body_streams = self.body_streams.lock().await; | ||
| if body_streams.reserved_ids.contains_key(request_id) { | ||
| return Err(invalid_params(format!( | ||
| "http/request streamResponse requestId `{request_id}` is already active" | ||
| ))); | ||
| } | ||
| body_streams | ||
| .reserved_ids | ||
| .insert(request_id.to_string(), None); | ||
| Ok(()) |
There was a problem hiding this comment.
Cap concurrent streamed HTTP requests per connection
reserve_http_body_stream only rejects duplicate requestIds and accepts every unique id, and each accepted stream later gets its own spawned body task. With timeoutMs allowed to be null, a client can open many long-lived streams and keep them active indefinitely, leading to unbounded task/socket growth and potential executor DoS. Enforce a per-connection active-stream limit.
Useful? React with 👍 / 👎.
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
Co-authored-by: Codex <noreply@openai.com>
| session_registry: Arc<SessionRegistry>, | ||
| notifications: RpcNotificationSender, | ||
| session: StdMutex<Option<SessionHandle>>, | ||
| body_streams: Mutex<HashMap<String, Option<JoinHandle<()>>>>, |
There was a problem hiding this comment.
from quick read of the rust docs, if we can use a tokio_util::task::TaskTracker and don't need the key here, that might generalize so we can just use this for any background coros.
so e.g. background_tasks: tokio_util::task::TaskTracker<...>
then you can add the background body tailing tasks there and don't need the explicit cleanup in the request handler.
if it's not easy to adapt/use the current PR is fine tho
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: cb86b7a6bc
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| None => reqwest::Client::builder(), | ||
| Some(timeout_ms) => { | ||
| reqwest::Client::builder().timeout(Duration::from_millis(timeout_ms)) | ||
| } | ||
| } | ||
| .build() | ||
| .map_err(|err| internal_error(format!("failed to build http/request client: {err}")))?; |
There was a problem hiding this comment.
Build HTTP runner client with shared custom-CA policy
ExecutorHttpRequestRunner::new builds a raw reqwest::Client, which bypasses Codex’s shared CA policy (CODEX_CA_CERTIFICATE / SSL_CERT_FILE). As a result, http/request can fail TLS to enterprise/intercepting proxies even when other Codex HTTP paths succeed. This outbound path should use the shared reqwest-client construction helper instead of direct Client::builder().build().
Useful? React with 👍 / 👎.
| tokio::select! { | ||
| _ = shutdown.cancelled() => {} | ||
| _ = ExecutorHttpRequestRunner::stream_body(pending_stream, notifications) => {} | ||
| } |
There was a problem hiding this comment.
Cancel HTTP body streams immediately on session eviction
Stream tasks only terminate on background_task_shutdown in shutdown(). But eviction (is_session_attached == false) is checked only after incoming_rx.recv() returns, so an otherwise idle old connection may never reach shutdown. Existing http/request stream tasks can keep emitting bodyDelta notifications on the evicted socket after session resume.
Useful? React with 👍 / 👎.
Why
Remote streamable HTTP MCP needs the executor to perform ordinary HTTP requests on the executor side. This keeps network placement aligned with
experimental_environment = "remote"without adding MCP-specific executor APIs.What
http/requestrunner backed byreqwest.http/request/bodyDeltanotifications for streaming responses.Stack
Verification
just fmtcargo check -p codex-exec-server -p codex-rmcp-client --testscargo check -p codex-core --test allcompile-onlygit diff --checkfull-cibranch, including the remote Rust test job.Co-authored-by: Codex noreply@openai.com