Apache Pulsar
분산 메시징 및 스트리밍 플랫폼
분산 메시징 및 스트리밍 플랫폼
Apache Pulsar는 Yahoo에서 개발한 클라우드 네이티브 분산 메시징 및 스트리밍 플랫폼으로, 메시지 큐와 스트림 처리를 단일 시스템에서 통합 제공합니다. Kafka의 대안으로 주목받고 있으며, 멀티 테넌시, 지역 복제(Geo-replication), 계층형 스토리지 등 엔터프라이즈급 기능을 기본 제공합니다.
Pulsar의 핵심 아키텍처는 서빙 레이어(Broker)와 스토리지 레이어(BookKeeper)를 분리한 것입니다. Broker는 프로듀서와 컨슈머의 요청을 처리하고, Apache BookKeeper는 메시지를 영구 저장합니다. 이 분리 덕분에 각 레이어를 독립적으로 확장할 수 있어, 컴퓨트 집약적 또는 스토리지 집약적 워크로드에 유연하게 대응할 수 있습니다.
멀티 테넌시는 Pulsar의 차별화된 기능으로, 단일 클러스터에서 여러 팀이나 애플리케이션을 격리하여 운영할 수 있습니다. 테넌트(Tenant), 네임스페이스(Namespace), 토픽(Topic)의 계층 구조를 통해 리소스 할당량, 보안 정책, 메시지 보존 정책을 세밀하게 제어할 수 있습니다.
Pulsar는 Exclusive, Shared, Failover, Key_Shared 네 가지 구독 모드를 지원합니다. 특히 Key_Shared 모드는 동일한 키를 가진 메시지가 항상 같은 컨슈머로 전달되어 순서를 보장하면서도 병렬 처리가 가능합니다. Pulsar Functions과 Pulsar IO를 통해 서버리스 방식의 스트림 처리와 커넥터 기능도 제공합니다.
# Apache Pulsar Producer/Consumer 예제 (Python)
import pulsar
from pulsar.schema import *
import json
import time
# ============================================
# 1. 기본 Producer/Consumer
# ============================================
# 클라이언트 생성
client = pulsar.Client(
'pulsar://localhost:6650',
authentication=pulsar.AuthenticationToken('your-token'),
operation_timeout_seconds=30,
connection_timeout_seconds=30
)
# Producer 생성
producer = client.create_producer(
'persistent://public/default/user-events',
producer_name='event-producer-1',
send_timeout_millis=30000,
batching_enabled=True,
batching_max_messages=1000,
batching_max_publish_delay_ms=10,
compression_type=pulsar.CompressionType.ZSTD
)
# 메시지 전송
for i in range(100):
event = {
'event_id': f'evt_{i}',
'user_id': f'user_{i % 10}',
'event_type': 'click',
'timestamp': int(time.time() * 1000)
}
# 동기 전송
message_id = producer.send(
json.dumps(event).encode('utf-8'),
properties={'source': 'web', 'version': '1.0'},
partition_key=event['user_id'] # 파티션 키로 순서 보장
)
print(f"Sent message: {message_id}")
# 비동기 전송
def send_callback(result, msg_id):
if result == pulsar.Result.Ok:
print(f"Message sent: {msg_id}")
else:
print(f"Failed to send: {result}")
producer.send_async(
b'async message',
callback=send_callback
)
producer.flush() # 배치 메시지 즉시 전송
# ============================================
# 2. Consumer 구독 모드
# ============================================
# Exclusive 구독 (단일 컨슈머)
exclusive_consumer = client.subscribe(
'persistent://public/default/user-events',
subscription_name='exclusive-sub',
consumer_type=pulsar.ConsumerType.Exclusive,
initial_position=pulsar.InitialPosition.Earliest
)
# Shared 구독 (다중 컨슈머 라운드 로빈)
shared_consumer = client.subscribe(
'persistent://public/default/user-events',
subscription_name='shared-sub',
consumer_type=pulsar.ConsumerType.Shared,
receiver_queue_size=1000
)
# Key_Shared 구독 (키 기반 순서 보장 + 병렬 처리)
key_shared_consumer = client.subscribe(
'persistent://public/default/user-events',
subscription_name='key-shared-sub',
consumer_type=pulsar.ConsumerType.KeyShared,
key_shared_policy=pulsar.KeySharedPolicy(
pulsar.KeySharedMode.AutoSplit
)
)
# 메시지 수신 및 처리
while True:
try:
msg = shared_consumer.receive(timeout_millis=5000)
data = json.loads(msg.data().decode('utf-8'))
print(f"Received: {data}")
print(f"Properties: {msg.properties()}")
print(f"Message ID: {msg.message_id()}")
print(f"Publish Time: {msg.publish_timestamp()}")
# 처리 완료 후 ACK
shared_consumer.acknowledge(msg)
except pulsar.Timeout:
print("No message received")
continue
except Exception as e:
# 처리 실패 시 Negative ACK (재전송)
shared_consumer.negative_acknowledge(msg)
print(f"Error: {e}")
# ============================================
# 3. Schema 기반 메시징
# ============================================
class UserEvent(Record):
event_id = String()
user_id = String()
event_type = String()
timestamp = Long()
metadata = Map(String(), String())
# Schema Producer
schema_producer = client.create_producer(
'persistent://public/default/schema-events',
schema=AvroSchema(UserEvent)
)
event = UserEvent(
event_id='evt_001',
user_id='user_1',
event_type='purchase',
timestamp=int(time.time() * 1000),
metadata={'product': 'laptop', 'price': '1200'}
)
schema_producer.send(event)
# Schema Consumer
schema_consumer = client.subscribe(
'persistent://public/default/schema-events',
subscription_name='schema-sub',
schema=AvroSchema(UserEvent)
)
msg = schema_consumer.receive()
user_event = msg.value()
print(f"Event: {user_event.event_type} by {user_event.user_id}")
# ============================================
# 4. Dead Letter Topic 설정
# ============================================
dlq_consumer = client.subscribe(
'persistent://public/default/user-events',
subscription_name='dlq-enabled-sub',
consumer_type=pulsar.ConsumerType.Shared,
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(
max_redeliver_count=3,
dead_letter_topic='persistent://public/default/user-events-dlq'
),
negative_ack_redelivery_delay_ms=1000
)
# ============================================
# 5. Reader (비구독 읽기)
# ============================================
# 특정 위치부터 읽기 (구독 상태 없음)
reader = client.create_reader(
'persistent://public/default/user-events',
start_message_id=pulsar.MessageId.earliest,
reader_name='analytics-reader'
)
while reader.has_message_available():
msg = reader.read_next(timeout_millis=1000)
print(f"Read: {msg.data()}")
# ============================================
# 6. Pulsar Admin API (토픽 관리)
# ============================================
from pulsar import admin
# Admin 클라이언트 생성
admin_client = admin.PulsarAdmin(
admin_url='http://localhost:8080',
auth_token='your-token'
)
# 테넌트 생성
admin_client.tenants().create(
tenant='my-company',
admin_roles=['admin'],
allowed_clusters=['standalone']
)
# 네임스페이스 생성
admin_client.namespaces().create('my-company/analytics')
# 네임스페이스 정책 설정
admin_client.namespaces().set_retention(
'my-company/analytics',
retention_time_minutes=60 * 24 * 7, # 7일
retention_size_mb=10 * 1024 # 10GB
)
# 토픽 생성 (파티션)
admin_client.topics().create_partitioned_topic(
'persistent://my-company/analytics/events',
num_partitions=8
)
# 토픽 통계 조회
stats = admin_client.topics().get_stats(
'persistent://my-company/analytics/events'
)
print(f"Messages in: {stats['msgInCounter']}")
print(f"Messages out: {stats['msgOutCounter']}")
print(f"Storage size: {stats['storageSize']}")
# ============================================
# 7. Geo-Replication 설정 (CLI)
# ============================================
"""
# 클러스터 등록
pulsar-admin clusters create cluster-us \\
--url http://us-broker:8080 \\
--broker-url pulsar://us-broker:6650
pulsar-admin clusters create cluster-eu \\
--url http://eu-broker:8080 \\
--broker-url pulsar://eu-broker:6650
# 네임스페이스에 복제 설정
pulsar-admin namespaces set-clusters \\
my-company/analytics \\
--clusters cluster-us,cluster-eu
"""
# ============================================
# 8. Tiered Storage 설정 (conf/broker.conf)
# ============================================
"""
# AWS S3 오프로더 설정
managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=pulsar-offload-bucket
s3ManagedLedgerOffloadRegion=ap-northeast-2
s3ManagedLedgerOffloadServiceEndpoint=https://s3.ap-northeast-2.amazonaws.com
# 오프로드 임계값 (100MB 초과 시 오프로드)
managedLedgerOffloadThresholdInBytes=104857600
# 삭제 지연 시간
managedLedgerOffloadDeletionLagInMillis=14400000
"""
# 정리
producer.close()
shared_consumer.close()
client.close()
print("Pulsar 예제 완료")
인프라 엔지니어: "현재 Kafka 클러스터가 여러 팀에서 공유하고 있는데, 리소스 격리가 잘 안 되고 있어요. 한 팀의 트래픽 급증이 다른 팀에 영향을 주고 있습니다."
아키텍트: "Pulsar로 마이그레이션하면 멀티 테넌시로 팀별 격리가 가능해요. 테넌트별로 rate limit과 quota를 설정할 수 있고, 네임스페이스별 보존 정책도 다르게 가져갈 수 있어요."
인프라 엔지니어: "스토리지 비용도 문제인데, 오래된 데이터를 S3로 옮길 수 있나요?"
아키텍트: "Pulsar의 Tiered Storage 기능으로 자동 오프로드가 돼요. 일정 크기 이상의 세그먼트는 S3로 이동하고, 쿼리할 때 자동으로 가져와서 처리해요."
면접관: "Apache Pulsar와 Kafka의 아키텍처 차이점과 각각 어떤 상황에서 선택해야 하는지 설명해주세요."
지원자: "핵심 차이는 컴퓨트-스토리지 분리입니다. Kafka는 브로커가 데이터 저장과 서빙을 모두 담당하지만, Pulsar는 Broker가 서빙만, BookKeeper가 저장만 담당합니다. 이 덕분에 Pulsar는 각 레이어를 독립적으로 확장할 수 있고, 브로커 장애 시 복구가 빨라요. 멀티 테넌시, Geo-replication, Tiered Storage가 필요하면 Pulsar가 유리하고, 에코시스템이 성숙하고 운영 경험이 풍부한 팀이라면 Kafka가 안정적입니다."
리뷰어: "Consumer에서 처리 실패 시 negative_acknowledge만 호출하고 있는데, DLQ 설정이 없어서 무한 재시도될 수 있어요."
개발자: "Dead Letter Topic을 설정해야 하나요?"
리뷰어: "네, max_redeliver_count를 설정하면 해당 횟수 초과 시 DLQ로 이동해요. 그리고 negative_ack_redelivery_delay도 설정해서 재시도 간격을 두는 게 좋아요. 일시적 오류가 바로 재시도되면 또 실패할 가능성이 높거든요."