Skip to main content

Python CQRS: Building distributed systems without the...

Python CQRS: Building distributed systems without the...

Python CQRS: Building distributed systems without the pain (Sagas, Outbox, Event‑Driven)

Did you know that > 70 % of Python‑based micro‑services projects stumble on data‑consistency bugs within the first three months? Most teams try to “just add a queue” and end up with tangled callbacks, lost messages, and endless debugging sessions. Enter **CQRS**—a pattern that separates reads from writes, paired with **Sagas**, **Outbox**, and an event‑driven backbone—to give you a clean, testable architecture without the usual headaches.

Why CQRS Matters for Modern Python Apps

Python developers love simplicity, but when your codebase grows, complexity sneaks in. I’ve seen teams hit that “write once, read twice” wall, where a single table becomes a bottleneck. CQRS gives you two truths: the **write side** can be a fine‑tuned, transaction‑heavy model; the **read side** can be a read‑optimized, eventually consistent view. * Business value: read models can scale horizontally with a caching layer, while write models keep consistency guarantees. * Technical payoff: less lock contention, easy sharding, independent deployment. * Real‑world impact: e‑commerce order processors dropped latency by 40 % after moving inventory queries to a separate read replica. Sound familiar? You’re probably juggling read/write conflicts without realizing it.

Core Concepts: Commands, Queries, Events & the Outbox Pattern

Python’s flexibility lets you express these ideas with dataclasses and type hints. The pattern looks like: ```python from dataclasses import dataclass from uuid import uuid4 @dataclass class CreateOrderCommand: order_id: str = uuid4().hex user_id: int items: list[int] ``` Commands are **intentional** actions—never read data. Queries, on the other hand, are **pure reads**; they never mutate state. Events are immutable snapshots of something that happened. In Python, you might serialise them to JSON and push onto a Redis Stream or Kafka topic. The outbox table sits in the same transaction as your domain changes: ```sql CREATE TABLE outbox ( id SERIAL PRIMARY KEY, aggregate_id TEXT, event_type TEXT, payload JSONB, created_at TIMESTAMPTZ DEFAULT now(), sent_at TIMESTAMPTZ ); ``` When a command handler commits the main data, it also inserts an event into this table. A background worker later picks up unsent rows, publishes them, and marks them `sent_at`. This guarantees **once‑and‑only‑once** delivery without a distributed transaction.

Coordinating Distributed Transactions with Sagas (Practical Walkthrough)

Sagas replace the classic two‑phase commit. * **Choreography**: services react to events and emit new ones. * **Orchestration**: a central saga manager sends commands and reacts to responses. In my past projects, the orchestration model felt cleaner because it let me write compensation logic explicitly. Here’s a step‑by‑step walkthrough: 1. **CreateOrderCommand** arrives at the order service. 2. The handler persists `orders` and writes an `OrderCreated` event to the outbox. 3. A Celery worker, `process_outbox`, reads the outbox, pushes to Redis Streams, marks as sent. 4. The `order_saga` listens to `OrderCreated`. 5. It calls `reserve_inventory` on the inventory service. 6. If inventory succeeds, it calls `charge_payment`. 7. Any failure triggers a compensating `CancelOrder` command that rolls back inventory reservation or payment. Below is a minimal saga orchestrator: ```python # order_saga.py from celery import Celery from pydantic import BaseModel celery = Celery('order_saga', broker='redis://localhost:6379/0') class InventoryResult(BaseModel): success: bool reservation_id: str | None = None class PaymentResult(BaseModel): success: bool transaction_id: str | None = None @celery.task def reserve_inventory(order_id: str, items: list[int]): # pretend to call external inventory service return InventoryResult(success=True, reservation_id="resv123") @celery.task def charge_payment(order_id: str, amount: float): # pretend to call payment gateway return PaymentResult(success=True, transaction_id="txn456") @celery.task def orchestrate(order_id: str, items: list[int], amount: float): inv = reserve_inventory.delay(order_id, items).get() if not inv.success: cancel_order.delay(order_id) return pay = charge_payment.delay(order_id, amount).get() if not pay.success: cancel_inventory.delay(inv.reservation_id) cancel_order.delay(order_id) return # all good – publish OrderCompleted event publish_event({"type": "OrderCompleted", "order_id": order_id}) ``` The saga orchestrates the flow, and each step is idempotent thanks to the outbox guarantees.

Putting It All Together: Building an Event‑Driven Microservice with FastAPI, Pandas & NumPy

Now let’s combine everything. The read side uses **pandas** to materialise event streams into a dataframe that can be served via an analytical endpoint. NumPy helps crunch aggregate metrics. ```python # read_model.py import pandas as pd import numpy as np import redis redis_client = redis.Redis(host='localhost', port=6379, db=1) def build_view(): events = redis_client.xrange('order_events', count=1000) data = [json.loads(e[1]['payload']) for e in events] df = pd.DataFrame(data) # compute total sales per day df['created_at'] = pd.to_datetime(df['created_at']) daily = df.groupby(df['created_at'].dt.date)['amount'].sum().reset_index() return daily # FastAPI endpoint from fastapi import FastAPI app = FastAPI() @app.get("/analytics/sales") async def sales(): return build_view().to_dict(orient="records") ``` The write side remains simple: ```python # command_handlers.py from fastapi import APIRouter, HTTPException from sqlmodel import Session, select from .models import Order, OutboxEvent from .commands import CreateOrderCommand router = APIRouter() @router.post("/orders") def create_order(cmd: CreateOrderCommand): with Session() as session: order = Order(user_id=cmd.user_id, items=cmd.items) session.add(order) session.flush() # get id event = OutboxEvent(aggregate_id=order.id, event_type="OrderCreated", payload=cmd.__dict__) session.add(event) session.commit() return {"order_id": order.id} ``` Dockerfile and a minimal `docker-compose.yml` let you spin up PostgreSQL, Redis, and Celery workers with a single `docker compose up`. For local debugging, open a Jupyter notebook, import the modules, and watch the saga in action with `%%timeit` blocks to measure latency.

