In this tutorial, I will walk you through how I built a fully functional ETL pipeline using Apache Airflow to extract stock data from the AlphaVantage API, transform it using pandas, and load it into a PostgreSQL database. The pipeline is modular and scalable and is a great introduction to Airflow DAGs, task scheduling and orchestration.
You can access the GitHub repository here.
Project Overview
The pipeline pulls daily, weekly and monthly stock data from AlphaVantage for IBM’s daily time series and performs three key steps:
-
Extract: Retrieve data from the API using the
requests
library. -
Transform: Clean and convert the data to a structured
pandas
DataFrame. -
Load: Store the cleaned data into a PostgreSQL database using SQLAlchemy's
create_engine
method.
Requirements
The following packages are required to run this project:
- apache-airflow==2.10.5
- python
- pandas
- requests
- sqlalchemy
- psycopg2-binary
- python-dotenv
Project Structure
stock-data-pipeline/
├── dags/
│ ├── daily_pipeline.py
│ ├── weekly_pipeline.py
│ ├── monthly_pipeline.py
├── airflow.cfg
├── airflow.db
├── webserver_pid.py
├── .gitignore
├── .env
├── requirements.txt
└── README.md
Setup Instructions
NOTE for Windows users: Airflow does not support Windows natively. Use WSL (Windows Subsystem for Linux) to run this project.
1. Clone the repository
To run this repository, open your terminal and run this command
git clone https://github.com/dkkinyua/stock-data-pipeline.git
cd stock-data-pipeline
2. Set up a virtual environment
Install and activate your virtual environment using venv
or any other virtual environment provider, such as virtualenv
.
Installing virtual environments for each project is essential and a good practice to prevent package mixup and allows each project to work with compatible package versions.
python -m venv your_env
source your_env/bin/activate # Because Windows users will be
running Airflow on WSL, use this command to activate your environment.
When your virtual environment is active, the environment name will show next to your directory in the terminal as shown in the snapshot below:
3. Install dependencies
The project structure has a requirements.txt
file containing all the packages required for this project.
To install these packages, run the following command in your terminal.
pip install -r requirements.txt
4. Configure Apache Airflow
a. Set your Airflow home path and initialize Airflow's SQLite database:
export AIRFLOW_HOME=$(pwd)/airflow
airflow db init
The airflow db init
command creates a folder airflow/
in the project's root directory.
b. Create an admin user:
airflow users create \
--username admin \
--firstname YourName \
--lastname Admin \
--role Admin \
--email [email protected] \
--password admin
c. Start the webserver and scheduler:
airflow webserver & airflow scheduler
Airflow UI will run on http://localhost:8080
Exploring the DAG: daily_pipeline.py
This DAG extracts IBM’s daily stock data using AlphaVantage's API and runs at midnight daily.
DAG setup
default_args = {
'owner': 'deecodes',
'retries': 5,
'retry_delay': timedelta(minutes=5)
}
@dag(dag_id='daily_data_pipeline_dag', default_args=default_args, start_date=datetime(2025, 5, 13), schedule='@daily')
def extract_data():
# code
a. extract_daily_data
def extract_daily_data():
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_DAILY&symbol=IBM&apikey={API_KEY}"
response = requests.get(url)
return response.json() if response.status_code == 200 else None
TIP: Store your API keys and credentials in a
.env
file and add it to.gitignore
. A .gitignore file instructs Git not to push certain files and folders that might have sensitive information
b. transform_daily_data
def transform_daily_data(data):
df = pd.DataFrame(...)
df["Date"] = pd.to_datetime(df["Date"])
df.set_index("Date", inplace=True)
...
return df
c. load_to_db
def load_to_db(df):
engine = create_engine(DB_URL)
df.to_sql(name='daily_stock_data', con=engine, if_exists='append')
Setting Task Sequence with Airflow's XCom
The tasks return data values when complete. We will use Airflow's XCom feature to share this data between tasks, and this will help us in setting up the task sequence.
data = extract_daily_data()
df = transform_daily_data(data)
load_to_db(df)
Trigger the DAG from the Airflow UI:
Check success status:
Inspect task flow in the graph view:
Confirm the data loaded in PostgreSQL:
✅ Conclusion
We’ve walked through:
- How to extract and clean stock data from AlphaVantage.
- How to configure and run DAGs with Apache Airflow.
- How to load structured data into a PostgreSQL database.
If you have questions or suggestions, feel free to open a pull request or email me at [email protected].
Also, please like, share, comment and follow me to get more data engineering blogs and projects and stay in the know-how of the data engineering world.
Top comments (0)