📊 데이터공학

Apache Pulsar

분산 메시징 및 스트리밍 플랫폼

상세 설명

Apache Pulsar는 Yahoo에서 개발한 클라우드 네이티브 분산 메시징 및 스트리밍 플랫폼으로, 메시지 큐와 스트림 처리를 단일 시스템에서 통합 제공합니다. Kafka의 대안으로 주목받고 있으며, 멀티 테넌시, 지역 복제(Geo-replication), 계층형 스토리지 등 엔터프라이즈급 기능을 기본 제공합니다.

Pulsar의 핵심 아키텍처는 서빙 레이어(Broker)와 스토리지 레이어(BookKeeper)를 분리한 것입니다. Broker는 프로듀서와 컨슈머의 요청을 처리하고, Apache BookKeeper는 메시지를 영구 저장합니다. 이 분리 덕분에 각 레이어를 독립적으로 확장할 수 있어, 컴퓨트 집약적 또는 스토리지 집약적 워크로드에 유연하게 대응할 수 있습니다.

멀티 테넌시는 Pulsar의 차별화된 기능으로, 단일 클러스터에서 여러 팀이나 애플리케이션을 격리하여 운영할 수 있습니다. 테넌트(Tenant), 네임스페이스(Namespace), 토픽(Topic)의 계층 구조를 통해 리소스 할당량, 보안 정책, 메시지 보존 정책을 세밀하게 제어할 수 있습니다.

Pulsar는 Exclusive, Shared, Failover, Key_Shared 네 가지 구독 모드를 지원합니다. 특히 Key_Shared 모드는 동일한 키를 가진 메시지가 항상 같은 컨슈머로 전달되어 순서를 보장하면서도 병렬 처리가 가능합니다. Pulsar Functions과 Pulsar IO를 통해 서버리스 방식의 스트림 처리와 커넥터 기능도 제공합니다.

코드 예제

# Apache Pulsar Producer/Consumer 예제 (Python)
import pulsar
from pulsar.schema import *
import json
import time

# ============================================
# 1. 기본 Producer/Consumer
# ============================================

# 클라이언트 생성
client = pulsar.Client(
    'pulsar://localhost:6650',
    authentication=pulsar.AuthenticationToken('your-token'),
    operation_timeout_seconds=30,
    connection_timeout_seconds=30
)

# Producer 생성
producer = client.create_producer(
    'persistent://public/default/user-events',
    producer_name='event-producer-1',
    send_timeout_millis=30000,
    batching_enabled=True,
    batching_max_messages=1000,
    batching_max_publish_delay_ms=10,
    compression_type=pulsar.CompressionType.ZSTD
)

# 메시지 전송
for i in range(100):
    event = {
        'event_id': f'evt_{i}',
        'user_id': f'user_{i % 10}',
        'event_type': 'click',
        'timestamp': int(time.time() * 1000)
    }

    # 동기 전송
    message_id = producer.send(
        json.dumps(event).encode('utf-8'),
        properties={'source': 'web', 'version': '1.0'},
        partition_key=event['user_id']  # 파티션 키로 순서 보장
    )
    print(f"Sent message: {message_id}")

# 비동기 전송
def send_callback(result, msg_id):
    if result == pulsar.Result.Ok:
        print(f"Message sent: {msg_id}")
    else:
        print(f"Failed to send: {result}")

producer.send_async(
    b'async message',
    callback=send_callback
)

producer.flush()  # 배치 메시지 즉시 전송

# ============================================
# 2. Consumer 구독 모드
# ============================================

# Exclusive 구독 (단일 컨슈머)
exclusive_consumer = client.subscribe(
    'persistent://public/default/user-events',
    subscription_name='exclusive-sub',
    consumer_type=pulsar.ConsumerType.Exclusive,
    initial_position=pulsar.InitialPosition.Earliest
)

