Introduction
This project involves implementing an ETL(Extract, Transform, Load) pipeline that fetches real-time weather data from OpenWeatherMap API
, processes it through Apache Kafka and stores it in Cassandra database. The pipeline monitors the weather conditions across multiple cities in the world.
System Architecture
The pipeline consists of two main components:
- Data Producer(
weather_df.py
): It extracts weather data fromOpenWeatherApi
and publishes it to Kafka topic. - Data Consumer(
weather_consumer.py
): Subscribes to the Kafka topic, process the incoming messages and load data into Cassandra database.
Implementation
step 1: Creating the scripts
The first component is weather_df.py
which handles data extraction and publishing:
import requests, os
import json
from dotenv import load_dotenv
from confluent_kafka import Producer
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
load_dotenv()
own_url='https://api.openweathermap.org/data/2.5/weather'
own_api_key=os.getenv('WEATHER_API_KEY')
cities = [
"Milan",
"Tokyo",
"London",
"Managua",
"Sydney"
]
def weather_extract(city):
url = f"{own_url}?q={city}&appid={own_api_key}&units=metric"
response=requests.get(url)
data=response.json()
data['extracted_city']=city
return data
def delivery_report(err, msg):
"""Callback for Kafka message delivery status."""
if err is not None:
logger.error(f"Message delivery failed: {err}")
else:
logger.info(f"Message delivered to {msg.topic()} [{msg.partition()}] at offset {msg.offset()}")
kafka_config={
'bootstrap.servers':os.getenv('BOOTSTRAP_SERVER'),
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": os.getenv('CONFLUENT_API_KEY'),
"sasl.password": os.getenv('CONFLUENT_SECRET_KEY'),
"broker.address.family": "v4",
"message.send.max.retries": 5,
"retry.backoff.ms": 500,
}
producer=Producer(kafka_config)
topic='weather-data'
def produce_weather_data():
for city in cities:
data=weather_extract(city)
if data:
producer.produce(topic, key=city, value=json.dumps(data), callback=delivery_report)
producer.poll(0)
else:
logger.error(f"Failed to fetch data for {city}")
producer.flush()
if __name__ == "__main__":
produce_weather_data()
logger.info("Data extraction and production complete")
This script:
Fetches current weather data for Milan, Tokyo, London, Managua, and Sydney
Transforms the API response to a consistent format
Sends the formatted data to a Confluent_Kafka topic named
weather-data
Uses environment variables for secure database connection management.
Step 2: Running Consumer
The Consumer weather_consumer.py
file subscribes to and polls the messages from Kafka producer before loading it to the database.
import os
from dotenv import load_dotenv
from confluent_kafka import Consumer, KafkaException
from cassandra.cluster import Cluster
from json import loads
from datetime import datetime
import uuid
# --- Load environment variables ---
load_dotenv()
# --- Confluent Kafka Consumer Configuration ---
conf = {
'bootstrap.servers': os.getenv('BOOTSTRAP_SERVER'),
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': os.getenv('CONFLUENT_API_KEY'),
'sasl.password': os.getenv('CONFLUENT_SECRET_KEY'),
'group.id': 'weather-group-id',
'auto.offset.reset': 'earliest'
}
# Initialize Kafka consumer
consumer = Consumer(conf)
topic = 'weather-data' # Topic name
consumer.subscribe([topic])
print(f"Subscribed to topic: {topic}")
# --- Cassandra Setup (Azure Server) ---
try:
cluster = Cluster(['127.0.0.1']) # Updated with Azure IP address
session = cluster.connect()
session.execute("""
CREATE KEYSPACE IF NOT EXISTS city_weather_data
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
session.set_keyspace("city_weather_data")
session.execute("""
CREATE TABLE IF NOT EXISTS city_weather_data (
id UUID PRIMARY KEY,
city_name TEXT,
weather_main TEXT,
weather_description TEXT,
temperature FLOAT,
timestamp TIMESTAMP
)
""")
print("Cassandra table ready")
except Exception as e:
print("Error setting up Cassandra: {e}")
session = None
# --- Read from Kafka and Insert into Cassandra ---
if session:
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
raise KafkaException(msg.error())
else:
try:
data = loads(msg.value().decode('utf-8'))
# Extract required fields
record = {
"id": uuid.uuid4(),
"city_name": data.get("extracted_city", "Unknown"),
"weather_main": data["weather"][0]["main"],
"weather_description": data["weather"][0]["description"],
"temperature": data["main"]["temp"],
"timestamp": datetime.fromtimestamp(data["dt"])
}
# Insert into Cassandra
session.execute("""
INSERT INTO city_weather_data (id, city_name, weather_main, weather_description, temperature, timestamp)
VALUES (%(id)s, %(city_name)s, %(weather_main)s, %(weather_description)s, %(temperature)s, %(timestamp)s)
""", record)
print(f"Inserted weather for {record['city_name']} at {record['timestamp']}")
except Exception as e:
print(f"Error processing message: {e}")
except KeyboardInterrupt:
print("Consumer stopped manually")
finally:
consumer.close()
print("Kafka consumer closed")
This script:
Subscribes to and polls messages from Kafka
Extracts relevant fields from the weather data
Inserts processed records into a Cassandra database
Step 3: Setting Up the Environment
Creating a Virtual Environment:
python3 -m venv venv
source venv/bin/activate
Install required packages:
pip install requests python-dotenv confluent-kafka cassandra-driver
Step 4: Running the Pipeline
Ensure your Cassandra instance is running. The consumer will automatically create KeySpace and table if they don't exist.
Run the consumer to begin listening for messages:
python3 weather_consumer.py
In a separate terminal, run the producer to fetch and publish weather data:
python3 weather_df.py
The producer will:
- Fetch weather data for each configured city
- Publish messages to Kafka
- Log the status of each operation
Data Flow
- The producer calls OpenWeatherMap API for each city
- Weather data is serialized to JSON and published to Kafka
- The consumer continuously polls the Kafka topic
- Incoming messages are deserialized and transformed
- Data is inserted into the Cassandra database for persistence
- The process repeats as new data becomes available
Future Enhancements
- Scheduling: Implement Apache Airflow to schedule regular data collection
- Data Validation: Add schema validation to ensure data quality
- Monitoring: Implement metrics collection for pipeline performance
- Scaling: Configure multiple consumer instances for parallel processing
- Analytics: Build data visualization dashboards with the collected weather data
Conclusion
This ETL pipeline demonstrates knowledge on how to build a real-time data processing system using Kafka. It provides a foundation that can be extended for various use cases, from weather analytics to environmental monitoring systems.