Actionable Takeaways & Next Steps

*Checklist for refactoring an existing monolith:* - Create an **outbox** table in your current DB. - Wrap every write operation in a transaction that also writes to the outbox. - Spin up a lightweight Celery worker that reads unsent outbox rows. - Design **command** and **event** dataclasses for each domain change. - Sketch a saga diagram: define compensation commands for each step. - Build a read model with pandas; expose via FastAPI for quick analytics. *Tooling recommendations:* - `pydantic` for DTOs (validation + serialization). - `SQLModel` (or `SQLAlchemy`) for models; it works nicely with FastAPI. - `aiokafka` or `redis-py` for event streaming; pick what fits your stack. *Learning path:* 1. Build a tiny e‑commerce order service following the above pattern. 2. Replace the inventory service with a mock that randomly fails. 3. Observe how the saga rolls back. 4. Add a second saga for shipment, trigger it after payment. Measure success in terms of reduced latency, lower error rates, and the ability to deploy read and write services independently. ---

Frequently Asked Questions

What is CQRS and how does it differ from classic MVC in Python?

CQRS (Command Query Responsibility Segregation) splits the *write* model (commands) from the *read* model (queries). Unlike MVC, where a single controller often handles both, CQRS lets you optimize each side independently, leading to better scalability and clearer domain boundaries.

How can I implement the Outbox pattern with Python and PostgreSQL?

Store outgoing events in an “outbox” table inside the same DB transaction that persists the business change. A background worker (Celery, RQ, or a simple asyncio loop) reads pending rows, publishes them to Kafka/Redis, and marks them as sent—ensuring atomicity without distributed transactions.

When should I use a Saga instead of a traditional two‑phase commit?

Use Sagas when you have micro‑services that own separate databases and cannot share a global transaction manager. Sagas coordinate a series of local transactions with compensating actions, providing eventual consistency without locking resources across services.

Is CQRS compatible with data‑science libraries like pandas or NumPy?

Absolutely. The *read* side can materialize event streams into analytical tables or DataFrames using pandas/NumPy, giving data‑science teams instant access to up‑to‑date, query‑optimized data without hitting the write DB.

How do I test CQRS components (commands, events, sagas) in Python?

Leverage pytest together with pytest‑asyncio for async handlers, mock the outbox with an in‑memory SQLite DB, and assert that the correct events are emitted. For sagas, use a test harness that simulates failures to verify compensation logic.


Related reading: Original discussion

Related Articles

What do you think?

Have experience with this topic? Drop your thoughts in the comments - I read every single one and love hearing different perspectives!

Comments

Popular posts from this blog

2026 Update: Getting Started with SQL & Databases: A Comp...

Low-Code Isn't Stealing Dev Jobs — It's Changing Them (And That's a Good Thing) Have you noticed how many non-tech folks are building Mission-critical apps lately? Honestly, it's kinda wild — marketing tres creating lead-gen tools, ops managers deploying inventory systems. Sound familiar? But here's the deal: it's not magic, it's low-code development platforms reshaping who gets to play the app-building game. What's With This Low-Code Thing Anyway? So let's break it down. Low-code platforms are visual playgrounds where you drag pre-built components instead of hand-coding everything. Think LEGO blocks for software – connect APIs, design interfaces, and automate workflows with minimal typing. Citizen developers (non-IT pros solving their own problems) are loving it because they don't need a PhD in Java. Recently, platforms like OutSystems and Mendix have exploded because honestly? Everyone needs custom tools faster than traditional codin...

Practical Guide: Getting Started with Data Science: A Com...

Laravel 11 Unpacked: What's New and Why It Matters Still running Laravel 10? Honestly, you might be missing out on some serious upgrades. Let's break down what Laravel 11 brings to the table – and whether it's worth the hype for your PHP framework projects. Because when it comes down to it, staying current can save you headaches later. What's Cooking in Laravel 11? Laravel 11 streamlines things right out of the gate. Gone are the cluttered config files – now you get a leaner, more focused starting point. That means less boilerplate and more actual coding. And here's the kicker: they've baked health routing directly into the framework. So instead of third-party packages for uptime monitoring, you've got built-in /up endpoints. But the real showstopper? Per-second API rate limiting. Remember those clunky custom solutions for throttling requests? Now you can just do: RateLimiter::for('api', function (Request $ 💬 What do you think?...

Expert Tips: Getting Started with Data Tools & ETL: A Com...

{"text":""} 💬 What do you think? Have you tried any of these approaches? I'd love to hear about your experience in the comments!