QuestDB
QuestDB Time-Series Database
초고속 시계열 데이터 처리에 특화된 오픈소스 데이터베이스. InfluxDB 프로토콜과 PostgreSQL 와이어 프로토콜을 지원하며, 익숙한 SQL로 쿼리할 수 있습니다.
QuestDB Time-Series Database
초고속 시계열 데이터 처리에 특화된 오픈소스 데이터베이스. InfluxDB 프로토콜과 PostgreSQL 와이어 프로토콜을 지원하며, 익숙한 SQL로 쿼리할 수 있습니다.
QuestDB는 Java로 작성된 고성능 시계열 데이터베이스입니다. 핵심 강점은 속도입니다. 컬럼 지향 스토리지, SIMD 벡터화 연산, 메모리 매핑 파일 등을 활용해 초당 수백만 행의 데이터를 처리할 수 있습니다. InfluxDB, TimescaleDB 대비 벤치마크에서 종종 더 빠른 성능을 보여줍니다.
개발자 친화적인 인터페이스가 특징입니다. PostgreSQL 와이어 프로토콜을 지원해 pgAdmin, DBeaver 같은 기존 도구를 그대로 사용할 수 있고, 표준 SQL에 시계열 특화 함수(SAMPLE BY, LATEST ON 등)가 추가되어 있습니다. InfluxDB Line Protocol(ILP)도 지원해 기존 InfluxDB 클라이언트 코드를 수정 없이 마이그레이션할 수 있습니다.
주요 사용 사례는 IoT 센서 데이터, 금융 틱 데이터, 인프라 모니터링 메트릭입니다. 특히 고빈도 데이터 수집(밀리초 단위 타임스탬프)과 실시간 대시보드 쿼리가 동시에 필요한 환경에서 강점을 발휘합니다. Grafana와의 통합이 잘 되어 있어 시각화도 쉽습니다.
Apache 2.0 라이선스의 오픈소스이며, QuestDB Cloud도 제공됩니다. 단일 노드에서 매우 높은 성능을 내지만, 분산 클러스터링은 엔터프라이즈 기능으로 제공됩니다. 테이블 파티셔닝과 자동 데이터 보존 정책으로 대용량 시계열 데이터를 효율적으로 관리합니다.
# QuestDB 시계열 데이터 처리 예제
import questdb.ingress as qdb
import psycopg2
from datetime import datetime, timedelta
import random
import time
# ==========================================
# 1. ILP (InfluxDB Line Protocol)로 데이터 수집
# ==========================================
def ingest_sensor_data():
"""고속 데이터 수집 (ILP over TCP)"""
with qdb.Sender('localhost', 9009) as sender:
# 배치로 센서 데이터 전송
for i in range(100000):
sender.row(
'sensors',
symbols={'device_id': f'device_{i % 100}', 'location': 'seoul'},
columns={
'temperature': 20.0 + random.random() * 10,
'humidity': 40.0 + random.random() * 30,
'pressure': 1013.0 + random.random() * 20
},
at=datetime.utcnow()
)
# 1000행마다 flush (성능 최적화)
if i % 1000 == 0:
sender.flush()
print("100,000 rows ingested")
# ==========================================
# 2. PostgreSQL 프로토콜로 쿼리
# ==========================================
def query_with_psycopg2():
"""PostgreSQL 클라이언트로 SQL 쿼리"""
conn = psycopg2.connect(
host='localhost',
port=8812,
user='admin',
password='quest',
database='qdb'
)
with conn.cursor() as cur:
# 테이블 생성 (시계열 최적화)
cur.execute("""
CREATE TABLE IF NOT EXISTS trades (
timestamp TIMESTAMP,
symbol SYMBOL,
price DOUBLE,
volume LONG
) timestamp(timestamp) PARTITION BY DAY WAL;
""")
# 최근 1시간 데이터 조회
cur.execute("""
SELECT * FROM trades
WHERE timestamp > dateadd('h', -1, now())
ORDER BY timestamp DESC
LIMIT 100;
""")
rows = cur.fetchall()
# SAMPLE BY로 시간 집계 (시계열 특화 기능)
cur.execute("""
SELECT
timestamp,
symbol,
avg(price) as avg_price,
sum(volume) as total_volume,
first(price) as open,
last(price) as close,
max(price) as high,
min(price) as low
FROM trades
WHERE timestamp > dateadd('d', -1, now())
SAMPLE BY 1h
ALIGN TO CALENDAR;
""")
ohlc_data = cur.fetchall()
print(f"OHLC candles: {len(ohlc_data)}")
# LATEST ON으로 각 심볼의 최신 데이터 조회
cur.execute("""
SELECT * FROM trades
LATEST ON timestamp PARTITION BY symbol;
""")
latest = cur.fetchall()
print(f"Latest per symbol: {latest}")
conn.close()
# ==========================================
# 3. REST API로 데이터 조회
# ==========================================
import requests
def query_via_rest():
"""HTTP REST API로 쿼리"""
query = """
SELECT
timestamp,
device_id,
avg(temperature) as avg_temp
FROM sensors
WHERE timestamp > dateadd('h', -6, now())
SAMPLE BY 15m
"""
response = requests.get(
'http://localhost:9000/exec',
params={'query': query}
)
if response.status_code == 200:
data = response.json()
print(f"Columns: {data['columns']}")
print(f"Row count: {data['count']}")
for row in data['dataset'][:5]:
print(row)
# ==========================================
# 4. 데이터 보존 정책 설정
# ==========================================
def setup_retention_policy():
"""오래된 데이터 자동 삭제 설정"""
conn = psycopg2.connect(host='localhost', port=8812, user='admin', password='quest')
with conn.cursor() as cur:
# 파티션 삭제 (30일 이전 데이터)
cur.execute("""
ALTER TABLE sensors DROP PARTITION
WHERE timestamp < dateadd('d', -30, now());
""")
# 테이블 설정 변경
cur.execute("""
ALTER TABLE sensors SET PARAM maxUncommittedRows = 100000;
""")
conn.close()
# ==========================================
# 5. Grafana 대시보드용 쿼리
# ==========================================
def grafana_compatible_queries():
"""Grafana에서 사용할 시계열 쿼리 예시"""
# 시간 범위 변수를 사용한 쿼리 (Grafana에서 $__timeFilter 대체)
grafana_query = """
SELECT
timestamp as time,
device_id,
temperature,
humidity
FROM sensors
WHERE timestamp BETWEEN '2024-01-01' AND '2024-01-02'
SAMPLE BY $__interval
"""
# 히트맵용 쿼리
heatmap_query = """
SELECT
timestamp as time,
floor(temperature) as temp_bucket,
count() as count
FROM sensors
WHERE timestamp > dateadd('d', -7, now())
SAMPLE BY 1h
"""
# 알림용 집계 쿼리
alert_query = """
SELECT
device_id,
avg(temperature) as avg_temp,
max(temperature) as max_temp
FROM sensors
WHERE timestamp > dateadd('m', -5, now())
GROUP BY device_id
HAVING avg(temperature) > 30
"""
return grafana_query, heatmap_query, alert_query
# 실행
if __name__ == "__main__":
ingest_sensor_data()
query_with_psycopg2()
query_via_rest()
데이터 엔지니어: "IoT 센서 데이터가 초당 10만 건인데, InfluxDB가 버티질 못해요."
개발자: "QuestDB 검토해보세요. ILP 프로토콜 호환이라 클라이언트 코드 거의 안 바꿔도 되고, 같은 하드웨어에서 훨씬 빠르게 처리할 수 있어요."
데이터 엔지니어: "SQL 쓸 수 있는 거 맞죠? 팀원들이 InfluxQL 적응 못 해서..."
개발자: "네, 표준 SQL에 SAMPLE BY 같은 시계열 함수만 추가된 형태예요. PostgreSQL 드라이버 그대로 씁니다."
면접관: "QuestDB와 TimescaleDB의 차이점은 뭔가요?"
지원자: "TimescaleDB는 PostgreSQL 확장이라 PostgreSQL의 모든 기능(조인, 트랜잭션 등)을 쓸 수 있지만, QuestDB는 시계열 전용으로 설계되어 순수 시계열 워크로드에서 더 빠릅니다. QuestDB는 Append-only라 UPDATE/DELETE가 제한적이지만, 수집 성능은 압도적입니다."
리뷰어: "ILP로 데이터 보낼 때 배치 크기 좀 키워요. row마다 flush 하면 성능 다 날아갑니다."
작성자: "아, 1000행마다 flush 하도록 수정하고, auto_flush_rows 설정도 추가할게요."