Introduction
Welcome to the exciting world of data engineering! In this comprehensive tutorial, you'll learn how to build your first ETL (Extract, Transform, Load) pipeline using PySpark to fetch weather data from the OpenWeatherMap API and store it in a PostgreSQL database.
What is ETL?
- Extract: Get data from a source (in our case, OpenWeatherMap API)
- Transform: Clean, process, and structure the data
- Load: Store the processed data in a destination (PostgreSQL database)
By the end of this tutorial, you'll have hands-on experience with:
- Server management and SSH connections
- Python virtual environments
- PySpark for data processing
- API integration
- Database connections
- Project organization best practices
Prerequisites
Before we begin, make sure you have:
- Access to a Linux server (cloud instance or local machine)
- Basic knowledge of command line operations
- A free OpenWeatherMap API account (OpenWeatherMap )
- PostgreSQL installed on your server
Step 1: Connecting to Your Server
First, we need to establish a secure connection to our server using SSH (Secure Shell).
ssh user@your_server_ip_address
What's happening here?
-
ssh
is the command to establish a secure connection -
user
is your username on the server -
your_server_ip_address
is the IP address of your server
After entering this command, you'll be prompted to enter your password. Once authenticated, you'll see your server's command prompt, indicating you're now connected.
Step 2: Setting Up Your Project Directory
Now that we're connected to the server, let's create a dedicated folder for our weather ETL project.
example:
mkdir navas_weather_etl
Why create a separate folder?
- Keeps your project organized
- Prevents conflicts with other projects
- Makes it easier to manage dependencies
- Follows professional development practices
Next, navigate into your newly created directory:
cd navas_weather_etl
Step 3: Creating a Python Virtual Environment
Virtual environments are crucial in Python development. Let's create one for our project:
python3 -m venv myvenv
Why Use Virtual Environments?
Virtual environments are isolated Python environments that allow you to:
- - Dependency Isolation: Each project can have its own set of packages without conflicts
- - Version Control: Different projects can use different versions of the same package
- - Clean Development: Prevents system-wide package installations that could break other projects
- - Reproducibility: Makes it easier to replicate your environment on other machines
- - Professional Standard: Industry best practice for Python development
Think of a virtual environment as a separate "workspace" for each project, ensuring that what you install for one project doesn't interfere with another.
Now, let's activate our virtual environment:
source myvenv/bin/activate
You'll notice your command prompt changes to show (myvenv)
at the beginning, indicating the virtual environment is active.
Step 4: Creating Project Files
Let's create the essential files for our project using the touch
command:
touch weather_etl.py .env requirements.txt
File Breakdown:
-
weather_etl.py
: Contains our main ETL code -
.env
: Stores sensitive information like API keys (never commit to version control!) -
requirements.txt
: Lists all Python packages our project needs
Step 5: Setting Up Dependencies
Let's populate our requirements.txt
file with the necessary packages:
certifi==2025.4.26
charset-normalizer==3.4.2
idna==3.10
psycopg2-binary==2.9.10
py4j==0.10.9.9
pyspark==4.0.0
requests==2.32.3
urllib3==2.4.0
python-dotenv==1.0.0
Package Explanations:
-
pyspark
: Apache Spark's Python API for big data processing -
requests
: For making HTTP requests to the OpenWeatherMap API -
psycopg2-binary
: PostgreSQL adapter for Python -
python-dotenv
: Loads environment variables from .env file
Step 6: Database Setup
Ensure you have PostgreSQL set up with a database and user for this project.
Step 7: The Complete ETL Code
Now, let's create our main ETL script. Edit the weather_etl.py
file:
import requests
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
# Load environment variables from .env file
load_dotenv()
# Get API key from environment variable
API_KEY = os.getenv("API_KEY")
CITIES = ["Nairobi", "Mombasa", "Kisumu"]
def fetch_weather(city):
"""Fetch weather data for a specific city from OpenWeatherMap API"""
url = f"https://api.openweathermap.org/data/2.5/weather?q={city}&appid={API_KEY}&units=metric"
return requests.get(url).json()
def extract_data():
"""Extract weather data for all cities"""
return [fetch_weather(city) for city in CITIES]
def transform(spark, data):
"""Transform raw weather data into structured DataFrame"""
schema = StructType([
StructField("city", StringType()),
StructField("temp", DoubleType()),
StructField("feels_like", DoubleType()),
StructField("humidity", IntegerType()),
StructField("pressure", IntegerType()),
StructField("wind_speed", DoubleType()),
StructField("weather_main", StringType()),
StructField("weather_desc", StringType())
])
rows = [(d["name"], d["main"]["temp"], d["main"]["feels_like"],
d["main"]["humidity"], d["main"]["pressure"], d["wind"]["speed"],
d["weather"][0]["main"], d["weather"][0]["description"])
for d in data]
return spark.createDataFrame(rows, schema)
def load(df):
"""Load DataFrame to PostgreSQL database"""
df.write \
.format("jdbc") \
.option("url", "jdbc:postgresql://localhost:5432/weather_db") \
.option("dbtable", "public.navas_weather_data") \
.option("user", "postgres") \
.option("password", "12345") \
.option("driver", "org.postgresql.Driver") \
.mode("append") \
.save()
def main():
"""Main ETL pipeline execution"""
# Check if API key is loaded
if not API_KEY:
raise ValueError("API_KEY not found in environment variables. Please check your .env file.")
# Create Spark session
spark = SparkSession.builder \
.appName("WeatherETL") \
.config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
.getOrCreate()
try:
# Execute ETL pipeline
data = extract_data()
df = transform(spark, data)
df.show()
load(df)
print("ETL pipeline completed successfully!")
except Exception as e:
print(f"Error in ETL pipeline: {str(e)}")
finally:
# Stop Spark session
spark.stop()
if __name__ == "__main__":
main()
Step 9: Installing Dependencies
Before running our code, we need to install all the required packages:
pip install -r requirements.txt
Step 10: Running the ETL Pipeline
Now for the exciting part - running our ETL pipeline:
python weather_etl.py
If everything is set up correctly, you should see output showing the extraction, transformation, and loading process.
Step 10: Version Control Best Practices
Before pushing your code to GitHub, create a .gitignore
file to exclude sensitive files:
touch .gitignore
Add the following content to .gitignore:
# Environment variables
.env
# Virtual environment
myvenv/
venv/
env/
# Python cache
__pycache__/
*.pyc
*.pyo
# IDE files
.vscode/
.idea/
# OS files
.DS_Store
Thumbs.db
Security Note: Never commit .env
files to version control. They contain sensitive information!
Why use .gitignore?
- Prevents sensitive information (like API keys) from being committed
- Keeps repository clean by excluding temporary files
- Prevents virtual environment files from being tracked
Conclusion
You've successfully created a complete ETL pipeline that:
- Extracts real-time weather data from an API
- Transforms it with PySpark for analysis
- Loads it into a PostgreSQL database for storage
This project demonstrates fundamental data engineering concepts and provides a solid foundation for more complex data pipelines. Remember to always follow best practices like using virtual environments, keeping secrets secure, and maintaining clean code structure.
Happy data engineering! π
This tutorial was created to help beginners start their data engineering journey with practical, hands-on experience using industry-standard tools and practices.
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.