I have a set of Python scripts for automating different tasks on a Windows environment, and most of these tasks make use of the pywin32
library. Therefore, I need a python executable from a Windows venv in order to run these scripts.
I thought about using Apache Airflow to schedule the execution of these scripts and I installed it on my Windows machine using Docker for Desktop and the official production images. I have been trying to make ExternalPythonOperator
use a test Windows venv that I set up and mount on to the airflow containers by referencing the python executable within this venv using PATH_TO_PYTHON_BINARY
. To do so, I have created a volume to place all the venvs needed for the execution of these scripts, so these venvs can be accessed from the airflow container: ${AIRFLOW_PROJ_DIR:-.}/venvs:/opt/airflow/venvs
.
For this example, I am trying to use /opt/airflow/venvs/test_1
, so I set PATH_TO_PYTHON_BINARY = '/opt/airflow/venvs/test_1/scripts/python.exe'
.
This is the test DAG I am trying to run:
from datetime import datetime, timedelta
import socket
from platform import platform
from airflow.models.dag import DAG
from airflow.operators.python import ExternalPythonOperator, PythonOperator
# Test callable
def test_function():
print('This is a test callable.')
print(f'Today is {datetime.today().date()}.')
print(f'Execution started at {datetime.today().time()}'[:-6])
print(f'The host is {socket.gethostname()}.')
print(f'Running on {platform()}')
print('End of the script.')
with DAG(
"external-python",
default_args={
"depends_on_past": False,
"email": ["[email protected]"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
},
description="Test DAG",
schedule=timedelta(days=1),
start_date=datetime(2023, 1, 1),
catchup=False,
tags=["python-script"],
) as dag:
# First task is to execute test_function using PythonOperator to see if code works correctly
t1 = PythonOperator(
task_id="python-operator",
python_callable=test_function,
retries=0)
# Second task is to execute test_function using ExternalPythonOperator
PATH_TO_PYTHON_BINARY = '/opt/airflow/venvs/test_1/scripts/python.exe'
t2 = ExternalPythonOperator(
task_id="external-python-operator",
depends_on_past=False,
python_callable=test_function,
python=PATH_TO_PYTHON_BINARY,
retries=0,
)
t1.set_downstream(t2)
Running this DAG results in a successfull run of the first task, which prints the following results:
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - This is a test script.
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - Today is 2023-12-26.
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - Execution started at 13:22:50.
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - The host is f991b6d7f20a.
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - Running on Linux-5.15.133.1-microsoft-standard-WSL2-x86_64-with-glibc2.34
[2023-12-26, 14:22:50 CET] {logging_mixin.py:188} INFO - End of the script
However, the second task fails and returns the following error:
[2023-12-26, 14:22:51 CET] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 855, in _get_python_version_from_environment
result = subprocess.check_output([self.python, "--version"], text=True)
File "/usr/local/lib/python3.8/subprocess.py", line 415, in check_output
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
File "/usr/local/lib/python3.8/subprocess.py", line 516, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['/opt/airflow/venvs/test_1/scripts/python.exe', '--version']' returned non-zero exit status 1.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
result = execute_callable(context=context, **execute_callable_kwargs)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 400, in execute
return super().execute(context=serializable_context)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 839, in execute_callable
python_version_as_list_of_strings = self._get_python_version_from_environment()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 858, in _get_python_version_from_environment
raise ValueError(f"Error while executing {self.python}: {e}")
ValueError: Error while executing /opt/***/venvs/test_1/scripts/python.exe: Command '['/opt/***/venvs/test_1/scripts/python.exe', '--version']' returned non-zero exit status 1.
Long story short: can I use a Windows venv to execute a Python callable in Apache Airflow or am I trying to do the impossible? I am pretty new using Apache Airflow and I could not find any information about Windows venvs in the official documentation. I did not find any similar question either, so any help is appreciated!