🏗️ 아키텍처

Inbox Pattern

인박스 패턴 · Idempotent Consumer Pattern

Inbox Pattern은 메시지 기반 분산 시스템에서 중복 메시지 처리를 방지하는 멱등성(Idempotency) 패턴입니다. 수신된 메시지의 고유 ID를 별도 테이블(Inbox)에 저장하여, 동일한 메시지가 다시 도착해도 중복 처리하지 않습니다.

📖 상세 설명

왜 Inbox Pattern이 필요한가?

분산 메시지 시스템에서 At-Least-Once 전달 보장은 메시지 손실을 방지하지만, 네트워크 장애나 Consumer 재시작 시 동일 메시지가 여러 번 전달될 수 있습니다. 이로 인해:

  • 주문 중복 생성: 같은 주문이 2번 처리됨
  • 결제 중복 차감: 한 번의 결제 요청에 금액이 2배로 차감
  • 재고 과다 차감: 동일 재고 감소가 여러 번 실행
  • 알림 중복 발송: 사용자에게 같은 알림이 여러 번 전송

Inbox Pattern의 동작 원리

Inbox Pattern은 수신된 메시지의 고유 식별자(Message ID)를 영속 저장소(Inbox 테이블)에 기록하여 중복 여부를 판단합니다:

┌─────────────────────────────────────────────────────────────────────────────┐ │ Inbox Pattern Flow │ ├─────────────────────────────────────────────────────────────────────────────┤ │ │ │ Message Queue Consumer Service Database │ │ ┌──────────┐ ┌──────────────┐ ┌────────────┐ │ │ │ │ │ │ │ │ │ │ │ Msg A │────Consume─────▶│ 1. Extract │ │ INBOX │ │ │ │ (ID:123)│ │ Msg ID │ │ TABLE │ │ │ │ │ │ │ │ │ │ │ │ Msg A │ │ 2. Check │──EXISTS?────▶│ ┌──────┐ │ │ │ │ (ID:123)│ │ Inbox │ │ │ID:123│ │ │ │ │ (retry) │ │ │◀─YES/NO──────│ │ID:456│ │ │ │ │ │ │ │ │ │ID:789│ │ │ │ └──────────┘ │ 3. If NEW: │ │ └──────┘ │ │ │ │ - Insert │──INSERT─────▶│ │ │ │ │ - Process │ │ │ │ │ │ │ │ BUSINESS │ │ │ │ 4. If DUP: │ │ TABLE │ │ │ │ - Skip │ │ ┌──────┐ │ │ │ │ - ACK │ │ │Orders│ │ │ │ │ │ │ │Items │ │ │ │ └──────────────┘ │ └──────┘ │ │ │ └────────────┘ │ │ │ │ 핵심: Message ID를 Inbox 테이블에 저장하여 중복 체크 │ │ 전략: INSERT와 비즈니스 로직을 하나의 트랜잭션으로 묶음 │ └─────────────────────────────────────────────────────────────────────────────┘

처리 단계

  1. 메시지 수신: Consumer가 Message Queue에서 메시지를 받음
  2. ID 추출: 메시지에서 고유 식별자(Message ID, Idempotency Key) 추출
  3. 중복 체크: Inbox 테이블에서 해당 ID 존재 여부 확인
  4. 조건부 처리:
    • 신규 메시지: Inbox에 ID 삽입 + 비즈니스 로직 실행 (동일 트랜잭션)
    • 중복 메시지: 비즈니스 로직 건너뛰고 ACK만 전송
  5. ACK 전송: Message Queue에 처리 완료 알림
💡 핵심 원칙 Inbox INSERT와 비즈니스 로직은 반드시 동일 트랜잭션에서 실행되어야 합니다. 그래야 "Inbox에는 기록됐지만 비즈니스 로직은 실패한" 불일치 상태를 방지할 수 있습니다.

Inbox 테이블 설계

Inbox 테이블은 간단한 구조로 설계할 수 있습니다:

-- 기본 Inbox 테이블
CREATE TABLE inbox (
    message_id    VARCHAR(255) PRIMARY KEY,  -- 메시지 고유 ID (UUID, Correlation ID 등)
    message_type  VARCHAR(100) NOT NULL,      -- 메시지 타입 (OrderCreated, PaymentCompleted 등)
    received_at   TIMESTAMP DEFAULT NOW(),    -- 수신 시각
    processed_at  TIMESTAMP,                  -- 처리 완료 시각 (nullable)
    payload       JSONB                       -- 원본 메시지 저장 (디버깅/재처리용, optional)
);

