Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 101 additions & 49 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,6 @@ use codex_core::plugins::PluginInstallRequest;
use codex_core::plugins::PluginReadRequest;
use codex_core::plugins::PluginUninstallError as CorePluginUninstallError;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::sandboxing::SandboxPermissions;
use codex_core::windows_sandbox::WindowsSandboxLevelExt;
use codex_core::windows_sandbox::WindowsSandboxSetupMode as CoreWindowsSandboxSetupMode;
Expand Down Expand Up @@ -3898,16 +3897,48 @@ impl CodexMessageProcessor {
include_turns: bool,
) -> Result<Thread, ThreadReadViewError> {
let loaded_thread = self.thread_manager.get_thread(thread_id).await.ok();
let mut thread = if let Some(thread) = self
let mut thread = if include_turns {
if let Some(loaded_thread) = loaded_thread.as_ref() {
// Loaded thread with turns: use persisted metadata when it exists,
// but reconstruct turns from the live ThreadStore history.
let persisted_thread = self
.load_persisted_thread_for_read(thread_id, /*include_turns*/ false)
.await?;
self.load_live_thread_view(
thread_id,
include_turns,
loaded_thread,
persisted_thread,
)
.await?
} else if let Some(thread) = self
.load_persisted_thread_for_read(thread_id, include_turns)
.await?
{
// Unloaded thread with turns: load metadata and history together
// from the ThreadStore.
thread
} else {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread not loaded: {thread_id}"
)));
}
} else if let Some(thread) = self
.load_persisted_thread_for_read(thread_id, include_turns)
.await?
{
// Persisted metadata-only read: no live thread state is needed.
thread
} else if let Some(thread) = self
.load_live_thread_view(thread_id, include_turns, loaded_thread.as_ref())
} else if let Some(loaded_thread) = loaded_thread.as_ref() {
// Loaded metadata-only read before persistence is materialized: build
// the response from the live thread snapshot.
self.load_live_thread_view(
thread_id,
include_turns,
loaded_thread,
/*persisted_thread*/ None,
)
.await?
{
thread
} else {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread not loaded: {thread_id}"
Expand Down Expand Up @@ -3973,65 +4004,51 @@ impl CodexMessageProcessor {
}
}

/// Builds a `thread/read` view from a loaded thread plus optional persisted metadata.
async fn load_live_thread_view(
&self,
thread_id: ThreadId,
include_turns: bool,
loaded_thread: Option<&Arc<CodexThread>>,
) -> Result<Option<Thread>, ThreadReadViewError> {
let Some(thread) = loaded_thread else {
return Ok(None);
};
let config_snapshot = thread.config_snapshot().await;
let loaded_rollout_path = thread.rollout_path();
if include_turns && loaded_rollout_path.is_none() {
loaded_thread: &CodexThread,
persisted_thread: Option<Thread>,
) -> Result<Thread, ThreadReadViewError> {
let config_snapshot = loaded_thread.config_snapshot().await;
if include_turns && config_snapshot.ephemeral {
return Err(ThreadReadViewError::InvalidRequest(
"ephemeral threads do not support includeTurns".to_string(),
));
}
let mut thread =
build_thread_from_snapshot(thread_id, &config_snapshot, loaded_rollout_path.clone());
self.apply_thread_read_rollout_fields(
thread_id,
&mut thread,
loaded_rollout_path.as_deref(),
include_turns,
)
.await?;
Ok(Some(thread))
let fallback_thread =
build_thread_from_loaded_snapshot(thread_id, &config_snapshot, loaded_thread);
let mut thread = if let Some(mut thread) = persisted_thread {
if thread.path.is_none() {
thread.path = fallback_thread.path.clone();
}
thread.ephemeral = fallback_thread.ephemeral;
thread
} else {
fallback_thread
};
self.apply_thread_read_store_fields(thread_id, &mut thread, include_turns, loaded_thread)
.await?;
Ok(thread)
}

async fn apply_thread_read_rollout_fields(
async fn apply_thread_read_store_fields(
&self,
thread_id: ThreadId,
thread: &mut Thread,
rollout_path: Option<&Path>,
include_turns: bool,
loaded_thread: &CodexThread,
) -> Result<(), ThreadReadViewError> {
if thread.forked_from_id.is_none()
&& let Some(rollout_path) = rollout_path
{
thread.forked_from_id = forked_from_id_from_rollout(rollout_path).await;
}
self.attach_thread_name(thread_id, thread).await;

if include_turns && let Some(rollout_path) = rollout_path {
match read_rollout_items_from_rollout(rollout_path).await {
Ok(items) => {
thread.turns = build_turns_from_rollout_items(&items);
}
Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
return Err(ThreadReadViewError::InvalidRequest(format!(
"thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message"
)));
}
Err(err) => {
return Err(ThreadReadViewError::Internal(format!(
"failed to load rollout `{}` for thread {thread_id}: {err}",
rollout_path.display()
)));
}
}
if include_turns {
let history = loaded_thread
.load_history(/*include_archived*/ true)
.await
.map_err(|err| thread_read_history_load_error(thread_id, err))?;
thread.turns = build_turns_from_rollout_items(&history.items);
}

Ok(())
Expand Down Expand Up @@ -9074,6 +9091,32 @@ fn thread_turns_list_history_load_error(
}
}

fn thread_read_history_load_error(
thread_id: ThreadId,
err: ThreadStoreError,
) -> ThreadReadViewError {
match err {
ThreadStoreError::InvalidRequest { message }
if message.starts_with("failed to resolve rollout path `") =>
{
ThreadReadViewError::InvalidRequest(format!(
"thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message"
))
}
ThreadStoreError::ThreadNotFound {
thread_id: missing_thread_id,
} if missing_thread_id == thread_id => ThreadReadViewError::InvalidRequest(format!(
"thread {thread_id} is not materialized yet; includeTurns is unavailable before first user message"
)),
ThreadStoreError::InvalidRequest { message } => {
ThreadReadViewError::InvalidRequest(message)
}
err => ThreadReadViewError::Internal(format!(
"failed to load thread history for thread {thread_id}: {err}"
Comment thread
wiltzius-openai marked this conversation as resolved.
)),
}
}

fn conversation_summary_thread_id_read_error(
conversation_id: ThreadId,
err: ThreadStoreError,
Expand Down Expand Up @@ -9493,8 +9536,9 @@ fn map_git_info(git_info: &CoreGitInfo) -> ConversationGitInfo {
}
}

#[cfg(test)]
async fn forked_from_id_from_rollout(path: &Path) -> Option<String> {
read_session_meta_line(path)
codex_core::read_session_meta_line(path)
.await
.ok()
.and_then(|meta_line| meta_line.meta.forked_from_id)
Expand Down Expand Up @@ -9672,6 +9716,14 @@ fn build_thread_from_snapshot(
}
}

fn build_thread_from_loaded_snapshot(
thread_id: ThreadId,
config_snapshot: &ThreadConfigSnapshot,
loaded_thread: &CodexThread,
) -> Thread {
build_thread_from_snapshot(thread_id, config_snapshot, loaded_thread.rollout_path())
}

fn thread_started_notification(mut thread: Thread) -> ThreadStartedNotification {
thread.turns.clear();
ThreadStartedNotification { thread }
Expand Down
82 changes: 82 additions & 0 deletions codex-rs/app-server/tests/suite/v2/thread_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,88 @@ async fn thread_turns_list_reads_store_history_without_rollout_path() -> Result<
Ok(())
}

#[tokio::test]
async fn thread_read_loaded_include_turns_reads_store_history_without_rollout_path() -> Result<()> {
let codex_home = TempDir::new()?;
let store_id = Uuid::new_v4().to_string();
create_config_toml_with_thread_store(codex_home.path(), &store_id)?;
let store = InMemoryThreadStore::for_id(store_id.clone());
let _in_memory_store = InMemoryThreadStoreId { store_id };

let loader_overrides = LoaderOverrides::without_managed_config_for_tests();
let config = ConfigBuilder::default()
.codex_home(codex_home.path().to_path_buf())
.fallback_cwd(Some(codex_home.path().to_path_buf()))
.loader_overrides(loader_overrides.clone())
.build()
.await?;
let client = in_process::start(InProcessStartArgs {
arg0_paths: Arg0DispatchPaths::default(),
config: Arc::new(config),
cli_overrides: Vec::new(),
loader_overrides,
cloud_requirements: CloudRequirementsLoader::default(),
thread_config_loader: Arc::new(codex_config::NoopThreadConfigLoader),
feedback: CodexFeedback::new(),
log_db: None,
environment_manager: Arc::new(EnvironmentManager::default_for_tests()),
config_warnings: Vec::new(),
session_source: SessionSource::Cli.into(),
enable_codex_api_key_env: false,
initialize: InitializeParams {
client_info: ClientInfo {
name: "codex-app-server-tests".to_string(),
title: None,
version: "0.1.0".to_string(),
},
capabilities: Some(InitializeCapabilities {
experimental_api: true,
..Default::default()
}),
},
channel_capacity: in_process::DEFAULT_IN_PROCESS_CHANNEL_CAPACITY,
})
.await?;

let result = client
.request(ClientRequest::ThreadStart {
request_id: RequestId::Integer(1),
params: ThreadStartParams {
model: Some("mock-model".to_string()),
..Default::default()
},
})
.await?
.expect("thread/start should succeed");
let ThreadStartResponse { thread, .. } = serde_json::from_value(result)?;
assert_eq!(thread.path, None);

let thread_id = codex_protocol::ThreadId::from_string(&thread.id)?;
store
.append_items(AppendThreadItemsParams {
thread_id,
items: store_history_items(),
})
.await?;

let result = client
.request(ClientRequest::ThreadRead {
request_id: RequestId::Integer(2),
params: ThreadReadParams {
thread_id: thread.id,
include_turns: true,
},
})
.await?
.expect("thread/read should succeed");
let ThreadReadResponse { thread, .. } = serde_json::from_value(result)?;

assert_eq!(turn_user_texts(&thread.turns), vec!["history from store"]);

client.shutdown().await?;
Ok(())
}

#[tokio::test]
async fn thread_list_includes_store_thread_without_rollout_path() -> Result<()> {
let codex_home = TempDir::new()?;
Expand Down
Loading