Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions codex-rs/analytics/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,22 @@ impl AnalyticsEventsClient {
)));
}

pub fn track_request(&self, connection_id: u64, request_id: RequestId, request: ClientRequest) {
pub fn track_request(
&self,
connection_id: u64,
request_id: RequestId,
request: &ClientRequest,
) {
if !matches!(
request,
ClientRequest::TurnStart { .. } | ClientRequest::TurnSteer { .. }
) {
return;
}
self.record_fact(AnalyticsFact::ClientRequest {
connection_id,
request_id,
request: Box::new(request),
request: Box::new(request.clone()),
});
}

Expand Down Expand Up @@ -324,6 +335,10 @@ impl AnalyticsEventsClient {
}
}

#[cfg(test)]
#[path = "client_tests.rs"]
mod tests;

async fn send_track_events(
auth_manager: &AuthManager,
base_url: &str,
Expand Down
79 changes: 79 additions & 0 deletions codex-rs/analytics/src/client_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use super::AnalyticsEventsClient;
use super::AnalyticsEventsQueue;
use crate::facts::AnalyticsFact;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::RequestId;
use codex_app_server_protocol::ThreadArchiveParams;
use codex_app_server_protocol::TurnStartParams;
use codex_app_server_protocol::TurnSteerParams;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::Mutex;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TryRecvError;

fn client_with_receiver() -> (AnalyticsEventsClient, mpsc::Receiver<AnalyticsFact>) {
let (sender, receiver) = mpsc::channel(4);
let queue = AnalyticsEventsQueue {
sender,
app_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
plugin_used_emitted_keys: Arc::new(Mutex::new(HashSet::new())),
};
(AnalyticsEventsClient { queue: Some(queue) }, receiver)
}

fn sample_turn_start_request() -> ClientRequest {
ClientRequest::TurnStart {
request_id: RequestId::Integer(1),
params: TurnStartParams {
thread_id: "thread-1".to_string(),
input: Vec::new(),
..Default::default()
},
}
}

fn sample_turn_steer_request() -> ClientRequest {
ClientRequest::TurnSteer {
request_id: RequestId::Integer(2),
params: TurnSteerParams {
thread_id: "thread-1".to_string(),
expected_turn_id: "turn-1".to_string(),
input: Vec::new(),
responsesapi_client_metadata: None,
},
}
}

fn sample_thread_archive_request() -> ClientRequest {
ClientRequest::ThreadArchive {
request_id: RequestId::Integer(3),
params: ThreadArchiveParams {
thread_id: "thread-1".to_string(),
},
}
}

#[test]
fn track_request_only_enqueues_analytics_relevant_requests() {
let (client, mut receiver) = client_with_receiver();

for (request_id, request) in [
(RequestId::Integer(1), sample_turn_start_request()),
(RequestId::Integer(2), sample_turn_steer_request()),
] {
client.track_request(/*connection_id*/ 7, request_id, &request);
assert!(matches!(
receiver.try_recv(),
Ok(AnalyticsFact::ClientRequest { .. })
));
}

let ignored_request = sample_thread_archive_request();
client.track_request(
/*connection_id*/ 7,
RequestId::Integer(3),
&ignored_request,
);
assert!(matches!(receiver.try_recv(), Err(TryRecvError::Empty)));
}
110 changes: 110 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ macro_rules! client_request_definitions {
params: $(#[$params_meta:meta])* $params:ty,
$(inspect_params: $inspect_params:tt,)?
serialization: $serialization:ident $( ( $($serialization_args:tt)* ) )?,
$(manual_payload_conversion: $manual_payload_conversion:ident,)?
response: $response:ty,
}
),* $(,)?
Expand Down Expand Up @@ -243,8 +244,100 @@ macro_rules! client_request_definitions {
})
.unwrap_or_else(|| "<unknown>".to_string())
}

pub fn into_jsonrpc_parts(
self,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant { request_id, response } => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
}
}
}

#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum ClientResponsePayload {
$( $variant($response), )*
InterruptConversation(v1::InterruptConversationResponse),
}

impl ClientResponsePayload {
pub fn into_jsonrpc_parts_and_payload(
self,
request_id: RequestId,
) -> std::result::Result<
(RequestId, crate::Result, Option<ClientResponsePayload>),
serde_json::Error,
> {
match self {
$(
Self::$variant(response) => {
let result = serde_json::to_value(&response)?;
Ok((request_id, result, Some(Self::$variant(response))))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result, None))
}
}
}

pub fn into_client_response(self, request_id: RequestId) -> Option<ClientResponse> {
match self {
$(
Self::$variant(response) => {
Some(ClientResponse::$variant {
request_id,
response,
})
}
)*
Self::InterruptConversation(_) => None,
}
}

pub fn into_jsonrpc_parts(
self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
self.to_jsonrpc_parts(request_id)
}

pub fn to_jsonrpc_parts(
&self,
request_id: RequestId,
) -> std::result::Result<(RequestId, crate::Result), serde_json::Error> {
match self {
$(
Self::$variant(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
)*
Self::InterruptConversation(response) => {
serde_json::to_value(response).map(|result| (request_id, result))
}
}
}
}

impl From<v1::InterruptConversationResponse> for ClientResponsePayload {
fn from(response: v1::InterruptConversationResponse) -> Self {
Self::InterruptConversation(response)
}
}

$(
client_response_payload_from_impl!(
$variant,
$response
$(, $manual_payload_conversion)?
);
)*