-- 조회 성능을 위한 인덱스
CREATE INDEX idx_inbox_received_at ON inbox(received_at);
CREATE INDEX idx_inbox_message_type ON inbox(message_type);

-- 오래된 레코드 정리를 위한 파티셔닝 (선택사항)
-- 메시지 보관 기간에 따라 일별/주별 파티션 구성

Inbox Pattern vs Outbox Pattern

InboxOutbox는 메시지 처리의 양 끝단에서 신뢰성을 보장하는 상보적 패턴입니다:

구분 Inbox Pattern Outbox Pattern
위치 Consumer (수신측) Producer (발신측)
목적 중복 메시지 처리 방지 메시지 손실 방지
동작 메시지 ID로 중복 체크 후 처리 비즈니스 로직과 메시지를 한 트랜잭션에 저장
해결 문제 At-Least-Once로 인한 중복 처리 DB 커밋 후 메시지 발행 실패
테이블 역할 처리 완료된 메시지 ID 기록 발행할 메시지 임시 저장
함께 사용 Outbox → Message Queue → Inbox (End-to-End 신뢰성)

💻 코드 예제

TypeScript + PostgreSQL 구현

Node.js 환경에서 Inbox Pattern을 구현한 완전한 예제입니다:

// inbox/inbox.entity.ts
import { Entity, PrimaryColumn, Column, CreateDateColumn } from 'typeorm';

@Entity('inbox')
export class InboxEntity {
  @PrimaryColumn('varchar', { length: 255 })
  messageId: string;

  @Column('varchar', { length: 100 })
  messageType: string;

  @CreateDateColumn()
  receivedAt: Date;

  @Column('timestamp', { nullable: true })
  processedAt: Date | null;

  @Column('jsonb', { nullable: true })
  payload: Record<string, unknown>;
}

// inbox/inbox.repository.ts
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { InboxEntity } from './inbox.entity';

@Injectable()
export class InboxRepository {
  constructor(private dataSource: DataSource) {}

  /**
   * 메시지가 이미 처리되었는지 확인
   */
  async isProcessed(messageId: string): Promise<boolean> {
    const existing = await this.dataSource.manager.findOne(InboxEntity, {
      where: { messageId }
    });
    return existing !== null;
  }

  /**
   * Inbox에 메시지 등록 (트랜잭션 내에서 사용)
   */
  async recordMessage(
    manager: EntityManager,
    messageId: string,
    messageType: string,
    payload?: Record<string, unknown>
  ): Promise<void> {
    const inbox = manager.create(InboxEntity, {
      messageId,
      messageType,
      payload,
      processedAt: null
    });
    await manager.save(inbox);
  }

  /**
   * 처리 완료 시각 업데이트
   */
  async markAsProcessed(
    manager: EntityManager,
    messageId: string
  ): Promise<void> {
    await manager.update(InboxEntity, { messageId }, {
      processedAt: new Date()
    });
  }
}

// message/message-handler.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { InboxRepository } from '../inbox/inbox.repository';

interface OrderCreatedEvent {
  orderId: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  totalAmount: number;
}

@Injectable()
export class MessageHandlerService {
  private readonly logger = new Logger(MessageHandlerService.name);

  constructor(
    private dataSource: DataSource,
    private inboxRepository: InboxRepository
  ) {}

