Skip to content

Celery: Task executed twice on scheduler crash due to adopt_or_reset_orphaned_tasks #58570

@arkadiuszbach

Description

@arkadiuszbach

Apache Airflow version

3.1.3

If "Other Airflow 2/3 version" selected, which one?

3.1.1

What happened?

If scheduler would crash after successfully placing the task on celery queue, but before getting queued event:

if state in (TaskInstanceState.QUEUED, TaskInstanceState.RUNNING):

Then external_executor_id will not be set, on next run adopt_or_reset_orphaned_tasks will reset the task (ti.state = None) and scheduler will queue it again. If the task was already running then it is going to be executed twice.

api /{task_instance_id}/run endpoint will not complain as the task instance status will be queued due to reset by adoption:

In airflow 2.10.2 executor_external_id from what i can see was set either by scheduler or by the starting task via check_and_change_state_before_execution, so maybe ti_run should also set executor_external_id to prevent task reset in such scenario?

if not self.task_instance.check_and_change_state_before_execution(

API log:

# Initial Start
2025-11-21T17:56:56.297808Z [debug    [] Starting task instance run     [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:115 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d unixname=airflow
2025-11-21T17:56:56.303505Z [debug    [] Retrieved task instance details [airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test loc=task_instances.py:148 state=queued task_id=test_12 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:56:56.303923Z [info     [] Task started                   [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:199 previous_state=queued ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:56:56.306420Z [info     [] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:212 rows_affected=1 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d

# Heartbeat - task state is running
2025-11-21T17:58:56.984438Z [debug    [] Processing heartbeat           [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:595 pid=175 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:58:56.990311Z [debug    [] Retrieved current task state   [airflow.api_fastapi.execution_api.routes.task_instances] current_hostname=10.0.152.49 current_pid=175 loc=task_instances.py:604 state=running ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:58:56.992791Z [debug    [] Heartbeat updated              [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:648 state=running ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
INFO:     10.0.152.49:47826 - "PUT /execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/heartbeat HTTP/1.1" 204 No Content

# another start after adoption, previous_state=queued
2025-11-21T17:58:59.602695Z [debug    [] Starting task instance run     [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:115 pid=195 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d unixname=airflow
2025-11-21T17:58:59.609789Z [debug    [] Retrieved task instance details [airflow.api_fastapi.execution_api.routes.task_instances] dag_id=test loc=task_instances.py:148 state=queued task_id=test_12 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:58:59.610316Z [info     [] Task started                   [airflow.api_fastapi.execution_api.routes.task_instances] hostname=10.0.152.49 loc=task_instances.py:199 previous_state=queued ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
2025-11-21T17:58:59.613515Z [info     [] Task instance state updated    [airflow.api_fastapi.execution_api.routes.task_instances] loc=task_instances.py:212 rows_affected=1 ti_id=019aa78f-1e64-7faa-a5e2-b662a0b60f2d
INFO:     10.0.152.49:47834 - "PATCH /execution/task-instances/019aa78f-1e64-7faa-a5e2-b662a0b60f2d/run HTTP/1.1" 200 OK

What you think should happen instead?

No response

How to reproduce

Was killing db connectivity randomly and it happened, was able to reproduce by modifying scheduler_job_runner.py:

  1. I addded
    time.sleep(30) # to make sure task gets into running state
    raise Exception()
    
    before:
    ti.external_executor_id = info
    cls.logger().info("Setting external_executor_id for %s to %s", ti, info)
    continue
    
  2. Started dummy 5minutes sleep task
  3. Restarted scheduler to trigger adoption

Operating System

Kubernetes

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

area:Schedulerincluding HA (high availability) schedulerarea:corekind:bugThis is a clearly a bug

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions