🏗️
아키텍처
Inbox Pattern
인박스 패턴 · Idempotent Consumer Pattern
Inbox Pattern은 메시지 기반 분산 시스템에서 중복 메시지 처리를 방지하는 멱등성(Idempotency) 패턴입니다. 수신된 메시지의 고유 ID를 별도 테이블(Inbox)에 저장하여, 동일한 메시지가 다시 도착해도 중복 처리하지 않습니다.
인박스 패턴 · Idempotent Consumer Pattern
Inbox Pattern은 메시지 기반 분산 시스템에서 중복 메시지 처리를 방지하는 멱등성(Idempotency) 패턴입니다. 수신된 메시지의 고유 ID를 별도 테이블(Inbox)에 저장하여, 동일한 메시지가 다시 도착해도 중복 처리하지 않습니다.
분산 메시지 시스템에서 At-Least-Once 전달 보장은 메시지 손실을 방지하지만, 네트워크 장애나 Consumer 재시작 시 동일 메시지가 여러 번 전달될 수 있습니다. 이로 인해:
Inbox Pattern은 수신된 메시지의 고유 식별자(Message ID)를 영속 저장소(Inbox 테이블)에 기록하여 중복 여부를 판단합니다:
Inbox 테이블은 간단한 구조로 설계할 수 있습니다:
-- 기본 Inbox 테이블
CREATE TABLE inbox (
message_id VARCHAR(255) PRIMARY KEY, -- 메시지 고유 ID (UUID, Correlation ID 등)
message_type VARCHAR(100) NOT NULL, -- 메시지 타입 (OrderCreated, PaymentCompleted 등)
received_at TIMESTAMP DEFAULT NOW(), -- 수신 시각
processed_at TIMESTAMP, -- 처리 완료 시각 (nullable)
payload JSONB -- 원본 메시지 저장 (디버깅/재처리용, optional)
);
-- 조회 성능을 위한 인덱스
CREATE INDEX idx_inbox_received_at ON inbox(received_at);
CREATE INDEX idx_inbox_message_type ON inbox(message_type);
-- 오래된 레코드 정리를 위한 파티셔닝 (선택사항)
-- 메시지 보관 기간에 따라 일별/주별 파티션 구성
Inbox와 Outbox는 메시지 처리의 양 끝단에서 신뢰성을 보장하는 상보적 패턴입니다:
| 구분 | Inbox Pattern | Outbox Pattern |
|---|---|---|
| 위치 | Consumer (수신측) | Producer (발신측) |
| 목적 | 중복 메시지 처리 방지 | 메시지 손실 방지 |
| 동작 | 메시지 ID로 중복 체크 후 처리 | 비즈니스 로직과 메시지를 한 트랜잭션에 저장 |
| 해결 문제 | At-Least-Once로 인한 중복 처리 | DB 커밋 후 메시지 발행 실패 |
| 테이블 역할 | 처리 완료된 메시지 ID 기록 | 발행할 메시지 임시 저장 |
| 함께 사용 | Outbox → Message Queue → Inbox (End-to-End 신뢰성) | |
Node.js 환경에서 Inbox Pattern을 구현한 완전한 예제입니다:
// inbox/inbox.entity.ts
import { Entity, PrimaryColumn, Column, CreateDateColumn } from 'typeorm';
@Entity('inbox')
export class InboxEntity {
@PrimaryColumn('varchar', { length: 255 })
messageId: string;
@Column('varchar', { length: 100 })
messageType: string;
@CreateDateColumn()
receivedAt: Date;
@Column('timestamp', { nullable: true })
processedAt: Date | null;
@Column('jsonb', { nullable: true })
payload: Record<string, unknown>;
}
// inbox/inbox.repository.ts
import { Injectable } from '@nestjs/common';
import { DataSource, EntityManager } from 'typeorm';
import { InboxEntity } from './inbox.entity';
@Injectable()
export class InboxRepository {
constructor(private dataSource: DataSource) {}
/**
* 메시지가 이미 처리되었는지 확인
*/
async isProcessed(messageId: string): Promise<boolean> {
const existing = await this.dataSource.manager.findOne(InboxEntity, {
where: { messageId }
});
return existing !== null;
}
/**
* Inbox에 메시지 등록 (트랜잭션 내에서 사용)
*/
async recordMessage(
manager: EntityManager,
messageId: string,
messageType: string,
payload?: Record<string, unknown>
): Promise<void> {
const inbox = manager.create(InboxEntity, {
messageId,
messageType,
payload,
processedAt: null
});
await manager.save(inbox);
}
/**
* 처리 완료 시각 업데이트
*/
async markAsProcessed(
manager: EntityManager,
messageId: string
): Promise<void> {
await manager.update(InboxEntity, { messageId }, {
processedAt: new Date()
});
}
}
// message/message-handler.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { DataSource } from 'typeorm';
import { InboxRepository } from '../inbox/inbox.repository';
interface OrderCreatedEvent {
orderId: string;
customerId: string;
items: Array<{ productId: string; quantity: number; price: number }>;
totalAmount: number;
}
@Injectable()
export class MessageHandlerService {
private readonly logger = new Logger(MessageHandlerService.name);
constructor(
private dataSource: DataSource,
private inboxRepository: InboxRepository
) {}
/**
* Inbox Pattern을 적용한 메시지 처리
*/
async handleOrderCreated(
messageId: string,
event: OrderCreatedEvent
): Promise<void> {
// 1. 중복 체크 (빠른 실패)
const alreadyProcessed = await this.inboxRepository.isProcessed(messageId);
if (alreadyProcessed) {
this.logger.log(`Duplicate message detected: ${messageId}, skipping...`);
return; // ACK 전송 (메시지 큐에서 제거)
}
// 2. 트랜잭션으로 Inbox 기록 + 비즈니스 로직 실행
await this.dataSource.transaction(async (manager) => {
// 2-1. Inbox에 메시지 등록 (Race condition 방지를 위해 트랜잭션 내에서)
try {
await this.inboxRepository.recordMessage(
manager,
messageId,
'OrderCreated',
event as Record<string, unknown>
);
} catch (error: any) {
// UNIQUE 제약 조건 위반 = 다른 인스턴스가 먼저 처리함
if (error.code === '23505') { // PostgreSQL unique violation
this.logger.log(`Concurrent duplicate: ${messageId}, skipping...`);
return;
}
throw error;
}
// 2-2. 비즈니스 로직 실행
await this.processOrderCreated(manager, event);
// 2-3. 처리 완료 표시
await this.inboxRepository.markAsProcessed(manager, messageId);
});
this.logger.log(`Successfully processed OrderCreated: ${messageId}`);
}
private async processOrderCreated(
manager: any,
event: OrderCreatedEvent
): Promise<void> {
// 실제 비즈니스 로직 구현
// - 주문 엔티티 생성/업데이트
// - 재고 감소
// - 포인트 적립
// - 알림 발송 이벤트 생성 (Outbox Pattern 사용)
this.logger.log(`Processing order: ${event.orderId} for customer: ${event.customerId}`);
// 예시: 주문 저장
await manager.query(
`INSERT INTO orders (id, customer_id, total_amount, status, created_at)
VALUES ($1, $2, $3, 'CREATED', NOW())`,
[event.orderId, event.customerId, event.totalAmount]
);
// 예시: 주문 아이템 저장
for (const item of event.items) {
await manager.query(
`INSERT INTO order_items (order_id, product_id, quantity, price)
VALUES ($1, $2, $3, $4)`,
[event.orderId, item.productId, item.quantity, item.price]
);
}
}
}
// message/kafka-consumer.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Kafka, Consumer, EachMessagePayload } from 'kafkajs';
import { MessageHandlerService } from './message-handler.service';
@Injectable()
export class KafkaConsumer implements OnModuleInit {
private consumer: Consumer;
constructor(private messageHandler: MessageHandlerService) {
const kafka = new Kafka({
clientId: 'order-service',
brokers: ['kafka:9092']
});
this.consumer = kafka.consumer({ groupId: 'order-service-group' });
}
async onModuleInit() {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'order-events', fromBeginning: false });
await this.consumer.run({
eachMessage: async (payload: EachMessagePayload) => {
const { message } = payload;
// Message ID 추출 (Kafka에서는 보통 헤더에 포함)
const messageId = message.headers?.['message-id']?.toString()
?? message.key?.toString()
?? `${payload.topic}-${payload.partition}-${message.offset}`;
const event = JSON.parse(message.value?.toString() ?? '{}');
// Inbox Pattern이 적용된 핸들러 호출
await this.messageHandler.handleOrderCreated(messageId, event);
}
});
}
}
Spring Boot와 JPA를 사용한 Inbox Pattern 구현입니다:
// entity/InboxMessage.java
package com.example.inbox.entity;
import jakarta.persistence.*;
import java.time.Instant;
@Entity
@Table(name = "inbox")
public class InboxMessage {
@Id
@Column(name = "message_id", length = 255)
private String messageId;
@Column(name = "message_type", length = 100, nullable = false)
private String messageType;
@Column(name = "received_at", nullable = false)
private Instant receivedAt = Instant.now();
@Column(name = "processed_at")
private Instant processedAt;
@Column(name = "payload", columnDefinition = "jsonb")
private String payload;
// Getters, setters, constructors...
public static InboxMessage create(String messageId, String messageType, String payload) {
InboxMessage inbox = new InboxMessage();
inbox.messageId = messageId;
inbox.messageType = messageType;
inbox.payload = payload;
return inbox;
}
public void markAsProcessed() {
this.processedAt = Instant.now();
}
}
// repository/InboxRepository.java
package com.example.inbox.repository;
import com.example.inbox.entity.InboxMessage;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
@Repository
public interface InboxRepository extends JpaRepository<InboxMessage, String> {
boolean existsByMessageId(String messageId);
}
// service/IdempotentMessageHandler.java
package com.example.inbox.service;
import com.example.inbox.entity.InboxMessage;
import com.example.inbox.repository.InboxRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.function.Consumer;
@Service
public class IdempotentMessageHandler {
private static final Logger log = LoggerFactory.getLogger(IdempotentMessageHandler.class);
private final InboxRepository inboxRepository;
public IdempotentMessageHandler(InboxRepository inboxRepository) {
this.inboxRepository = inboxRepository;
}
/**
* 멱등성이 보장된 메시지 처리 템플릿
*
* @param messageId 메시지 고유 ID
* @param messageType 메시지 타입
* @param payload 원본 페이로드 (JSON 문자열)
* @param handler 실제 비즈니스 로직
*/
@Transactional
public void handleIdempotently(
String messageId,
String messageType,
String payload,
Consumer<String> handler
) {
// 1. 빠른 중복 체크 (트랜잭션 외부)
if (inboxRepository.existsByMessageId(messageId)) {
log.info("Duplicate message detected: {}, skipping", messageId);
return;
}
// 2. Inbox 기록 시도 (트랜잭션 내부)
InboxMessage inbox = InboxMessage.create(messageId, messageType, payload);
try {
inboxRepository.save(inbox);
} catch (DataIntegrityViolationException e) {
// 동시성 경쟁에서 다른 인스턴스가 먼저 INSERT 성공
log.info("Concurrent duplicate detected: {}, skipping", messageId);
return;
}
// 3. 비즈니스 로직 실행
handler.accept(payload);
// 4. 처리 완료 표시
inbox.markAsProcessed();
inboxRepository.save(inbox);
log.info("Successfully processed message: {}", messageId);
}
}
// listener/OrderEventListener.java
package com.example.order.listener;
import com.example.inbox.service.IdempotentMessageHandler;
import com.example.order.service.OrderService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
public class OrderEventListener {
private final IdempotentMessageHandler idempotentHandler;
private final OrderService orderService;
private final ObjectMapper objectMapper;
public OrderEventListener(
IdempotentMessageHandler idempotentHandler,
OrderService orderService,
ObjectMapper objectMapper
) {
this.idempotentHandler = idempotentHandler;
this.orderService = orderService;
this.objectMapper = objectMapper;
}
@KafkaListener(topics = "order-events", groupId = "order-service")
public void handleOrderEvent(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_KEY) String messageKey,
@Header(value = "message-id", required = false) String messageId
) {
// Message ID 결정 (헤더 우선, 없으면 Kafka key 사용)
String effectiveMessageId = messageId != null ? messageId : messageKey;
// Inbox Pattern 적용
idempotentHandler.handleIdempotently(
effectiveMessageId,
"OrderCreated",
payload,
(p) -> {
try {
OrderCreatedEvent event = objectMapper.readValue(p, OrderCreatedEvent.class);
orderService.processOrderCreated(event);
} catch (Exception e) {
throw new RuntimeException("Failed to process order event", e);
}
}
);
}
}
Go 언어에서의 Inbox Pattern 구현입니다:
// inbox/handler.go
package inbox
import (
"context"
"database/sql"
"encoding/json"
"errors"
"log"
"time"
"github.com/lib/pq"
)
type InboxHandler struct {
db *sql.DB
}
func NewInboxHandler(db *sql.DB) *InboxHandler {
return &InboxHandler{db: db}
}
// HandleIdempotently wraps message processing with inbox pattern
func (h *InboxHandler) HandleIdempotently(
ctx context.Context,
messageID string,
messageType string,
payload json.RawMessage,
handler func(ctx context.Context, tx *sql.Tx) error,
) error {
// 1. Quick duplicate check
exists, err := h.checkExists(ctx, messageID)
if err != nil {
return err
}
if exists {
log.Printf("Duplicate message: %s, skipping", messageID)
return nil
}
// 2. Start transaction
tx, err := h.db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted})
if err != nil {
return err
}
defer tx.Rollback()
// 3. Try to insert into inbox (handles race condition)
err = h.insertInbox(ctx, tx, messageID, messageType, payload)
if err != nil {
var pqErr *pq.Error
if errors.As(err, &pqErr) && pqErr.Code == "23505" {
log.Printf("Concurrent duplicate: %s, skipping", messageID)
return nil
}
return err
}
// 4. Execute business logic
if err := handler(ctx, tx); err != nil {
return err
}
// 5. Mark as processed and commit
if err := h.markProcessed(ctx, tx, messageID); err != nil {
return err
}
return tx.Commit()
}
func (h *InboxHandler) checkExists(ctx context.Context, messageID string) (bool, error) {
var exists bool
err := h.db.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM inbox WHERE message_id = $1)",
messageID,
).Scan(&exists)
return exists, err
}
func (h *InboxHandler) insertInbox(
ctx context.Context,
tx *sql.Tx,
messageID, messageType string,
payload json.RawMessage,
) error {
_, err := tx.ExecContext(ctx,
`INSERT INTO inbox (message_id, message_type, payload, received_at)
VALUES ($1, $2, $3, $4)`,
messageID, messageType, payload, time.Now(),
)
return err
}
func (h *InboxHandler) markProcessed(ctx context.Context, tx *sql.Tx, messageID string) error {
_, err := tx.ExecContext(ctx,
"UPDATE inbox SET processed_at = $1 WHERE message_id = $2",
time.Now(), messageID,
)
return err
}
Inbox Pattern 외에도 중복 처리를 방지하는 다양한 전략이 있습니다:
| 전략 | 방식 | 장점 | 단점 |
|---|---|---|---|
| Inbox Pattern | Message ID를 별도 테이블에 저장 | 범용적, 모든 메시지 타입 적용 가능 | 추가 저장 공간, 테이블 관리 필요 |
| Upsert | INSERT ON CONFLICT UPDATE | 단순, 별도 테이블 불필요 | 특정 엔티티에만 적용, 삭제 연산 어려움 |
| Version Check | 엔티티 버전으로 상태 체크 | Optimistic Lock과 자연스럽게 결합 | 버전 관리 로직 필요, 복잡도 증가 |
| Natural Idempotency | 연산 자체가 멱등 (SET 연산 등) | 추가 구현 불필요 | 모든 연산에 적용 불가 (증감 연산 등) |
| Deduplication Service | Redis/Memcached로 ID 캐싱 | 매우 빠른 체크, 분산 환경 용이 | TTL 만료 후 중복 가능, 외부 의존성 |
시간이 지나면 Inbox 테이블은 계속 커집니다. 정리 전략이 필요합니다:
-- 1. 시간 기반 정리 (30일 이상 오래된 레코드 삭제)
DELETE FROM inbox
WHERE processed_at IS NOT NULL
AND processed_at < NOW() - INTERVAL '30 days';
-- 2. 파티셔닝 적용 (PostgreSQL 예시)
CREATE TABLE inbox (
message_id VARCHAR(255) NOT NULL,
message_type VARCHAR(100) NOT NULL,
received_at TIMESTAMP NOT NULL DEFAULT NOW(),
processed_at TIMESTAMP,
payload JSONB,
PRIMARY KEY (message_id, received_at)
) PARTITION BY RANGE (received_at);
-- 월별 파티션 생성
CREATE TABLE inbox_2025_01 PARTITION OF inbox
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE inbox_2025_02 PARTITION OF inbox
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- 오래된 파티션 삭제 (빠름)
DROP TABLE inbox_2024_12;
// 1. Redis를 이용한 2단계 중복 체크 (Hot Cache + Cold Storage)
class OptimizedInboxHandler {
constructor(
private redis: Redis,
private db: DataSource,
private inboxRepository: InboxRepository
) {}
async handleMessage(messageId: string, handler: () => Promise<void>) {
// Stage 1: Redis 캐시 체크 (매우 빠름)
const cached = await this.redis.get(`inbox:${messageId}`);
if (cached) {
return; // Cache hit = 이미 처리됨
}
// Stage 2: DB 체크 (캐시 미스 시에만)
const exists = await this.inboxRepository.isProcessed(messageId);
if (exists) {
// DB에 있으면 캐시에도 추가 (다음 중복 시 빠른 응답)
await this.redis.setex(`inbox:${messageId}`, 86400, '1'); // 24시간 TTL
return;
}
// Stage 3: 트랜잭션 처리
await this.db.transaction(async (manager) => {
await this.inboxRepository.recordMessage(manager, messageId, 'event', {});
await handler();
});
// Stage 4: 캐시 갱신
await this.redis.setex(`inbox:${messageId}`, 86400, '1');
}
}
// 2. Batch 처리로 처리량 향상
class BatchInboxHandler {
private batch: Array<{ messageId: string; handler: () => Promise<void> }> = [];
private readonly BATCH_SIZE = 100;
private readonly FLUSH_INTERVAL = 100; // ms
async addMessage(messageId: string, handler: () => Promise<void>) {
this.batch.push({ messageId, handler });
if (this.batch.length >= this.BATCH_SIZE) {
await this.flush();
}
}
async flush() {
if (this.batch.length === 0) return;
const currentBatch = [...this.batch];
this.batch = [];
// 1. Batch로 중복 체크
const messageIds = currentBatch.map(b => b.messageId);
const existingIds = await this.checkBatch(messageIds);
// 2. 중복 아닌 것만 처리
const newMessages = currentBatch.filter(
b => !existingIds.has(b.messageId)
);
// 3. Batch INSERT + 개별 핸들러 실행
await this.processBatch(newMessages);
}
private async checkBatch(ids: string[]): Promise<Set<string>> {
const results = await this.db.query(
`SELECT message_id FROM inbox WHERE message_id = ANY($1)`,
[ids]
);
return new Set(results.map((r: any) => r.message_id));
}
}
processed_at 컬럼을 추가하면 "처리 중" 상태를 구분할 수 있습니다.
비정상 종료로 처리가 미완료된 메시지를 식별하고 재처리하는 데 유용합니다.