  /**
   * Inbox Pattern을 적용한 메시지 처리
   */
  async handleOrderCreated(
    messageId: string,
    event: OrderCreatedEvent
  ): Promise<void> {
    // 1. 중복 체크 (빠른 실패)
    const alreadyProcessed = await this.inboxRepository.isProcessed(messageId);
    if (alreadyProcessed) {
      this.logger.log(`Duplicate message detected: ${messageId}, skipping...`);
      return; // ACK 전송 (메시지 큐에서 제거)
    }

    // 2. 트랜잭션으로 Inbox 기록 + 비즈니스 로직 실행
    await this.dataSource.transaction(async (manager) => {
      // 2-1. Inbox에 메시지 등록 (Race condition 방지를 위해 트랜잭션 내에서)
      try {
        await this.inboxRepository.recordMessage(
          manager,
          messageId,
          'OrderCreated',
          event as Record<string, unknown>
        );
      } catch (error: any) {
        // UNIQUE 제약 조건 위반 = 다른 인스턴스가 먼저 처리함
        if (error.code === '23505') { // PostgreSQL unique violation
          this.logger.log(`Concurrent duplicate: ${messageId}, skipping...`);
          return;
        }
        throw error;
      }

      // 2-2. 비즈니스 로직 실행
      await this.processOrderCreated(manager, event);

      // 2-3. 처리 완료 표시
      await this.inboxRepository.markAsProcessed(manager, messageId);
    });

    this.logger.log(`Successfully processed OrderCreated: ${messageId}`);
  }

  private async processOrderCreated(
    manager: any,
    event: OrderCreatedEvent
  ): Promise<void> {
    // 실제 비즈니스 로직 구현
    // - 주문 엔티티 생성/업데이트
    // - 재고 감소
    // - 포인트 적립
    // - 알림 발송 이벤트 생성 (Outbox Pattern 사용)

    this.logger.log(`Processing order: ${event.orderId} for customer: ${event.customerId}`);

    // 예시: 주문 저장
    await manager.query(
      `INSERT INTO orders (id, customer_id, total_amount, status, created_at)
       VALUES ($1, $2, $3, 'CREATED', NOW())`,
      [event.orderId, event.customerId, event.totalAmount]
    );

    // 예시: 주문 아이템 저장
    for (const item of event.items) {
      await manager.query(
        `INSERT INTO order_items (order_id, product_id, quantity, price)
         VALUES ($1, $2, $3, $4)`,
        [event.orderId, item.productId, item.quantity, item.price]
      );
    }
  }
}

// message/kafka-consumer.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { MessageHandlerService } from './message-handler.service';

@Injectable()
export class KafkaConsumer implements OnModuleInit {
  private consumer: Consumer;

  constructor(private messageHandler: MessageHandlerService) {
    const kafka = new Kafka({
      clientId: 'order-service',
      brokers: ['kafka:9092']
    });
    this.consumer = kafka.consumer({ groupId: 'order-service-group' });
  }

  async onModuleInit() {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'order-events', fromBeginning: false });

    await this.consumer.run({
      eachMessage: async (payload: EachMessagePayload) => {
        const { message } = payload;

        // Message ID 추출 (Kafka에서는 보통 헤더에 포함)
        const messageId = message.headers?.['message-id']?.toString()
          ?? message.key?.toString()
          ?? `${payload.topic}-${payload.partition}-${message.offset}`;

        const event = JSON.parse(message.value?.toString() ?? '{}');

        // Inbox Pattern이 적용된 핸들러 호출
        await this.messageHandler.handleOrderCreated(messageId, event);
      }
    });
  }
}

Java + Spring 구현

Spring Boot와 JPA를 사용한 Inbox Pattern 구현입니다:

// entity/InboxMessage.java
package com.example.inbox.entity;

import jakarta.persistence.*;
import java.time.Instant;

@Entity
@Table(name = "inbox")
public class InboxMessage {

    @Id
    @Column(name = "message_id", length = 255)
    private String messageId;

    @Column(name = "message_type", length = 100, nullable = false)
    private String messageType;

    @Column(name = "received_at", nullable = false)
    private Instant receivedAt = Instant.now();

    @Column(name = "processed_at")
    private Instant processedAt;

    @Column(name = "payload", columnDefinition = "jsonb")
    private String payload;

    // Getters, setters, constructors...

    public static InboxMessage create(String messageId, String messageType, String payload) {
        InboxMessage inbox = new InboxMessage();
        inbox.messageId = messageId;
        inbox.messageType = messageType;
        inbox.payload = payload;
        return inbox;
    }

    public void markAsProcessed() {
        this.processedAt = Instant.now();
    }
}

// repository/InboxRepository.java
package com.example.inbox.repository;

import com.example.inbox.entity.InboxMessage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface InboxRepository extends JpaRepository<InboxMessage, String> {
    boolean existsByMessageId(String messageId);
}

// service/IdempotentMessageHandler.java
package com.example.inbox.service;

