🏗️ 아키텍처

Outbox Pattern

아웃박스 패턴

분산 시스템에서 데이터베이스 변경과 메시지 발행의 원자성을 보장하는 패턴입니다. 비즈니스 데이터 변경 시 이벤트를 같은 트랜잭션 내 Outbox 테이블에 저장하고, 별도 프로세스가 이를 읽어 메시지 브로커로 발행합니다. "이중 쓰기(Dual Write)" 문제를 해결하여 데이터 일관성을 보장합니다.

📖 상세 설명

패턴 원리: 분산 시스템에서 "DB 업데이트 후 Kafka에 메시지 발행"처럼 두 시스템에 동시에 쓰는 것은 위험합니다. DB 커밋은 성공했는데 Kafka 발행이 실패하면 데이터 불일치가 발생합니다. Outbox Pattern은 메시지를 DB의 outbox 테이블에 먼저 저장하고(단일 트랜잭션), 별도 프로세스가 이를 읽어 메시지 브로커로 "at-least-once" 발행합니다.

사용 시나리오: 마이크로서비스 간 이벤트 기반 통신, 주문-결제-배송 같은 Saga 패턴 구현, CDC(Change Data Capture) 기반 데이터 동기화에 활용됩니다. 이커머스에서 주문 생성 시 "주문 생성 이벤트"를 안전하게 발행하거나, 결제 서비스가 결제 완료 이벤트를 발행할 때 사용합니다.

장점: 데이터 일관성이 보장되고 메시지 유실이 없습니다. 메시지 브로커 장애 시에도 outbox에 저장되어 있어 복구 가능합니다. 기존 RDBMS 트랜잭션을 활용하므로 익숙한 방식으로 구현할 수 있습니다. 폴링 방식 대신 Debezium 같은 CDC 도구로 실시간 발행도 가능합니다.

단점: 추가 테이블과 프로세스(폴러 또는 CDC)가 필요합니다. 메시지가 중복 발행될 수 있어 수신자가 멱등성(Idempotency)을 처리해야 합니다. Outbox 테이블이 커지면 성능 저하가 있어 주기적 정리가 필요합니다. CDC 도구 도입 시 운영 복잡도가 증가합니다.

Debezium + Kafka Connect를 활용하면 폴링 없이 binlog/WAL을 감지하여 실시간으로 이벤트를 발행할 수 있습니다. AWS에서는 DynamoDB Streams + Lambda, GCP에서는 Cloud Spanner Change Streams로 유사하게 구현합니다.

💻 코드 예제

# Outbox Pattern 구현 - Python + SQLAlchemy
from sqlalchemy import create_engine, Column, String, DateTime, Text, Boolean
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import datetime
import json
import uuid
import threading
import time

Base = declarative_base()

# Outbox 테이블 정의
class OutboxMessage(Base):
    """이벤트를 저장하는 Outbox 테이블"""
    __tablename__ = 'outbox'

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    aggregate_type = Column(String(100), nullable=False)  # e.g., 'Order'
    aggregate_id = Column(String(100), nullable=False)    # e.g., order_id
    event_type = Column(String(100), nullable=False)      # e.g., 'OrderCreated'
    payload = Column(Text, nullable=False)                # JSON 데이터
    created_at = Column(DateTime, default=datetime.utcnow)
    published = Column(Boolean, default=False)
    published_at = Column(DateTime, nullable=True)


class Order(Base):
    """주문 도메인 모델"""
    __tablename__ = 'orders'

    id = Column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    customer_id = Column(String(100), nullable=False)
    total_amount = Column(String(20), nullable=False)
    status = Column(String(50), default='PENDING')
    created_at = Column(DateTime, default=datetime.utcnow)


class OrderService:
    """주문 서비스 - Outbox 패턴 적용"""

    def __init__(self, session_factory):
        self.session_factory = session_factory

    def create_order(self, customer_id: str, items: list, total_amount: float) -> Order:
        """
        주문 생성 + 이벤트를 Outbox에 저장 (단일 트랜잭션)
        """
        session = self.session_factory()

        try:
            # 1. 주문 생성
            order = Order(
                customer_id=customer_id,
                total_amount=str(total_amount),
                status='CREATED'
            )
            session.add(order)

            # 2. Outbox에 이벤트 저장 (같은 트랜잭션!)
            event_payload = {
                'order_id': order.id,
                'customer_id': customer_id,
                'items': items,
                'total_amount': total_amount,
                'status': 'CREATED',
                'created_at': datetime.utcnow().isoformat()
            }

            outbox_message = OutboxMessage(
                aggregate_type='Order',
                aggregate_id=order.id,
                event_type='OrderCreated',
                payload=json.dumps(event_payload)
            )
            session.add(outbox_message)

            # 3. 단일 트랜잭션으로 커밋
            # DB 저장과 이벤트 저장이 원자적으로 처리됨!
            session.commit()

            print(f"✅ 주문 생성 완료: {order.id}")
            print(f"✅ Outbox 이벤트 저장: OrderCreated")

            return order

        except Exception as e:
            session.rollback()
            print(f"❌ 주문 생성 실패: {e}")
            raise
        finally:
            session.close()


