Outbox Pattern
아웃박스 패턴
분산 시스템에서 데이터베이스 변경과 메시지 발행의 원자성을 보장하는 패턴입니다. 비즈니스 데이터 변경 시 이벤트를 같은 트랜잭션 내 Outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 메시지 브로커로 발행합니다. "이중 쓰기(Dual Write)" 문제를 해결하여 데이터 일관성을 보장합니다.
아웃박스 패턴
분산 시스템에서 데이터베이스 변경과 메시지 발행의 원자성을 보장하는 패턴입니다. 비즈니스 데이터 변경 시 이벤트를 같은 트랜잭션 내 Outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 메시지 브로커로 발행합니다. "이중 쓰기(Dual Write)" 문제를 해결하여 데이터 일관성을 보장합니다.
패턴 원리: 분산 시스템에서 "DB 업데이트 후 Kafka에 메시지 발행"처럼 두 시스템에 동시에 쓰는 것은 위험합니다. DB 커밋은 성공했는데 Kafka 발행이 실패하면 데이터 불일치가 발생합니다. Outbox Pattern은 메시지를 DB의 outbox 테이블에 먼저 저장하고(단일 트랜잭션), 별도 프로세스가 이를 읽어 메시지 브로커로 "at-least-once" 발행합니다.
사용 시나리오: 마이크로서비스 간 이벤트 기반 통신, 주문-결제-배송 같은 Saga 패턴 구현, CDC(Change Data Capture) 기반 데이터 동기화에 활용됩니다. 이커머스에서 주문 생성 시 "주문 생성 이벤트"를 안전하게 발행하거나, 결제 서비스가 결제 완료 이벤트를 발행할 때 사용합니다.
장점: 데이터 일관성이 보장되고 메시지 유실이 없습니다. 메시지 브로커 장애 시에도 outbox에 저장되어 있어 복구 가능합니다. 기존 RDBMS 트랜잭션을 활용하므로 익숙한 방식으로 구현할 수 있습니다. 폴링 방식 대신 Debezium 같은 CDC 도구로 실시간 발행도 가능합니다.
단점: 추가 테이블과 프로세스(폴러 또는 CDC)가 필요합니다. 메시지가 중복 발행될 수 있어 수신자가 멱등성(Idempotency)을 처리해야 합니다. Outbox 테이블이 커지면 성능 저하가 있어 주기적 정리가 필요합니다. CDC 도구 도입 시 운영 복잡도가 증가합니다.
Debezium + Kafka Connect를 활용하면 폴링 없이 binlog/WAL을 감지하여 실시간으로 이벤트를 발행할 수 있습니다. AWS에서는 DynamoDB Streams + Lambda, GCP에서는 Cloud Spanner Change Streams로 유사하게 구현합니다.
# Outbox Pattern 구현 - Python + SQLAlchemy
from sqlalchemy import create_engine, Column, String, DateTime, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
import uuid
import threading
import time
Base = declarative_base()
# Outbox 테이블 정의
class OutboxMessage(Base):
"""이벤트를 저장하는 Outbox 테이블"""
__tablename__ = 'outbox'
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
aggregate_type = Column(String(100), nullable=False) # e.g., 'Order'
aggregate_id = Column(String(100), nullable=False) # e.g., order_id
event_type = Column(String(100), nullable=False) # e.g., 'OrderCreated'
payload = Column(Text, nullable=False) # JSON 데이터
created_at = Column(DateTime, default=datetime.utcnow)
published = Column(Boolean, default=False)
published_at = Column(DateTime, nullable=True)
class Order(Base):
"""주문 도메인 모델"""
__tablename__ = 'orders'
id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
customer_id = Column(String(100), nullable=False)
total_amount = Column(String(20), nullable=False)
status = Column(String(50), default='PENDING')
created_at = Column(DateTime, default=datetime.utcnow)
class OrderService:
"""주문 서비스 - Outbox 패턴 적용"""
def __init__(self, session_factory):
self.session_factory = session_factory
def create_order(self, customer_id: str, items: list, total_amount: float) -> Order:
"""
주문 생성 + 이벤트를 Outbox에 저장 (단일 트랜잭션)
"""
session = self.session_factory()
try:
# 1. 주문 생성
order = Order(
customer_id=customer_id,
total_amount=str(total_amount),
status='CREATED'
)
session.add(order)
# 2. Outbox에 이벤트 저장 (같은 트랜잭션!)
event_payload = {
'order_id': order.id,
'customer_id': customer_id,
'items': items,
'total_amount': total_amount,
'status': 'CREATED',
'created_at': datetime.utcnow().isoformat()
}
outbox_message = OutboxMessage(
aggregate_type='Order',
aggregate_id=order.id,
event_type='OrderCreated',
payload=json.dumps(event_payload)
)
session.add(outbox_message)
# 3. 단일 트랜잭션으로 커밋
# DB 저장과 이벤트 저장이 원자적으로 처리됨!
session.commit()
print(f"✅ 주문 생성 완료: {order.id}")
print(f"✅ Outbox 이벤트 저장: OrderCreated")
return order
except Exception as e:
session.rollback()
print(f"❌ 주문 생성 실패: {e}")
raise
finally:
session.close()
class OutboxPoller:
"""
Outbox 폴러 - 주기적으로 Outbox를 읽어 메시지 발행
실제 환경에서는 Debezium CDC 사용 권장
"""
def __init__(self, session_factory, message_publisher):
self.session_factory = session_factory
self.publisher = message_publisher
self.running = False
def start(self, poll_interval: float = 1.0):
"""폴링 시작"""
self.running = True
thread = threading.Thread(target=self._poll_loop, args=(poll_interval,))
thread.daemon = True
thread.start()
def stop(self):
self.running = False
def _poll_loop(self, interval: float):
while self.running:
self._process_outbox()
time.sleep(interval)
def _process_outbox(self):
session = self.session_factory()
try:
# 미발행 메시지 조회 (FOR UPDATE로 락)
messages = session.query(OutboxMessage).filter(
OutboxMessage.published == False
).order_by(OutboxMessage.created_at).limit(100).with_for_update().all()
for msg in messages:
try:
# 메시지 브로커로 발행
self.publisher.publish(
topic=f"{msg.aggregate_type.lower()}-events",
key=msg.aggregate_id,
value=msg.payload,
headers={
'event_type': msg.event_type,
'message_id': msg.id
}
)
# 발행 완료 표시
msg.published = True
msg.published_at = datetime.utcnow()
print(f"📤 이벤트 발행: {msg.event_type} ({msg.aggregate_id})")
except Exception as e:
print(f"❌ 발행 실패, 재시도 예정: {e}")
# 실패해도 다음 폴링에서 재시도
session.commit()
except Exception as e:
session.rollback()
print(f"❌ Outbox 처리 오류: {e}")
finally:
session.close()
# Kafka 발행자 예시
class KafkaMessagePublisher:
def __init__(self, bootstrap_servers: str):
from kafka import KafkaProducer
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else v
)
def publish(self, topic: str, key: str, value: str, headers: dict = None):
header_list = [(k, v.encode()) for k, v in (headers or {}).items()]
self.producer.send(topic, key=key.encode(), value=value, headers=header_list)
self.producer.flush()
# 사용 예시
if __name__ == "__main__":
engine = create_engine('postgresql://localhost/orders')
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
# 서비스 생성
order_service = OrderService(Session)
# 폴러 시작
publisher = KafkaMessagePublisher('localhost:9092')
poller = OutboxPoller(Session, publisher)
poller.start()
# 주문 생성 (DB + Outbox 원자적 저장)
order = order_service.create_order(
customer_id='CUST-001',
items=[{'product_id': 'PROD-1', 'quantity': 2}],
total_amount=50000
)
// Outbox Pattern - Node.js + Prisma + TypeScript
import { PrismaClient, Prisma } from '@prisma/client';
import { Kafka, Producer } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';
const prisma = new PrismaClient();
// Prisma 스키마 (schema.prisma)
/*
model Order {
id String @id @default(uuid())
customerId String
totalAmount Decimal
status String @default("PENDING")
createdAt DateTime @default(now())
}
model OutboxMessage {
id String @id @default(uuid())
aggregateType String
aggregateId String
eventType String
payload Json
published Boolean @default(false)
publishedAt DateTime?
createdAt DateTime @default(now())
@@index([published, createdAt])
}
*/
interface OrderCreatedEvent {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number }>;
totalAmount: number;
status: string;
createdAt: string;
}
class OrderService {
/**
* 주문 생성 - Outbox 패턴으로 이벤트 저장
*/
async createOrder(
customerId: string,
items: Array<{ productId: string; quantity: number }>,
totalAmount: number
) {
// Prisma 트랜잭션으로 원자성 보장
const result = await prisma.$transaction(async (tx) => {
// 1. 주문 생성
const order = await tx.order.create({
data: {
customerId,
totalAmount,
status: 'CREATED',
},
});
// 2. Outbox에 이벤트 저장 (같은 트랜잭션!)
const eventPayload: OrderCreatedEvent = {
orderId: order.id,
customerId,
items,
totalAmount,
status: 'CREATED',
createdAt: new Date().toISOString(),
};
await tx.outboxMessage.create({
data: {
aggregateType: 'Order',
aggregateId: order.id,
eventType: 'OrderCreated',
payload: eventPayload as unknown as Prisma.JsonObject,
},
});
console.log(`✅ 주문 생성: ${order.id}`);
console.log(`✅ Outbox 저장: OrderCreated`);
return order;
});
return result;
}
/**
* 주문 취소 - 상태 변경 + 이벤트 저장
*/
async cancelOrder(orderId: string, reason: string) {
return await prisma.$transaction(async (tx) => {
const order = await tx.order.update({
where: { id: orderId },
data: { status: 'CANCELLED' },
});
await tx.outboxMessage.create({
data: {
aggregateType: 'Order',
aggregateId: orderId,
eventType: 'OrderCancelled',
payload: {
orderId,
reason,
cancelledAt: new Date().toISOString(),
} as Prisma.JsonObject,
},
});
return order;
});
}
}
/**
* Outbox 폴러 - 미발행 메시지를 Kafka로 발행
*/
class OutboxPublisher {
private producer: Producer;
private isRunning = false;
constructor(kafkaBrokers: string[]) {
const kafka = new Kafka({
clientId: 'outbox-publisher',
brokers: kafkaBrokers,
});
this.producer = kafka.producer();
}
async start(pollIntervalMs: number = 1000) {
await this.producer.connect();
this.isRunning = true;
console.log('🚀 Outbox 폴러 시작');
while (this.isRunning) {
await this.processOutbox();
await this.sleep(pollIntervalMs);
}
}
async stop() {
this.isRunning = false;
await this.producer.disconnect();
}
private async processOutbox() {
// 미발행 메시지 조회 및 처리
const messages = await prisma.outboxMessage.findMany({
where: { published: false },
orderBy: { createdAt: 'asc' },
take: 100,
});
for (const msg of messages) {
try {
// Kafka로 발행
await this.producer.send({
topic: `${msg.aggregateType.toLowerCase()}-events`,
messages: [
{
key: msg.aggregateId,
value: JSON.stringify(msg.payload),
headers: {
eventType: msg.eventType,
messageId: msg.id,
},
},
],
});
// 발행 완료 표시
await prisma.outboxMessage.update({
where: { id: msg.id },
data: {
published: true,
publishedAt: new Date(),
},
});
console.log(`📤 발행: ${msg.eventType} (${msg.aggregateId})`);
} catch (error) {
console.error(`❌ 발행 실패: ${msg.id}`, error);
// 다음 폴링에서 재시도
}
}
}
private sleep(ms: number): Promise {
return new Promise((resolve) => setTimeout(resolve, ms));
}
}
/**
* Outbox 클리너 - 오래된 발행 완료 메시지 삭제
*/
async function cleanupOutbox(retentionDays: number = 7) {
const cutoffDate = new Date();
cutoffDate.setDate(cutoffDate.getDate() - retentionDays);
const result = await prisma.outboxMessage.deleteMany({
where: {
published: true,
publishedAt: { lt: cutoffDate },
},
});
console.log(`🧹 ${result.count}개 오래된 Outbox 메시지 삭제`);
}
// 사용 예시
async function main() {
const orderService = new OrderService();
// 주문 생성 (원자적으로 DB + Outbox 저장)
const order = await orderService.createOrder(
'customer-123',
[{ productId: 'prod-1', quantity: 2 }],
50000
);
// Outbox 폴러 시작 (별도 프로세스로 실행 권장)
const publisher = new OutboxPublisher(['localhost:9092']);
await publisher.start();
}
main();
// Debezium CDC를 사용한 Outbox Pattern
// 폴링 대신 DB 변경 로그(binlog/WAL)를 실시간 캡처
// 1. Outbox 테이블 생성 (PostgreSQL)
/*
CREATE TABLE outbox (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_type VARCHAR(100) NOT NULL,
aggregate_id VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
payload JSONB NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- Debezium이 읽은 후 자동 삭제를 위한 인덱스
CREATE INDEX idx_outbox_created ON outbox(created_at);
*/
// 2. Debezium Connector 설정 (Kafka Connect)
{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "debezium",
"database.password": "secret",
"database.dbname": "orders_db",
"database.server.name": "orders",
// Outbox 테이블만 캡처
"table.include.list": "public.outbox",
// Outbox 이벤트 라우터 사용
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
// 토픽 라우팅 설정
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.by.field": "aggregate_type",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
// 발행 후 레코드 삭제 (선택사항)
"transforms.outbox.table.expand.json.payload": "true",
// 스냅샷 설정
"snapshot.mode": "initial",
// 오프셋 저장
"offset.storage.topic": "debezium-offsets",
"offset.flush.interval.ms": "1000"
}
}
// 3. 결과: aggregate_type에 따라 자동으로 토픽 생성
// - Order → order-events 토픽
// - Payment → payment-events 토픽
// 4. Kafka Consumer에서 이벤트 수신
/*
{
"key": "order-12345",
"value": {
"orderId": "order-12345",
"customerId": "cust-001",
"totalAmount": 50000,
"status": "CREATED",
"createdAt": "2024-01-15T10:30:00Z"
},
"headers": {
"id": "msg-uuid-xxx",
"eventType": "OrderCreated"
}
}
*/
// 5. Docker Compose 예시
/*
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: outbox-connect
CONFIG_STORAGE_TOPIC: connect-configs
OFFSET_STORAGE_TOPIC: connect-offsets
STATUS_STORAGE_TOPIC: connect-status
ports:
- "8083:8083"
postgres:
image: postgres:15
environment:
POSTGRES_USER: debezium
POSTGRES_PASSWORD: secret
POSTGRES_DB: orders_db
command:
- "postgres"
- "-c"
- "wal_level=logical" # CDC 필수 설정
*/
// 6. Connector 등록 API 호출
/*
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.json
*/
"주문 생성하고 Kafka에 직접 발행하면 Dual Write 문제가 생깁니다. DB 커밋 후 Kafka 장애 나면 이벤트 유실이에요. Outbox 패턴으로 이벤트를 같은 트랜잭션에 저장하고, Debezium이 CDC로 읽어서 발행하면 at-least-once 보장됩니다."
"Outbox 패턴은 at-least-once라서 중복 발행될 수 있어요. 수신자 측에서 message_id로 멱등성 처리해야 합니다. Inbox 패턴을 같이 쓰면 수신 측도 중복 처리를 DB 레벨에서 보장할 수 있습니다."
"Outbox 테이블이 계속 쌓이면 성능이 떨어집니다. 발행 완료된 레코드는 7일 정도 보관 후 삭제하는 배치를 돌리세요. 또는 Debezium의 delete 모드를 켜면 캡처 후 자동 삭제됩니다."
여러 인스턴스가 폴링하면 순서가 뒤바뀔 수 있습니다. 순서가 중요하면 aggregate_id 기준 파티셔닝하거나 단일 폴러를 사용하세요.
발행 완료된 레코드를 삭제하지 않으면 테이블이 커져 쿼리 성능이 저하됩니다. 주기적인 정리(retention) 정책을 반드시 구현하세요.
at-least-once 발행이므로 수신자가 중복을 처리하지 않으면 데이터가 꼬입니다. message_id 기반 멱등성 또는 Inbox 패턴을 적용하세요.
CDC(Debezium) 활용으로 실시간 발행, message_id로 멱등성, 발행 완료 레코드 정리, 모니터링으로 미발행 메시지 감지.