# Shared 구독 (다중 컨슈머 라운드 로빈)
shared_consumer = client.subscribe(
    'persistent://public/default/user-events',
    subscription_name='shared-sub',
    consumer_type=pulsar.ConsumerType.Shared,
    receiver_queue_size=1000
)

# Key_Shared 구독 (키 기반 순서 보장 + 병렬 처리)
key_shared_consumer = client.subscribe(
    'persistent://public/default/user-events',
    subscription_name='key-shared-sub',
    consumer_type=pulsar.ConsumerType.KeyShared,
    key_shared_policy=pulsar.KeySharedPolicy(
        pulsar.KeySharedMode.AutoSplit
    )
)

# 메시지 수신 및 처리
while True:
    try:
        msg = shared_consumer.receive(timeout_millis=5000)
        data = json.loads(msg.data().decode('utf-8'))

        print(f"Received: {data}")
        print(f"Properties: {msg.properties()}")
        print(f"Message ID: {msg.message_id()}")
        print(f"Publish Time: {msg.publish_timestamp()}")

        # 처리 완료 후 ACK
        shared_consumer.acknowledge(msg)

    except pulsar.Timeout:
        print("No message received")
        continue
    except Exception as e:
        # 처리 실패 시 Negative ACK (재전송)
        shared_consumer.negative_acknowledge(msg)
        print(f"Error: {e}")

# ============================================
# 3. Schema 기반 메시징
# ============================================

class UserEvent(Record):
    event_id = String()
    user_id = String()
    event_type = String()
    timestamp = Long()
    metadata = Map(String(), String())

# Schema Producer
schema_producer = client.create_producer(
    'persistent://public/default/schema-events',
    schema=AvroSchema(UserEvent)
)

event = UserEvent(
    event_id='evt_001',
    user_id='user_1',
    event_type='purchase',
    timestamp=int(time.time() * 1000),
    metadata={'product': 'laptop', 'price': '1200'}
)

schema_producer.send(event)

# Schema Consumer
schema_consumer = client.subscribe(
    'persistent://public/default/schema-events',
    subscription_name='schema-sub',
    schema=AvroSchema(UserEvent)
)

msg = schema_consumer.receive()
user_event = msg.value()
print(f"Event: {user_event.event_type} by {user_event.user_id}")

# ============================================
# 4. Dead Letter Topic 설정
# ============================================

dlq_consumer = client.subscribe(
    'persistent://public/default/user-events',
    subscription_name='dlq-enabled-sub',
    consumer_type=pulsar.ConsumerType.Shared,
    dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(
        max_redeliver_count=3,
        dead_letter_topic='persistent://public/default/user-events-dlq'
    ),
    negative_ack_redelivery_delay_ms=1000
)

# ============================================
# 5. Reader (비구독 읽기)
# ============================================

# 특정 위치부터 읽기 (구독 상태 없음)
reader = client.create_reader(
    'persistent://public/default/user-events',
    start_message_id=pulsar.MessageId.earliest,
    reader_name='analytics-reader'
)

while reader.has_message_available():
    msg = reader.read_next(timeout_millis=1000)
    print(f"Read: {msg.data()}")

# ============================================
# 6. Pulsar Admin API (토픽 관리)
# ============================================

from pulsar import admin

# Admin 클라이언트 생성
admin_client = admin.PulsarAdmin(
    admin_url='http://localhost:8080',
    auth_token='your-token'
)

# 테넌트 생성
admin_client.tenants().create(
    tenant='my-company',
    admin_roles=['admin'],
    allowed_clusters=['standalone']
)

# 네임스페이스 생성
admin_client.namespaces().create('my-company/analytics')

# 네임스페이스 정책 설정
admin_client.namespaces().set_retention(
    'my-company/analytics',
    retention_time_minutes=60 * 24 * 7,  # 7일
    retention_size_mb=10 * 1024  # 10GB
)

# 토픽 생성 (파티션)
admin_client.topics().create_partitioned_topic(
    'persistent://my-company/analytics/events',
    num_partitions=8
)

