Tutorial
Understanding Event-Driven Architecture Through Python Examples
Learn the core concepts of event-driven architecture (EDA) with practical Python examples, from a simple callback pattern to async event buses and webhook dispatchers, and understand when to apply EDA in your projects.
June 2026 · 9 min read · 1 views · 0 hearts
Advertisement
Understanding Event-Driven Architecture Through Python Examples
Event-driven architecture (EDA) has become the backbone of modern, responsive applications—from reactive user interfaces to real-time data pipelines and microservices. But what does it actually look like in code, and when should you reach for it? Let’s demystify EDA with concrete Python examples that show how events trigger actions, decouple components, and scale work.
What Is Event-Driven Architecture?
At its core, EDA is about reacting to things that happen—user clicks, sensor readings, file uploads, queue messages—rather than orchestrating a predetermined sequence of steps. Instead of a program asking "What should I do next?" and polling for updates, event-driven systems flip the script: "When something happens, here’s what to do."
This inversion is powerful: components become independent and can scale separately. A web server doesn’t wait for a payment to complete; it emits a "PaymentSubmitted" event and moves on. A worker service picks that event up, processes it, and emits a "PaymentConfirmed" event that triggers email notifications, inventory updates, or analytics.
The Simplest Event-Driven Pattern: Callbacks and Signals
Even basic Python can demonstrate the pattern. Consider a traditional synchronous function:
def process_order(order):
# This is a bad pattern: tightly coupled and blocking
charge_payment(order)
send_email(order)
update_inventory(order)
Here, process_order is coupled to payment, email, and inventory logic. If any step fails or takes too long, the whole order slows down. An event-driven approach uses callbacks:
def on_order_placed(order):
emit("payment_charge", order)
def on_payment_charged(order):
emit("email_send", order)
emit("inventory_update", order)
def on_email_sent(order):
print(f"Confirmation sent for order {order.id}")
# A mini event bus
events = {
"order_placed": [on_order_placed],
"payment_charged": [on_payment_charged],
"email_sent": [on_email_sent],
}
def emit(event_name, data):
for handler in events.get(event_name, []):
handler(data)
# Usage
emit("order_placed", order)
Not the most elegant—but it shows the core idea: instead of one function doing everything, we break work into event handlers that can run independently.
A More Realistic Event Bus in Python
Production systems need something more robust than a dictionary of lists. Here’s a lightweight event bus class you can actually use:
from collections import defaultdict
from typing import Callable, Any
class EventBus:
def __init__(self):
self._handlers = defaultdict(list)
def subscribe(self, event_type: str, handler: Callable):
self._handlers[event_type].append(handler)
def unsubscribe(self, event_type: str, handler: Callable):
self._handlers[event_type].remove(handler)
def publish(self, event_type: str, data: Any = None):
for handler in self._handlers[event_type]:
handler(data)
Now you can wire up services cleanly:
bus = EventBus()
def send_email(order):
print(f"Sending confirmation for {order.id}")
def update_inventory(order):
print(f"Reducing stock for {order.product_id}")
bus.subscribe("order_confirmed", send_email)
bus.subscribe("order_confirmed", update_inventory)
# Somewhere else in your app:
bus.publish("order_confirmed", order)
The beauty? Adding a new handler (like push notification or fraud check) requires zero changes to existing code—just bus.subscribe("order_confirmed", new_handler).
Async Event-Driven Patterns with Asyncio
Modern Python applications often need concurrency. EDA pairs beautifully with asyncio because event handlers can await I/O without blocking the whole system. Here’s an async event bus:
import asyncio
from collections import defaultdict
class AsyncEventBus:
def __init__(self):
self._handlers = defaultdict(list)
def subscribe(self, event_type: str, handler):
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, data=None):
tasks = [handler(data) for handler in self._handlers[event_type]]
await asyncio.gather(*tasks)
async def slow_payment_handler(order):
await asyncio.sleep(1) # Simulate payment processing
print(f"Payment done for {order.id}")
async def fast_log_handler(order):
print(f"Order {order.id} logged at {order.timestamp}")
bus = AsyncEventBus()
bus.subscribe("order_placed", slow_payment_handler)
bus.subscribe("order_placed", fast_log_handler)
await bus.publish("order_placed", order)
# Both handlers run concurrently, but independently
The slow payment handler doesn’t delay the log handler—they execute in parallel. This is where EDA shines: handlers that are unrelated don’t block each other.
Real-World Example: A Webhook Dispatcher
A common use case for EDA is a webhook dispatcher—where an event from your system (like "user signed up") triggers HTTP calls to third-party services. Here’s a simplified implementation using Python’s built-in http.server:
import json
import urllib.request
from http.server import HTTPServer, BaseHTTPRequestHandler
class WebhookBus:
def __init__(self):
self.webhooks = {}
def register(self, event: str, url: str):
self.webhooks.setdefault(event, []).append(url)
def dispatch(self, event: str, payload: dict):
for url in self.webhooks.get(event, []):
data = json.dumps({"event": event, "payload": payload}).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
# In production, use async HTTP or a thread pool!
urllib.request.urlopen(req, timeout=5)
# Setup
webhook_bus = WebhookBus()
webhook_bus.register("user.created", "https://third-party.com/webhook/user_created")
webhook_bus.register("payment.completed", "https://analytics.example.com/track")
# Later, when a user signs up:
user = {"id": 42, "email": "alice@example.com"}
webhook_bus.dispatch("user.created", user)
Key insight: The webhook bus doesn’t know what https://third-party.com does with the data—and it doesn’t care. That’s loose coupling in action.
When to Use Event-Driven Architecture (and When Not To)
EDA is great for:
- Decoupling components that shouldn’t depend on each other (payment processing vs. email sending)
- Scalable backends where you can add more workers per event type independently
- Real-time systems that react to user actions, sensor data, or message queues
- Auditing and replayability—events are first-class data you can store and replay later
But it adds complexity:
- Harder to debug—you can’t just follow a linear call stack
- Eventual consistency—dispatched events aren’t guaranteed to be processed immediately
- More infrastructure—you often need a message broker (RabbitMQ, Redis, Kafka) for production
Bringing It Together
You don’t need Kafka to try event-driven patterns. A Python event bus—synchronous or async—is a low-ceremony way to start decoupling your code. Start by identifying places where a "done" event could trigger multiple follow-up actions independently. Extract those actions into subscribers. Watch your code become more modular, testable, and scalable.
Event-driven architecture isn’t just for microservices—it’s a mindset that treats every action as a potential trigger for an unknown number of reactions. And in Python, that’s just a few classes and decorators away from reality.
Advertisement
Comments
Questions, corrections, and tips stay visible for everyone reading this page.
Join the discussion
No comments yet
Be the first to leave a note — it helps the next reader.