Saga
사가 패턴 (Saga Pattern)
마이크로서비스 환경에서 분산 트랜잭션을 관리하는 패턴입니다. 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션(Compensating Transaction)을 통해 일관성을 유지합니다.
사가 패턴 (Saga Pattern)
마이크로서비스 환경에서 분산 트랜잭션을 관리하는 패턴입니다. 각 서비스의 로컬 트랜잭션을 순차적으로 실행하고, 실패 시 보상 트랜잭션(Compensating Transaction)을 통해 일관성을 유지합니다.
Saga 패턴은 1987년 데이터베이스 논문에서 처음 제안된 개념으로, 마이크로서비스 아키텍처에서 여러 서비스에 걸친 트랜잭션을 관리합니다. 전통적인 2PC(Two-Phase Commit)와 달리 각 서비스는 자신의 로컬 트랜잭션만 관리하며, 전체 프로세스는 일련의 이벤트로 조율됩니다.
두 가지 조율 방식이 있습니다. Choreography(코레오그래피)는 각 서비스가 이벤트를 발행하고 다른 서비스가 구독하여 반응하는 분산 방식입니다. 중앙 조율자가 없어 결합도가 낮지만 흐름 파악이 어렵습니다. Orchestration(오케스트레이션)은 중앙의 Saga Orchestrator가 전체 흐름을 제어합니다. 흐름 파악이 쉽지만 단일 실패점이 될 수 있습니다.
보상 트랜잭션이 핵심입니다. Saga의 어느 단계에서든 실패하면 이미 완료된 트랜잭션들을 역순으로 취소해야 합니다. 예를 들어 주문 -> 결제 -> 배송 Saga에서 배송이 실패하면, 결제 취소 -> 주문 취소 순서로 보상 트랜잭션을 실행합니다.
멱등성(Idempotency)이 중요합니다. 네트워크 장애로 인해 같은 요청이 여러 번 도달할 수 있으므로, 각 단계와 보상 트랜잭션은 여러 번 실행해도 결과가 동일해야 합니다. 고유 트랜잭션 ID를 사용하여 중복 처리를 방지합니다.
주요 구현체로는 Temporal, Camunda, Axon Framework, Apache Camel, AWS Step Functions 등이 있습니다. 이벤트 소싱(Event Sourcing)과 함께 사용하면 모든 상태 변경을 추적할 수 있어 디버깅과 감사에 유리합니다.
# Saga 패턴 - Python Orchestration 방식
from dataclasses import dataclass
from enum import Enum
from typing import List, Callable, Optional
import uuid
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
FAILED = "failed"
@dataclass
class SagaStep:
"""Saga의 개별 단계"""
name: str
action: Callable # 실행 함수
compensation: Callable # 보상 함수
status: SagaStatus = SagaStatus.PENDING
class SagaOrchestrator:
"""Saga Orchestrator - 전체 흐름 제어"""
def __init__(self, saga_id: str = None):
self.saga_id = saga_id or str(uuid.uuid4())
self.steps: List[SagaStep] = []
self.completed_steps: List[SagaStep] = []
self.status = SagaStatus.PENDING
self.context = {} # 단계 간 데이터 공유
def add_step(self, name: str, action: Callable, compensation: Callable):
"""Saga 단계 추가"""
self.steps.append(SagaStep(name, action, compensation))
return self
def execute(self, initial_context: dict = None) -> dict:
"""Saga 실행"""
self.context = initial_context or {}
self.status = SagaStatus.RUNNING
logger.info(f"🚀 Saga 시작: {self.saga_id}")
try:
for step in self.steps:
logger.info(f" ▶ 단계 실행: {step.name}")
step.status = SagaStatus.RUNNING
# 멱등성을 위한 트랜잭션 ID
tx_id = f"{self.saga_id}-{step.name}"
self.context['current_tx_id'] = tx_id
# 단계 실행
result = step.action(self.context)
self.context.update(result or {})
step.status = SagaStatus.COMPLETED
self.completed_steps.append(step)
logger.info(f" ✅ 단계 완료: {step.name}")
self.status = SagaStatus.COMPLETED
logger.info(f"🎉 Saga 완료: {self.saga_id}")
return {"status": "success", "saga_id": self.saga_id, "context": self.context}
except Exception as e:
logger.error(f" ❌ 단계 실패: {e}")
self._compensate()
return {"status": "failed", "saga_id": self.saga_id, "error": str(e)}
def _compensate(self):
"""보상 트랜잭션 실행 (역순)"""
self.status = SagaStatus.COMPENSATING
logger.info(f"🔄 보상 트랜잭션 시작 (완료된 {len(self.completed_steps)}개 단계)")
for step in reversed(self.completed_steps):
try:
logger.info(f" ↩ 보상 실행: {step.name}")
step.compensation(self.context)
logger.info(f" ✅ 보상 완료: {step.name}")
except Exception as e:
# 보상 실패 시 수동 개입 필요
logger.error(f" ⚠️ 보상 실패: {step.name} - {e}")
self._save_for_manual_intervention(step, e)
self.status = SagaStatus.FAILED
logger.info(f"🔚 Saga 롤백 완료: {self.saga_id}")
def _save_for_manual_intervention(self, step: SagaStep, error: Exception):
"""수동 개입이 필요한 케이스 저장"""
logger.warning(f"📋 수동 처리 필요: Saga={self.saga_id}, Step={step.name}")
# ============================================
# 주문 처리 Saga 예시
# ============================================
class OrderService:
"""주문 서비스"""
def create_order(self, ctx: dict) -> dict:
order_id = f"ORD-{uuid.uuid4().hex[:8]}"
logger.info(f" 주문 생성: {order_id}")
return {"order_id": order_id}
def cancel_order(self, ctx: dict):
logger.info(f" 주문 취소: {ctx['order_id']}")
class PaymentService:
"""결제 서비스"""
def process_payment(self, ctx: dict) -> dict:
payment_id = f"PAY-{uuid.uuid4().hex[:8]}"
logger.info(f" 결제 처리: {payment_id}, 금액: {ctx.get('amount', 0)}원")
# 실패 시뮬레이션
# raise Exception("결제 실패: 잔액 부족")
return {"payment_id": payment_id}
def refund_payment(self, ctx: dict):
logger.info(f" 결제 환불: {ctx['payment_id']}")
class InventoryService:
"""재고 서비스"""
def reserve_inventory(self, ctx: dict) -> dict:
logger.info(f" 재고 예약: {ctx.get('product_id')}")
return {"inventory_reserved": True}
def release_inventory(self, ctx: dict):
logger.info(f" 재고 해제: {ctx.get('product_id')}")
class ShippingService:
"""배송 서비스"""
def create_shipment(self, ctx: dict) -> dict:
shipment_id = f"SHIP-{uuid.uuid4().hex[:8]}"
logger.info(f" 배송 생성: {shipment_id}")
return {"shipment_id": shipment_id}
def cancel_shipment(self, ctx: dict):
logger.info(f" 배송 취소: {ctx['shipment_id']}")
# Saga 실행
if __name__ == "__main__":
# 서비스 인스턴스
order_svc = OrderService()
payment_svc = PaymentService()
inventory_svc = InventoryService()
shipping_svc = ShippingService()
# Saga 정의
order_saga = SagaOrchestrator()
order_saga.add_step("create_order", order_svc.create_order, order_svc.cancel_order)
order_saga.add_step("reserve_inventory", inventory_svc.reserve_inventory, inventory_svc.release_inventory)
order_saga.add_step("process_payment", payment_svc.process_payment, payment_svc.refund_payment)
order_saga.add_step("create_shipment", shipping_svc.create_shipment, shipping_svc.cancel_shipment)
# Saga 실행
result = order_saga.execute({
"customer_id": "CUST-001",
"product_id": "PROD-001",
"amount": 50000
})
print(f"\n결과: {result}")
// Saga 패턴 - Node.js Choreography 방식 (이벤트 기반)
const EventEmitter = require('events');
// 이벤트 버스 (실제로는 Kafka, RabbitMQ 등 사용)
const eventBus = new EventEmitter();
// ============================================
// 각 서비스 정의 (이벤트 기반 반응)
// ============================================
class OrderService {
constructor() {
// 이벤트 구독
eventBus.on('ORDER_REQUESTED', this.createOrder.bind(this));
eventBus.on('PAYMENT_FAILED', this.cancelOrder.bind(this));
eventBus.on('INVENTORY_FAILED', this.cancelOrder.bind(this));
}
async createOrder(data) {
console.log('📦 OrderService: 주문 생성 시작');
try {
const order = {
orderId: `ORD-${Date.now()}`,
customerId: data.customerId,
items: data.items,
status: 'CREATED'
};
// DB 저장 (생략)
console.log(` ✅ 주문 생성 완료: ${order.orderId}`);
// 다음 단계 이벤트 발행
eventBus.emit('ORDER_CREATED', { ...data, ...order });
} catch (error) {
eventBus.emit('ORDER_FAILED', { ...data, error: error.message });
}
}
async cancelOrder(data) {
console.log(` ↩ OrderService: 주문 취소 - ${data.orderId}`);
// 주문 상태를 CANCELLED로 변경
eventBus.emit('ORDER_CANCELLED', data);
}
}
class InventoryService {
constructor() {
eventBus.on('ORDER_CREATED', this.reserveInventory.bind(this));
eventBus.on('PAYMENT_FAILED', this.releaseInventory.bind(this));
}
async reserveInventory(data) {
console.log('📦 InventoryService: 재고 예약 시작');
try {
// 재고 확인 및 예약 로직
const available = true; // 실제로는 DB 확인
if (!available) {
throw new Error('재고 부족');
}
console.log(` ✅ 재고 예약 완료`);
eventBus.emit('INVENTORY_RESERVED', { ...data, inventoryReserved: true });
} catch (error) {
console.log(` ❌ 재고 예약 실패: ${error.message}`);
eventBus.emit('INVENTORY_FAILED', { ...data, error: error.message });
}
}
async releaseInventory(data) {
console.log(` ↩ InventoryService: 재고 해제 - ${data.orderId}`);
// 재고 복구 로직
}
}
class PaymentService {
constructor() {
eventBus.on('INVENTORY_RESERVED', this.processPayment.bind(this));
}
async processPayment(data) {
console.log('💳 PaymentService: 결제 처리 시작');
try {
// 결제 처리 로직
const paymentResult = {
paymentId: `PAY-${Date.now()}`,
amount: data.amount,
status: 'COMPLETED'
};
// 실패 시뮬레이션 (주석 해제하면 롤백 테스트)
// throw new Error('잔액 부족');
console.log(` ✅ 결제 완료: ${paymentResult.paymentId}`);
eventBus.emit('PAYMENT_COMPLETED', { ...data, ...paymentResult });
} catch (error) {
console.log(` ❌ 결제 실패: ${error.message}`);
eventBus.emit('PAYMENT_FAILED', { ...data, error: error.message });
}
}
async refundPayment(data) {
console.log(` ↩ PaymentService: 환불 처리 - ${data.paymentId}`);
}
}
class ShippingService {
constructor() {
eventBus.on('PAYMENT_COMPLETED', this.createShipment.bind(this));
}
async createShipment(data) {
console.log('🚚 ShippingService: 배송 생성 시작');
try {
const shipment = {
shipmentId: `SHIP-${Date.now()}`,
orderId: data.orderId,
status: 'PREPARING'
};
console.log(` ✅ 배송 생성 완료: ${shipment.shipmentId}`);
eventBus.emit('SHIPMENT_CREATED', { ...data, ...shipment });
// 최종 완료
eventBus.emit('SAGA_COMPLETED', data);
} catch (error) {
eventBus.emit('SHIPMENT_FAILED', { ...data, error: error.message });
}
}
}
// Saga 완료/실패 핸들러
eventBus.on('SAGA_COMPLETED', (data) => {
console.log('\n🎉 Saga 완료!', data);
});
eventBus.on('ORDER_CANCELLED', (data) => {
console.log('\n🔚 Saga 롤백 완료', data);
});
// ============================================
// 서비스 초기화 및 Saga 시작
// ============================================
const orderService = new OrderService();
const inventoryService = new InventoryService();
const paymentService = new PaymentService();
const shippingService = new ShippingService();
// Saga 시작 - ORDER_REQUESTED 이벤트 발행
console.log('🚀 주문 Saga 시작\n');
eventBus.emit('ORDER_REQUESTED', {
customerId: 'CUST-001',
items: [{ productId: 'PROD-001', quantity: 2 }],
amount: 50000
});
# Saga 패턴 - Temporal Workflow (권장 프로덕션 방식)
from datetime import timedelta
from temporalio import workflow, activity
from temporalio.common import RetryPolicy
from dataclasses import dataclass
@dataclass
class OrderData:
customer_id: str
product_id: str
amount: int
order_id: str = None
payment_id: str = None
shipment_id: str = None
# ============================================
# Activity 정의 (실제 비즈니스 로직)
# ============================================
@activity.defn
async def create_order(data: OrderData) -> str:
"""주문 생성"""
order_id = f"ORD-{activity.info().workflow_id[:8]}"
# DB에 주문 저장
return order_id
@activity.defn
async def cancel_order(order_id: str) -> None:
"""주문 취소 (보상)"""
# DB에서 주문 상태 변경
@activity.defn
async def reserve_inventory(product_id: str) -> bool:
"""재고 예약"""
# 재고 시스템 호출
return True
@activity.defn
async def release_inventory(product_id: str) -> None:
"""재고 해제 (보상)"""
# 재고 복구
@activity.defn
async def process_payment(data: OrderData) -> str:
"""결제 처리"""
# 결제 게이트웨이 호출
payment_id = f"PAY-{activity.info().workflow_id[:8]}"
return payment_id
@activity.defn
async def refund_payment(payment_id: str) -> None:
"""환불 처리 (보상)"""
# 결제 취소 API 호출
@activity.defn
async def create_shipment(order_id: str) -> str:
"""배송 생성"""
shipment_id = f"SHIP-{order_id[-8:]}"
return shipment_id
@activity.defn
async def cancel_shipment(shipment_id: str) -> None:
"""배송 취소 (보상)"""
# 배송 취소 API 호출
# ============================================
# Saga Workflow 정의
# ============================================
@workflow.defn
class OrderSagaWorkflow:
"""주문 처리 Saga Workflow"""
@workflow.run
async def run(self, data: OrderData) -> dict:
# 재시도 정책
retry_policy = RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=10),
maximum_attempts=3
)
compensations = [] # 보상 작업 스택
try:
# 1. 주문 생성
data.order_id = await workflow.execute_activity(
create_order,
data,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy
)
compensations.append((cancel_order, data.order_id))
workflow.logger.info(f"주문 생성 완료: {data.order_id}")
# 2. 재고 예약
await workflow.execute_activity(
reserve_inventory,
data.product_id,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy
)
compensations.append((release_inventory, data.product_id))
workflow.logger.info("재고 예약 완료")
# 3. 결제 처리
data.payment_id = await workflow.execute_activity(
process_payment,
data,
start_to_close_timeout=timedelta(seconds=60),
retry_policy=retry_policy
)
compensations.append((refund_payment, data.payment_id))
workflow.logger.info(f"결제 완료: {data.payment_id}")
# 4. 배송 생성
data.shipment_id = await workflow.execute_activity(
create_shipment,
data.order_id,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy
)
workflow.logger.info(f"배송 생성 완료: {data.shipment_id}")
return {
"status": "completed",
"order_id": data.order_id,
"payment_id": data.payment_id,
"shipment_id": data.shipment_id
}
except Exception as e:
workflow.logger.error(f"Saga 실패: {e}, 보상 트랜잭션 시작")
# 보상 트랜잭션 실행 (역순)
for compensation_fn, arg in reversed(compensations):
try:
await workflow.execute_activity(
compensation_fn,
arg,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=retry_policy
)
workflow.logger.info(f"보상 완료: {compensation_fn.__name__}")
except Exception as comp_error:
workflow.logger.error(f"보상 실패: {compensation_fn.__name__} - {comp_error}")
# 보상 실패 시 알림 또는 수동 처리 큐에 저장
return {
"status": "failed",
"error": str(e),
"compensated": True
}
# Workflow 실행 예시 (Client 코드)
"""
from temporalio.client import Client
async def main():
client = await Client.connect("localhost:7233")
result = await client.execute_workflow(
OrderSagaWorkflow.run,
OrderData(
customer_id="CUST-001",
product_id="PROD-001",
amount=50000
),
id="order-saga-001",
task_queue="order-saga-queue"
)
print(f"Saga 결과: {result}")
"""
"주문-결제-배송이 각각 다른 서비스인데 2PC는 성능 문제가 있어요. Saga 패턴으로 각 서비스가 로컬 트랜잭션만 처리하고, 결제 실패 시 주문 취소하는 보상 트랜잭션을 구현합시다. 일관성은 Eventually Consistent로 가져가죠."
"서비스가 5개 이상 연결되니까 Choreography보다 Orchestration이 나을 것 같아요. 흐름 추적이 쉽고 보상 로직도 한 곳에서 관리할 수 있습니다. Temporal 쓰면 내구성 있는 Workflow로 구현 가능해요."
"결제는 됐는데 배송 서비스 장애로 Saga가 중간에 멈췄어요. 보상 트랜잭션이 멱등하게 구현되어 있어서 재시도해도 안전합니다. Temporal 대시보드에서 현재 상태 확인하고 수동으로 resume 하거나 강제 보상 실행할 수 있어요."
모든 단계에는 반드시 보상 트랜잭션이 있어야 합니다. 보상이 불가능한 작업(이메일 발송 등)은 Saga 외부에서 처리하거나 마지막 단계에 배치하세요.
네트워크 장애로 같은 요청이 여러 번 올 수 있습니다. 각 단계와 보상 트랜잭션은 고유 ID 기반으로 중복 실행을 방지해야 합니다.
Saga 진행 중 다른 트랜잭션이 같은 데이터를 변경할 수 있습니다(Dirty Read). 시맨틱 락 또는 낙관적 락으로 동시성 문제를 해결하세요.
Temporal/Camunda 같은 전용 프레임워크 사용, 이벤트 소싱과 결합, 모든 상태 변경 로깅, 실패한 Saga 모니터링 대시보드, 수동 개입 UI 제공.