import com.example.inbox.entity.InboxMessage;
import com.example.inbox.repository.InboxRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.function.Consumer;

@Service
public class IdempotentMessageHandler {

    private static final Logger log = LoggerFactory.getLogger(IdempotentMessageHandler.class);

    private final InboxRepository inboxRepository;

    public IdempotentMessageHandler(InboxRepository inboxRepository) {
        this.inboxRepository = inboxRepository;
    }

    /**
     * 멱등성이 보장된 메시지 처리 템플릿
     *
     * @param messageId    메시지 고유 ID
     * @param messageType  메시지 타입
     * @param payload      원본 페이로드 (JSON 문자열)
     * @param handler      실제 비즈니스 로직
     */
    @Transactional
    public void handleIdempotently(
            String messageId,
            String messageType,
            String payload,
            Consumer<String> handler
    ) {
        // 1. 빠른 중복 체크 (트랜잭션 외부)
        if (inboxRepository.existsByMessageId(messageId)) {
            log.info("Duplicate message detected: {}, skipping", messageId);
            return;
        }

        // 2. Inbox 기록 시도 (트랜잭션 내부)
        InboxMessage inbox = InboxMessage.create(messageId, messageType, payload);

        try {
            inboxRepository.save(inbox);
        } catch (DataIntegrityViolationException e) {
            // 동시성 경쟁에서 다른 인스턴스가 먼저 INSERT 성공
            log.info("Concurrent duplicate detected: {}, skipping", messageId);
            return;
        }

        // 3. 비즈니스 로직 실행
        handler.accept(payload);

        // 4. 처리 완료 표시
        inbox.markAsProcessed();
        inboxRepository.save(inbox);

        log.info("Successfully processed message: {}", messageId);
    }
}

// listener/OrderEventListener.java
package com.example.order.listener;

import com.example.inbox.service.IdempotentMessageHandler;
import com.example.order.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
public class OrderEventListener {

    private final IdempotentMessageHandler idempotentHandler;
    private final OrderService orderService;
    private final ObjectMapper objectMapper;

    public OrderEventListener(
            IdempotentMessageHandler idempotentHandler,
            OrderService orderService,
            ObjectMapper objectMapper
    ) {
        this.idempotentHandler = idempotentHandler;
        this.orderService = orderService;
        this.objectMapper = objectMapper;
    }

    @KafkaListener(topics = "order-events", groupId = "order-service")
    public void handleOrderEvent(
            @Payload String payload,
            @Header(KafkaHeaders.RECEIVED_KEY) String messageKey,
            @Header(value = "message-id", required = false) String messageId
    ) {
        // Message ID 결정 (헤더 우선, 없으면 Kafka key 사용)
        String effectiveMessageId = messageId != null ? messageId : messageKey;

        // Inbox Pattern 적용
        idempotentHandler.handleIdempotently(
            effectiveMessageId,
            "OrderCreated",
            payload,
            (p) -> {
                try {
                    OrderCreatedEvent event = objectMapper.readValue(p, OrderCreatedEvent.class);
                    orderService.processOrderCreated(event);
                } catch (Exception e) {
                    throw new RuntimeException("Failed to process order event", e);
                }
            }
        );
    }
}

Go 구현 (간결한 버전)

Go 언어에서의 Inbox Pattern 구현입니다:

// inbox/handler.go
package inbox

import (
    "context"
    "database/sql"
    "encoding/json"
    "errors"
    "log"
    "time"

    "github.com/lib/pq"
)

type InboxHandler struct {
    db *sql.DB
}

func NewInboxHandler(db *sql.DB) *InboxHandler {
    return &InboxHandler{db: db}
}

// HandleIdempotently wraps message processing with inbox pattern
func (h *InboxHandler) HandleIdempotently(
    ctx context.Context,
    messageID string,
    messageType string,
    payload json.RawMessage,
    handler func(ctx context.Context, tx *sql.Tx) error,
) error {
    // 1. Quick duplicate check
    exists, err := h.checkExists(ctx, messageID)
    if err != nil {
        return err
    }
    if exists {
        log.Printf("Duplicate message: %s, skipping", messageID)
        return nil
    }

    // 2. Start transaction
    tx, err := h.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
    if err != nil {
        return err
    }
    defer tx.Rollback()

    // 3. Try to insert into inbox (handles race condition)
    err = h.insertInbox(ctx, tx, messageID, messageType, payload)
    if err != nil {
        var pqErr *pq.Error
        if errors.As(err, &pqErr) && pqErr.Code == "23505" {
            log.Printf("Concurrent duplicate: %s, skipping", messageID)
            return nil
        }
        return err
    }

    // 4. Execute business logic
    if err := handler(ctx, tx); err != nil {
        return err
    }

    // 5. Mark as processed and commit
    if err := h.markProcessed(ctx, tx, messageID); err != nil {
        return err
    }

    return tx.Commit()
}

