🏗️ 아키텍처

RabbitMQ

RabbitMQ

오픈소스 메시지 브로커. AMQP 프로토콜, 유연한 라우팅.

📖 상세 설명

RabbitMQ는 Erlang/OTP로 작성된 오픈소스 메시지 브로커입니다. AMQP(Advanced Message Queuing Protocol)를 기본 프로토콜로 사용하며, MQTT, STOMP, HTTP 등 다양한 프로토콜도 지원합니다. 2007년 Rabbit Technologies에서 처음 개발되었고, 현재는 VMware(Broadcom)에서 관리합니다.

핵심 구성요소:

  • Producer: 메시지를 생성하여 Exchange에 전송
  • Exchange: 라우팅 규칙에 따라 Queue로 메시지 분배 (Direct, Topic, Fanout, Headers)
  • Queue: 메시지가 실제로 저장되는 버퍼
  • Binding: Exchange와 Queue를 연결하는 규칙
  • Consumer: Queue에서 메시지를 소비

Exchange 타입:

  • Direct: 정확한 routing key 매칭 (point-to-point)
  • Topic: 패턴 매칭 (*.logs, error.#)
  • Fanout: 연결된 모든 큐에 브로드캐스트
  • Headers: 메시지 헤더 기반 라우팅

주요 특징:

  • 신뢰성: 메시지 영속성, Publisher Confirms, Consumer Acknowledgments
  • 클러스터링: 여러 노드를 클러스터로 구성하여 고가용성 확보
  • 미러링: Queue Mirroring으로 데이터 복제 (Classic) 또는 Quorum Queues (권장)
  • Management UI: 웹 기반 관리 콘솔 제공 (포트 15672)
  • 플러그인 시스템: Federation, Shovel, Delayed Message 등 확장 가능

Kafka vs RabbitMQ: RabbitMQ는 복잡한 라우팅, 요청-응답 패턴, 작업 큐에 적합합니다. Kafka는 대용량 스트리밍, 로그 수집, 이벤트 소싱에 더 적합합니다. RabbitMQ는 메시지를 "푸시"하고, Kafka는 Consumer가 "풀"하는 방식입니다.

💻 코드 예제

Python - pika 라이브러리

import pika
import json

# 연결 설정
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(
    pika.ConnectionParameters(
        host='localhost',
        port=5672,
        credentials=credentials,
        heartbeat=600,
        blocked_connection_timeout=300
    )
)
channel = connection.channel()

# Exchange 및 Queue 선언
channel.exchange_declare(
    exchange='orders',
    exchange_type='topic',
    durable=True  # 브로커 재시작 시에도 유지
)

channel.queue_declare(
    queue='order_processing',
    durable=True,
    arguments={
        'x-message-ttl': 86400000,  # 24시간 후 만료
        'x-dead-letter-exchange': 'orders_dlx',  # 실패 메시지 처리
        'x-max-priority': 10  # 우선순위 지원
    }
)

# Binding 설정 (Topic Exchange)
channel.queue_bind(
    exchange='orders',
    queue='order_processing',
    routing_key='order.*.created'  # order.pizza.created, order.book.created 등
)

# Producer: 메시지 발행
def publish_order(order_data, order_type):
    channel.basic_publish(
        exchange='orders',
        routing_key=f'order.{order_type}.created',
        body=json.dumps(order_data),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 메시지 영속성
            content_type='application/json',
            priority=order_data.get('priority', 0),
            headers={'source': 'web-app'}
        )
    )
    print(f"Published order: {order_data['id']}")

# Consumer: 메시지 소비
def on_message(ch, method, properties, body):
    try:
        order = json.loads(body)
        print(f"Processing order: {order['id']}")

        # 비즈니스 로직 처리
        process_order(order)

        # 성공 시 ACK
        ch.basic_ack(delivery_tag=method.delivery_tag)
    except Exception as e:
        print(f"Error: {e}")
        # 실패 시 NACK (재시도 또는 DLQ로 이동)
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)

# QoS 설정: 동시에 처리할 메시지 수 제한
channel.basic_qos(prefetch_count=10)

# 소비 시작
channel.basic_consume(
    queue='order_processing',
    on_message_callback=on_message,
    auto_ack=False  # 수동 ACK 모드
)

print("Waiting for messages...")
channel.start_consuming()

Node.js - amqplib

const amqp = require('amqplib');

class RabbitMQService {
  constructor() {
    this.connection = null;
    this.channel = null;
  }

  async connect() {
    this.connection = await amqp.connect({
      protocol: 'amqp',
      hostname: 'localhost',
      port: 5672,
      username: 'guest',
      password: 'guest',
      vhost: '/',
    });

    // 연결 에러 핸들링
    this.connection.on('error', (err) => {
      console.error('Connection error:', err);
      setTimeout(() => this.connect(), 5000);
    });

    this.channel = await this.connection.createChannel();

    // Prefetch 설정
    await this.channel.prefetch(10);

    return this;
  }

  async setupQueues() {
    // Dead Letter Exchange 설정
    await this.channel.assertExchange('dlx', 'direct', { durable: true });
    await this.channel.assertQueue('dead_letters', { durable: true });
    await this.channel.bindQueue('dead_letters', 'dlx', 'dead');

    // 메인 큐 설정
    await this.channel.assertExchange('tasks', 'direct', { durable: true });
    await this.channel.assertQueue('task_queue', {
      durable: true,
      arguments: {
        'x-dead-letter-exchange': 'dlx',
        'x-dead-letter-routing-key': 'dead',
      },
    });
    await this.channel.bindQueue('task_queue', 'tasks', 'process');
  }

