Introduction

This project involves implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world.

System Architecture

The pipeline consists of two main components:

  1. Data Producer(weather_df.py): It extracts weather data from OpenWeatherApi and publishes it to Kafka topic.
  2. Data Consumer(weather_consumer.py): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database.

Implementation

step 1: Creating the scripts

The first component is weather_df.py which handles data extraction and publishing:

import requests, os
import json
from dotenv import load_dotenv
from confluent_kafka import Producer
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

load_dotenv()


own_url='https://api.openweathermap.org/data/2.5/weather'
own_api_key=os.getenv('WEATHER_API_KEY')
cities = [
    "Milan",
    "Tokyo",
    "London",
    "Managua",
    "Sydney"
]
def weather_extract(city):

    url = f"{own_url}?q={city}&appid={own_api_key}&units=metric"
    response=requests.get(url)
    data=response.json()
    data['extracted_city']=city
    return data

def delivery_report(err, msg):
    """Callback for Kafka message delivery status."""
    if err is not None:
        logger.error(f"Message delivery failed: {err}")
    else:
        logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")


kafka_config={
    'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'),
    "security.protocol": "SASL_SSL",
    "sasl.mechanisms": "PLAIN",
    "sasl.username": os.getenv('CONFLUENT_API_KEY'),
    "sasl.password": os.getenv('CONFLUENT_SECRET_KEY'),
    "broker.address.family": "v4",
    "message.send.max.retries": 5,
    "retry.backoff.ms": 500,
}

producer=Producer(kafka_config)
topic='weather-data'

def produce_weather_data():
    for city in cities:
        data=weather_extract(city)
        if data:
            producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
            producer.poll(0)
        else:
            logger.error(f"Failed to fetch data for {city}")
    producer.flush()

if __name__ == "__main__":
    produce_weather_data()
    logger.info("Data extraction and production complete")

This script:

  • Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney

  • Transforms the API response to a consistent format

  • Sends the formatted data to a Confluent_Kafka topic named weather-data

  • Uses environment variables for secure database connection management.

Step 2: Running Consumer

The Consumer weather_consumer.pyfile subscribes to and polls the messages from Kafka producer before loading it to the database.

import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from cassandra.cluster import Cluster
from json import loads
from datetime import datetime
import uuid

# --- Load environment variables ---
load_dotenv()

# --- Confluent Kafka Consumer Configuration ---
conf = {
    'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
    'security.protocol': 'SASL_SSL',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': os.getenv('CONFLUENT_API_KEY'),
    'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
    'group.id': 'weather-group-id',
    'auto.offset.reset': 'earliest'
}

# Initialize Kafka consumer

consumer = Consumer(conf)
topic = 'weather-data'  # Topic name
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")

# --- Cassandra Setup (Azure Server) ---
try:
    cluster = Cluster(['127.0.0.1'])  # Updated with Azure IP address
    session = cluster.connect()

    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS city_weather_data
        WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
    """)
    session.set_keyspace("city_weather_data")

    session.execute("""
        CREATE TABLE IF NOT EXISTS city_weather_data (
            id UUID PRIMARY KEY,
            city_name TEXT,
            weather_main TEXT,
            weather_description TEXT,
            temperature FLOAT,
            timestamp TIMESTAMP
        )
    """)
    print("Cassandra table ready")
except Exception as e:
    print("Error setting up Cassandra: {e}")
    session = None

# --- Read from Kafka and Insert into Cassandra ---
if session:
    try:
        while True:
            msg = consumer.poll(1.0)
            if msg is None:
                continue
            if msg.error():
                raise KafkaException(msg.error())
            else:
                try:
                    data = loads(msg.value().decode('utf-8'))

                    # Extract required fields
                    record = {
                        "id": uuid.uuid4(),
                        "city_name": data.get("extracted_city", "Unknown"),
                        "weather_main": data["weather"][0]["main"],
                        "weather_description": data["weather"][0]["description"],
                        "temperature": data["main"]["temp"],
                        "timestamp": datetime.fromtimestamp(data["dt"])
                    }

                    # Insert into Cassandra
                    session.execute("""
                        INSERT INTO city_weather_data (id, city_name, weather_main, weather_description, temperature, timestamp)
                        VALUES (%(id)s, %(city_name)s, %(weather_main)s, %(weather_description)s, %(temperature)s, %(timestamp)s)
                    """, record)

                    print(f"Inserted weather for {record['city_name']} at {record['timestamp']}")

                except Exception as e:
                    print(f"Error processing message: {e}")

    except KeyboardInterrupt:
        print("Consumer stopped manually")

    finally:
        consumer.close()
        print("Kafka consumer closed")

This script:

  • Subscribes to and polls messages from Kafka

  • Extracts relevant fields from the weather data

  • Inserts processed records into a Cassandra database

Step 3: Setting Up the Environment

Creating a Virtual Environment:

python3 -m venv venv
   source venv/bin/activate

Install required packages:

pip install requests python-dotenv confluent-kafka cassandra-driver

Step 4: Running the Pipeline

Ensure your Cassandra instance is running. The consumer will automatically create KeySpace and table if they don't exist.
Run the consumer to begin listening for messages:

python3 weather_consumer.py

In a separate terminal, run the producer to fetch and publish weather data:

python3 weather_df.py

The producer will:

  • Fetch weather data for each configured city
  • Publish messages to Kafka
  • Log the status of each operation

Data Flow

  • The producer calls OpenWeatherMap API for each city
  • Weather data is serialized to JSON and published to Kafka
  • The consumer continuously polls the Kafka topic
  • Incoming messages are deserialized and transformed
  • Data is inserted into the Cassandra database for persistence
  • The process repeats as new data becomes available

Future Enhancements

  • Scheduling: Implement Apache Airflow to schedule regular data collection
  • Data Validation: Add schema validation to ensure data quality
  • Monitoring: Implement metrics collection for pipeline performance
  • Scaling: Configure multiple consumer instances for parallel processing
  • Analytics: Build data visualization dashboards with the collected weather data

Conclusion

This ETL pipeline demonstrates knowledge on how to build a real-time data processing system using Kafka. It provides a foundation that can be extended for various use cases, from weather analytics to environmental monitoring systems.