class OutboxPoller:
    """
    Outbox 폴러 - 주기적으로 Outbox를 읽어 메시지 발행
    실제 환경에서는 Debezium CDC 사용 권장
    """

    def __init__(self, session_factory, message_publisher):
        self.session_factory = session_factory
        self.publisher = message_publisher
        self.running = False

    def start(self, poll_interval: float = 1.0):
        """폴링 시작"""
        self.running = True
        thread = threading.Thread(target=self._poll_loop, args=(poll_interval,))
        thread.daemon = True
        thread.start()

    def stop(self):
        self.running = False

    def _poll_loop(self, interval: float):
        while self.running:
            self._process_outbox()
            time.sleep(interval)

    def _process_outbox(self):
        session = self.session_factory()

        try:
            # 미발행 메시지 조회 (FOR UPDATE로 락)
            messages = session.query(OutboxMessage).filter(
                OutboxMessage.published == False
            ).order_by(OutboxMessage.created_at).limit(100).with_for_update().all()

            for msg in messages:
                try:
                    # 메시지 브로커로 발행
                    self.publisher.publish(
                        topic=f"{msg.aggregate_type.lower()}-events",
                        key=msg.aggregate_id,
                        value=msg.payload,
                        headers={
                            'event_type': msg.event_type,
                            'message_id': msg.id
                        }
                    )

                    # 발행 완료 표시
                    msg.published = True
                    msg.published_at = datetime.utcnow()

                    print(f"📤 이벤트 발행: {msg.event_type} ({msg.aggregate_id})")

                except Exception as e:
                    print(f"❌ 발행 실패, 재시도 예정: {e}")
                    # 실패해도 다음 폴링에서 재시도

            session.commit()

        except Exception as e:
            session.rollback()
            print(f"❌ Outbox 처리 오류: {e}")
        finally:
            session.close()


# Kafka 발행자 예시
class KafkaMessagePublisher:
    def __init__(self, bootstrap_servers: str):
        from kafka import KafkaProducer
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: v.encode('utf-8') if isinstance(v, str) else v
        )

    def publish(self, topic: str, key: str, value: str, headers: dict = None):
        header_list = [(k, v.encode()) for k, v in (headers or {}).items()]
        self.producer.send(topic, key=key.encode(), value=value, headers=header_list)
        self.producer.flush()


# 사용 예시
if __name__ == "__main__":
    engine = create_engine('postgresql://localhost/orders')
    Base.metadata.create_all(engine)
    Session = sessionmaker(bind=engine)

    # 서비스 생성
    order_service = OrderService(Session)

    # 폴러 시작
    publisher = KafkaMessagePublisher('localhost:9092')
    poller = OutboxPoller(Session, publisher)
    poller.start()

    # 주문 생성 (DB + Outbox 원자적 저장)
    order = order_service.create_order(
        customer_id='CUST-001',
        items=[{'product_id': 'PROD-1', 'quantity': 2}],
        total_amount=50000
    )
// Outbox Pattern - Node.js + Prisma + TypeScript
import { PrismaClient, Prisma } from '@prisma/client';
import { Kafka, Producer } from 'kafkajs';
import { v4 as uuidv4 } from 'uuid';

const prisma = new PrismaClient();

// Prisma 스키마 (schema.prisma)
/*
model Order {
  id          String   @id @default(uuid())
  customerId  String
  totalAmount Decimal
  status      String   @default("PENDING")
  createdAt   DateTime @default(now())
}

model OutboxMessage {
  id            String    @id @default(uuid())
  aggregateType String
  aggregateId   String
  eventType     String
  payload       Json
  published     Boolean   @default(false)
  publishedAt   DateTime?
  createdAt     DateTime  @default(now())

  @@index([published, createdAt])
}
*/

interface OrderCreatedEvent {
  orderId: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number }>;
  totalAmount: number;
  status: string;
  createdAt: string;
}

