RabbitMQ
RabbitMQ
오픈소스 메시지 브로커. AMQP 프로토콜, 유연한 라우팅.
RabbitMQ
오픈소스 메시지 브로커. AMQP 프로토콜, 유연한 라우팅.
RabbitMQ는 Erlang/OTP로 작성된 오픈소스 메시지 브로커입니다. AMQP(Advanced Message Queuing Protocol)를 기본 프로토콜로 사용하며, MQTT, STOMP, HTTP 등 다양한 프로토콜도 지원합니다. 2007년 Rabbit Technologies에서 처음 개발되었고, 현재는 VMware(Broadcom)에서 관리합니다.
핵심 구성요소:
Exchange 타입:
주요 특징:
Kafka vs RabbitMQ: RabbitMQ는 복잡한 라우팅, 요청-응답 패턴, 작업 큐에 적합합니다. Kafka는 대용량 스트리밍, 로그 수집, 이벤트 소싱에 더 적합합니다. RabbitMQ는 메시지를 "푸시"하고, Kafka는 Consumer가 "풀"하는 방식입니다.
import pika
import json
# 연결 설정
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='localhost',
port=5672,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
)
channel = connection.channel()
# Exchange 및 Queue 선언
channel.exchange_declare(
exchange='orders',
exchange_type='topic',
durable=True # 브로커 재시작 시에도 유지
)
channel.queue_declare(
queue='order_processing',
durable=True,
arguments={
'x-message-ttl': 86400000, # 24시간 후 만료
'x-dead-letter-exchange': 'orders_dlx', # 실패 메시지 처리
'x-max-priority': 10 # 우선순위 지원
}
)
# Binding 설정 (Topic Exchange)
channel.queue_bind(
exchange='orders',
queue='order_processing',
routing_key='order.*.created' # order.pizza.created, order.book.created 등
)
# Producer: 메시지 발행
def publish_order(order_data, order_type):
channel.basic_publish(
exchange='orders',
routing_key=f'order.{order_type}.created',
body=json.dumps(order_data),
properties=pika.BasicProperties(
delivery_mode=2, # 메시지 영속성
content_type='application/json',
priority=order_data.get('priority', 0),
headers={'source': 'web-app'}
)
)
print(f"Published order: {order_data['id']}")
# Consumer: 메시지 소비
def on_message(ch, method, properties, body):
try:
order = json.loads(body)
print(f"Processing order: {order['id']}")
# 비즈니스 로직 처리
process_order(order)
# 성공 시 ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Error: {e}")
# 실패 시 NACK (재시도 또는 DLQ로 이동)
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
# QoS 설정: 동시에 처리할 메시지 수 제한
channel.basic_qos(prefetch_count=10)
# 소비 시작
channel.basic_consume(
queue='order_processing',
on_message_callback=on_message,
auto_ack=False # 수동 ACK 모드
)
print("Waiting for messages...")
channel.start_consuming()
const amqp = require('amqplib');
class RabbitMQService {
constructor() {
this.connection = null;
this.channel = null;
}
async connect() {
this.connection = await amqp.connect({
protocol: 'amqp',
hostname: 'localhost',
port: 5672,
username: 'guest',
password: 'guest',
vhost: '/',
});
// 연결 에러 핸들링
this.connection.on('error', (err) => {
console.error('Connection error:', err);
setTimeout(() => this.connect(), 5000);
});
this.channel = await this.connection.createChannel();
// Prefetch 설정
await this.channel.prefetch(10);
return this;
}
async setupQueues() {
// Dead Letter Exchange 설정
await this.channel.assertExchange('dlx', 'direct', { durable: true });
await this.channel.assertQueue('dead_letters', { durable: true });
await this.channel.bindQueue('dead_letters', 'dlx', 'dead');
// 메인 큐 설정
await this.channel.assertExchange('tasks', 'direct', { durable: true });
await this.channel.assertQueue('task_queue', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'dead',
},
});
await this.channel.bindQueue('task_queue', 'tasks', 'process');
}
async publish(exchange, routingKey, message, options = {}) {
const content = Buffer.from(JSON.stringify(message));
// Publisher Confirms 활성화
await this.channel.confirmSelect();
return new Promise((resolve, reject) => {
this.channel.publish(exchange, routingKey, content, {
persistent: true,
contentType: 'application/json',
timestamp: Date.now(),
...options,
}, (err) => {
if (err) reject(err);
else resolve(true);
});
});
}
async consume(queue, handler) {
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
await handler(content, msg.properties);
this.channel.ack(msg);
} catch (error) {
console.error('Processing error:', error);
// 3번까지 재시도 후 DLQ로
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
await this.publish(msg.fields.exchange, msg.fields.routingKey,
JSON.parse(msg.content.toString()), {
headers: { 'x-retry-count': retryCount }
});
}
this.channel.nack(msg, false, false);
}
});
}
}
// 사용 예시
(async () => {
const rabbit = await new RabbitMQService().connect();
await rabbit.setupQueues();
// Producer
await rabbit.publish('tasks', 'process', { taskId: 123, type: 'email' });
// Consumer
await rabbit.consume('task_queue', async (task) => {
console.log('Processing:', task);
});
})();
# docker-compose.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3.12-management
hostname: rabbitmq1
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=secure_password
ports:
- "5672:5672" # AMQP
- "15672:15672" # Management UI
volumes:
- rabbitmq1_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 30s
timeout: 30s
retries: 3
rabbitmq2:
image: rabbitmq:3.12-management
hostname: rabbitmq2
environment:
- RABBITMQ_ERLANG_COOKIE=secret_cookie
depends_on:
- rabbitmq1
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
volumes:
rabbitmq1_data:
rabbitmq2_data:
# rabbitmq.conf
# cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
# cluster_formation.classic_config.nodes.1 = rabbit@rabbitmq1
# cluster_formation.classic_config.nodes.2 = rabbit@rabbitmq2
# Quorum Queue (권장)
# quorum_queue.default_replication_factor = 3
시니어: "메시지 브로커로 Kafka 대신 RabbitMQ 선택한 이유가 뭐야?"
개발자: "저희 서비스는 복잡한 라우팅이 필요해요. 주문 유형별로 다른 큐로 분배하고, 일부는 브로드캐스트해야 하는데 RabbitMQ의 Exchange 타입들이 딱 맞아요. 또 메시지당 처리 보장이 중요한데, RabbitMQ ACK/NACK가 직관적이고요."
시니어: "트래픽 피크 시 처리량은 괜찮아?"
개발자: "초당 2-3만 메시지 정도면 3노드 클러스터로 충분해요. Kafka처럼 초당 수십만은 안 되지만, 저희 워크로드엔 과분하죠. 오히려 메시지 라우팅 유연성이랑 관리 편의성이 더 중요했어요."
면접관: "RabbitMQ에서 메시지 손실을 방지하려면 어떻게 해야 하나요?"
지원자: "세 가지 레벨에서 보장해야 합니다. 첫째, Producer 측에서 Publisher Confirms로 메시지가 브로커에 도착했는지 확인합니다. 둘째, 메시지 자체를 persistent로 설정하고 Queue도 durable로 선언해서 브로커 재시작에도 유지되게 합니다. 셋째, Consumer는 auto_ack=False로 설정하고 처리 완료 후 수동 ACK를 보냅니다."
면접관: "Consumer가 처리 중 죽으면요?"
지원자: "ACK 전에 연결이 끊기면 메시지는 unacked 상태가 되고 다른 Consumer에게 requeue됩니다. 무한 재시도를 막으려면 DLX(Dead Letter Exchange)를 설정해서 일정 횟수 실패 시 별도 큐로 이동시키고 나중에 분석하거나 수동 처리합니다."
운영팀: "RabbitMQ 메모리 알람 떴는데 Consumer가 계속 죽어요!"
개발자: "Management UI에서 큐별 메시지 수량 확인해볼게요. unacked 메시지가 쌓이고 있으면 Consumer가 처리를 못 따라가는 거예요. prefetch_count가 너무 높거나 Consumer 로직이 느릴 수 있어요."
운영팀: "ready 메시지가 50만 개나 쌓여있어요."
개발자: "일단 Consumer 인스턴스를 스케일아웃 하고, 급한 메시지부터 처리하게 priority 설정 확인해요. 근본적으론 Producer 속도를 늦추거나 lazy queue로 전환해서 메모리 대신 디스크에 저장하게 설정해야 해요."