CQRS Design Pattern and Python Flask Microservices Implementation Guide
This diagram illustrates the Command Query Responsibility Segregation (CQRS) design pattern applied to a microservices architecture. Here's a breakdown of its components:
Key Elements:
Microservices:
- Order Microservice:
- Handles commands (e.g., create, update, delete orders) through the
Command Service
andCommand Model
. - Handles queries (e.g., fetching order details) via the
Query Service
andQuery Model
.
- Handles commands (e.g., create, update, delete orders) through the
- Customer Microservice:
- Handles customer-related commands through its
Command Service
andCommand Model
. - Handles customer queries via its
Query Service
andQuery Model
.
- Handles customer-related commands through its
- Order Microservice:
Endpoints:
Command Endpoint
: Processes write operations.Query Endpoint
: Processes read operations.
Event Store:
- Centralized storage for events generated by the command operations.
- Ensures consistency and event sourcing.
Read Storage:
- Optimized for query operations, providing fast access to read-only data.
Message Broker:
- Facilitates communication between microservices by publishing and consuming events.
Event Handling:
Event Publisher
: Sends events to the message broker after a command operation.Event Consumer
: Processes events from the message broker to update the read storage.
Flask Microservices:
- Both the
Order
andCustomer
microservices are implemented using Python Flask, as indicated by the Flask logos.
- Both the
Workflow:
Command Handling:
- Commands are sent to the
Command Endpoint
. - The
Command Service
processes the request and updates theCommand Model
. - Events are generated and stored in the
Event Store
. - The
Event Publisher
sends events to theMessage Broker
.
- Commands are sent to the
Query Handling:
- Queries are sent to the
Query Endpoint
. - The
Query Service
retrieves data from theRead Storage
using theQuery Model
.
- Queries are sent to the
Event Propagation:
- Events are consumed by
Event Consumers
, which update theRead Storage
for efficient query responses.
- Events are consumed by
Use Case:
This design is ideal for scenarios where read and write operations have distinct performance and scaling requirements, such as e-commerce platforms or customer management systems.
This guide explains the step-by-step implementation of the Command Query Responsibility Segregation (CQRS) design pattern using Python Flask for microservices. The architecture is based on the provided diagram, with Order and Customer microservices as examples.
1. Introduction to CQRS
CQRS separates the responsibility of handling commands (write operations) and queries (read operations). This allows better scalability, optimized data retrieval, and a clean separation of concerns.
2. Core Components
Command Model: Handles write operations (create, update, delete).
Query Model: Handles read operations (fetch data).
Event Store: Logs all changes as events for consistency.
Message Broker: Ensures communication between services.
Read Storage: Optimized for queries and updated asynchronously.
3. Setting Up the Environment
Dependencies
Install the required libraries:
pip install flask flask-restful flask-sqlalchemy flask-migrate kafka-python
Directory Structure
project-root/
├── order_service/
│ ├── app.py
│ ├── command/
│ │ ├── endpoints.py
│ │ ├── models.py
│ │ ├── services.py
│ └── query/
│ ├── endpoints.py
│ ├── models.py
│ ├── services.py
├── customer_service/
│ ├── app.py
│ ├── command/
│ ├── query/
├── message_broker/
│ ├── producer.py
│ ├── consumer.py
└── event_store/
├── store.py
4. Implementing the Order Microservice
Command Side
Handles operations like creating and updating orders.
order_service/command/models.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
customer_id = db.Column(db.Integer, nullable=False)
status = db.Column(db.String(50), nullable=False)
total_price = db.Column(db.Float, nullable=False)
order_service/command/services.py
from .models import Order, db
from message_broker.producer import publish_event
def create_order(data):
new_order = Order(
customer_id=data['customer_id'],
status=data['status'],
total_price=data['total_price']
)
db.session.add(new_order)
db.session.commit()
publish_event('OrderCreated', {
'id': new_order.id,
'customer_id': new_order.customer_id,
'status': new_order.status,
'total_price': new_order.total_price
})
return new_order
order_service/command/endpoints.py
from flask_restful import Resource, Api, reqparse
from .services import create_order
parser = reqparse.RequestParser()
parser.add_argument('customer_id', type=int, required=True)
parser.add_argument('status', type=str, required=True)
parser.add_argument('total_price', type=float, required=True)
class OrderCommandEndpoint(Resource):
def post(self):
args = parser.parse_args()
order = create_order(args)
return {'message': 'Order created', 'order_id': order.id}, 201
Query Side
Handles fetching order details.
order_service/query/models.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class OrderReadModel(db.Model):
__tablename__ = 'order_read_model'
id = db.Column(db.Integer, primary_key=True)
customer_id = db.Column(db.Integer, nullable=False)
status = db.Column(db.String(50), nullable=False)
total_price = db.Column(db.Float, nullable=False)
order_service/query/endpoints.py
from flask_restful import Resource, Api
from .models import OrderReadModel
class OrderQueryEndpoint(Resource):
def get(self, order_id):
order = OrderReadModel.query.get(order_id)
if not order:
return {'message': 'Order not found'}, 404
return {
'id': order.id,
'customer_id': order.customer_id,
'status': order.status,
'total_price': order.total_price
}
5. Implementing the Kafka Message Broker
Producer
message_broker/producer.py
from kafka import KafkaProducer
import json
def publish_event(event_type, event_data):
producer = KafkaProducer(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
event = {'type': event_type, 'data': event_data}
producer.send('events', value=event)
producer.flush()
producer.close()
Consumer
message_broker/consumer.py
from kafka import KafkaConsumer
import json
from order_service.query.models import OrderReadModel, db
def consume_events():
consumer = KafkaConsumer(
'events',
bootstrap_servers='localhost:9092',
value_deserializer=lambda v: json.loads(v.decode('utf-8'))
)
for message in consumer:
event = message.value
if event['type'] == 'OrderCreated':
data = event['data']
new_order = OrderReadModel(
id=data['id'],
customer_id=data['customer_id'],
status=data['status'],
total_price=data['total_price']
)
db.session.add(new_order)
db.session.commit()
6. Deploying the Microservices
Running the Order Microservice
export FLASK_APP=order_service/app.py
flask run --port=5000
Running the Customer Microservice
Repeat similar steps as above for the customer microservice.
Running the Kafka Consumer
python message_broker/consumer.py
7. Testing the System
Create an Order
curl -X POST http://localhost:5000/command/orders -H "Content-Type: application/json" -d '{
"customer_id": 1,
"status": "pending",
"total_price": 100.0
}'
Fetch an Order
curl http://localhost:5000/query/orders/1
8. Conclusion
By implementing CQRS with Flask and Kafka, we achieved:
Separation of read and write operations.
Event-driven communication for eventual consistency.
Scalable and maintainable microservices.