Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions codex-rs/core/src/codex_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,20 @@ impl CodexThread {
self.codex.session_loop_termination.clone().await;
}

pub(crate) fn emit_thread_resume_lifecycle(&self) {
pub(crate) async fn emit_thread_resume_lifecycle(&self) {
for contributor in self
.codex
.session
.services
.extensions
.thread_lifecycle_contributors()
{
contributor.on_thread_resume(codex_extension_api::ThreadResumeInput {
session_store: &self.codex.session.services.session_extension_data,
thread_store: &self.codex.session.services.thread_extension_data,
});
contributor
.on_thread_resume(codex_extension_api::ThreadResumeInput {
session_store: &self.codex.session.services.session_extension_data,
thread_store: &self.codex.session.services.thread_extension_data,
})
.await;
}
}

Expand Down
16 changes: 9 additions & 7 deletions codex-rs/core/src/session/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,12 +638,14 @@ async fn shutdown_session_runtime(sess: &Arc<Session>) {
sess.guardian_review_session.shutdown().await;
}

fn emit_thread_stop_lifecycle(sess: &Session) {
async fn emit_thread_stop_lifecycle(sess: &Session) {
for contributor in sess.services.extensions.thread_lifecycle_contributors() {
contributor.on_thread_stop(codex_extension_api::ThreadStopInput {
session_store: &sess.services.session_extension_data,
thread_store: &sess.services.thread_extension_data,
});
contributor
.on_thread_stop(codex_extension_api::ThreadStopInput {
session_store: &sess.services.session_extension_data,
thread_store: &sess.services.thread_extension_data,
})
.await;
}
}

Expand All @@ -662,7 +664,7 @@ pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
&[],
);

emit_thread_stop_lifecycle(sess.as_ref());
emit_thread_stop_lifecycle(sess.as_ref()).await;

// Gracefully flush and shutdown thread persistence on session end so tests
// that inspect durable state do not race with the background writer.
Expand Down Expand Up @@ -917,7 +919,7 @@ pub(super) async fn submission_loop(
// explicit shutdown op, still run session teardown.
if !shutdown_received {
shutdown_session_runtime(&sess).await;
emit_thread_stop_lifecycle(sess.as_ref());
emit_thread_stop_lifecycle(sess.as_ref()).await;
}
debug!("Agent loop exited");
}
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ impl Session {
config: config.as_ref(),
session_store: &session_extension_data,
thread_store: &thread_extension_data,
});
}).await;
}