class OrderService {
  /**
   * 주문 생성 - Outbox 패턴으로 이벤트 저장
   */
  async createOrder(
    customerId: string,
    items: Array<{ productId: string; quantity: number }>,
    totalAmount: number
  ) {
    // Prisma 트랜잭션으로 원자성 보장
    const result = await prisma.$transaction(async (tx) => {
      // 1. 주문 생성
      const order = await tx.order.create({
        data: {
          customerId,
          totalAmount,
          status: 'CREATED',
        },
      });

      // 2. Outbox에 이벤트 저장 (같은 트랜잭션!)
      const eventPayload: OrderCreatedEvent = {
        orderId: order.id,
        customerId,
        items,
        totalAmount,
        status: 'CREATED',
        createdAt: new Date().toISOString(),
      };

      await tx.outboxMessage.create({
        data: {
          aggregateType: 'Order',
          aggregateId: order.id,
          eventType: 'OrderCreated',
          payload: eventPayload as unknown as Prisma.JsonObject,
        },
      });

      console.log(`✅ 주문 생성: ${order.id}`);
      console.log(`✅ Outbox 저장: OrderCreated`);

      return order;
    });

    return result;
  }

  /**
   * 주문 취소 - 상태 변경 + 이벤트 저장
   */
  async cancelOrder(orderId: string, reason: string) {
    return await prisma.$transaction(async (tx) => {
      const order = await tx.order.update({
        where: { id: orderId },
        data: { status: 'CANCELLED' },
      });

      await tx.outboxMessage.create({
        data: {
          aggregateType: 'Order',
          aggregateId: orderId,
          eventType: 'OrderCancelled',
          payload: {
            orderId,
            reason,
            cancelledAt: new Date().toISOString(),
          } as Prisma.JsonObject,
        },
      });

      return order;
    });
  }
}

/**
 * Outbox 폴러 - 미발행 메시지를 Kafka로 발행
 */
class OutboxPublisher {
  private producer: Producer;
  private isRunning = false;

  constructor(kafkaBrokers: string[]) {
    const kafka = new Kafka({
      clientId: 'outbox-publisher',
      brokers: kafkaBrokers,
    });
    this.producer = kafka.producer();
  }

  async start(pollIntervalMs: number = 1000) {
    await this.producer.connect();
    this.isRunning = true;

    console.log('🚀 Outbox 폴러 시작');

    while (this.isRunning) {
      await this.processOutbox();
      await this.sleep(pollIntervalMs);
    }
  }

  async stop() {
    this.isRunning = false;
    await this.producer.disconnect();
  }

  private async processOutbox() {
    // 미발행 메시지 조회 및 처리
    const messages = await prisma.outboxMessage.findMany({
      where: { published: false },
      orderBy: { createdAt: 'asc' },
      take: 100,
    });

    for (const msg of messages) {
      try {
        // Kafka로 발행
        await this.producer.send({
          topic: `${msg.aggregateType.toLowerCase()}-events`,
          messages: [
            {
              key: msg.aggregateId,
              value: JSON.stringify(msg.payload),
              headers: {
                eventType: msg.eventType,
                messageId: msg.id,
              },
            },
          ],
        });

        // 발행 완료 표시
        await prisma.outboxMessage.update({
          where: { id: msg.id },
          data: {
            published: true,
            publishedAt: new Date(),
          },
        });

        console.log(`📤 발행: ${msg.eventType} (${msg.aggregateId})`);
      } catch (error) {
        console.error(`❌ 발행 실패: ${msg.id}`, error);
        // 다음 폴링에서 재시도
      }
    }
  }

  private sleep(ms: number): Promise {
    return new Promise((resolve) => setTimeout(resolve, ms));
  }
}

/**
 * Outbox 클리너 - 오래된 발행 완료 메시지 삭제
 */
async function cleanupOutbox(retentionDays: number = 7) {
  const cutoffDate = new Date();
  cutoffDate.setDate(cutoffDate.getDate() - retentionDays);

  const result = await prisma.outboxMessage.deleteMany({
    where: {
      published: true,
      publishedAt: { lt: cutoffDate },
    },
  });

  console.log(`🧹 ${result.count}개 오래된 Outbox 메시지 삭제`);
}

// 사용 예시
async function main() {
  const orderService = new OrderService();

  // 주문 생성 (원자적으로 DB + Outbox 저장)
  const order = await orderService.createOrder(
    'customer-123',
    [{ productId: 'prod-1', quantity: 2 }],
    50000
  );

  // Outbox 폴러 시작 (별도 프로세스로 실행 권장)
  const publisher = new OutboxPublisher(['localhost:9092']);
  await publisher.start();
}