func (h *InboxHandler) checkExists(ctx context.Context, messageID string) (bool, error) {
    var exists bool
    err := h.db.QueryRowContext(ctx,
        "SELECT EXISTS(SELECT 1 FROM inbox WHERE message_id = $1)",
        messageID,
    ).Scan(&exists)
    return exists, err
}

func (h *InboxHandler) insertInbox(
    ctx context.Context,
    tx *sql.Tx,
    messageID, messageType string,
    payload json.RawMessage,
) error {
    _, err := tx.ExecContext(ctx,
        `INSERT INTO inbox (message_id, message_type, payload, received_at)
         VALUES ($1, $2, $3, $4)`,
        messageID, messageType, payload, time.Now(),
    )
    return err
}

func (h *InboxHandler) markProcessed(ctx context.Context, tx *sql.Tx, messageID string) error {
    _, err := tx.ExecContext(ctx,
        "UPDATE inbox SET processed_at = $1 WHERE message_id = $2",
        time.Now(), messageID,
    )
    return err
}
💡 Message ID 생성 전략 메시지 ID는 다음 중 하나를 사용합니다:
  • UUID v4: Producer에서 생성, 글로벌 유니크
  • Idempotency Key: 클라이언트가 생성한 요청별 고유 키
  • Business Key: 주문ID+버전 조합 등 비즈니스 의미가 있는 키
  • Kafka Offset: topic-partition-offset 조합 (Kafka 전용)

⚖️ 멱등성 구현 전략 비교

Inbox Pattern 외에도 중복 처리를 방지하는 다양한 전략이 있습니다:

전략 방식 장점 단점
Inbox Pattern Message ID를 별도 테이블에 저장 범용적, 모든 메시지 타입 적용 가능 추가 저장 공간, 테이블 관리 필요
Upsert INSERT ON CONFLICT UPDATE 단순, 별도 테이블 불필요 특정 엔티티에만 적용, 삭제 연산 어려움
Version Check 엔티티 버전으로 상태 체크 Optimistic Lock과 자연스럽게 결합 버전 관리 로직 필요, 복잡도 증가
Natural Idempotency 연산 자체가 멱등 (SET 연산 등) 추가 구현 불필요 모든 연산에 적용 불가 (증감 연산 등)
Deduplication Service Redis/Memcached로 ID 캐싱 매우 빠른 체크, 분산 환경 용이 TTL 만료 후 중복 가능, 외부 의존성

언제 어떤 전략을 선택해야 하나?

🎯 선택 가이드
  • Inbox Pattern: 범용적인 메시지 처리, 다양한 이벤트 타입, 감사 로그 필요 시
  • Upsert: 단순 엔티티 생성/수정, 빠른 구현이 필요할 때
  • Version Check: 이미 버전 관리하는 애그리거트, DDD 환경
  • Redis 캐싱: 초고속 처리 필요, 일시적 중복만 방지해도 되는 경우

🛠️ 실무 적용 가이드

Inbox 테이블 관리

시간이 지나면 Inbox 테이블은 계속 커집니다. 정리 전략이 필요합니다:

-- 1. 시간 기반 정리 (30일 이상 오래된 레코드 삭제)
DELETE FROM inbox
WHERE processed_at IS NOT NULL
  AND processed_at < NOW() - INTERVAL '30 days';

-- 2. 파티셔닝 적용 (PostgreSQL 예시)
CREATE TABLE inbox (
    message_id VARCHAR(255) NOT NULL,
    message_type VARCHAR(100) NOT NULL,
    received_at TIMESTAMP NOT NULL DEFAULT NOW(),
    processed_at TIMESTAMP,
    payload JSONB,
    PRIMARY KEY (message_id, received_at)
) PARTITION BY RANGE (received_at);