  async publish(exchange, routingKey, message, options = {}) {
    const content = Buffer.from(JSON.stringify(message));

    // Publisher Confirms 활성화
    await this.channel.confirmSelect();

    return new Promise((resolve, reject) => {
      this.channel.publish(exchange, routingKey, content, {
        persistent: true,
        contentType: 'application/json',
        timestamp: Date.now(),
        ...options,
      }, (err) => {
        if (err) reject(err);
        else resolve(true);
      });
    });
  }

  async consume(queue, handler) {
    await this.channel.consume(queue, async (msg) => {
      if (!msg) return;

      try {
        const content = JSON.parse(msg.content.toString());
        await handler(content, msg.properties);
        this.channel.ack(msg);
      } catch (error) {
        console.error('Processing error:', error);
        // 3번까지 재시도 후 DLQ로
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
        if (retryCount < 3) {
          await this.publish(msg.fields.exchange, msg.fields.routingKey,
            JSON.parse(msg.content.toString()), {
              headers: { 'x-retry-count': retryCount }
            });
        }
        this.channel.nack(msg, false, false);
      }
    });
  }
}

// 사용 예시
(async () => {
  const rabbit = await new RabbitMQService().connect();
  await rabbit.setupQueues();

  // Producer
  await rabbit.publish('tasks', 'process', { taskId: 123, type: 'email' });

  // Consumer
  await rabbit.consume('task_queue', async (task) => {
    console.log('Processing:', task);
  });
})();

Docker Compose - RabbitMQ 클러스터

# docker-compose.yml
version: '3.8'

services:
  rabbitmq1:
    image: rabbitmq:3.12-management
    hostname: rabbitmq1
    environment:
      - RABBITMQ_ERLANG_COOKIE=secret_cookie
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=secure_password
    ports:
      - "5672:5672"   # AMQP
      - "15672:15672" # Management UI
    volumes:
      - rabbitmq1_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 30s
      timeout: 30s
      retries: 3

  rabbitmq2:
    image: rabbitmq:3.12-management
    hostname: rabbitmq2
    environment:
      - RABBITMQ_ERLANG_COOKIE=secret_cookie
    depends_on:
      - rabbitmq1
    volumes:
      - rabbitmq2_data:/var/lib/rabbitmq

volumes:
  rabbitmq1_data:
  rabbitmq2_data:

# rabbitmq.conf
# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
# cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
# cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq2

# Quorum Queue (권장)
# quorum_queue.default_replication_factor = 3

🗣️ 실무 대화 예시

기술 미팅

시니어: "메시지 브로커로 Kafka 대신 RabbitMQ 선택한 이유가 뭐야?"

개발자: "저희 서비스는 복잡한 라우팅이 필요해요. 주문 유형별로 다른 큐로 분배하고, 일부는 브로드캐스트해야 하는데 RabbitMQ의 Exchange 타입들이 딱 맞아요. 또 메시지당 처리 보장이 중요한데, RabbitMQ ACK/NACK가 직관적이고요."

시니어: "트래픽 피크 시 처리량은 괜찮아?"

개발자: "초당 2-3만 메시지 정도면 3노드 클러스터로 충분해요. Kafka처럼 초당 수십만은 안 되지만, 저희 워크로드엔 과분하죠. 오히려 메시지 라우팅 유연성이랑 관리 편의성이 더 중요했어요."

기술 면접

면접관: "RabbitMQ에서 메시지 손실을 방지하려면 어떻게 해야 하나요?"

지원자: "세 가지 레벨에서 보장해야 합니다. 첫째, Producer 측에서 Publisher Confirms로 메시지가 브로커에 도착했는지 확인합니다. 둘째, 메시지 자체를 persistent로 설정하고 Queue도 durable로 선언해서 브로커 재시작에도 유지되게 합니다. 셋째, Consumer는 auto_ack=False로 설정하고 처리 완료 후 수동 ACK를 보냅니다."

면접관: "Consumer가 처리 중 죽으면요?"

지원자: "ACK 전에 연결이 끊기면 메시지는 unacked 상태가 되고 다른 Consumer에게 requeue됩니다. 무한 재시도를 막으려면 DLX(Dead Letter Exchange)를 설정해서 일정 횟수 실패 시 별도 큐로 이동시키고 나중에 분석하거나 수동 처리합니다."

장애 대응

운영팀: "RabbitMQ 메모리 알람 떴는데 Consumer가 계속 죽어요!"

개발자: "Management UI에서 큐별 메시지 수량 확인해볼게요. unacked 메시지가 쌓이고 있으면 Consumer가 처리를 못 따라가는 거예요. prefetch_count가 너무 높거나 Consumer 로직이 느릴 수 있어요."

운영팀: "ready 메시지가 50만 개나 쌓여있어요."

개발자: "일단 Consumer 인스턴스를 스케일아웃 하고, 급한 메시지부터 처리하게 priority 설정 확인해요. 근본적으론 Producer 속도를 늦추거나 lazy queue로 전환해서 메모리 대신 디스크에 저장하게 설정해야 해요."

⚠️ 주의사항

🔗 관련 용어

Message Queue AMQP Apache Kafka Pub-Sub Pattern Event-Driven Architecture Dead Letter Queue

📚 더 배우기