main();
// Debezium CDC를 사용한 Outbox Pattern
// 폴링 대신 DB 변경 로그(binlog/WAL)를 실시간 캡처

// 1. Outbox 테이블 생성 (PostgreSQL)
/*
CREATE TABLE outbox (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    aggregate_type VARCHAR(100) NOT NULL,
    aggregate_id VARCHAR(100) NOT NULL,
    event_type VARCHAR(100) NOT NULL,
    payload JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Debezium이 읽은 후 자동 삭제를 위한 인덱스
CREATE INDEX idx_outbox_created ON outbox(created_at);
*/

// 2. Debezium Connector 설정 (Kafka Connect)
{
  "name": "outbox-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "debezium",
    "database.password": "secret",
    "database.dbname": "orders_db",
    "database.server.name": "orders",

    // Outbox 테이블만 캡처
    "table.include.list": "public.outbox",

    // Outbox 이벤트 라우터 사용
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",

    // 토픽 라우팅 설정
    "transforms.outbox.table.field.event.key": "aggregate_id",
    "transforms.outbox.table.field.event.type": "event_type",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.route.by.field": "aggregate_type",
    "transforms.outbox.route.topic.replacement": "${routedByValue}-events",

    // 발행 후 레코드 삭제 (선택사항)
    "transforms.outbox.table.expand.json.payload": "true",

    // 스냅샷 설정
    "snapshot.mode": "initial",

    // 오프셋 저장
    "offset.storage.topic": "debezium-offsets",
    "offset.flush.interval.ms": "1000"
  }
}

// 3. 결과: aggregate_type에 따라 자동으로 토픽 생성
// - Order → order-events 토픽
// - Payment → payment-events 토픽

// 4. Kafka Consumer에서 이벤트 수신
/*
{
  "key": "order-12345",
  "value": {
    "orderId": "order-12345",
    "customerId": "cust-001",
    "totalAmount": 50000,
    "status": "CREATED",
    "createdAt": "2024-01-15T10:30:00Z"
  },
  "headers": {
    "id": "msg-uuid-xxx",
    "eventType": "OrderCreated"
  }
}
*/

// 5. Docker Compose 예시
/*
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
      - postgres
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: outbox-connect
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
    ports:
      - "8083:8083"

  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: debezium
      POSTGRES_PASSWORD: secret
      POSTGRES_DB: orders_db
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"  # CDC 필수 설정
*/

// 6. Connector 등록 API 호출
/*
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d @outbox-connector.json
*/

🗣️ 실무에서 이렇게 말하세요

💬 이벤트 기반 아키텍처 설계 시
"주문 생성하고 Kafka에 직접 발행하면 Dual Write 문제가 생깁니다. DB 커밋 후 Kafka 장애 나면 이벤트 유실이에요. Outbox 패턴으로 이벤트를 같은 트랜잭션에 저장하고, Debezium이 CDC로 읽어서 발행하면 at-least-once 보장됩니다."
💬 메시지 중복 처리 논의에서
"Outbox 패턴은 at-least-once라서 중복 발행될 수 있어요. 수신자 측에서 message_id로 멱등성 처리해야 합니다. Inbox 패턴을 같이 쓰면 수신 측도 중복 처리를 DB 레벨에서 보장할 수 있습니다."
💬 Outbox 테이블 관리 논의에서
"Outbox 테이블이 계속 쌓이면 성능이 떨어집니다. 발행 완료된 레코드는 7일 정도 보관 후 삭제하는 배치를 돌리세요. 또는 Debezium의 delete 모드를 켜면 캡처 후 자동 삭제됩니다."

⚠️ 주의사항 & 베스트 프랙티스

메시지 순서 보장 안함

여러 인스턴스가 폴링하면 순서가 뒤바뀔 수 있습니다. 순서가 중요하면 aggregate_id 기준 파티셔닝하거나 단일 폴러를 사용하세요.

Outbox 테이블 비대화

발행 완료된 레코드를 삭제하지 않으면 테이블이 커져 쿼리 성능이 저하됩니다. 주기적인 정리(retention) 정책을 반드시 구현하세요.

중복 메시지 미처리

at-least-once 발행이므로 수신자가 중복을 처리하지 않으면 데이터가 꼬입니다. message_id 기반 멱등성 또는 Inbox 패턴을 적용하세요.

Outbox 패턴 베스트 프랙티스

CDC(Debezium) 활용으로 실시간 발행, message_id로 멱등성, 발행 완료 레코드 정리, 모니터링으로 미발행 메시지 감지.

🔗 관련 용어

📚 더 배우기