Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 115 additions & 159 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,13 @@ enum ThreadReadViewError {
mod thread_goal_handlers;
use self::thread_goal_handlers::api_thread_goal_from_state;

fn thread_read_view_error(err: ThreadReadViewError) -> JSONRPCErrorError {
match err {
ThreadReadViewError::InvalidRequest(message) => invalid_request(message),
ThreadReadViewError::Internal(message) => internal_error(message),
}
}

impl Drop for ActiveLogin {
fn drop(&mut self) {
self.cancel();
Expand Down Expand Up @@ -3639,6 +3646,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: ThreadLoadedListParams,
) {
let result = self.thread_loaded_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}

async fn thread_loaded_list_response(
&self,
params: ThreadLoadedListParams,
) -> Result<ThreadLoadedListResponse, JSONRPCErrorError> {
let ThreadLoadedListParams { cursor, limit } = params;
let mut data = self
.thread_manager
Expand All @@ -3649,12 +3664,10 @@ impl CodexMessageProcessor {
.collect::<Vec<_>>();

if data.is_empty() {
let response = ThreadLoadedListResponse {
return Ok(ThreadLoadedListResponse {
data,
next_cursor: None,
};
self.outgoing.send_response(request_id, response).await;
return;
});
}

data.sort();
Expand All @@ -3663,15 +3676,7 @@ impl CodexMessageProcessor {
Some(cursor) => {
let cursor = match ThreadId::from_string(&cursor) {
Ok(id) => id.to_string(),
Err(_) => {
let error = JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!("invalid cursor: {cursor}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
}
Err(_) => return Err(invalid_request(format!("invalid cursor: {cursor}"))),
};
match data.binary_search(&cursor) {
Ok(idx) => idx + 1,
Expand All @@ -3686,41 +3691,34 @@ impl CodexMessageProcessor {
let page = data[start..end].to_vec();
let next_cursor = page.last().filter(|_| end < total).cloned();

let response = ThreadLoadedListResponse {
Ok(ThreadLoadedListResponse {
data: page,
next_cursor,
};
self.outgoing.send_response(request_id, response).await;
})
}

async fn thread_read(&self, request_id: ConnectionRequestId, params: ThreadReadParams) {
let result = self.thread_read_response(params).await;
self.outgoing.send_result(request_id, result).await;
}

async fn thread_read_response(
&self,
params: ThreadReadParams,
) -> Result<ThreadReadResponse, JSONRPCErrorError> {
let ThreadReadParams {
thread_id,
include_turns,
} = params;

let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let thread_uuid = ThreadId::from_string(&thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;

let thread = match self.read_thread_view(thread_uuid, include_turns).await {
Ok(thread) => thread,
Err(ThreadReadViewError::InvalidRequest(message)) => {
self.send_invalid_request_error(request_id, message).await;
return;
}
Err(ThreadReadViewError::Internal(message)) => {
self.send_internal_error(request_id, message).await;
return;
}
};
let response = ThreadReadResponse { thread };
self.outgoing.send_response(request_id, response).await;
let thread = self
.read_thread_view(thread_uuid, include_turns)
.await
.map_err(thread_read_view_error)?;
Ok(ThreadReadResponse { thread })
}

/// Builds the API view for `thread/read` from persisted metadata plus optional live state.
Expand Down Expand Up @@ -3878,21 +3876,23 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: ThreadTurnsListParams,
) {
let result = self.thread_turns_list_response(params).await;
self.outgoing.send_result(request_id, result).await;
}

async fn thread_turns_list_response(
&self,
params: ThreadTurnsListParams,
) -> Result<ThreadTurnsListResponse, JSONRPCErrorError> {
let ThreadTurnsListParams {
thread_id,
cursor,
limit,
sort_direction,
} = params;

let thread_uuid = match ThreadId::from_string(&thread_id) {
Ok(id) => id,
Err(err) => {
self.send_invalid_request_error(request_id, format!("invalid thread id: {err}"))
.await;
return;
}
};
let thread_uuid = ThreadId::from_string(&thread_id)
.map_err(|err| invalid_request(format!("invalid thread id: {err}")))?;

let state_db_ctx = get_state_db(&self.config).await;
let mut rollout_path = self
Expand All @@ -3912,21 +3912,15 @@ impl CodexMessageProcessor {
{
Ok(path) => path,
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate archived thread id {thread_uuid}: {err}"),
)
.await;
return;
return Err(invalid_request(format!(
"failed to locate archived thread id {thread_uuid}: {err}"
)));
}
},
Err(err) => {
self.send_invalid_request_error(
request_id,
format!("failed to locate thread id {thread_uuid}: {err}"),
)
.await;
return;
return Err(invalid_request(format!(
"failed to locate thread id {thread_uuid}: {err}"
)));
}
};
}
Expand All @@ -3936,92 +3930,63 @@ impl CodexMessageProcessor {
Ok(thread) => {
rollout_path = thread.rollout_path();
if rollout_path.is_none() {
self.send_invalid_request_error(
request_id,
"ephemeral threads do not support thread/turns/list".to_string(),
)
.await;
return;
return Err(invalid_request(
"ephemeral threads do not support thread/turns/list",
));
}
}
Err(_) => {
self.send_invalid_request_error(
request_id,
format!("thread not loaded: {thread_uuid}"),
)
.await;
return;
}
Err(_) => return Err(invalid_request(format!("thread not loaded: {thread_uuid}"))),
}
}

let Some(rollout_path) = rollout_path.as_ref() else {
self.send_internal_error(
request_id,
format!("failed to locate rollout for thread {thread_uuid}"),
)
.await;
return;
return Err(internal_error(format!(
"failed to locate rollout for thread {thread_uuid}"
)));
};

match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
// This API optimizes network transfer by letting clients page through a
// thread's turns incrementally, but it still replays the entire rollout on
// every request. Rollback and compaction events can change earlier turns, so
// the server has to rebuild the full turn list until turn metadata is indexed
// separately.
let has_live_in_progress_turn =
match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
Err(_) => false,
};
let turns = reconstruct_thread_turns_from_rollout_items(
&items,
self.thread_watch_manager
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_in_progress_turn,
);
let page = match paginate_thread_turns(
turns,
cursor.as_deref(),
limit,
sort_direction.unwrap_or(SortDirection::Desc),
) {
Ok(page) => page,
Err(error) => {
self.outgoing.send_error(request_id, error).await;
return;
}
};
let response = ThreadTurnsListResponse {
data: page.turns,
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
};
self.outgoing.send_response(request_id, response).await;
}
let items = match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => items,
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
self.send_invalid_request_error(
request_id,
format!(
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
),
)
.await;
return Err(invalid_request(format!(
"thread {thread_uuid} is not materialized yet; thread/turns/list is unavailable before first user message"
)));
}
Err(err) => {
self.send_internal_error(
request_id,
format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
),
)
.await;
return Err(internal_error(format!(
"failed to load rollout `{}` for thread {thread_uuid}: {err}",
rollout_path.display()
)));
}
}
};