impl crate::experimental_api::ExperimentalApi for ClientRequest {
fn experimental_reason(&self) -> Option<&'static str> {
match self {
Expand Down Expand Up @@ -317,6 +410,17 @@ macro_rules! client_request_definitions {
};
}

macro_rules! client_response_payload_from_impl {
($variant:ident, $response:ty) => {
impl From<$response> for ClientResponsePayload {
fn from(response: $response) -> Self {
Self::$variant(response)
}
}
};
($variant:ident, $response:ty, manual) => {};
}

client_request_definitions! {
Initialize {
params: v1::InitializeParams,
Expand Down Expand Up @@ -789,11 +893,13 @@ client_request_definitions! {
ConfigValueWrite => "config/value/write" {
params: v2::ConfigValueWriteParams,
serialization: global("config"),
manual_payload_conversion: manual,
response: v2::ConfigWriteResponse,
},
ConfigBatchWrite => "config/batchWrite" {
params: v2::ConfigBatchWriteParams,
serialization: global("config"),
manual_payload_conversion: manual,
response: v2::ConfigWriteResponse,
},

Expand Down Expand Up @@ -2766,3 +2872,7 @@ mod tests {
);
}
}

#[cfg(test)]
#[path = "common_tests.rs"]
mod common_tests;
44 changes: 44 additions & 0 deletions codex-rs/app-server-protocol/src/protocol/common_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use super::*;
use anyhow::Result;
use codex_protocol::protocol::TurnAbortReason;
use pretty_assertions::assert_eq;
use serde_json::json;

#[test]
fn client_response_payload_returns_jsonrpc_parts_and_client_response() -> Result<()> {
let (request_id, result, payload) =
ClientResponsePayload::ThreadArchive(v2::ThreadArchiveResponse {})
.into_jsonrpc_parts_and_payload(RequestId::Integer(7))?;

assert_eq!(request_id, RequestId::Integer(7));
assert_eq!(result, json!({}));

let Some(ClientResponse::ThreadArchive {
request_id,
response: _,
}) = payload.and_then(|payload| payload.into_client_response(RequestId::Integer(7)))
else {
panic!("expected thread/archive client response");
};
assert_eq!(request_id, RequestId::Integer(7));
Ok(())
}

#[test]
fn interrupt_conversation_payload_stays_jsonrpc_only() -> Result<()> {
let (request_id, result, payload) =
ClientResponsePayload::InterruptConversation(v1::InterruptConversationResponse {
abort_reason: TurnAbortReason::Interrupted,
})
.into_jsonrpc_parts_and_payload(RequestId::Integer(8))?;

assert_eq!(request_id, RequestId::Integer(8));
assert_eq!(
result,
json!({
"abortReason": "interrupted",
})
);
assert!(payload.is_none());
Ok(())
}
11 changes: 5 additions & 6 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use codex_app_server_protocol::CancelLoginAccountResponse;
use codex_app_server_protocol::CancelLoginAccountStatus;
use codex_app_server_protocol::ClientRequest;
use codex_app_server_protocol::ClientResponse;
use codex_app_server_protocol::ClientResponsePayload;
use codex_app_server_protocol::CodexErrorInfo;
use codex_app_server_protocol::CollaborationModeListParams;
use codex_app_server_protocol::CollaborationModeListResponse;
Expand Down Expand Up @@ -2118,7 +2119,7 @@ impl CodexMessageProcessor {
let result = self
.exec_one_off_command_inner(request_id.clone(), params)
.await
.map(|()| None::<serde_json::Value>);
.map(|()| None::<ClientResponsePayload>);
self.send_optional_result(request_id, result).await;
}

Expand Down Expand Up @@ -2864,7 +2865,6 @@ impl CodexMessageProcessor {
response: response.clone(),
},
);

listener_task_context
.outgoing
.send_response(request_id, response)
Expand Down Expand Up @@ -3544,7 +3544,7 @@ impl CodexMessageProcessor {
let result = self
.thread_rollback_start(&request_id, params)
.await
.map(|()| None::<serde_json::Value>);
.map(|()| None::<ClientResponsePayload>);
self.send_optional_result(request_id, result).await;
}

Expand Down Expand Up @@ -4401,14 +4401,14 @@ impl CodexMessageProcessor {
permission_profile,
reasoning_effort: session_configured.reasoning_effort,
};

self.analytics_events_client.track_response(
request_id.connection_id.0,
ClientResponse::ThreadResume {
request_id: request_id.request_id.clone(),
response: response.clone(),
},
);

let connection_id = request_id.connection_id;
let token_usage_thread = include_turns.then(|| response.thread.clone());
self.outgoing.send_response(request_id, response).await;
Expand Down Expand Up @@ -5027,7 +5027,6 @@ impl CodexMessageProcessor {
response: response.clone(),
},
);

let connection_id = request_id.connection_id;
let token_usage_thread = include_turns.then(|| response.thread.clone());
self.outgoing.send_response(request_id, response).await;
Expand Down Expand Up @@ -5811,7 +5810,7 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
result: Result<Option<T>, JSONRPCErrorError>,
) where
T: serde::Serialize,
T: Into<ClientResponsePayload>,
{
match result {
Ok(Some(response)) => self.outgoing.send_response(request_id, response).await,
Expand Down
5 changes: 4 additions & 1 deletion codex-rs/app-server/src/command_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,10 @@ mod tests {

manager
.start(StartCommandExecParams {
outgoing: Arc::new(OutgoingMessageSender::new(tx)),
outgoing: Arc::new(OutgoingMessageSender::new(
tx,
codex_analytics::AnalyticsEventsClient::disabled(),
)),
request_id: request_id.clone(),
process_id: Some("proc-101".to_string()),
exec_request: ExecRequest::new(
Expand Down
Loading
Loading