CQRS
Command Query Responsibility Segregation
명령(Command, 쓰기)과 조회(Query, 읽기) 모델을 분리하는 아키텍처 패턴입니다. 복잡한 도메인에서 읽기와 쓰기 요구사항을 독립적으로 최적화할 수 있으며, Event Sourcing과 함께 자주 사용됩니다.
Command Query Responsibility Segregation
명령(Command, 쓰기)과 조회(Query, 읽기) 모델을 분리하는 아키텍처 패턴입니다. 복잡한 도메인에서 읽기와 쓰기 요구사항을 독립적으로 최적화할 수 있으며, Event Sourcing과 함께 자주 사용됩니다.
CQRS의 핵심 개념은 Bertrand Meyer의 CQS(Command Query Separation) 원칙에서 발전했습니다. CQS는 메서드가 상태를 변경하거나(Command) 값을 반환하거나(Query) 둘 중 하나만 해야 한다는 원칙입니다. CQRS는 이를 아키텍처 수준으로 확장하여 읽기와 쓰기 모델을 완전히 분리합니다.
Command(명령)는 시스템의 상태를 변경하는 작업입니다. "주문 생성", "상품 수량 변경", "결제 처리" 등이 해당됩니다. Command는 void를 반환하거나 성공/실패 결과만 반환합니다. Command Handler가 비즈니스 로직을 수행하고 도메인 모델을 업데이트합니다.
Query(조회)는 시스템 상태를 변경하지 않고 데이터를 조회합니다. "주문 목록 조회", "상품 상세 조회" 등이 해당됩니다. Query는 읽기에 최적화된 별도의 Read Model을 사용합니다. 복잡한 조인이나 집계 연산을 미리 계산해둔 비정규화된 뷰를 사용할 수 있습니다.
두 모델의 동기화는 주로 이벤트를 통해 이루어집니다. Write Model에서 Command가 처리되면 도메인 이벤트가 발행되고, 이 이벤트를 구독하여 Read Model이 업데이트됩니다. 이로 인해 읽기와 쓰기 사이에 최종 일관성(Eventual Consistency)이 발생합니다.
CQRS의 장점은 읽기/쓰기 워크로드를 독립적으로 확장 가능하고, 복잡한 조회를 최적화할 수 있으며, 도메인 로직이 명확해집니다. 단점으로는 시스템 복잡도 증가, 최종 일관성으로 인한 설계 난이도, 인프라 비용 증가가 있습니다.
# CQRS 패턴 구현 예제 - Python
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
from uuid import uuid4
import asyncio
# ============================================
# Commands (명령) - 상태를 변경하는 작업
# ============================================
@dataclass
class Command(ABC):
"""명령 기본 클래스"""
command_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class CreateOrderCommand(Command):
"""주문 생성 명령"""
user_id: str
items: List[Dict[str, Any]]
shipping_address: str
@dataclass
class UpdateOrderStatusCommand(Command):
"""주문 상태 변경 명령"""
order_id: str
new_status: str
# ============================================
# Command Handlers (명령 핸들러)
# ============================================
class CommandHandler(ABC):
@abstractmethod
async def handle(self, command: Command) -> Any:
pass
class CreateOrderCommandHandler(CommandHandler):
def __init__(self, order_repository, event_bus):
self.order_repository = order_repository
self.event_bus = event_bus
async def handle(self, command: CreateOrderCommand) -> str:
"""주문 생성 - Write Model에 저장"""
# 1. 도메인 로직 수행
order_id = str(uuid4())
total_amount = sum(item["price"] * item["quantity"] for item in command.items)
order = Order(
id=order_id,
user_id=command.user_id,
items=command.items,
total_amount=total_amount,
status="PENDING",
shipping_address=command.shipping_address,
created_at=datetime.now()
)
# 2. Write Model(DB)에 저장
await self.order_repository.save(order)
# 3. 도메인 이벤트 발행 (Read Model 업데이트용)
event = OrderCreatedEvent(
order_id=order_id,
user_id=command.user_id,
total_amount=total_amount,
status="PENDING"
)
await self.event_bus.publish(event)
return order_id
# ============================================
# Queries (조회) - 데이터를 읽는 작업
# ============================================
@dataclass
class Query(ABC):
"""조회 기본 클래스"""
pass
@dataclass
class GetOrderByIdQuery(Query):
"""주문 단건 조회"""
order_id: str
@dataclass
class GetOrdersByUserQuery(Query):
"""사용자별 주문 목록 조회"""
user_id: str
page: int = 1
page_size: int = 20
# ============================================
# Query Handlers (조회 핸들러)
# ============================================
class QueryHandler(ABC):
@abstractmethod
async def handle(self, query: Query) -> Any:
pass
class GetOrdersByUserQueryHandler(QueryHandler):
def __init__(self, read_model_repository):
# Read Model 전용 저장소 (비정규화된 뷰)
self.read_model = read_model_repository
async def handle(self, query: GetOrdersByUserQuery) -> List[Dict]:
"""
Read Model에서 조회 - 읽기에 최적화된 비정규화 데이터
복잡한 조인 없이 바로 반환 가능
"""
orders = await self.read_model.find_by_user(
user_id=query.user_id,
skip=(query.page - 1) * query.page_size,
limit=query.page_size
)
return orders
# ============================================
# Events (이벤트) - Write→Read 동기화
# ============================================
@dataclass
class DomainEvent:
event_id: str = field(default_factory=lambda: str(uuid4()))
timestamp: datetime = field(default_factory=datetime.now)
@dataclass
class OrderCreatedEvent(DomainEvent):
order_id: str = ""
user_id: str = ""
total_amount: float = 0.0
status: str = ""
# Event Handler - Read Model 업데이트
class OrderCreatedEventHandler:
def __init__(self, read_model_repository):
self.read_model = read_model_repository
async def handle(self, event: OrderCreatedEvent):
"""이벤트를 받아 Read Model 업데이트"""
# 읽기에 최적화된 형태로 저장
read_model_data = {
"order_id": event.order_id,
"user_id": event.user_id,
"total_amount": event.total_amount,
"status": event.status,
"status_display": "주문 접수", # 미리 계산
"created_at": event.timestamp.isoformat()
}
await self.read_model.upsert(event.order_id, read_model_data)
# ============================================
# CQRS Mediator (명령/조회 라우터)
# ============================================
class CQRSMediator:
"""명령과 조회를 적절한 핸들러로 라우팅"""
def __init__(self):
self.command_handlers: Dict[type, CommandHandler] = {}
self.query_handlers: Dict[type, QueryHandler] = {}
def register_command(self, command_type: type, handler: CommandHandler):
self.command_handlers[command_type] = handler
def register_query(self, query_type: type, handler: QueryHandler):
self.query_handlers[query_type] = handler
async def send_command(self, command: Command) -> Any:
"""명령 전송 - Write 작업"""
handler = self.command_handlers.get(type(command))
if not handler:
raise ValueError(f"No handler for command: {type(command)}")
return await handler.handle(command)
async def send_query(self, query: Query) -> Any:
"""조회 전송 - Read 작업"""
handler = self.query_handlers.get(type(query))
if not handler:
raise ValueError(f"No handler for query: {type(query)}")
return await handler.handle(query)
# ============================================
# 사용 예시
# ============================================
async def main():
mediator = CQRSMediator()
# 핸들러 등록 (의존성 주입)
# mediator.register_command(CreateOrderCommand, CreateOrderCommandHandler(...))
# mediator.register_query(GetOrdersByUserQuery, GetOrdersByUserQueryHandler(...))
# Command: 주문 생성 (Write)
command = CreateOrderCommand(
user_id="user-123",
items=[{"product_id": "prod-1", "price": 10000, "quantity": 2}],
shipping_address="서울시 강남구"
)
order_id = await mediator.send_command(command)
# Query: 주문 목록 조회 (Read)
query = GetOrdersByUserQuery(user_id="user-123", page=1)
orders = await mediator.send_query(query)
// CQRS 패턴 구현 예제 - Node.js/TypeScript
const { v4: uuidv4 } = require('uuid');
// ============================================
// Commands (명령)
// ============================================
class CreateOrderCommand {
constructor(userId, items, shippingAddress) {
this.commandId = uuidv4();
this.timestamp = new Date();
this.userId = userId;
this.items = items;
this.shippingAddress = shippingAddress;
}
}
class CancelOrderCommand {
constructor(orderId, reason) {
this.commandId = uuidv4();
this.orderId = orderId;
this.reason = reason;
}
}
// ============================================
// Command Handlers
// ============================================
class CreateOrderCommandHandler {
constructor(writeRepository, eventBus) {
this.writeRepository = writeRepository;
this.eventBus = eventBus;
}
async handle(command) {
// 1. 비즈니스 검증
const totalAmount = command.items.reduce(
(sum, item) => sum + item.price * item.quantity, 0
);
// 2. Write Model (정규화된 도메인 모델) 저장
const order = {
id: uuidv4(),
userId: command.userId,
items: command.items,
totalAmount,
status: 'PENDING',
shippingAddress: command.shippingAddress,
createdAt: new Date(),
version: 1 // Optimistic Locking
};
await this.writeRepository.save(order);
// 3. 도메인 이벤트 발행 → Read Model 업데이트
await this.eventBus.publish('OrderCreated', {
orderId: order.id,
userId: order.userId,
totalAmount: order.totalAmount,
status: order.status,
itemCount: command.items.length
});
return { orderId: order.id };
}
}
// ============================================
// Queries (조회)
// ============================================
class GetOrderSummaryQuery {
constructor(orderId) {
this.orderId = orderId;
}
}
class GetUserOrderHistoryQuery {
constructor(userId, options = {}) {
this.userId = userId;
this.page = options.page || 1;
this.pageSize = options.pageSize || 10;
this.statusFilter = options.statusFilter;
}
}
// ============================================
// Query Handlers (Read Model 사용)
// ============================================
class GetUserOrderHistoryQueryHandler {
constructor(readModelRepository) {
// Read Model: 비정규화된 조회 전용 저장소
this.readModel = readModelRepository;
}
async handle(query) {
/**
* Read Model은 이미 비정규화되어 있어
* 복잡한 JOIN 없이 빠르게 조회 가능
*/
const result = await this.readModel.aggregate([
{ $match: { userId: query.userId } },
{ $sort: { createdAt: -1 } },
{ $skip: (query.page - 1) * query.pageSize },
{ $limit: query.pageSize },
{
$project: {
orderId: 1,
totalAmount: 1,
status: 1,
statusDisplay: 1, // 미리 계산된 한글 상태
itemCount: 1, // 미리 계산된 상품 수
createdAt: 1
}
}
]);
return {
orders: result,
page: query.page,
pageSize: query.pageSize
};
}
}
// ============================================
// Event Handler (Write → Read 동기화)
// ============================================
class OrderProjectionHandler {
constructor(readModelRepository) {
this.readModel = readModelRepository;
}
// 이벤트를 받아 Read Model 업데이트
async handleOrderCreated(event) {
const statusMap = {
'PENDING': '주문 접수',
'PAID': '결제 완료',
'SHIPPED': '배송 중',
'DELIVERED': '배송 완료'
};
// 읽기에 최적화된 비정규화 데이터 저장
await this.readModel.insertOne({
orderId: event.orderId,
userId: event.userId,
totalAmount: event.totalAmount,
status: event.status,
statusDisplay: statusMap[event.status],
itemCount: event.itemCount,
createdAt: new Date(),
// 조회 시 자주 필요한 파생 데이터 미리 계산
formattedAmount: `${event.totalAmount.toLocaleString()}원`,
isRecent: true
});
}
async handleOrderStatusChanged(event) {
await this.readModel.updateOne(
{ orderId: event.orderId },
{
$set: {
status: event.newStatus,
statusDisplay: this.getStatusDisplay(event.newStatus),
updatedAt: new Date()
}
}
);
}
}
// ============================================
// CQRS Dispatcher
// ============================================
class CQRSDispatcher {
constructor() {
this.commandHandlers = new Map();
this.queryHandlers = new Map();
}
registerCommand(CommandClass, handler) {
this.commandHandlers.set(CommandClass.name, handler);
}
registerQuery(QueryClass, handler) {
this.queryHandlers.set(QueryClass.name, handler);
}
async dispatch(commandOrQuery) {
const name = commandOrQuery.constructor.name;
if (name.includes('Command')) {
const handler = this.commandHandlers.get(name);
if (!handler) throw new Error(`No handler for ${name}`);
return await handler.handle(commandOrQuery);
}
if (name.includes('Query')) {
const handler = this.queryHandlers.get(name);
if (!handler) throw new Error(`No handler for ${name}`);
return await handler.handle(commandOrQuery);
}
throw new Error(`Unknown type: ${name}`);
}
}
// ============================================
// Express API 예시
// ============================================
const express = require('express');
const app = express();
app.post('/orders', async (req, res) => {
// Command (Write) - POST, PUT, DELETE
const command = new CreateOrderCommand(
req.body.userId,
req.body.items,
req.body.shippingAddress
);
const result = await dispatcher.dispatch(command);
res.status(201).json(result);
});
app.get('/orders', async (req, res) => {
// Query (Read) - GET
const query = new GetUserOrderHistoryQuery(req.query.userId, {
page: parseInt(req.query.page) || 1
});
const result = await dispatcher.dispatch(query);
res.json(result);
});
┌─────────────────────────────────────────────────────────────────────────┐
│ CQRS Architecture │
└─────────────────────────────────────────────────────────────────────────┘
┌─────────────┐
│ Client │
└──────┬──────┘
│
┌────────────────┴────────────────┐
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Command │ │ Query │
│ (Write) │ │ (Read) │
│ POST/PUT/DEL│ │ GET │
└──────┬──────┘ └──────┬──────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Command │ │ Query │
│ Handler │ │ Handler │
│ (비즈니스 │ │ (조회 최적화)│
│ 로직) │ │ │
└──────┬──────┘ └──────┬──────┘
│ │
┌──────▼──────┐ ┌──────▼──────┐
│ Write Model │ │ Read Model │
│ (정규화 DB) │ │(비정규화 뷰)│
│ │ │ │
│ PostgreSQL │ │ MongoDB / │
│ MySQL │ │ Redis │
└──────┬──────┘ └──────▲──────┘
│ │
│ ┌─────────────┐ │
└────────►│ Event │─────────┘
│ Bus │
│ (Kafka) │
└─────────────┘
═══════════════════════════════════════════════════════════════════════════
데이터 흐름:
1. Command Flow (쓰기)
Client → Command → Handler → Write Model → Event 발행
2. Event Flow (동기화)
Event Bus → Event Handler → Read Model 업데이트
3. Query Flow (읽기)
Client → Query → Handler → Read Model → 결과 반환
═══════════════════════════════════════════════════════════════════════════
Write Model (정규화) Read Model (비정규화)
┌─────────────────────┐ ┌─────────────────────────────┐
│ orders │ │ order_summaries │
├─────────────────────┤ ├─────────────────────────────┤
│ id │ │ order_id │
│ user_id (FK) │ ───► │ user_name (복사됨) │
│ status │ Event │ status │
│ created_at │ │ status_display (계산됨) │
│ shipping_address_id │ │ total_amount │
└─────────────────────┘ │ formatted_amount (계산됨) │
│ item_count (계산됨) │
┌─────────────────────┐ │ shipping_address (복사됨) │
│ order_items │ │ created_at │
├─────────────────────┤ └─────────────────────────────┘
│ id │
│ order_id (FK) │ 장점: 조인 없이 즉시 반환
│ product_id (FK) │ 단점: 데이터 중복, 동기화 지연
│ quantity │
│ price │
└─────────────────────┘
"주문 조회 API가 너무 느려요. 매번 5개 테이블을 조인하고 있거든요. CQRS를 도입해서 조회용 비정규화 뷰를 만들면, 조인 없이 바로 반환할 수 있어 응답 시간을 크게 줄일 수 있습니다."
"CQRS에서 Write와 Read Model 사이에는 최종 일관성이 적용됩니다. 주문 생성 직후 목록에서 바로 안 보일 수 있어요. 이게 비즈니스적으로 문제가 된다면, 생성 직후에는 Write Model에서 직접 조회하는 방법도 있습니다."
"Write Model은 트랜잭션이 중요하니까 PostgreSQL을 쓰고, Read Model은 빠른 조회가 목적이니까 Redis나 Elasticsearch를 쓰면 좋겠어요. 이벤트 동기화는 Kafka로 하면 안정적입니다."
단순한 CRUD 애플리케이션에 CQRS는 과설계입니다. 읽기/쓰기 요구사항이 크게 다르고 복잡한 도메인에서만 도입을 검토하세요.
Read Model은 Write Model과 즉시 동기화되지 않습니다. UI에서 "생성했는데 목록에 없다"는 문제를 사용자에게 설명하거나, 생성 직후만 예외 처리하세요.
이벤트가 유실되면 Read Model이 영구적으로 불일치합니다. 이벤트 스토어에 저장하고, 재생(Replay) 기능을 구현하세요.
명확한 Command/Query 분리, 이벤트 기반 동기화, Read Model Rebuild 기능 구현, 모니터링으로 동기화 지연 감지, 점진적 도입(일부 기능부터).