Apache Pinot
Apache Pinot
실시간 분산 OLAP 데이터스토어. LinkedIn 개발.
Apache Pinot
실시간 분산 OLAP 데이터스토어. LinkedIn 개발.
Apache Pinot는 LinkedIn에서 개발한 실시간 분산 OLAP 데이터스토어로, 초저지연(sub-second) 분석 쿼리를 대규모 데이터셋에서 제공합니다. 사용자 대면 분석 애플리케이션, 실시간 대시보드, 이상 탐지 시스템 등 밀리초 단위 응답이 필요한 워크로드에 최적화되어 있습니다.
Pinot의 아키텍처는 세 가지 핵심 컴포넌트로 구성됩니다. Controller는 클러스터 메타데이터와 리소스 관리를 담당하고, Broker는 쿼리 라우팅과 결과 병합을 처리하며, Server는 실제 데이터를 저장하고 쿼리를 실행합니다. 이러한 분산 아키텍처를 통해 수평적 확장이 가능하며, LinkedIn에서는 수천 개의 노드로 구성된 클러스터를 운영하고 있습니다.
Pinot는 실시간(REALTIME)과 오프라인(OFFLINE) 두 가지 테이블 타입을 지원합니다. 실시간 테이블은 Kafka나 Kinesis 같은 스트리밍 소스에서 데이터를 직접 수집하여 즉시 쿼리 가능하게 하고, 오프라인 테이블은 배치로 적재된 히스토리 데이터를 제공합니다. 하이브리드 테이블은 두 타입을 결합하여 실시간 데이터와 히스토리 데이터를 통합 쿼리할 수 있습니다.
고성능의 핵심은 인덱싱 전략에 있습니다. Pinot는 Inverted Index, Sorted Index, Star-Tree Index, Range Index, Text Index, JSON Index 등 다양한 인덱스를 제공하며, 쿼리 패턴에 따라 적절한 인덱스를 조합하여 성능을 최적화합니다. 특히 Star-Tree Index는 사전 집계를 통해 고차원 분석 쿼리를 극적으로 가속화합니다.
# Apache Pinot 테이블 스키마 및 설정
# ============================================
# 1. 스키마 정의 (events_schema.json)
# ============================================
{
"schemaName": "user_events",
"dimensionFieldSpecs": [
{
"name": "event_id",
"dataType": "STRING"
},
{
"name": "user_id",
"dataType": "STRING"
},
{
"name": "event_type",
"dataType": "STRING"
},
{
"name": "device_type",
"dataType": "STRING"
},
{
"name": "country",
"dataType": "STRING"
},
{
"name": "city",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "event_count",
"dataType": "LONG"
},
{
"name": "revenue",
"dataType": "DOUBLE"
},
{
"name": "session_duration_ms",
"dataType": "LONG"
}
],
"dateTimeFieldSpecs": [
{
"name": "event_timestamp",
"dataType": "TIMESTAMP",
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
# ============================================
# 2. 실시간 테이블 설정 (events_realtime_table.json)
# ============================================
{
"tableName": "user_events_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "event_timestamp",
"timeType": "MILLISECONDS",
"schemaName": "user_events",
"replicasPerPartition": "2",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "7"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"invertedIndexColumns": [
"user_id",
"event_type",
"device_type",
"country"
],
"rangeIndexColumns": [
"event_timestamp",
"revenue"
],
"sortedColumn": [
"event_timestamp"
],
"bloomFilterColumns": [
"user_id",
"event_id"
],
"noDictionaryColumns": [
"event_id",
"session_duration_ms"
],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowLevel",
"stream.kafka.topic.name": "user-events",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": "kafka-broker:9092",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"realtime.segment.flush.threshold.rows": "500000",
"realtime.segment.flush.threshold.time": "6h"
}
},
"routing": {
"instanceSelectorType": "strictReplicaGroup"
},
"upsertConfig": {
"mode": "FULL",
"hashFunction": "MURMUR3"
},
"metadata": {
"customConfigs": {}
}
}
# ============================================
# 3. Star-Tree 인덱스 설정 (고성능 집계용)
# ============================================
{
"tableIndexConfig": {
"starTreeIndexConfigs": [
{
"dimensionsSplitOrder": [
"country",
"device_type",
"event_type"
],
"skipStarNodeCreationForDimensions": [],
"functionColumnPairs": [
"COUNT__*",
"SUM__revenue",
"SUM__event_count",
"AVG__session_duration_ms"
],
"maxLeafRecords": 10000
}
]
}
}
# ============================================
# 4. 실시간 분석 쿼리 예제 (SQL)
# ============================================
-- 실시간 이벤트 집계 (지난 1시간)
SELECT
event_type,
country,
COUNT(*) as event_count,
SUM(revenue) as total_revenue,
AVG(session_duration_ms) as avg_session_ms
FROM user_events
WHERE event_timestamp > ago('PT1H')
GROUP BY event_type, country
ORDER BY event_count DESC
LIMIT 100
-- 시간대별 트렌드 분석
SELECT
DATETIMECONVERT(event_timestamp, '1:MILLISECONDS:EPOCH',
'1:MINUTES:EPOCH', '5:MINUTES') as time_bucket,
event_type,
COUNT(*) as events,
DISTINCTCOUNT(user_id) as unique_users
FROM user_events
WHERE event_timestamp BETWEEN
FromDateTime('2024-01-15 00:00:00', 'yyyy-MM-dd HH:mm:ss')
AND FromDateTime('2024-01-15 23:59:59', 'yyyy-MM-dd HH:mm:ss')
GROUP BY time_bucket, event_type
ORDER BY time_bucket
-- 실시간 Top-K 사용자
SELECT
user_id,
COUNT(*) as activity_count,
SUM(revenue) as total_spent
FROM user_events
WHERE event_timestamp > ago('PT24H')
GROUP BY user_id
ORDER BY activity_count DESC
LIMIT 10
-- 퍼널 분석 (근사)
SELECT
SUM(CASE WHEN event_type = 'page_view' THEN 1 ELSE 0 END) as step1_views,
SUM(CASE WHEN event_type = 'add_to_cart' THEN 1 ELSE 0 END) as step2_cart,
SUM(CASE WHEN event_type = 'checkout' THEN 1 ELSE 0 END) as step3_checkout,
SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) as step4_purchase
FROM user_events
WHERE event_timestamp > ago('PT7D')
-- 지역별 실시간 대시보드
SELECT
country,
city,
COUNT(*) as events,
DISTINCTCOUNTHLL(user_id) as approx_users,
PERCENTILEEST(session_duration_ms, 50) as median_session,
PERCENTILEEST(session_duration_ms, 95) as p95_session
FROM user_events
WHERE event_timestamp > ago('PT1H')
GROUP BY country, city
HAVING events > 100
ORDER BY events DESC
# ============================================
# 5. Python 클라이언트 예제
# ============================================
from pinotdb import connect
import pandas as pd
# Pinot 연결
conn = connect(
host='pinot-broker',
port=8099,
path='/query/sql',
scheme='http'
)
# 쿼리 실행
cursor = conn.cursor()
query = """
SELECT
event_type,
COUNT(*) as count,
SUM(revenue) as revenue
FROM user_events
WHERE event_timestamp > ago('PT1H')
GROUP BY event_type
ORDER BY count DESC
LIMIT 10
"""
cursor.execute(query)
results = cursor.fetchall()
# Pandas DataFrame으로 변환
df = pd.DataFrame(results, columns=[desc[0] for desc in cursor.description])
print(df)
# ============================================
# 6. 테이블 관리 CLI 명령어
# ============================================
# 스키마 추가
curl -X POST "http://localhost:9000/schemas" \
-H "Content-Type: application/json" \
-d @events_schema.json
# 테이블 생성
curl -X POST "http://localhost:9000/tables" \
-H "Content-Type: application/json" \
-d @events_realtime_table.json
# 세그먼트 상태 확인
curl "http://localhost:9000/segments/user_events_REALTIME"
# 테이블 리로드
curl -X POST "http://localhost:9000/segments/user_events_REALTIME/reload"
# 쿼리 실행
curl -X POST "http://localhost:8099/query/sql" \
-H "Content-Type: application/json" \
-d '{"sql": "SELECT COUNT(*) FROM user_events"}'
프로덕트 매니저: "실시간 대시보드 응답 시간이 5초 넘게 걸리는데, 사용자들이 불만을 제기하고 있어요."
데이터 엔지니어: "현재 Elasticsearch로 처리 중인데, 대규모 집계 쿼리에는 한계가 있어요. Apache Pinot를 도입하면 밀리초 단위 응답이 가능해요."
프로덕트 매니저: "실시간 데이터도 바로 조회 가능한가요?"
데이터 엔지니어: "네, Kafka에서 실시간으로 수집해서 바로 쿼리할 수 있어요. 하이브리드 테이블로 구성하면 실시간 데이터와 히스토리 데이터를 같이 조회할 수 있고요."
면접관: "Apache Pinot의 Star-Tree 인덱스가 어떻게 동작하고, 어떤 쿼리에 효과적인지 설명해주세요."
지원자: "Star-Tree는 데이터를 사전 집계하여 트리 구조로 저장하는 인덱스입니다. 예를 들어 country, device, event_type 차원에 대해 SUM(revenue)를 미리 계산해두면, 런타임에 원본 데이터를 스캔하지 않고 바로 결과를 반환할 수 있습니다. GROUP BY 절의 차원과 집계 함수가 Star-Tree 설정과 일치할 때 효과적이며, 차원 순서는 카디널리티가 낮은 것부터 높은 순으로 설정하는 것이 좋습니다. 다만 Star-Tree는 추가 저장 공간이 필요하고 수집 시간이 늘어나므로 자주 사용되는 쿼리 패턴에만 적용하는 것이 좋습니다."
리뷰어: "user_id 컬럼에 dictionary 인코딩을 사용하고 있는데, 고유값이 수억 개라면 메모리 이슈가 생길 수 있어요."
개발자: "그럼 noDictionaryColumns에 추가해야 하나요?"
리뷰어: "네, 고카디널리티 컬럼은 dictionary 없이 저장하는 게 좋아요. 대신 Bloom Filter 인덱스를 추가하면 point lookup 성능은 유지할 수 있어요. 그리고 DISTINCTCOUNT 대신 DISTINCTCOUNTHLL을 사용하면 메모리 효율도 좋아져요."