DEV Community

Ambuso Dismas
Ambuso Dismas

Posted on

Building an Automated Crypto Price ETL Pipeline with Airflow and PostgreSQL

INTRODUCTION

The world of cryptocurrency moves fast—and so should your data pipelines. In this blog post, I’ll walk you through how I built a real-time ETL pipeline using Apache Airflow that fetches and stores hourly cryptocurrency price data from the CoinGecko API into a PostgreSQL database. This setup is designed to be scalable, reliable, and easy to manage.

This project demonstrates key data engineering principles:

  1. REST API extraction using Python
  2. Database integration with PostgreSQL
  3. Workflow scheduling with Apache Airflow
  4. Secure credential handling using environment variables

System Architecture

The pipeline consists of the following components:

Data Source: Polygon.io API providing cryptocurrency price data
ETL Script: Python script that handles extraction, transformation, and loading
Database: PostgreSQL for data storage
Orchestration: Apache Airflow for scheduling and monitoring
Infrastructure: Cloud VM for hosting the pipeline
The system flows in a linear fashion: Airflow triggers the ETL script hourly, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.

Detailed Implementation

** Step 1: Creating the ETL Script**
The first component is etl.py, which handles the core ETL functionality:


import requests
import psycopg2
import os
from datetime import datetime
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

coin_list = [
    'bitcoin', 'ethereum', 'tether', 'xrp', 'binancecoin',
    'solana', 'usd-coin', 'dogecoin', 'cardano', 'tron',
    'avalanche-2', 'polkadot', 'chainlink', 'litecoin', 'stellar'
]

def fetch_and_store_crypto_prices():
    url = "https://api.coingecko.com/api/v3/coins/markets"
    params = {
        'ids': ','.join(coin_list),
        'vs_currency': 'usd'
    }

    try:
        response = requests.get(url, params=params, timeout=10)
        response.raise_for_status()
    except Exception as e:
        print(f"[ERROR] Failed to fetch data: {e}")
        return

    data = response.json()
    timestamp = datetime.utcnow()
    rows = [
        (
            coin['name'],
            coin['symbol'].upper(),
            coin['current_price'],
            coin['market_cap'],
            coin['total_volume'],
            timestamp
        )
        for coin in data
    ]

    try:
        conn = psycopg2.connect(
            dbname=os.getenv('DB_NAME'),
            host=os.getenv('DB_HOST'),
            user=os.getenv('DB_USER'),
            password=os.getenv('DB_PASSWORD'),
            port=os.getenv('DB_PORT')
        )
        cursor = conn.cursor()

        cursor.execute("CREATE SCHEMA IF NOT EXISTS crypto;")
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS crypto.crypto_prices (
                name TEXT,
                symbol TEXT,
                price NUMERIC,
                market_cap NUMERIC,
                total_volume NUMERIC,
                timestamp TIMESTAMP
            );
        """)
        cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_crypto_timestamp 
            ON crypto.crypto_prices (timestamp);
        """)

        insert_query = """
            INSERT INTO crypto.crypto_prices 
            (name, symbol, price, market_cap, total_volume, timestamp)
            VALUES (%s, %s, %s, %s, %s, %s)
        """
        cursor.executemany(insert_query, rows)
        conn.commit()
        print("[SUCCESS] Data inserted.")

    except Exception as e:
        print(f"[ERROR] DB insert failed: {e}")
        if conn:
            conn.rollback()
    finally:
        if cursor:
            cursor.close()
        if conn:
            conn.close()
        print("[INFO] DB connection closed.")

Enter fullscreen mode Exit fullscreen mode

This script:

Extracts Bitcoin price data from the Polygon.io API
Transforms and structures the data using pandas
Loads the data into PostgreSQL
Uses environment variables for secure database connection management

Step 2: Creating the Airflow DAG
Next, the crypto_dag.py defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

# Import your custom ETL logic
from scripts.etl import fetch_and_store_crypto_prices

default_args = {
    'owner': 'Ambuso',
    'depends_on_past': False,
    'start_date': datetime(2025, 5, 20),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=2),
}

with DAG(
    dag_id='coin_price_etl_dag',
    default_args=default_args,
    description='Fetch and store hourly crypto prices',
    schedule_interval='@hourly',
    catchup=False,
) as dag:

    fetch_task = PythonOperator(
        task_id='fetch_and_store_crypto_prices',
        python_callable=fetch_and_store_crypto_prices,
    )

Enter fullscreen mode Exit fullscreen mode

This DAG:

  1. Defines the Workflow – Sets up the DAG's schedule, start date, retry logic, and other configuration details.
  2. Registers the Task – Connects your Python ETL function (fetch_and_store_crypto_prices) as a task in the workflow.
  3. Orchestrates Execution – Tells Airflow to run the ETL task automatically (e.g., hourly), track its status, and handle failures or retries.

Data Visualization with Python
After loading cryptocurrency data into PostgreSQL, I used Python (with libraries like pandas, matplotlib, and seaborn) to explore and visualize the data. This helped uncover key patterns such as:

  1. Hourly price trends for top cryptocurrencies like Bitcoin, Ethereum, and Solana
  2. Volatility patterns by plotting market cap and trading volume over time
  3. Buy/sell signal indicators, such as local minima/maxima and moving averages

These visualizations provide deeper insight into market behavior, helping translate raw numbers into actionable intelligence. Python’s flexibility also allows for automating chart updates as new data flows in through the ETL pipeline.

Image description

Conclusion
The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.

Top comments (1)

Collapse
 
emmanuel_kiriinya_416fc40 profile image
Emmanuel Kiriinya

This is so cool!

Some comments may only be visible to logged-in visitors. Sign in to view all comments.