CQRS Design Pattern and Python Flask Microservices Implementation Guide


This diagram illustrates the Command Query Responsibility Segregation (CQRS) design pattern applied to a microservices architecture. Here's a breakdown of its components:

Key Elements:

  1. Microservices:

    • Order Microservice:
      • Handles commands (e.g., create, update, delete orders) through the Command Service and Command Model.
      • Handles queries (e.g., fetching order details) via the Query Service and Query Model.
    • Customer Microservice:
      • Handles customer-related commands through its Command Service and Command Model.
      • Handles customer queries via its Query Service and Query Model.
  2. Endpoints:

    • Command Endpoint: Processes write operations.
    • Query Endpoint: Processes read operations.
  3. Event Store:

    • Centralized storage for events generated by the command operations.
    • Ensures consistency and event sourcing.
  4. Read Storage:

    • Optimized for query operations, providing fast access to read-only data.
  5. Message Broker:

    • Facilitates communication between microservices by publishing and consuming events.
  6. Event Handling:

    • Event Publisher: Sends events to the message broker after a command operation.
    • Event Consumer: Processes events from the message broker to update the read storage.
  7. Flask Microservices:

    • Both the Order and Customer microservices are implemented using Python Flask, as indicated by the Flask logos.

Workflow:

  1. Command Handling:

    • Commands are sent to the Command Endpoint.
    • The Command Service processes the request and updates the Command Model.
    • Events are generated and stored in the Event Store.
    • The Event Publisher sends events to the Message Broker.
  2. Query Handling:

    • Queries are sent to the Query Endpoint.
    • The Query Service retrieves data from the Read Storage using the Query Model.
  3. Event Propagation:

    • Events are consumed by Event Consumers, which update the Read Storage for efficient query responses.

Use Case:

This design is ideal for scenarios where read and write operations have distinct performance and scaling requirements, such as e-commerce platforms or customer management systems.


This guide explains the step-by-step implementation of the Command Query Responsibility Segregation (CQRS) design pattern using Python Flask for microservices. The architecture is based on the provided diagram, with Order and Customer microservices as examples.


1. Introduction to CQRS

CQRS separates the responsibility of handling commands (write operations) and queries (read operations). This allows better scalability, optimized data retrieval, and a clean separation of concerns.

2. Core Components

  • Command Model: Handles write operations (create, update, delete).

  • Query Model: Handles read operations (fetch data).

  • Event Store: Logs all changes as events for consistency.

  • Message Broker: Ensures communication between services.

  • Read Storage: Optimized for queries and updated asynchronously.


3. Setting Up the Environment

Dependencies

Install the required libraries:

pip install flask flask-restful flask-sqlalchemy flask-migrate kafka-python

Directory Structure

project-root/
├── order_service/
│   ├── app.py
│   ├── command/
│   │   ├── endpoints.py
│   │   ├── models.py
│   │   ├── services.py
│   └── query/
│       ├── endpoints.py
│       ├── models.py
│       ├── services.py
├── customer_service/
│   ├── app.py
│   ├── command/
│   ├── query/
├── message_broker/
│   ├── producer.py
│   ├── consumer.py
└── event_store/
    ├── store.py

4. Implementing the Order Microservice

Command Side

Handles operations like creating and updating orders.

order_service/command/models.py

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class Order(db.Model):
    __tablename__ = 'orders'
    id = db.Column(db.Integer, primary_key=True)
    customer_id = db.Column(db.Integer, nullable=False)
    status = db.Column(db.String(50), nullable=False)
    total_price = db.Column(db.Float, nullable=False)

order_service/command/services.py

from .models import Order, db
from message_broker.producer import publish_event

def create_order(data):
    new_order = Order(
        customer_id=data['customer_id'],
        status=data['status'],
        total_price=data['total_price']
    )
    db.session.add(new_order)
    db.session.commit()
    publish_event('OrderCreated', {
        'id': new_order.id,
        'customer_id': new_order.customer_id,
        'status': new_order.status,
        'total_price': new_order.total_price
    })
    return new_order

order_service/command/endpoints.py

from flask_restful import Resource, Api, reqparse
from .services import create_order

parser = reqparse.RequestParser()
parser.add_argument('customer_id', type=int, required=True)
parser.add_argument('status', type=str, required=True)
parser.add_argument('total_price', type=float, required=True)

class OrderCommandEndpoint(Resource):
    def post(self):
        args = parser.parse_args()
        order = create_order(args)
        return {'message': 'Order created', 'order_id': order.id}, 201

Query Side

Handles fetching order details.

order_service/query/models.py

from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class OrderReadModel(db.Model):
    __tablename__ = 'order_read_model'
    id = db.Column(db.Integer, primary_key=True)
    customer_id = db.Column(db.Integer, nullable=False)
    status = db.Column(db.String(50), nullable=False)
    total_price = db.Column(db.Float, nullable=False)

order_service/query/endpoints.py

from flask_restful import Resource, Api
from .models import OrderReadModel

class OrderQueryEndpoint(Resource):
    def get(self, order_id):
        order = OrderReadModel.query.get(order_id)
        if not order:
            return {'message': 'Order not found'}, 404
        return {
            'id': order.id,
            'customer_id': order.customer_id,
            'status': order.status,
            'total_price': order.total_price
        }

5. Implementing the Kafka Message Broker

Producer

message_broker/producer.py

from kafka import KafkaProducer
import json

def publish_event(event_type, event_data):
    producer = KafkaProducer(
        bootstrap_servers='localhost:9092',
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )
    event = {'type': event_type, 'data': event_data}
    producer.send('events', value=event)
    producer.flush()
    producer.close()

Consumer

message_broker/consumer.py

from kafka import KafkaConsumer
import json
from order_service.query.models import OrderReadModel, db

def consume_events():
    consumer = KafkaConsumer(
        'events',
        bootstrap_servers='localhost:9092',
        value_deserializer=lambda v: json.loads(v.decode('utf-8'))
    )

    for message in consumer:
        event = message.value
        if event['type'] == 'OrderCreated':
            data = event['data']
            new_order = OrderReadModel(
                id=data['id'],
                customer_id=data['customer_id'],
                status=data['status'],
                total_price=data['total_price']
            )
            db.session.add(new_order)
            db.session.commit()

6. Deploying the Microservices

Running the Order Microservice

export FLASK_APP=order_service/app.py
flask run --port=5000

Running the Customer Microservice

Repeat similar steps as above for the customer microservice.

Running the Kafka Consumer

python message_broker/consumer.py

7. Testing the System

Create an Order

curl -X POST http://localhost:5000/command/orders -H "Content-Type: application/json" -d '{
  "customer_id": 1,
  "status": "pending",
  "total_price": 100.0
}'

Fetch an Order

curl http://localhost:5000/query/orders/1

8. Conclusion

By implementing CQRS with Flask and Kafka, we achieved:

  • Separation of read and write operations.

  • Event-driven communication for eventual consistency.

  • Scalable and maintainable microservices.

Popular posts from this blog

Learn Java 8 streams with an example - print odd/even numbers from Array and List

Java Stream API - How to convert List of objects to another List of objects using Java streams?

Registration and Login with Spring Boot + Spring Security + Thymeleaf

Java, Spring Boot Mini Project - Library Management System - Download

ReactJS, Spring Boot JWT Authentication Example

Top 5 Java ORM tools - 2024

Java - Blowfish Encryption and decryption Example

Spring boot video streaming example-HTML5

Google Cloud Storage + Spring Boot - File Upload, Download, and Delete