As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Python offers powerful tools for building event-driven microservices that can scale effectively and maintain resilience under various conditions. I've extensively worked with these technologies in production systems and found several approaches that consistently deliver results.
Event-driven architecture fundamentally changes how we build distributed systems. Instead of direct service-to-service communication, components interact by producing and consuming events through message brokers. This creates loosely coupled systems where services can evolve independently.
The asyncio library forms the foundation for modern event processing in Python. It enables non-blocking I/O operations through an event loop that efficiently manages concurrent tasks.
import asyncio
import time
async def process_event(event_id):
print(f"Processing event {event_id}")
# Simulate I/O operation like database or API call
await asyncio.sleep(1)
print(f"Completed event {event_id}")
return f"Result-{event_id}"
async def main():
start = time.time()
# Process multiple events concurrently
tasks = [process_event(i) for i in range(5)]
results = await asyncio.gather(*tasks)
end = time.time()
print(f"Processed 5 events in {end-start:.2f} seconds")
print(f"Results: {results}")
if __name__ == "__main__":
asyncio.run(main())
The code above demonstrates how asyncio processes multiple events concurrently. While sequential processing would take 5 seconds, asyncio completes the work in just over 1 second by efficiently switching between tasks during I/O waits.
Message brokers are essential for reliable communication between microservices. RabbitMQ, Kafka, and Redis Streams each have unique strengths for different use cases.
For RabbitMQ integration, the Pika library provides a robust interface:
import pika
import json
import uuid
class OrderService:
def __init__(self):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
# Declare exchanges and queues
self.channel.exchange_declare(exchange='orders', exchange_type='topic')
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.queue_bind(
exchange='orders',
queue=self.callback_queue,
routing_key='order.created'
)
# Set up consumer
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_order_created,
auto_ack=False
)
def on_order_created(self, ch, method, props, body):
order = json.loads(body)
print(f"Processing new order: {order}")
# Business logic
self.process_inventory(order)
# Acknowledge message
ch.basic_ack(delivery_tag=method.delivery_tag)
# Publish a follow-up event
self.publish_event('order.processed', {
'order_id': order['id'],
'status': 'processing'
})
def process_inventory(self, order):
# Implementation of inventory logic
print(f"Updating inventory for items: {order['items']}")
def publish_event(self, routing_key, data):
self.channel.basic_publish(
exchange='orders',
routing_key=routing_key,
body=json.dumps(data),
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
message_id=str(uuid.uuid4())
)
)
print(f"Published event {routing_key}: {data}")
def start_consuming(self):
print("Starting consumer, press CTRL+C to exit")
self.channel.start_consuming()
if __name__ == "__main__":
service = OrderService()
service.start_consuming()
For systems requiring higher throughput, Kafka provides excellent scaling capabilities. The aiokafka library enables async interaction with Kafka:
import asyncio
import json
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
class AnalyticsService:
def __init__(self):
self.consumer = None
self.producer = None
async def initialize(self):
# Set up the producer
self.producer = AIOKafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
await self.producer.start()
# Set up the consumer
self.consumer = AIOKafkaConsumer(
'user-events',
bootstrap_servers='localhost:9092',
group_id='analytics-service',
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest'
)
await self.consumer.start()
async def consume_events(self):
try:
async for message in self.consumer:
event = message.value
print(f"Processing event: {event}")
# Extract metrics and process the event
await self.process_event(event)
# Publish derived metrics
if event['type'] == 'page_view':
await self.publish_metrics(event)
finally:
await self.consumer.stop()
async def process_event(self, event):
# Apply analytics logic
print(f"Analyzing event type: {event['type']}")
await asyncio.sleep(0.05) # Simulate processing
async def publish_metrics(self, event):
metrics = {
'page': event['page'],
'user_segment': self.determine_segment(event),
'timestamp': event['timestamp']
}
await self.producer.send_and_wait(
'derived-metrics',
metrics
)
def determine_segment(self, event):
# Logic to segment users based on behavior
return 'new_visitor' if event.get('first_visit') else 'returning'
async def shutdown(self):
if self.producer:
await self.producer.stop()
if self.consumer:
await self.consumer.stop()
async def main():
service = AnalyticsService()
await service.initialize()
try:
await service.consume_events()
finally:
await service.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Event sourcing represents another powerful pattern where all changes to application state are stored as a sequence of events. This approach provides complete audit trails and enables temporal queries.
Here's how to implement a basic event sourcing system:
import uuid
import json
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
@dataclass
class Event:
id: str
type: str
data: Dict[str, Any]
timestamp: str
aggregate_id: str
version: int
class EventStore:
def __init__(self):
self.events: List[Event] = []
self.subscribers = []
def append(self, event: Event):
self.events.append(event)
# Notify subscribers
for subscriber in self.subscribers:
subscriber(event)
def get_events(self, aggregate_id: str) -> List[Event]:
return [e for e in self.events if e.aggregate_id == aggregate_id]
def subscribe(self, callback):
self.subscribers.append(callback)
return lambda: self.subscribers.remove(callback)
class Aggregate:
def __init__(self, id: Optional[str] = None):
self.id = id or str(uuid.uuid4())
self.version = 0
def apply_event(self, event: Event):
# To be implemented by subclasses
pass
def load_from_history(self, events: List[Event]):
for event in events:
self.apply_event(event)
self.version = event.version
class ShoppingCart(Aggregate):
def __init__(self, id: Optional[str] = None):
super().__init__(id)
self.items = {}
self.is_checked_out = False
def add_item(self, product_id: str, quantity: int, event_store: EventStore):
if self.is_checked_out:
raise Exception("Cannot modify checked out cart")
event = Event(
id=str(uuid.uuid4()),
type="ItemAdded",
data={"product_id": product_id, "quantity": quantity},
timestamp=datetime.utcnow().isoformat(),
aggregate_id=self.id,
version=self.version + 1
)
# Apply and store
self.apply_event(event)
event_store.append(event)
def remove_item(self, product_id: str, event_store: EventStore):
if self.is_checked_out:
raise Exception("Cannot modify checked out cart")
if product_id not in self.items:
raise Exception(f"Product {product_id} not in cart")
event = Event(
id=str(uuid.uuid4()),
type="ItemRemoved",
data={"product_id": product_id},
timestamp=datetime.utcnow().isoformat(),
aggregate_id=self.id,
version=self.version + 1
)
# Apply and store
self.apply_event(event)
event_store.append(event)
def checkout(self, event_store: EventStore):
if self.is_checked_out:
raise Exception("Cart already checked out")
if not self.items:
raise Exception("Cannot checkout empty cart")
event = Event(
id=str(uuid.uuid4()),
type="CartCheckedOut",
data={},
timestamp=datetime.utcnow().isoformat(),
aggregate_id=self.id,
version=self.version + 1
)
# Apply and store
self.apply_event(event)
event_store.append(event)
def apply_event(self, event: Event):
if event.type == "ItemAdded":
product_id = event.data["product_id"]
quantity = event.data["quantity"]
if product_id in self.items:
self.items[product_id] += quantity
else:
self.items[product_id] = quantity
elif event.type == "ItemRemoved":
product_id = event.data["product_id"]
if product_id in self.items:
del self.items[product_id]
elif event.type == "CartCheckedOut":
self.is_checked_out = True
self.version = event.version
# Demo usage
def run_demo():
event_store = EventStore()
# Create a projection that maintains cart counts
cart_counts = {"active": 0, "checked_out": 0}
def update_cart_counts(event):
if event.type == "CartCheckedOut":
cart_counts["active"] -= 1
cart_counts["checked_out"] += 1
# Subscribe to events
unsubscribe = event_store.subscribe(update_cart_counts)
# Create and use a cart
cart = ShoppingCart()
cart_counts["active"] += 1
cart.add_item("product-1", 2, event_store)
cart.add_item("product-2", 1, event_store)
cart.remove_item("product-1", event_store)
cart.checkout(event_store)
print(f"Cart state: {cart.items}, Checked out: {cart.is_checked_out}")
print(f"Cart counts: {cart_counts}")
# Reconstruct cart from events
reconstructed_cart = ShoppingCart(cart.id)
events = event_store.get_events(cart.id)
reconstructed_cart.load_from_history(events)
print(f"Reconstructed cart: {reconstructed_cart.items}, Checked out: {reconstructed_cart.is_checked_out}")
# Cleanup
unsubscribe()
if __name__ == "__main__":
run_demo()
Circuit breakers are crucial for preventing cascade failures when services degrade. The pybreaker library implements this pattern elegantly:
import pybreaker
import requests
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configure the circuit breaker
inventory_breaker = pybreaker.CircuitBreaker(
fail_max=3, # Number of failures before opening
reset_timeout=30, # Seconds before attempting to close
exclude=[ConnectionError], # Errors to ignore
state_storage=pybreaker.CircuitMemoryStorage()
)
class InventoryClient:
def __init__(self, base_url="http://inventory-service:8080"):
self.base_url = base_url
@inventory_breaker
def check_availability(self, product_id, quantity):
"""Check if a product is available in requested quantity"""
try:
response = requests.get(
f"{self.base_url}/availability/{product_id}",
params={"quantity": quantity},
timeout=2.0
)
response.raise_for_status()
return response.json()["available"]
except requests.exceptions.RequestException as e:
logger.error(f"Inventory service error: {str(e)}")
raise
def reserve_inventory(self, order_id, items):
"""Reserve inventory for an order"""
try:
# Use the circuit breaker for the critical reservation call
return self._reserve_inventory_protected(order_id, items)
except pybreaker.CircuitBreakerError:
# Circuit is open, use fallback strategy
logger.warning("Circuit open - using inventory cache for order %s", order_id)
return self._fallback_reservation(order_id, items)
@inventory_breaker
def _reserve_inventory_protected(self, order_id, items):
response = requests.post(
f"{self.base_url}/reservations",
json={"order_id": order_id, "items": items},
timeout=3.0
)
response.raise_for_status()
return response.json()
def _fallback_reservation(self, order_id, items):
"""Fallback strategy when inventory service is unavailable"""
# Option 1: Use cached inventory data to make best-effort decision
# Option 2: Conditionally accept the order and reconcile later
logger.info("Using fallback reservation for order %s", order_id)
return {
"order_id": order_id,
"status": "pending_verification",
"message": "Reservation accepted with pending verification"
}
# Example usage
def process_order(order):
client = InventoryClient()
# First check availability
all_available = True
try:
for item in order["items"]:
available = client.check_availability(
item["product_id"],
item["quantity"]
)
if not available:
all_available = False
logger.warning(
"Product %s not available in quantity %d",
item["product_id"], item["quantity"]
)
except pybreaker.CircuitBreakerError:
logger.error("Inventory service circuit open during availability check")
all_available = False
if all_available:
# If all items are available, reserve inventory
reservation = client.reserve_inventory(
order["id"],
order["items"]
)
if reservation["status"] == "confirmed":
return {"status": "processing", "inventory_confirmed": True}
else:
return {"status": "pending", "inventory_confirmed": False}
else:
return {"status": "backorder", "inventory_confirmed": False}
# Simulate some orders with service degradation
def simulation():
orders = [
{"id": "order-1", "items": [{"product_id": "product-1", "quantity": 2}]},
{"id": "order-2", "items": [{"product_id": "product-2", "quantity": 1}]},
{"id": "order-3", "items": [{"product_id": "product-3", "quantity": 3}]}
]
client = InventoryClient()
# Break the inventory service after 2 orders
original_check = client._reserve_inventory_protected
def simulate_failure(*args, **kwargs):
time.sleep(5) # Simulate timeout
raise requests.exceptions.Timeout("Simulated timeout")
# Process orders normally
for i, order in enumerate(orders):
if i >= 1: # Break after first successful order
client._reserve_inventory_protected = simulate_failure
result = process_order(order)
logger.info(f"Order {order['id']} result: {result}")
# Brief pause between orders
time.sleep(1)
# Service recovers
client._reserve_inventory_protected = original_check
logger.info("Inventory service recovered")
# Process one more order after recovery
recovery_order = {"id": "order-4", "items": [{"product_id": "product-4", "quantity": 1}]}
result = process_order(recovery_order)
logger.info(f"Recovery order result: {result}")
if __name__ == "__main__":
simulation()
FastAPI provides excellent capabilities for building real-time event streams using WebSockets:
import asyncio
import json
import uuid
from typing import Dict, List, Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends
from pydantic import BaseModel
app = FastAPI()
# Event models
class Event(BaseModel):
id: str
type: str
data: Dict[str, Any]
# In-memory event bus
class EventBus:
def __init__(self):
self.subscribers: Dict[str, List[WebSocket]] = {}
async def publish(self, event_type: str, data: Dict[str, Any]):
"""Publish an event to all subscribers"""
event = Event(
id=str(uuid.uuid4()),
type=event_type,
data=data
)
if event_type in self.subscribers:
# Convert to dict for JSON serialization
event_data = json.dumps(event.dict())
# Send to all subscribers
disconnected = []
for websocket in self.subscribers[event_type]:
try:
await websocket.send_text(event_data)
except RuntimeError:
disconnected.append(websocket)
# Clean up disconnected subscribers
for ws in disconnected:
self.subscribers[event_type].remove(ws)
return event
def subscribe(self, event_type: str, websocket: WebSocket):
"""Subscribe a websocket to events of a given type"""
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(websocket)
def unsubscribe(self, event_type: str, websocket: WebSocket):
"""Unsubscribe a websocket from events"""
if event_type in self.subscribers and websocket in self.subscribers[event_type]:
self.subscribers[event_type].remove(websocket)
# Create a single instance of the event bus
event_bus = EventBus()
# Dependency to get the event bus
def get_event_bus():
return event_bus
# Endpoint to publish an event
@app.post("/events/{event_type}")
async def publish_event(
event_type: str,
data: Dict[str, Any],
event_bus: EventBus = Depends(get_event_bus)
):
event = await event_bus.publish(event_type, data)
return {"status": "published", "event_id": event.id}
# WebSocket endpoint for subscribing to events
@app.websocket("/ws/events/{event_type}")
async def event_websocket(
websocket: WebSocket,
event_type: str,
event_bus: EventBus = Depends(get_event_bus)
):
await websocket.accept()
# Subscribe to events
event_bus.subscribe(event_type, websocket)
try:
# Keep connection alive and handle client messages
while True:
# Wait for any messages from the client
_ = await websocket.receive_text()
except WebSocketDisconnect:
# Unsubscribe on disconnect
event_bus.unsubscribe(event_type, websocket)
# Background task to generate periodic events
@app.on_event("startup")
async def startup_event_generator():
asyncio.create_task(generate_heartbeat_events())
async def generate_heartbeat_events():
"""Generate heartbeat events every 30 seconds"""
while True:
await event_bus.publish("system.heartbeat", {
"timestamp": str(asyncio.get_event_loop().time()),
"status": "healthy"
})
await asyncio.sleep(30)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
The Command Query Responsibility Segregation (CQRS) pattern works well with event-driven architectures by separating read and write operations:
import uuid
import asyncio
from dataclasses import dataclass, asdict
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
# Command models
@dataclass
class CreateProductCommand:
name: str
price: float
stock: int
@dataclass
class UpdateProductPriceCommand:
product_id: str
new_price: float
# Event models
@dataclass
class Event:
id: str
type: str
data: Dict[str, Any]
timestamp: str
aggregate_id: str
version: int
# Product aggregate
class Product:
def __init__(self, id: Optional[str] = None):
self.id = id or str(uuid.uuid4())
self.name = ""
self.price = 0.0
self.stock = 0
self.version = 0
def apply_event(self, event: Event):
if event.type == "ProductCreated":
self.name = event.data["name"]
self.price = event.data["price"]
self.stock = event.data["stock"]
elif event.type == "ProductPriceUpdated":
self.price = event.data["new_price"]
self.version = event.version
# Command handlers
class ProductCommandHandler:
def __init__(self, event_store, event_bus):
self.event_store = event_store
self.event_bus = event_bus
async def handle_create_product(self, cmd: CreateProductCommand) -> str:
product_id = str(uuid.uuid4())
event = Event(
id=str(uuid.uuid4()),
type="ProductCreated",
data={
"name": cmd.name,
"price": cmd.price,
"stock": cmd.stock
},
timestamp=datetime.utcnow().isoformat(),
aggregate_id=product_id,
version=1
)
await self.event_store.append(event)
await self.event_bus.publish(event)
return product_id
async def handle_update_price(self, cmd: UpdateProductPriceCommand) -> bool:
# Load the product aggregate
events = await self.event_store.get_events(cmd.product_id)
if not events:
return False
product = Product(cmd.product_id)
for event in events:
product.apply_event(event)
# Create the update event
event = Event(
id=str(uuid.uuid4()),
type="ProductPriceUpdated",
data={"new_price": cmd.new_price},
timestamp=datetime.utcnow().isoformat(),
aggregate_id=cmd.product_id,
version=product.version + 1
)
await self.event_store.append(event)
await self.event_bus.publish(event)
return True
# Read model repositories
class ProductReadRepository:
def __init__(self):
self.products = {}
def add_product(self, product_id, name, price, stock):
self.products[product_id] = {
"id": product_id,
"name": name,
"price": price,
"stock": stock,
"last_updated": datetime.utcnow().isoformat()
}
def update_product_price(self, product_id, new_price):
if product_id in self.products:
self.products[product_id]["price"] = new_price
self.products[product_id]["last_updated"] = datetime.utcnow().isoformat()
def get_product(self, product_id):
return self.products.get(product_id)
def get_all_products(self):
return list(self.products.values())
# Event handlers for read models
class ProductEventHandler:
def __init__(self, repository):
self.repository = repository
async def handle_event(self, event: Event):
if event.type == "ProductCreated":
self.repository.add_product(
product_id=event.aggregate_id,
name=event.data["name"],
price=event.data["price"],
stock=event.data["stock"]
)
elif event.type == "ProductPriceUpdated":
self.repository.update_product_price(
product_id=event.aggregate_id,
new_price=event.data["new_price"]
)
# Event store and bus
class EventStore:
def __init__(self):
self.events = []
async def append(self, event: Event):
self.events.append(event)
async def get_events(self, aggregate_id: str) -> List[Event]:
return [e for e in self.events if e.aggregate_id == aggregate_id]
class EventBus:
def __init__(self):
self.handlers = []
def register_handler(self, handler):
self.handlers.append(handler)
async def publish(self, event: Event):
for handler in self.handlers:
await handler.handle_event(event)
# Application service that orchestrates the components
class ProductService:
def __init__(self):
# Initialize components
self.event_store = EventStore()
self.event_bus = EventBus()
# Read model
self.product_repository = ProductReadRepository()
product_event_handler = ProductEventHandler(self.product_repository)
self.event_bus.register_handler(product_event_handler)
# Command handler
self.command_handler = ProductCommandHandler(self.event_store, self.event_bus)
async def create_product(self, name, price, stock):
cmd = CreateProductCommand(name=name, price=price, stock=stock)
return await self.command_handler.handle_create_product(cmd)
async def update_product_price(self, product_id, new_price):
cmd = UpdateProductPriceCommand(product_id=product_id, new_price=new_price)
return await self.command_handler.handle_update_price(cmd)
def get_product(self, product_id):
return self.product_repository.get_product(product_id)
def get_all_products(self):
return self.product_repository.get_all_products()
# Demo application
async def run_demo():
# Initialize the service
product_service = ProductService()
# Create some products
laptop_id = await product_service.create_product("Laptop", 999.99, 10)
phone_id = await product_service.create_product("Smartphone", 699.99, 20)
# Update a price
await product_service.update_product_price(laptop_id, 899.99)
# Query products
laptop = product_service.get_product(laptop_id)
all_products = product_service.get_all_products()
print(f"Laptop details: {laptop}")
print(f"All products: {json.dumps(all_products, indent=2)}")
if __name__ == "__main__":
asyncio.run(run_demo())
I've found that combining these techniques creates microservices that can handle high loads while maintaining responsiveness. The event-driven approach allows for independent scaling of components based on their specific resource needs.
When implementing these patterns, pay close attention to error handling and message delivery guarantees. The "at least once" delivery model often provides the best balance between complexity and reliability, but requires your services to handle duplicate messages properly.
As systems grow, event schemas become critical. Consider using a schema registry to manage event formats and ensure backward compatibility as your services evolve.
The techniques shown here form the foundation for building resilient, scalable microservice architectures in Python. By embracing event-driven design, you'll create systems that can adapt to changing requirements while maintaining operational stability.
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva