🏗️ 아키텍처

메시지 큐

Message Queue (MQ)

비동기 통신을 위한 메시지 중개 시스템입니다. Producer가 메시지를 큐에 넣으면 Consumer가 이를 처리하는 구조로, RabbitMQ, Apache Kafka, AWS SQS가 대표적입니다. 서비스 간 결합도를 낮추고 시스템 안정성을 높입니다.

📖 상세 설명

메시지 큐는 애플리케이션 간 비동기 통신을 가능하게 하는 미들웨어입니다. 발신자(Producer)가 메시지를 큐에 전송하면, 수신자(Consumer)가 자신의 속도로 메시지를 가져가 처리합니다. 이 패턴을 통해 시스템 간 시간적, 공간적 결합이 제거됩니다.

핵심 개념으로는 Producer(메시지를 생성하여 큐에 전송), Consumer(큐에서 메시지를 가져와 처리), Queue/Topic(메시지가 저장되는 버퍼), Broker(메시지 라우팅과 전달을 담당하는 서버)가 있습니다.

메시지 전달 보장 수준에는 At-most-once(최대 한 번, 메시지 손실 가능), At-least-once(최소 한 번, 중복 가능), Exactly-once(정확히 한 번, 가장 어려움)가 있습니다. 대부분의 시스템은 At-least-once를 기본으로 하며, Consumer 측에서 멱등성을 보장해야 합니다.

주요 솔루션 비교: RabbitMQ는 전통적인 메시지 브로커로 복잡한 라우팅에 강점이 있고, Apache Kafka는 분산 이벤트 스트리밍 플랫폼으로 대용량 실시간 처리에 적합하며, AWS SQS는 완전 관리형 서비스로 운영 부담이 없습니다. Redis Streams는 가벼운 메시지 큐가 필요할 때 유용합니다.

메시지 큐는 주문 처리, 이메일 발송, 로그 수집, 이벤트 소싱, 시스템 간 연동 등 다양한 시나리오에서 활용됩니다. 특히 마이크로서비스 아키텍처에서는 서비스 간 비동기 통신의 핵심 인프라로 사용됩니다.

💻 코드 예제

# 메시지 큐 예제 - RabbitMQ with Python (pika)
import pika
import json
from dataclasses import dataclass, asdict
from typing import Callable
import time

# ========================================
# Producer (발행자)
# ========================================
class MessageProducer:
    def __init__(self, host: str = 'localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()

    def declare_queue(self, queue_name: str, durable: bool = True):
        """큐 선언 - durable=True면 브로커 재시작 시에도 유지"""
        self.channel.queue_declare(queue=queue_name, durable=durable)

    def publish(self, queue_name: str, message: dict):
        """메시지 발행"""
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 메시지 영속성 (디스크에 저장)
                content_type='application/json'
            )
        )
        print(f"[x] Sent: {message}")

    def close(self):
        self.connection.close()


# ========================================
# Consumer (소비자)
# ========================================
class MessageConsumer:
    def __init__(self, host: str = 'localhost'):
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=host)
        )
        self.channel = self.connection.channel()

    def consume(self, queue_name: str, callback: Callable):
        """
        메시지 소비 시작
        - auto_ack=False: 수동 ACK (처리 완료 후 확인)
        - prefetch_count=1: 한 번에 하나씩 처리
        """
        self.channel.queue_declare(queue=queue_name, durable=True)
        self.channel.basic_qos(prefetch_count=1)

        def wrapper(ch, method, properties, body):
            message = json.loads(body)
            try:
                callback(message)
                # 성공 시 ACK - 메시지가 큐에서 제거됨
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"[!] Error: {e}")
                # 실패 시 NACK - 메시지를 다시 큐에 넣음
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

        self.channel.basic_consume(queue=queue_name, on_message_callback=wrapper)
        print(f"[*] Waiting for messages in {queue_name}")
        self.channel.start_consuming()


# ========================================
# 실제 사용 예제 - 주문 처리 시스템
# ========================================
@dataclass
class OrderEvent:
    order_id: str
    user_id: str
    product_id: str
    quantity: int
    total_price: float


def process_order(message: dict):
    """주문 처리 핸들러 (멱등성 보장 필요)"""
    order_id = message['order_id']

    # 1. 중복 처리 방지 (멱등성)
    if is_already_processed(order_id):
        print(f"[!] Order {order_id} already processed, skipping")
        return

    # 2. 실제 처리 로직
    print(f"[>] Processing order: {order_id}")
    time.sleep(1)  # 처리 시뮬레이션

    # 3. 처리 완료 기록
    mark_as_processed(order_id)
    print(f"[v] Order {order_id} completed")


def is_already_processed(order_id: str) -> bool:
    # Redis나 DB에서 확인 (예시)
    return False

def mark_as_processed(order_id: str):
    # Redis나 DB에 기록 (예시)
    pass


# Producer 사용
if __name__ == "__main__":
    # 주문 생성 (API 서버에서 호출)
    producer = MessageProducer()
    producer.declare_queue('orders')

    order = OrderEvent(
        order_id="ORD-12345",
        user_id="USER-001",
        product_id="PROD-100",
        quantity=2,
        total_price=59000.0
    )
    producer.publish('orders', asdict(order))
    producer.close()


# Consumer 사용 (별도 워커 프로세스)
# consumer = MessageConsumer()
# consumer.consume('orders', process_order)
// 메시지 큐 예제 - Bull (Redis 기반) with Node.js
import Bull from 'bull';
import Redis from 'ioredis';

// ========================================
// Queue 설정
// ========================================
const orderQueue = new Bull('order-processing', {
    redis: {
        host: 'localhost',
        port: 6379
    },
    defaultJobOptions: {
        attempts: 3,           // 실패 시 3회 재시도
        backoff: {
            type: 'exponential',
            delay: 1000        // 1초, 2초, 4초... 간격으로 재시도
        },
        removeOnComplete: 100, // 완료된 작업 100개만 유지
        removeOnFail: 50       // 실패한 작업 50개만 유지
    }
});

// ========================================
// Producer - 작업 추가
// ========================================
interface OrderJob {
    orderId: string;
    userId: string;
    items: Array<{ productId: string; quantity: number }>;
    totalPrice: number;
}

async function addOrderJob(order: OrderJob) {
    const job = await orderQueue.add('process-order', order, {
        priority: order.totalPrice > 100000 ? 1 : 10,  // 고액 주문 우선처리
        delay: 0,                                        // 즉시 실행
        jobId: order.orderId                            // 중복 방지
    });

    console.log(`[Producer] Added job ${job.id}`);
    return job;
}

// ========================================
// Consumer (Worker) - 작업 처리
// ========================================
orderQueue.process('process-order', 5, async (job) => {
    // 동시에 5개 작업 처리
    const order = job.data as OrderJob;

    console.log(`[Worker] Processing order ${order.orderId}`);

    // 진행 상황 업데이트
    await job.progress(10);

    // 1. 재고 확인
    await checkInventory(order.items);
    await job.progress(30);

    // 2. 결제 처리
    await processPayment(order.userId, order.totalPrice);
    await job.progress(60);

    // 3. 배송 요청
    await requestShipment(order.orderId, order.items);
    await job.progress(90);

    // 4. 완료 알림
    await sendNotification(order.userId, order.orderId);
    await job.progress(100);

    return { success: true, orderId: order.orderId };
});

// 헬퍼 함수들 (실제 구현 필요)
async function checkInventory(items: any[]) { /* ... */ }
async function processPayment(userId: string, amount: number) { /* ... */ }
async function requestShipment(orderId: string, items: any[]) { /* ... */ }
async function sendNotification(userId: string, orderId: string) { /* ... */ }

// ========================================
// 이벤트 핸들링
// ========================================
orderQueue.on('completed', (job, result) => {
    console.log(`[Event] Job ${job.id} completed:`, result);
});

orderQueue.on('failed', (job, err) => {
    console.error(`[Event] Job ${job.id} failed:`, err.message);
});

orderQueue.on('stalled', (job) => {
    console.warn(`[Event] Job ${job.id} stalled`);
});

// ========================================
// Dead Letter Queue (DLQ) 처리
// ========================================
const dlq = new Bull('order-dlq', { redis: { host: 'localhost' } });

orderQueue.on('failed', async (job, err) => {
    if (job.attemptsMade >= 3) {
        // 3회 실패 후 DLQ로 이동
        await dlq.add('failed-order', {
            originalJob: job.data,
            error: err.message,
            failedAt: new Date().toISOString()
        });
        console.log(`[DLQ] Moved job ${job.id} to dead letter queue`);
    }
});

// ========================================
// 사용 예시
// ========================================
async function main() {
    // 주문 생성
    await addOrderJob({
        orderId: 'ORD-12345',
        userId: 'USER-001',
        items: [
            { productId: 'PROD-100', quantity: 2 },
            { productId: 'PROD-200', quantity: 1 }
        ],
        totalPrice: 89000
    });
}

main();
# Apache Kafka 아키텍처 개념

# ========================================
# Kafka vs 전통적 메시지 큐 차이점
# ========================================

# 전통적 MQ (RabbitMQ, SQS):
# - Point-to-Point 또는 Pub/Sub
# - 메시지 소비 후 삭제
# - Consumer가 메시지를 "가져감"

# Kafka:
# - 분산 이벤트 스트리밍 플랫폼
# - 메시지를 일정 기간 보관 (재처리 가능)
# - Consumer가 offset으로 위치 관리
# - 파티션을 통한 병렬 처리

# ========================================
# Kafka 핵심 개념
# ========================================

# Topic: 메시지 카테고리 (예: orders, payments)
# Partition: Topic을 나눈 단위, 병렬 처리의 기본
# Producer: 메시지 발행자
# Consumer: 메시지 구독자
# Consumer Group: 같은 토픽을 구독하는 Consumer 집합
# Offset: 파티션 내 메시지 위치
# Broker: Kafka 서버 노드

# ========================================
# 파티션과 Consumer Group
# ========================================

# Topic: orders (3 파티션)
#
# Partition 0: [msg1, msg2, msg5, msg8...]
# Partition 1: [msg3, msg4, msg6, msg9...]
# Partition 2: [msg7, msg10...]
#
# Consumer Group A (주문 처리):
#   Consumer A1 -> Partition 0
#   Consumer A2 -> Partition 1
#   Consumer A3 -> Partition 2
#
# Consumer Group B (분석):
#   Consumer B1 -> 모든 Partition
#
# * 같은 그룹 내 Consumer는 파티션을 나눠 가짐
# * 다른 그룹은 독립적으로 모든 메시지 수신

# ========================================
# Kafka 설정 예시 (docker-compose.yml)
# ========================================
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.4.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_LOG_RETENTION_HOURS: 168  # 7일간 보관

  kafka-ui:
    image: provectuslabs/kafka-ui
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

# ========================================
# Kafka CLI 명령어
# ========================================

# 토픽 생성
# kafka-topics --create --topic orders \
#   --bootstrap-server localhost:9092 \
#   --partitions 3 --replication-factor 1

# 메시지 발행
# kafka-console-producer --topic orders \
#   --bootstrap-server localhost:9092

# 메시지 소비 (처음부터)
# kafka-console-consumer --topic orders \
#   --bootstrap-server localhost:9092 \
#   --from-beginning --group order-group

# Consumer Group 상태 확인
# kafka-consumer-groups --describe --group order-group \
#   --bootstrap-server localhost:9092

# ========================================
# 언제 Kafka를 선택하나?
# ========================================
# - 대용량 실시간 데이터 처리 (초당 수만 건)
# - 이벤트 소싱 / CQRS 패턴
# - 로그 집계 및 분석
# - 스트림 처리 (Kafka Streams, Flink)
# - 메시지 재처리가 필요한 경우

# RabbitMQ가 나은 경우:
# - 복잡한 라우팅 로직
# - 전통적인 작업 큐
# - 낮은 지연 시간 (ms 단위)
# - 간단한 설정

🗣️ 실무에서 이렇게 말하세요

💬 시스템 설계 회의에서
"결제 완료 후 이메일 발송을 동기로 처리하면 API 응답이 느려집니다. 메시지 큐에 이벤트를 넣고 별도 워커가 처리하면, 사용자에게 바로 응답하면서도 이메일은 확실히 발송할 수 있어요."
💬 장애 대응 회의에서
"이메일 서버가 잠시 죽었는데 메시지가 손실되지 않았습니다. 메시지 큐에 쌓여있다가 서버가 복구되면서 밀린 건 다 처리됐어요. 메시지 영속성과 재시도 설정 덕분입니다."
💬 기술 선택 논의에서
"실시간 로그 처리에는 Kafka가 맞습니다. 하루에 수억 건을 처리해야 하고, 나중에 재처리도 필요하거든요. 단순 작업 큐 용도라면 RabbitMQ나 Redis Queue로 충분하지만요."

⚠️ 주의사항 & 베스트 프랙티스

멱등성 미보장

At-least-once 전달에서 중복 메시지가 올 수 있습니다. Consumer에서 반드시 중복 처리를 방지하는 멱등성 로직이 필요합니다.

메시지 순서 보장

여러 Consumer가 있으면 순서가 뒤바뀔 수 있습니다. 순서가 중요하면 단일 파티션이나 ordering key를 사용하세요.

Dead Letter Queue 미구성

처리 실패 메시지가 무한 재시도되면 시스템이 마비됩니다. 반드시 DLQ를 설정하고 모니터링하세요.

베스트 프랙티스

메시지에 고유 ID 포함, 처리 상태 저장, 적절한 재시도 정책, DLQ 설정, 메시지 크기 최소화, 큐 모니터링 및 알림 설정.

🔗 관련 용어

📚 더 배우기