Introduction

This article details creating an automated ETL (Extract, Transform, Load) pipeline that retrieves daily Bitcoin price data from the Polygon.io API, performs necessary transformations, and loads the data into a PostgreSQL database. The workflow is orchestrated using Apache Airflow, ensuring reliable daily execution.

This project demonstrates several key data engineering concepts:

  • API data extraction
  • Data transformation using pandas
  • Database integration with PostgreSQL
  • Workflow orchestration with Apache Airflow
  • Deployment to a cloud environment

System Architecture

The pipeline consists of the following components:

  1. Data Source: Polygon.io API providing cryptocurrency price data
  2. ETL Script: Python script that handles extraction, transformation, and loading
  3. Database: PostgreSQL for data storage
  4. Orchestration: Apache Airflow for scheduling and monitoring
  5. Infrastructure: Cloud VM for hosting the pipeline

The system flows in a linear fashion: Airflow triggers the ETL script daily, which extracts the latest BTC prices, transforms the data into a suitable format, and loads it into the PostgreSQL database.

Detailed Implementation

Step 1: Creating the ETL Script

The first component is btc_prices.py, which handles the core ETL functionality:

import requests
import os
from sqlalchemy import create_engine
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv

# Define API endpoint
url = 'https://api.polygon.io/v1/open-close/crypto/BTC/USD/2025-03-31?adjusted=true&apiKey=YOUR_API_KEY'
response = requests.get(url)

if response.status_code == 200:
    data = response.json()
    open_price = data.get('open')
    close_price = data.get('close')
    date = data.get('day')
    symbol = data.get('symbol')
else:
    print(f"Failed to retrieve data: {response.status}")
    exit()

# Prepare data for insertion
data_df = {
    'symbol': symbol,
    'open_price': open_price,
    'close_price': close_price,
    'date': date
}
df = pd.DataFrame(data_df, index=[0])
df['date'] = pd.to_datetime(df['date']).dt.strftime('%Y-%m-%d')

# Load environment variables
load_dotenv()
dbname = os.getenv('dbname')
user = os.getenv('user')
password = os.getenv('password')
host = os.getenv('host')
port = os.getenv('port')

# Create database connection
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{dbname}')

df.to_sql("crypto_prices", con=engine, if_exists="append", index=False, schema="dataengineering")
print(f"Successfully loaded crypto data for {df['date'][0]}")

This script:

  • Extracts Bitcoin price data from the Polygon.io API
  • Transforms and structures the data using pandas
  • Loads the data into PostgreSQL
  • Uses environment variables for secure database connection management

Step 2: Creating the Airflow DAG

Next, the btc_dag.py defines the Airflow DAG (Directed Acyclic Graph) that orchestrates the workflow:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

# DAG default arguments
default_args = {
    "owner": "data_engineer",
    "depends_on_past": False,
    "start_date": datetime(2025, 3, 31),
    "email_on_failure": False,
    "email_on_retry": True,
    "retries": 2,
    "retry_delay": timedelta(minutes=2)
}

with DAG(
    'polygon_btc_data',
    default_args=default_args,
    schedule_interval='@daily',
) as dag:

    activate_venv = BashOperator(
        task_id='activate_virtual_env',
        bash_command='source /home/user/project/venv/bin/activate',
    )

    execute_file = BashOperator(
        task_id='execute_python_file',
        bash_command='python /home/user/project/btc_prices.py',
    )

    activate_venv >> execute_file

This DAG:

  • Defines the execution schedule
  • Activates the virtual environment
  • Executes the ETL script

Step 3: Setting Up the Environment

  1. Creating a Virtual Environment:
python -m venv venv
   source venv/bin/activate
  1. Installing Dependencies:
pip install requests pandas sqlalchemy python-dotenv psycopg2-binary apache-airflow
  1. Setting Up Environment Variables:
echo "dbname=your_database_name" >> .env
   echo "user=your_database_user" >> .env
   echo "password=your_database_password" >> .env
   echo "host=your_database_host" >> .env
   echo "port=your_database_port" >> .env

Step 4: Server Deployment

  1. SSH into the cloud VM:
ssh user@your_server_ip
  1. Create necessary directories:
mkdir -p ~/crypto_price
   mkdir -p ~/airflow/dags
  1. Transfer scripts to the server:
scp btc_prices.py user@your_server_ip:~/crypto_price/
   scp btc_dag.py user@your_server_ip:~/airflow/dags/

Step 5: PostgreSQL Configuration

  1. Creating Database Schema:
CREATE SCHEMA IF NOT EXISTS dataengineering;

   CREATE TABLE IF NOT EXISTS dataengineering.crypto_prices (
       id SERIAL PRIMARY KEY,
       symbol VARCHAR(10) NOT NULL,
       open_price NUMERIC(20, 8) NOT NULL,
       close_price NUMERIC(20, 8) NOT NULL,
       date DATE NOT NULL,
       created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
   );

Conclusion

The architecture follows best practices for data engineering:

  • Separation of extraction, transformation, and loading concerns
  • Secure credential management
  • Robust error handling
  • Automated scheduling
  • Cloud-based deployment

The combination of Python, Airflow, and PostgreSQL provides a powerful foundation for financial data analysis, enabling timely insights into cryptocurrency market trends.

Github