let services = SessionServices {
Expand Down
9 changes: 6 additions & 3 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5683,8 +5683,9 @@ async fn submission_loop_channel_close_emits_thread_stop_lifecycle() {
expected_thread_id: ThreadId,
}

#[async_trait::async_trait]
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for ThreadStopRecorder {
fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
async fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
Expand Down Expand Up @@ -5728,8 +5729,9 @@ async fn submission_loop_channel_close_aborts_active_turn_before_thread_stop_lif
expected_turn_id: String,
}

#[async_trait::async_trait]
impl codex_extension_api::ThreadLifecycleContributor<crate::config::Config> for LifecycleRecorder {
fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
async fn on_thread_stop(&self, input: codex_extension_api::ThreadStopInput<'_>) {
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
Expand All @@ -5741,8 +5743,9 @@ async fn submission_loop_channel_close_aborts_active_turn_before_thread_stop_lif
}
}

#[async_trait::async_trait]
impl codex_extension_api::TurnLifecycleContributor for LifecycleRecorder {
fn on_turn_abort(&self, input: codex_extension_api::TurnAbortInput<'_>) {
async fn on_turn_abort(&self, input: codex_extension_api::TurnAbortInput<'_>) {
assert_eq!(
self.expected_thread_id.to_string(),
input.thread_store.level_id()
Expand Down
44 changes: 25 additions & 19 deletions codex-rs/core/src/tasks/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,38 +4,44 @@ use codex_protocol::protocol::TurnAbortReason;
use crate::session::session::Session;

impl Session {
pub(super) fn emit_turn_start_lifecycle(&self, turn_store: &ExtensionData) {
pub(super) async fn emit_turn_start_lifecycle(&self, turn_store: &ExtensionData) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_start(codex_extension_api::TurnStartInput {
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
contributor
.on_turn_start(codex_extension_api::TurnStartInput {
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
})
.await;
}
}

pub(super) fn emit_turn_stop_lifecycle(&self, turn_store: &ExtensionData) {
pub(super) async fn emit_turn_stop_lifecycle(&self, turn_store: &ExtensionData) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_stop(codex_extension_api::TurnStopInput {
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
contributor
.on_turn_stop(codex_extension_api::TurnStopInput {
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
})
.await;
}
}

pub(super) fn emit_turn_abort_lifecycle(
pub(super) async fn emit_turn_abort_lifecycle(
&self,
reason: TurnAbortReason,
turn_store: &ExtensionData,
) {
for contributor in self.services.extensions.turn_lifecycle_contributors() {
contributor.on_turn_abort(codex_extension_api::TurnAbortInput {
reason: reason.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
});
contributor
.on_turn_abort(codex_extension_api::TurnAbortInput {
reason: reason.clone(),
session_store: &self.services.session_extension_data,
thread_store: &self.services.thread_extension_data,
turn_store,
})
.await;
}
}
}
12 changes: 8 additions & 4 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,8 @@ impl Session {
turn_state.push_pending_input(item);
}
}
self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref());
self.emit_turn_start_lifecycle(turn_context.extension_data.as_ref())
.await;
Comment thread
jif-oai marked this conversation as resolved.

let turn_extension_data = Arc::clone(&turn_context.extension_data);
let mut active = self.active_turn.lock().await;
Expand Down Expand Up @@ -505,7 +506,8 @@ impl Session {
}

if let Some(turn_context) = turn_context.as_deref() {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref());
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref())
.await;
}
if (aborted_turn || reason == TurnAbortReason::Interrupted)
&& let Err(err) = self
Expand Down Expand Up @@ -553,7 +555,8 @@ impl Session {
self.handle_task_abort(task, reason.clone()).await;
}
if let Some(turn_context) = turn_context.as_deref() {
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref());
self.emit_turn_abort_lifecycle(reason.clone(), turn_context.extension_data.as_ref())
.await;
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TaskAborted {
Expand Down Expand Up @@ -756,7 +759,8 @@ impl Session {
.time_to_first_token_ms()
.await;
if should_clear_active_turn {
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref());
self.emit_turn_stop_lifecycle(turn_context.extension_data.as_ref())
.await;
}
if let Err(err) = self
.goal_runtime_apply(GoalRuntimeEvent::TurnFinished {
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/thread_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ impl ThreadManagerState {
.finalize_thread_spawn(codex, thread_id, tracked_session_source)
.await?;
if is_resumed_thread {
new_thread.thread.emit_thread_resume_lifecycle();
new_thread.thread.emit_thread_resume_lifecycle().await;
if let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await {
warn!("failed to apply goal resume runtime effects: {err}");
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/ext/extension-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ doctest = false
workspace = true

[dependencies]
async-trait = { workspace = true }
codex-protocol = { workspace = true }
codex-tools = { workspace = true }
16 changes: 9 additions & 7 deletions codex-rs/ext/extension-api/src/contributors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,33 +36,35 @@ pub trait ContextContributor: Send + Sync {
/// Implementations should use these callbacks to seed, rehydrate, or flush
/// extension-private thread state. Heavy dependencies belong on the extension
/// value created by the host, not in these inputs.
pub trait ThreadLifecycleContributor<C>: Send + Sync {
#[async_trait::async_trait]
pub trait ThreadLifecycleContributor<C: Sync>: Send + Sync {
/// Called after thread-scoped extension stores are created, before later
/// contributors can read from them.
fn on_thread_start(&self, _input: ThreadStartInput<'_, C>) {}
async fn on_thread_start(&self, _input: ThreadStartInput<'_, C>) {}
Comment thread
jif-oai marked this conversation as resolved.

/// Called after the host constructs a runtime from persisted history.
fn on_thread_resume(&self, _input: ThreadResumeInput<'_>) {}
async fn on_thread_resume(&self, _input: ThreadResumeInput<'_>) {}

/// Called before the host drops the thread runtime and thread-scoped store.
fn on_thread_stop(&self, _input: ThreadStopInput<'_>) {}
async fn on_thread_stop(&self, _input: ThreadStopInput<'_>) {}
}

/// Contributor for host-owned turn lifecycle gates.
///
/// Implementations should use these callbacks to seed, observe, or clear
/// extension-private turn state. The host exposes stable identifiers and
/// extension stores instead of core runtime objects.
#[async_trait::async_trait]
pub trait TurnLifecycleContributor: Send + Sync {
/// Called after turn-scoped extension stores are created, before the task
/// for the turn starts running.
fn on_turn_start(&self, _input: TurnStartInput<'_>) {}
async fn on_turn_start(&self, _input: TurnStartInput<'_>) {}

/// Called before the host drops the completed turn runtime and turn store.
fn on_turn_stop(&self, _input: TurnStopInput<'_>) {}
async fn on_turn_stop(&self, _input: TurnStopInput<'_>) {}

/// Called after the host aborts a running turn.
fn on_turn_abort(&self, _input: TurnAbortInput<'_>) {}
async fn on_turn_abort(&self, _input: TurnAbortInput<'_>) {}
}

/// Contributor for host-owned configuration changes.
Expand Down
12 changes: 6 additions & 6 deletions codex-rs/ext/extension-api/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::TurnItemContributor;
use crate::TurnLifecycleContributor;

/// Mutable registry used while hosts register typed runtime contributions.
pub struct ExtensionRegistryBuilder<C> {
pub struct ExtensionRegistryBuilder<C: Sync> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
config_contributors: Vec<Arc<dyn ConfigContributor<C>>>,
Expand All @@ -23,7 +23,7 @@ pub struct ExtensionRegistryBuilder<C> {
approval_review_contributors: Vec<Arc<dyn ApprovalReviewContributor>>,
}

impl<C> Default for ExtensionRegistryBuilder<C> {
impl<C: Sync> Default for ExtensionRegistryBuilder<C> {
fn default() -> Self {
Self {
thread_lifecycle_contributors: Vec::new(),
Expand All @@ -38,7 +38,7 @@ impl<C> Default for ExtensionRegistryBuilder<C> {
}
}

impl<C> ExtensionRegistryBuilder<C> {
impl<C: Sync> ExtensionRegistryBuilder<C> {
/// Creates an empty registry builder.
pub fn new() -> Self {
Self::default()
Expand Down Expand Up @@ -103,7 +103,7 @@ impl<C> ExtensionRegistryBuilder<C> {
}

/// Immutable typed registry produced after extensions are installed.
pub struct ExtensionRegistry<C> {
pub struct ExtensionRegistry<C: Sync> {
thread_lifecycle_contributors: Vec<Arc<dyn ThreadLifecycleContributor<C>>>,
turn_lifecycle_contributors: Vec<Arc<dyn TurnLifecycleContributor>>,
config_contributors: Vec<Arc<dyn ConfigContributor<C>>>,
Expand All @@ -114,7 +114,7 @@ pub struct ExtensionRegistry<C> {
approval_review_contributors: Vec<Arc<dyn ApprovalReviewContributor>>,
}

impl<C> ExtensionRegistry<C> {
impl<C: Sync> ExtensionRegistry<C> {
/// Returns the registered thread-lifecycle contributors.
pub fn thread_lifecycle_contributors(&self) -> &[Arc<dyn ThreadLifecycleContributor<C>>] {
&self.thread_lifecycle_contributors
Expand Down Expand Up @@ -165,6 +165,6 @@ impl<C> ExtensionRegistry<C> {
}

/// Creates an empty shared registry for hosts that do not register contributions.
pub fn empty_extension_registry<C>() -> Arc<ExtensionRegistry<C>> {
pub fn empty_extension_registry<C: Sync>() -> Arc<ExtensionRegistry<C>> {
Arc::new(ExtensionRegistryBuilder::new().build())
}
10 changes: 6 additions & 4 deletions codex-rs/ext/goal/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,12 @@ fn missing_backend_message() -> String {
"goal tools are not connected to host goal persistence yet".to_string()
}

#[async_trait]
impl<C> ThreadLifecycleContributor<C> for GoalExtension<C>
where
C: Send + Sync + 'static,
{
fn on_thread_start(&self, input: ThreadStartInput<'_, C>) {
async fn on_thread_start(&self, input: ThreadStartInput<'_, C>) {
input
.thread_store
.insert(GoalExtensionConfig::from_enabled((self.goals_enabled)(
Expand Down Expand Up @@ -135,11 +136,12 @@ where
}
}

#[async_trait]
impl<C> TurnLifecycleContributor for GoalExtension<C>
where
C: Send + Sync + 'static,
{
fn on_turn_start(&self, input: TurnStartInput<'_>) {
async fn on_turn_start(&self, input: TurnStartInput<'_>) {
if !goal_enabled(input.thread_store) {
return;
}
Expand All @@ -150,7 +152,7 @@ where
accounting_state(input.thread_store).start_turn(input.turn_store.level_id());
}

fn on_turn_stop(&self, input: TurnStopInput<'_>) {
async fn on_turn_stop(&self, input: TurnStopInput<'_>) {
if !goal_enabled(input.thread_store) {
return;
}
Expand All @@ -164,7 +166,7 @@ where
accounting_state(input.thread_store).stop_turn(input.turn_store.level_id());
}

fn on_turn_abort(&self, input: TurnAbortInput<'_>) {
async fn on_turn_abort(&self, input: TurnAbortInput<'_>) {
if !goal_enabled(input.thread_store) {
return;
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/ext/guardian/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ doctest = false
workspace = true

[dependencies]
async-trait = { workspace = true }
codex-core = { workspace = true }
codex-extension-api = { workspace = true }
codex-protocol = { workspace = true }
Loading
Loading