// This API optimizes network transfer by letting clients page through a
// thread's turns incrementally, but it still replays the entire rollout on
// every request. Rollback and compaction events can change earlier turns, so
// the server has to rebuild the full turn list until turn metadata is indexed
// separately.
let has_live_in_progress_turn = match self.thread_manager.get_thread(thread_uuid).await {
Ok(thread) => matches!(thread.agent_status().await, AgentStatus::Running),
Err(_) => false,
};
let turns = reconstruct_thread_turns_from_rollout_items(
&items,
self.thread_watch_manager
.loaded_status_for_thread(&thread_uuid.to_string())
.await,
has_live_in_progress_turn,
);
let page = paginate_thread_turns(
turns,
cursor.as_deref(),
limit,
sort_direction.unwrap_or(SortDirection::Desc),
)?;
Ok(ThreadTurnsListResponse {
data: page.turns,
next_cursor: page.next_cursor,
backwards_cursor: page.backwards_cursor,
})
}

pub(crate) fn thread_created_receiver(&self) -> broadcast::Receiver<ThreadId> {
Expand Down Expand Up @@ -5062,6 +5027,14 @@ impl CodexMessageProcessor {
request_id: ConnectionRequestId,
params: GetConversationSummaryParams,
) {
let result = self.get_thread_summary_response(params).await;
self.outgoing.send_result(request_id, result).await;
}

async fn get_thread_summary_response(
&self,
params: GetConversationSummaryParams,
) -> Result<GetConversationSummaryResponse, JSONRPCErrorError> {
let fallback_provider = self.config.model_provider_id.as_str();
let read_result = match params {
GetConversationSummaryParams::ThreadId { conversation_id } => self
Expand All @@ -5079,13 +5052,9 @@ impl CodexMessageProcessor {
.as_any()
.downcast_ref::<LocalThreadStore>()
else {
self.send_invalid_request_error(
request_id,
"rollout path queries are only supported with the local thread store"
.to_string(),
)
.await;
return;
return Err(invalid_request(
"rollout path queries are only supported with the local thread store",
));
};

local_thread_store
Expand All @@ -5099,27 +5068,14 @@ impl CodexMessageProcessor {
}
};

match read_result {
Ok(stored_thread) => {
let Some(summary) = summary_from_stored_thread(stored_thread, fallback_provider)
else {
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message:
"failed to load conversation summary: thread is missing rollout path"
.to_string(),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
};
let response = GetConversationSummaryResponse { summary };
self.outgoing.send_response(request_id, response).await;
}
Err(error) => {
self.outgoing.send_error(request_id, error).await;
}
}
let stored_thread = read_result?;
let summary =
summary_from_stored_thread(stored_thread, fallback_provider).ok_or_else(|| {
internal_error(
"failed to load conversation summary: thread is missing rollout path",
)
})?;
Ok(GetConversationSummaryResponse { summary })
}

async fn list_threads_common(
Expand Down
Loading