# 토픽 통계 조회
stats = admin_client.topics().get_stats(
    'persistent://my-company/analytics/events'
)
print(f"Messages in: {stats['msgInCounter']}")
print(f"Messages out: {stats['msgOutCounter']}")
print(f"Storage size: {stats['storageSize']}")

# ============================================
# 7. Geo-Replication 설정 (CLI)
# ============================================
"""
# 클러스터 등록
pulsar-admin clusters create cluster-us \\
    --url http://us-broker:8080 \\
    --broker-url pulsar://us-broker:6650

pulsar-admin clusters create cluster-eu \\
    --url http://eu-broker:8080 \\
    --broker-url pulsar://eu-broker:6650

# 네임스페이스에 복제 설정
pulsar-admin namespaces set-clusters \\
    my-company/analytics \\
    --clusters cluster-us,cluster-eu
"""

# ============================================
# 8. Tiered Storage 설정 (conf/broker.conf)
# ============================================
"""
# AWS S3 오프로더 설정
managedLedgerOffloadDriver=aws-s3
s3ManagedLedgerOffloadBucket=pulsar-offload-bucket
s3ManagedLedgerOffloadRegion=ap-northeast-2
s3ManagedLedgerOffloadServiceEndpoint=https://s3.ap-northeast-2.amazonaws.com

# 오프로드 임계값 (100MB 초과 시 오프로드)
managedLedgerOffloadThresholdInBytes=104857600

# 삭제 지연 시간
managedLedgerOffloadDeletionLagInMillis=14400000
"""

# 정리
producer.close()
shared_consumer.close()
client.close()

print("Pulsar 예제 완료")

실무에서 이렇게 쓰여요

인프라 엔지니어: "현재 Kafka 클러스터가 여러 팀에서 공유하고 있는데, 리소스 격리가 잘 안 되고 있어요. 한 팀의 트래픽 급증이 다른 팀에 영향을 주고 있습니다."

아키텍트: "Pulsar로 마이그레이션하면 멀티 테넌시로 팀별 격리가 가능해요. 테넌트별로 rate limit과 quota를 설정할 수 있고, 네임스페이스별 보존 정책도 다르게 가져갈 수 있어요."

인프라 엔지니어: "스토리지 비용도 문제인데, 오래된 데이터를 S3로 옮길 수 있나요?"

아키텍트: "Pulsar의 Tiered Storage 기능으로 자동 오프로드가 돼요. 일정 크기 이상의 세그먼트는 S3로 이동하고, 쿼리할 때 자동으로 가져와서 처리해요."

면접관: "Apache Pulsar와 Kafka의 아키텍처 차이점과 각각 어떤 상황에서 선택해야 하는지 설명해주세요."

지원자: "핵심 차이는 컴퓨트-스토리지 분리입니다. Kafka는 브로커가 데이터 저장과 서빙을 모두 담당하지만, Pulsar는 Broker가 서빙만, BookKeeper가 저장만 담당합니다. 이 덕분에 Pulsar는 각 레이어를 독립적으로 확장할 수 있고, 브로커 장애 시 복구가 빨라요. 멀티 테넌시, Geo-replication, Tiered Storage가 필요하면 Pulsar가 유리하고, 에코시스템이 성숙하고 운영 경험이 풍부한 팀이라면 Kafka가 안정적입니다."

리뷰어: "Consumer에서 처리 실패 시 negative_acknowledge만 호출하고 있는데, DLQ 설정이 없어서 무한 재시도될 수 있어요."

개발자: "Dead Letter Topic을 설정해야 하나요?"

리뷰어: "네, max_redeliver_count를 설정하면 해당 횟수 초과 시 DLQ로 이동해요. 그리고 negative_ack_redelivery_delay도 설정해서 재시도 간격을 두는 게 좋아요. 일시적 오류가 바로 재시도되면 또 실패할 가능성이 높거든요."

주의사항

관련 용어

더 배우기