Skip to content

[v3-2-test] Pre-assign external_executor_id at queuing time to prevent duplicate execution on scheduler crash (#65594)#65711

Merged
potiuk merged 1 commit into
v3-2-testfrom
backport-3b188b9-v3-2-test
Apr 26, 2026
Merged

[v3-2-test] Pre-assign external_executor_id at queuing time to prevent duplicate execution on scheduler crash (#65594)#65711
potiuk merged 1 commit into
v3-2-testfrom
backport-3b188b9-v3-2-test

Conversation

@ashb
Copy link
Copy Markdown
Member

@ashb ashb commented Apr 23, 2026

When a scheduler crashes between dispatching a task to Celery and
processing the QUEUED event that persists external_executor_id, the
replacement scheduler cannot adopt the in-flight task. Without the
Celery task ID in the database, try_adopt_task_instances has no
AsyncResult to look up, so the task is reset and re-queued — causing
duplicate execution of an already-running task.

Fix this by generating external_executor_id via a DB-side UUID
function (gen_random_uuid on PostgreSQL, UUID() on MySQL, a
Python uuid4 registered on SQLite) in the same bulk UPDATE that
sets state=QUEUED. The ID is committed atomically with the state
transition — no second write, no race window. RETURNING is used on
PostgreSQL and SQLite to read back the generated UUIDs without a
second round-trip; MySQL falls back to a SELECT.

The CeleryExecutor passes the pre-assigned ID to apply_async() as
the Celery task_id, making it deterministic from DB state. Other
executors ignore it and overwrite with their own ID (e.g. ECS task
ARN) during event processing.

This also fixes the separate race in #55004 where external_executor_id
is lost when the task instance row is locked during event processing.
process_executor_events uses skip_locked=True, and
get_event_buffer() flushes the executor's in-memory buffer into a
local variable. If a TI is locked and skipped, its QUEUED event is
consumed from the buffer but never processed — the event and its
task ID are silently dropped. With the ID now written to the database
before the task is even sent to Celery, adoption no longer depends on
the event being processed.

Closes: #55004
Closes: #58570
Closes: #64971

(cherry picked from commit 3b188b9)

@boring-cyborg boring-cyborg Bot added area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler provider:celery labels Apr 23, 2026
@ashb ashb force-pushed the backport-3b188b9-v3-2-test branch from fcba678 to ea6a41d Compare April 25, 2026 20:16
…execution on scheduler crash (#65594)

When a scheduler crashes between dispatching a task to Celery and
processing the QUEUED event that persists `external_executor_id`, the
replacement scheduler cannot adopt the in-flight task. Without the
Celery task ID in the database, `try_adopt_task_instances` has no
`AsyncResult` to look up, so the task is reset and re-queued — causing
duplicate execution of an already-running task.

Fix this by generating `external_executor_id` via a DB-side UUID
function (`gen_random_uuid` on PostgreSQL, `UUID()` on MySQL, a
Python `uuid4` registered on SQLite) in the same bulk UPDATE that
sets state=QUEUED. The ID is committed atomically with the state
transition — no second write, no race window. RETURNING is used on
PostgreSQL and SQLite to read back the generated UUIDs without a
second round-trip; MySQL falls back to a SELECT.

The CeleryExecutor passes the pre-assigned ID to `apply_async()` as
the Celery `task_id`, making it deterministic from DB state. Other
executors ignore it and overwrite with their own ID (e.g. ECS task
ARN) during event processing.

This also fixes the separate race in #55004 where `external_executor_id`
is lost when the task instance row is locked during event processing.
`process_executor_events` uses `skip_locked=True`, and
`get_event_buffer()` flushes the executor's in-memory buffer into a
local variable. If a TI is locked and skipped, its QUEUED event is
consumed from the buffer but never processed — the event and its
task ID are silently dropped. With the ID now written to the database
before the task is even sent to Celery, adoption no longer depends on
the event being processed.

Closes: #55004
Closes: #58570
Closes: #64971

(cherry picked from commit 3b188b9)
@potiuk potiuk force-pushed the backport-3b188b9-v3-2-test branch from ea6a41d to c78850b Compare April 26, 2026 16:58
@potiuk potiuk merged commit f82b765 into v3-2-test Apr 26, 2026
77 checks passed
@potiuk potiuk deleted the backport-3b188b9-v3-2-test branch April 26, 2026 19:45
vatsrahul1001 pushed a commit that referenced this pull request Apr 27, 2026
…execution on scheduler crash (#65594) (#65711)

When a scheduler crashes between dispatching a task to Celery and
processing the QUEUED event that persists `external_executor_id`, the
replacement scheduler cannot adopt the in-flight task. Without the
Celery task ID in the database, `try_adopt_task_instances` has no
`AsyncResult` to look up, so the task is reset and re-queued — causing
duplicate execution of an already-running task.

Fix this by generating `external_executor_id` via a DB-side UUID
function (`gen_random_uuid` on PostgreSQL, `UUID()` on MySQL, a
Python `uuid4` registered on SQLite) in the same bulk UPDATE that
sets state=QUEUED. The ID is committed atomically with the state
transition — no second write, no race window. RETURNING is used on
PostgreSQL and SQLite to read back the generated UUIDs without a
second round-trip; MySQL falls back to a SELECT.

The CeleryExecutor passes the pre-assigned ID to `apply_async()` as
the Celery `task_id`, making it deterministic from DB state. Other
executors ignore it and overwrite with their own ID (e.g. ECS task
ARN) during event processing.

This also fixes the separate race in #55004 where `external_executor_id`
is lost when the task instance row is locked during event processing.
`process_executor_events` uses `skip_locked=True`, and
`get_event_buffer()` flushes the executor's in-memory buffer into a
local variable. If a TI is locked and skipped, its QUEUED event is
consumed from the buffer but never processed — the event and its
task ID are silently dropped. With the ID now written to the database
before the task is even sent to Celery, adoption no longer depends on
the event being processed.

Closes: #55004
Closes: #58570
Closes: #64971

(cherry picked from commit 3b188b9)
@vatsrahul1001 vatsrahul1001 added this to the Airflow 3.2.2 milestone May 18, 2026
@vatsrahul1001 vatsrahul1001 added the type:bug-fix Changelog: Bug Fixes label May 18, 2026
vatsrahul1001 pushed a commit that referenced this pull request May 20, 2026
…execution on scheduler crash (#65594) (#65711)

When a scheduler crashes between dispatching a task to Celery and
processing the QUEUED event that persists `external_executor_id`, the
replacement scheduler cannot adopt the in-flight task. Without the
Celery task ID in the database, `try_adopt_task_instances` has no
`AsyncResult` to look up, so the task is reset and re-queued — causing
duplicate execution of an already-running task.

Fix this by generating `external_executor_id` via a DB-side UUID
function (`gen_random_uuid` on PostgreSQL, `UUID()` on MySQL, a
Python `uuid4` registered on SQLite) in the same bulk UPDATE that
sets state=QUEUED. The ID is committed atomically with the state
transition — no second write, no race window. RETURNING is used on
PostgreSQL and SQLite to read back the generated UUIDs without a
second round-trip; MySQL falls back to a SELECT.

The CeleryExecutor passes the pre-assigned ID to `apply_async()` as
the Celery `task_id`, making it deterministic from DB state. Other
executors ignore it and overwrite with their own ID (e.g. ECS task
ARN) during event processing.

This also fixes the separate race in #55004 where `external_executor_id`
is lost when the task instance row is locked during event processing.
`process_executor_events` uses `skip_locked=True`, and
`get_event_buffer()` flushes the executor's in-memory buffer into a
local variable. If a TI is locked and skipped, its QUEUED event is
consumed from the buffer but never processed — the event and its
task ID are silently dropped. With the ID now written to the database
before the task is even sent to Celery, adoption no longer depends on
the event being processed.

Closes: #55004
Closes: #58570
Closes: #64971

(cherry picked from commit 3b188b9)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:Executors-core LocalExecutor & SequentialExecutor area:providers area:Scheduler including HA (high availability) scheduler provider:celery type:bug-fix Changelog: Bug Fixes

3 participants