Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions codex-rs/core/src/state/turn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Turn-scoped state and active turn metadata scaffolding.

use codex_extension_api::ExtensionData;
use codex_sandboxing::policy_transforms::merge_permission_profiles;
use indexmap::IndexMap;
use std::collections::HashMap;
Expand Down Expand Up @@ -75,13 +76,15 @@ pub(crate) struct RunningTask {
pub(crate) cancellation_token: CancellationToken,
pub(crate) handle: AbortOnDropHandle<()>,
pub(crate) turn_context: Arc<TurnContext>,
pub(crate) turn_extension_data: Arc<ExtensionData>,
// Timer recorded when the task drops to capture the full turn duration.
pub(crate) _timer: Option<codex_otel::Timer>,
}

pub(crate) struct RemovedTask {
pub(crate) records_turn_token_usage_on_span: bool,
pub(crate) active_turn_is_empty: bool,
pub(crate) turn_extension_data: Arc<ExtensionData>,
}

impl ActiveTurn {
Expand All @@ -97,6 +100,7 @@ impl ActiveTurn {
Some(RemovedTask {
records_turn_token_usage_on_span,
active_turn_is_empty: self.tasks.is_empty(),
turn_extension_data: task.turn_extension_data,
})
}

Expand All @@ -120,6 +124,7 @@ pub(crate) struct TurnState {
pub(crate) tool_calls: u64,
pub(crate) has_memory_citation: bool,
pub(crate) token_usage_at_turn_start: TokenUsage,
pub(crate) extension_data: Arc<ExtensionData>,
}

pub(crate) struct PendingRequestPermissions {
Expand Down
57 changes: 57 additions & 0 deletions codex-rs/core/src/tasks/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use codex_extension_api::ExtensionData;
use codex_protocol::protocol::TurnAbortReason;

use crate::session::session::Session;
use crate::session::turn_context::TurnContext;

impl Session {
pub(super) fn emit_turn_start_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_start(codex_extension_api::TurnStartInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}

pub(super) fn emit_turn_stop_lifecycle(
&self,
turn_context: &TurnContext,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_stop(codex_extension_api::TurnStopInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}

pub(super) fn emit_turn_abort_lifecycle(
&self,
turn_context: &TurnContext,
reason: TurnAbortReason,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_abort(codex_extension_api::TurnAbortInput {
thread_id: self.conversation_id,
turn_id: &turn_context.sub_id,
reason: reason.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
}
}
}
32 changes: 30 additions & 2 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod compact;
mod lifecycle;
mod regular;
mod review;
mod user_shell;
Expand Down Expand Up @@ -345,7 +346,7 @@ impl Session {
debug_assert!(turn.tasks.is_empty());
Arc::clone(&turn.turn_state)
};
{
let turn_extension_data = {
let mut turn_state = turn_state.lock().await;
turn_state.token_usage_at_turn_start = token_usage_at_turn_start;
for item in queued_response_items {
Expand All @@ -354,7 +355,9 @@ impl Session {
for item in mailbox_items {
turn_state.push_pending_input(item);
}
}
Arc::clone(&turn_state.extension_data)
};
self.emit_turn_start_lifecycle(turn_context.as_ref(), turn_extension_data.as_ref());

let mut active = self.active_turn.lock().await;
let turn = active.get_or_insert_with(ActiveTurn::default);
Expand Down Expand Up @@ -425,6 +428,7 @@ impl Session {
task,
cancellation_token,
turn_context: Arc::clone(&turn_context),
turn_extension_data,
_timer: timer,
};
turn.add_task(running_task);
Expand Down Expand Up @@ -476,10 +480,14 @@ impl Session {
let mut aborted_turn = false;
let mut active_turn_to_clear = None;
let mut turn_context = None;
let mut turn_extension_data = None;
if let Some(mut active_turn) = self.take_active_turn().await {
let tasks = active_turn.drain_tasks();
aborted_turn = !tasks.is_empty();
turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
Expand All @@ -488,6 +496,11 @@ impl Session {
}
}

if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
}
if (aborted_turn || reason == TurnAbortReason::Interrupted)
&& let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
Expand Down Expand Up @@ -530,9 +543,17 @@ impl Session {

let tasks = active_turn.drain_tasks();
let turn_context = tasks.first().map(|task| Arc::clone(&task.turn_context));
let turn_extension_data = tasks
.first()
.map(|task| Arc::clone(&task.turn_extension_data));
for task in tasks {
self.handle_task_abort(task, reason.clone()).await;
}
if let Some(turn_context) = turn_context.as_deref()
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_abort_lifecycle(turn_context, reason.clone(), turn_extension_data);
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
turn_context: turn_context.as_deref(),
Expand Down Expand Up @@ -568,6 +589,7 @@ impl Session {
let mut turn_had_memory_citation = false;
let mut turn_tool_calls = 0_u64;
let mut records_turn_token_usage_on_span = false;
let mut turn_extension_data = None;
let turn_state = {
let mut active = self.active_turn.lock().await;
if let Some(at) = active.as_mut()
Expand All @@ -576,6 +598,7 @@ impl Session {
records_turn_token_usage_on_span = removed_task.records_turn_token_usage_on_span;
if removed_task.active_turn_is_empty {
should_clear_active_turn = true;
turn_extension_data = Some(removed_task.turn_extension_data);
let turn_state = Arc::clone(&at.turn_state);
Some(turn_state)
} else {
Expand Down Expand Up @@ -733,6 +756,11 @@ impl Session {
.turn_timing_state
.time_to_first_token_ms()
.await;
if should_clear_active_turn
&& let Some(turn_extension_data) = turn_extension_data.as_deref()
{
self.emit_turn_stop_lifecycle(turn_context.as_ref(), turn_extension_data);
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnFinished {
turn_context: turn_context.as_ref(),
Expand Down
21 changes: 21 additions & 0 deletions codex-rs/ext/extension-api/src/contributors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::ExtensionData;
mod prompt;
mod thread_lifecycle;
mod tools;
mod turn_lifecycle;

pub use prompt::PromptFragment;
pub use prompt::PromptSlot;
Expand All @@ -18,6 +19,9 @@ pub use thread_lifecycle::ThreadStopInput;
pub use tools::ExtensionToolExecutor;
pub use tools::ExtensionToolFuture;
pub use tools::ExtensionToolOutput;
pub use turn_lifecycle::TurnAbortInput;
pub use turn_lifecycle::TurnStartInput;
pub use turn_lifecycle::TurnStopInput;

/// Extension contribution that adds prompt fragments during prompt assembly.
pub trait ContextContributor: Send + Sync {
Expand Down Expand Up @@ -45,6 +49,23 @@ pub trait ThreadLifecycleContributor<C>: Send + Sync {
fn on_thread_stop(&self, _input: ThreadStopInput<'_>) {}
}

/// Contributor for host-owned turn lifecycle gates.
///
/// Implementations should use these callbacks to seed, observe, or clear
/// extension-private turn state. The host exposes stable identifiers and
/// extension stores instead of core runtime objects.
pub trait TurnLifecycleContributor: Send + Sync {
/// Called after turn-scoped extension stores are created, before the task
/// for the turn starts running.
fn on_turn_start(&self, _input: TurnStartInput<'_>) {}

/// Called before the host drops the completed turn runtime and turn store.
fn on_turn_stop(&self, _input: TurnStopInput<'_>) {}

/// Called after the host aborts a running turn.
fn on_turn_abort(&self, _input: TurnAbortInput<'_>) {}
}

/// Extension contribution that exposes native tools owned by a feature.
pub trait ToolContributor: Send + Sync {
/// Returns the native tools visible for the supplied extension stores.
Expand Down
48 changes: 48 additions & 0 deletions codex-rs/ext/extension-api/src/contributors/turn_lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use codex_protocol::ThreadId;
use codex_protocol::protocol::TurnAbortReason;

use crate::ExtensionData;

/// Input supplied when the host starts a turn.
pub struct TurnStartInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is starting.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}

/// Input supplied when the host completes a turn.
pub struct TurnStopInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is stopping.
pub turn_id: &'a str,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}

/// Input supplied when the host aborts a turn.
pub struct TurnAbortInput<'a> {
/// Identifier for the thread containing this turn.
pub thread_id: ThreadId,
/// Identifier for the turn that is aborting.
pub turn_id: &'a str,
/// Reason the host aborted the turn.
pub reason: TurnAbortReason,
/// Store scoped to the host session runtime.
pub session_store: &'a ExtensionData,
/// Store scoped to this thread runtime.
pub thread_store: &'a ExtensionData,
/// Store scoped to this turn runtime.
pub turn_store: &'a ExtensionData,
}
4 changes: 4 additions & 0 deletions codex-rs/ext/extension-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ pub use contributors::ThreadResumeInput;
pub use contributors::ThreadStartInput;
pub use contributors::ThreadStopInput;
pub use contributors::ToolContributor;
pub use contributors::TurnAbortInput;
pub use contributors::TurnItemContributionFuture;
pub use contributors::TurnItemContributor;
pub use contributors::TurnLifecycleContributor;
pub use contributors::TurnStartInput;
pub use contributors::TurnStopInput;
pub use registry::ExtensionRegistry;
pub use registry::ExtensionRegistryBuilder;
pub use registry::empty_extension_registry;
Expand Down
15 changes: 15 additions & 0 deletions codex-rs/ext/extension-api/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use crate::ExtensionData;
use crate::ThreadLifecycleContributor;
use crate::ToolContributor;
use crate::TurnItemContributor;
use crate::TurnLifecycleContributor;

/// Mutable registry used while hosts register typed runtime contributions.
pub struct ExtensionRegistryBuilder<C> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
context_contributors: Vec<Arc<dyn ContextContributor>>,
tool_contributors: Vec<Arc<dyn ToolContributor>>,
turn_item_contributors: Vec<Arc<dyn TurnItemContributor>>,
Expand All @@ -21,6 +23,7 @@ impl<C> Default for ExtensionRegistryBuilder<C> {
fn default() -> Self {
Self {
thread_lifecycle_contributors: Vec::new(),
turn_lifecycle_contributors: Vec::new(),
approval_review_contributors: Vec::new(),
context_contributors: Vec::new(),
tool_contributors: Vec::new(),
Expand Down Expand Up @@ -48,6 +51,11 @@ impl<C> ExtensionRegistryBuilder<C> {
self.thread_lifecycle_contributors.push(contributor);
}

/// Registers one turn-lifecycle contributor.
pub fn turn_lifecycle_contributor(&mut self, contributor: Arc<dyn TurnLifecycleContributor>) {
self.turn_lifecycle_contributors.push(contributor);
}

/// Registers one prompt contributor.
pub fn prompt_contributor(&mut self, contributor: Arc<dyn ContextContributor>) {
self.context_contributors.push(contributor);
Expand All @@ -67,6 +75,7 @@ impl<C> ExtensionRegistryBuilder<C> {
pub fn build(self) -> ExtensionRegistry<C> {
ExtensionRegistry {
thread_lifecycle_contributors: self.thread_lifecycle_contributors,
turn_lifecycle_contributors: self.turn_lifecycle_contributors,
approval_review_contributors: self.approval_review_contributors,
context_contributors: self.context_contributors,
tool_contributors: self.tool_contributors,
Expand All @@ -78,6 +87,7 @@ impl<C> ExtensionRegistryBuilder<C> {
/// Immutable typed registry produced after extensions are installed.
pub struct ExtensionRegistry<C> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
context_contributors: Vec<Arc<dyn ContextContributor>>,
tool_contributors: Vec<Arc<dyn ToolContributor>>,
turn_item_contributors: Vec<Arc<dyn TurnItemContributor>>,
Expand All @@ -90,6 +100,11 @@ impl<C> ExtensionRegistry<C> {
&self.thread_lifecycle_contributors
}

/// Returns the registered turn-lifecycle contributors.
pub fn turn_lifecycle_contributors(&self) -> &[Arc<dyn TurnLifecycleContributor>] {
&self.turn_lifecycle_contributors
}

/// Claims the first rendered approval-review prompt accepted by an
/// installed contributor.
pub fn approval_review<'a>(
Expand Down
Loading