Apache Kafka
아파치 카프카
분산 이벤트 스트리밍 플랫폼. 고처리량, 내구성, 수평 확장성을 갖춘 실시간 데이터 파이프라인의 핵심 기술입니다.
아파치 카프카
분산 이벤트 스트리밍 플랫폼. 고처리량, 내구성, 수평 확장성을 갖춘 실시간 데이터 파이프라인의 핵심 기술입니다.
Apache Kafka는 LinkedIn에서 개발하고 Apache Software Foundation에 기증한 오픈소스 분산 이벤트 스트리밍 플랫폼입니다. 초당 수백만 건의 메시지를 처리할 수 있는 고처리량, 디스크 기반 저장으로 데이터 손실 없는 내구성, 브로커 추가만으로 확장 가능한 수평 확장성이 핵심 특징입니다.
핵심 개념:
• Topic: 메시지가 저장되는 카테고리. 여러 Partition으로 분할됩니다.
• Partition: Topic을 병렬 처리 단위로 분할. 각 파티션 내 메시지는 순서가 보장됩니다.
• Producer: 메시지를 Topic에 발행하는 클라이언트.
• Consumer: Topic에서 메시지를 구독하는 클라이언트.
• Consumer Group: 여러 Consumer가 협력하여 하나의 Topic을 소비. 각 Partition은 그룹 내 한 Consumer에게만 할당됩니다.
• Offset: 파티션 내 메시지의 위치. Consumer가 어디까지 읽었는지 추적합니다.
Kafka vs 일반 메시지 큐: RabbitMQ 같은 전통적 메시지 큐는 메시지 소비 후 삭제하지만, Kafka는 설정된 보존 기간(retention) 동안 메시지를 유지합니다. 이로 인해 여러 Consumer가 같은 메시지를 독립적으로 소비할 수 있고, 필요시 과거 데이터를 재처리(replay)할 수 있습니다.
사용 사례: 실시간 로그 수집, 이벤트 드리븐 마이크로서비스 통신, CDC(Change Data Capture), 실시간 분석, ML 피처 파이프라인 등에 널리 사용됩니다. Netflix, Uber, LinkedIn 등 대규모 서비스에서 핵심 인프라로 활용합니다.
// === Node.js: KafkaJS (가장 인기 있는 Node Kafka 클라이언트) ===
import { Kafka, Partitioners } from 'kafkajs';
// 1. Kafka 클라이언트 생성
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092', 'localhost:9093', 'localhost:9094'],
// 프로덕션에서는 SASL/SSL 인증 추가
// ssl: true,
// sasl: { mechanism: 'scram-sha-256', username: 'user', password: 'pass' }
});
// 2. Producer: 메시지 발행
const producer = kafka.producer({
createPartitioner: Partitioners.DefaultPartitioner
});
async function produceMessages() {
await producer.connect();
// 단일 메시지
await producer.send({
topic: 'orders',
messages: [
{
key: 'order-123', // 같은 key는 같은 partition으로 (순서 보장)
value: JSON.stringify({
orderId: '123',
userId: 'user-456',
items: [{ sku: 'PROD-001', qty: 2 }],
total: 59900,
timestamp: Date.now()
}),
headers: { 'correlation-id': 'req-abc' }
}
]
});
// 배치 전송 (성능 최적화)
await producer.sendBatch({
topicMessages: [
{
topic: 'orders',
messages: Array.from({ length: 100 }, (_, i) => ({
key: `order-${i}`,
value: JSON.stringify({ orderId: i, total: Math.random() * 100000 })
}))
}
]
});
await producer.disconnect();
}
// 3. Consumer: 메시지 구독
const consumer = kafka.consumer({ groupId: 'order-processor' });
async function consumeMessages() {
await consumer.connect();
await consumer.subscribe({ topic: 'orders', fromBeginning: false });
await consumer.run({
// autoCommit: true (기본값), 수동 커밋이 필요하면 false
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
console.log(`[P${partition}] Order ${order.orderId}: ${order.total}원`);
// 비즈니스 로직 처리
await processOrder(order);
// 수동 커밋 시: await consumer.commitOffsets([{ topic, partition, offset: message.offset }]);
}
});
}
// 4. 관리자 기능: Topic 생성
const admin = kafka.admin();
async function createTopics() {
await admin.connect();
await admin.createTopics({
topics: [{
topic: 'orders',
numPartitions: 6, // 병렬 처리 수준
replicationFactor: 3, // 복제본 수 (브로커 수 이하)
configEntries: [
{ name: 'retention.ms', value: '604800000' }, // 7일 보존
{ name: 'cleanup.policy', value: 'delete' }
]
}]
});
await admin.disconnect();
}
# === Python: kafka-python 또는 confluent-kafka ===
from kafka import KafkaProducer, KafkaConsumer
from kafka.admin import KafkaAdminClient, NewTopic
import json
# 1. Producer 생성
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: k.encode('utf-8') if k else None,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
acks='all', # 모든 복제본이 기록 확인 후 응답
retries=3,
batch_size=16384, # 배치 크기 (bytes)
linger_ms=10 # 배치 대기 시간 (ms)
)
# 메시지 발행
def send_order(order):
future = producer.send(
topic='orders',
key=order['order_id'],
value=order,
headers=[('source', b'order-service')]
)
# 동기 전송 (응답 대기)
result = future.get(timeout=10)
print(f"Sent to partition {result.partition}, offset {result.offset}")
send_order({
'order_id': 'order-123',
'user_id': 'user-456',
'total': 59900,
'items': [{'sku': 'PROD-001', 'qty': 2}]
})
producer.flush() # 모든 메시지 전송 완료 대기
# 2. Consumer 생성
consumer = KafkaConsumer(
'orders',
bootstrap_servers=['localhost:9092'],
group_id='order-processor',
auto_offset_reset='earliest', # 처음부터 읽기 (latest: 최신부터)
enable_auto_commit=True,
auto_commit_interval_ms=5000,
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
# 메시지 소비
def process_orders():
for message in consumer:
order = message.value
print(f"[P{message.partition}:O{message.offset}] Order: {order['order_id']}")
# 비즈니스 로직 처리
# consumer.commit() # 수동 커밋 시
# 3. Admin: Topic 생성
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic = NewTopic(
name='orders',
num_partitions=6,
replication_factor=3,
topic_configs={
'retention.ms': '604800000', # 7일
'cleanup.policy': 'delete'
}
)
admin.create_topics([topic])
# === docker-compose.yml: Kafka + Zookeeper 로컬 환경 ===
version: "3.9"
services:
# ZooKeeper (Kafka 메타데이터 관리) - KRaft 모드로 대체 가능
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
# Kafka Broker
kafka:
image: confluentinc/cp-kafka:7.5.0
hostname: kafka
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
# 로그 보존 설정
KAFKA_LOG_RETENTION_HOURS: 168 # 7일
KAFKA_LOG_SEGMENT_BYTES: 1073741824 # 1GB
KAFKA_LOG_RETENTION_CHECK_INTERVAL_MS: 300000
# Kafka UI (웹 관리 도구)
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
ports:
- "8080:8080"
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
# === CLI 명령어 ===
# Topic 생성
# docker exec kafka kafka-topics --create --topic orders --partitions 6 --replication-factor 1 --bootstrap-server localhost:9092
# Topic 목록 확인
# docker exec kafka kafka-topics --list --bootstrap-server localhost:9092
# Consumer Group 상태 확인
# docker exec kafka kafka-consumer-groups --describe --group order-processor --bootstrap-server localhost:9092
# 메시지 확인 (콘솔 컨슈머)
# docker exec kafka kafka-console-consumer --topic orders --from-beginning --bootstrap-server localhost:9092
"주문 서비스와 배송 서비스 사이에 Kafka를 넣으면 디커플링이 됩니다. 주문이 들어오면 Kafka에 이벤트를 발행하고, 배송 서비스는 구독해서 처리하면 돼요. 주문 서비스 장애가 배송 서비스에 전파되지 않고, 배송 서비스 장애 시에도 이벤트는 Kafka에 쌓여있어서 복구 후 재처리됩니다."
"Kafka에서 메시지 순서를 보장하려면 같은 키를 사용해야 합니다. 예를 들어 주문 ID를 키로 사용하면 같은 주문의 이벤트들은 같은 파티션으로 가서 순서가 보장됩니다. 단, Consumer Group 내에서 각 파티션은 한 Consumer에게만 할당되므로, 파티션 수가 Consumer 수의 상한이 됩니다."
"Consumer lag이 급증하고 있어요. kafka-consumer-groups 명령으로 확인해보니 특정 파티션에서 처리가 느려지고 있네요. 일단 Consumer 인스턴스를 늘리면 부하 분산되고, 근본적으로는 메시지 처리 로직 병목을 찾아야 합니다. 배치 처리나 비동기 I/O로 개선 가능한지 확인해 볼게요."
"직접 운영하면 인프라 비용은 낮지만 운영 부담이 커요. AWS MSK나 Confluent Cloud 같은 관리형 서비스를 쓰면 운영 부담은 줄지만 메시지 처리량당 비용이 발생합니다. 월 10억 메시지 정도면 관리형이 ROI가 좋고, 그 이상이면 직접 운영 검토해볼 만합니다."
파티션 수를 늘리면 기존 키-파티션 매핑이 깨집니다. 같은 키가 다른 파티션으로 갈 수 있어 순서 보장이 무너집니다. 초기에 충분히 설계하고, 변경이 필요하면 새 토픽 생성 + 마이그레이션을 권장합니다.
새 Consumer Group이 처음 구독할 때 어디서부터 읽을지 설정입니다. 'earliest'면 처음부터, 'latest'면 최신부터. 미설정 시 offset이 없으면 에러가 발생합니다. 용도에 맞게 명시적으로 설정하세요.
기본 메시지 크기 제한은 1MB입니다. 큰 파일을 Kafka로 전송하지 마세요. 대신 S3에 저장하고 URL만 Kafka로 전달하는 패턴을 사용하세요.
Consumer가 같은 메시지를 여러 번 받을 수 있습니다 (재시도, 리밸런싱 시). 메시지 처리 로직이 멱등하도록 설계하세요. 예: DB에 이벤트 ID로 중복 체크 후 처리.