-- 월별 파티션 생성
CREATE TABLE inbox_2025_01 PARTITION OF inbox
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE inbox_2025_02 PARTITION OF inbox
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- 오래된 파티션 삭제 (빠름)
DROP TABLE inbox_2024_12;

성능 최적화

// 1. Redis를 이용한 2단계 중복 체크 (Hot Cache + Cold Storage)
class OptimizedInboxHandler {
  constructor(
    private redis: Redis,
    private db: DataSource,
    private inboxRepository: InboxRepository
  ) {}

  async handleMessage(messageId: string, handler: () => Promise<void>) {
    // Stage 1: Redis 캐시 체크 (매우 빠름)
    const cached = await this.redis.get(`inbox:${messageId}`);
    if (cached) {
      return; // Cache hit = 이미 처리됨
    }

    // Stage 2: DB 체크 (캐시 미스 시에만)
    const exists = await this.inboxRepository.isProcessed(messageId);
    if (exists) {
      // DB에 있으면 캐시에도 추가 (다음 중복 시 빠른 응답)
      await this.redis.setex(`inbox:${messageId}`, 86400, '1'); // 24시간 TTL
      return;
    }

    // Stage 3: 트랜잭션 처리
    await this.db.transaction(async (manager) => {
      await this.inboxRepository.recordMessage(manager, messageId, 'event', {});
      await handler();
    });

    // Stage 4: 캐시 갱신
    await this.redis.setex(`inbox:${messageId}`, 86400, '1');
  }
}

// 2. Batch 처리로 처리량 향상
class BatchInboxHandler {
  private batch: Array<{ messageId: string; handler: () => Promise<void> }> = [];
  private readonly BATCH_SIZE = 100;
  private readonly FLUSH_INTERVAL = 100; // ms

  async addMessage(messageId: string, handler: () => Promise<void>) {
    this.batch.push({ messageId, handler });

    if (this.batch.length >= this.BATCH_SIZE) {
      await this.flush();
    }
  }

  async flush() {
    if (this.batch.length === 0) return;

    const currentBatch = [...this.batch];
    this.batch = [];

    // 1. Batch로 중복 체크
    const messageIds = currentBatch.map(b => b.messageId);
    const existingIds = await this.checkBatch(messageIds);

    // 2. 중복 아닌 것만 처리
    const newMessages = currentBatch.filter(
      b => !existingIds.has(b.messageId)
    );

    // 3. Batch INSERT + 개별 핸들러 실행
    await this.processBatch(newMessages);
  }

  private async checkBatch(ids: string[]): Promise<Set<string>> {
    const results = await this.db.query(
      `SELECT message_id FROM inbox WHERE message_id = ANY($1)`,
      [ids]
    );
    return new Set(results.map((r: any) => r.message_id));
  }
}
⚠️ 분산 트랜잭션 주의 Inbox 테이블과 비즈니스 테이블이 서로 다른 데이터베이스에 있으면 단일 트랜잭션으로 묶을 수 없습니다. 이 경우 Saga 패턴이나 2PC(Two-Phase Commit)를 고려해야 합니다.

💬 실무 대화 예시

🧑‍💻
주니어 개발자
Kafka Consumer에서 메시지 처리 중 예외가 발생하면 자동으로 재시도되는데, 그 사이에 이미 DB에 데이터가 저장된 상태라 중복 데이터가 생겨요. 어떻게 해야 할까요?
👨‍🏫
시니어 개발자
전형적인 At-Least-Once 중복 문제네요. Inbox Pattern을 적용하면 됩니다. 메시지의 고유 ID를 inbox 테이블에 저장하고, 처리 전에 이미 있는지 체크해요. 핵심은 inbox INSERT와 비즈니스 로직을 같은 트랜잭션에서 실행하는 거예요. 그래야 "기록은 됐는데 처리 안 됨" 같은 불일치 상태를 막을 수 있어요.
🧑‍💻
주니어 개발자
그런데 Consumer가 여러 대 떠 있으면 동시에 같은 메시지를 처리할 수도 있지 않나요? 둘 다 "없음"을 확인하고 동시에 INSERT 시도할 것 같은데...
👨‍🏫
시니어 개발자
좋은 지적이에요! 그래서 message_id에 UNIQUE 제약조건을 걸어야 해요. 동시에 INSERT하면 하나는 성공하고, 하나는 unique violation 예외가 발생하죠. 예외를 catch해서 "이미 다른 인스턴스가 처리함"으로 간주하고 skip하면 됩니다. 이게 "check-then-act" 대신 "try-insert-catch-duplicate" 패턴이에요.
🧑‍💻
주니어 개발자
Inbox Pattern을 적용하니까 inbox 테이블이 엄청 빨리 커져요. 매일 수백만 건씩 쌓이는데 성능 문제가 걱정됩니다.
👨‍🏫
시니어 개발자
테이블 정리 전략이 필요하죠. 몇 가지 방법이 있어요:

1. 파티셔닝: received_at 기준으로 월별 파티션 만들고, 오래된 파티션은 DROP
2. 배치 삭제: 처리 완료 후 30일 지난 레코드 야간에 삭제
3. TTL 테이블: TimescaleDB나 Cassandra처럼 자동 만료 지원하는 DB 사용

조회 성능은 message_id가 Primary Key라서 O(1)이에요. 테이블이 커도 특정 ID 조회는 빠릅니다.
🧑‍💻
주니어 개발자
Redis를 중복 체크에 쓰면 더 빠르지 않을까요?
👨‍🏫
시니어 개발자
맞아요, Redis가 훨씬 빠르죠. 근데 Redis만 쓰면 문제가 있어요. Redis는 TTL 만료되면 데이터가 사라지잖아요? TTL 지난 후에 같은 메시지가 재전송되면 중복 처리됩니다.

그래서 2단계 전략을 추천해요:
1단계: Redis 체크 (빠른 경로, 대부분의 중복을 여기서 걸러냄)
2단계: Redis 미스 시 DB 체크 (느리지만 영구 보장)

이렇게 하면 속도와 정확성을 모두 잡을 수 있어요.
🧑‍💻
주니어 개발자
Outbox Pattern이랑 Inbox Pattern은 같이 써야 하나요?
👨‍🏫
시니어 개발자
End-to-End 신뢰성이 필요하면 같이 써야 해요:

Outbox: Producer에서 "메시지가 반드시 발행되도록" 보장 (손실 방지)
Inbox: Consumer에서 "메시지가 한 번만 처리되도록" 보장 (중복 방지)

둘은 메시지 파이프라인의 양 끝에서 서로 다른 문제를 해결해요. 결제나 재고처럼 정확성이 중요한 도메인에서는 둘 다 적용하는 게 안전합니다.

⚠️ 주의사항 및 팁

⚠️ 트랜잭션 범위 주의 Inbox INSERT와 비즈니스 로직이 반드시 같은 트랜잭션에 있어야 합니다. 분리되면 "Inbox에만 기록되고 비즈니스 로직은 실패"하는 불일치가 발생합니다.
⚠️ Message ID 생성 위치 Message ID는 Producer(발신자)에서 생성해야 합니다. Consumer에서 생성하면 동일한 메시지에 다른 ID가 부여되어 중복 방지가 불가능합니다.
⚠️ Kafka Offset 의존 주의 Kafka offset(topic-partition-offset)을 Message ID로 쓸 때 주의: Consumer Group 변경이나 토픽 재생성 시 offset이 리셋되면 ID가 중복될 수 있습니다.
💡 처리 완료 시각 활용 processed_at 컬럼을 추가하면 "처리 중" 상태를 구분할 수 있습니다. 비정상 종료로 처리가 미완료된 메시지를 식별하고 재처리하는 데 유용합니다.
💡 Payload 저장의 장점 원본 메시지(payload)를 Inbox에 함께 저장하면:
  • 장애 분석 시 원본 데이터 확인 가능
  • 처리 실패한 메시지 수동 재처리 가능
  • 이벤트 소싱의 감사 로그 역할
💡 테스트 팁 Inbox Pattern 테스트 시 동시성 테스트를 반드시 포함하세요:
  • 동일 메시지를 동시에 N개 Consumer가 처리 시도
  • 정확히 1번만 비즈니스 로직이 실행되는지 검증
  • UNIQUE 제약 조건 예외가 정상 처리되는지 확인

🔗 관련 용어

📚 더 배우기