📊
데이터공학
Materialize
스트리밍 데이터베이스
스트리밍 데이터베이스
Materialize는 스트리밍 데이터에 대해 표준 SQL 쿼리를 실시간으로 실행할 수 있는 스트리밍 데이터베이스입니다. 증분 뷰 유지보수(Incremental View Maintenance) 기술을 사용하여 데이터 변경 시 쿼리 결과를 즉시 업데이트합니다.
| 특성 | Materialize | Flink SQL | ksqlDB |
|---|---|---|---|
| SQL 호환성 | PostgreSQL | ANSI SQL | KSQL |
| Join 복잡도 | 다중 테이블 | 제한적 | 제한적 |
| 지연 시간 | 밀리초 | 밀리초~초 | 밀리초~초 |
| 상태 관리 | 내장 | RocksDB | RocksDB |
| 사용 편의성 | 높음 (SQL) | 중간 | 높음 |
-- Materialize 스트리밍 SQL 예제
-- 1. Kafka 소스 연결 생성
CREATE CONNECTION kafka_connection TO KAFKA (
BROKER 'kafka:9092',
SECURITY PROTOCOL = 'SASL_SSL',
SASL MECHANISMS = 'PLAIN',
SASL USERNAME = SECRET kafka_username,
SASL PASSWORD = SECRET kafka_password
);
-- 2. 주문 이벤트 스트림 소스 생성
CREATE SOURCE orders_source
FROM KAFKA CONNECTION kafka_connection (
TOPIC 'orders'
)
FORMAT JSON
WITH (
TIMESTAMP FIELD = 'order_time',
RETENTION = '7d'
);
-- 3. 상품 정보 스트림 소스 생성
CREATE SOURCE products_source
FROM KAFKA CONNECTION kafka_connection (
TOPIC 'products'
)
FORMAT JSON;
-- 4. 실시간 주문 뷰 생성
CREATE MATERIALIZED VIEW orders AS
SELECT
(data->>'order_id')::int AS order_id,
(data->>'customer_id')::int AS customer_id,
(data->>'product_id')::int AS product_id,
(data->>'quantity')::int AS quantity,
(data->>'price')::decimal AS price,
(data->>'order_time')::timestamp AS order_time
FROM orders_source;
-- 5. 상품 정보 뷰 생성
CREATE MATERIALIZED VIEW products AS
SELECT
(data->>'product_id')::int AS product_id,
data->>'product_name' AS product_name,
data->>'category' AS category,
(data->>'unit_price')::decimal AS unit_price
FROM products_source;
-- 6. 실시간 매출 집계 (1시간 윈도우)
CREATE MATERIALIZED VIEW hourly_sales AS
SELECT
date_trunc('hour', order_time) AS hour,
p.category,
COUNT(*) AS order_count,
SUM(o.quantity) AS total_quantity,
SUM(o.quantity * o.price) AS total_revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE order_time > now() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_time), p.category;
-- 7. 실시간 고객별 구매 현황
CREATE MATERIALIZED VIEW customer_metrics AS
SELECT
customer_id,
COUNT(*) AS total_orders,
SUM(quantity * price) AS total_spent,
AVG(quantity * price) AS avg_order_value,
MAX(order_time) AS last_order_time
FROM orders
WHERE order_time > now() - INTERVAL '30 days'
GROUP BY customer_id;
-- 실시간 이상 탐지를 위한 Materialized Views
-- 1분 단위 거래량 모니터링
CREATE MATERIALIZED VIEW transaction_rate AS
SELECT
date_trunc('minute', order_time) AS minute,
COUNT(*) AS transaction_count,
SUM(price * quantity) AS total_value
FROM orders
WHERE order_time > now() - INTERVAL '1 hour'
GROUP BY date_trunc('minute', order_time);
-- 이상 거래 탐지 (평균의 3배 이상)
CREATE MATERIALIZED VIEW anomaly_alerts AS
WITH stats AS (
SELECT
AVG(transaction_count) AS avg_count,
STDDEV(transaction_count) AS stddev_count
FROM transaction_rate
WHERE minute > now() - INTERVAL '24 hours'
)
SELECT
t.minute,
t.transaction_count,
t.total_value,
'HIGH_VOLUME' AS alert_type
FROM transaction_rate t, stats s
WHERE t.minute > now() - INTERVAL '5 minutes'
AND t.transaction_count > s.avg_count + 3 * s.stddev_count;
-- 고객별 이상 행동 탐지 (5분 내 10건 이상 주문)
CREATE MATERIALIZED VIEW suspicious_customers AS
SELECT
customer_id,
COUNT(*) AS order_count_5min,
SUM(price * quantity) AS total_value_5min
FROM orders
WHERE order_time > now() - INTERVAL '5 minutes'
GROUP BY customer_id
HAVING COUNT(*) > 10;
# Python에서 Materialize 사용하기 (psycopg2)
import psycopg2
import pandas as pd
from psycopg2 import sql
import time
# Materialize 연결 (PostgreSQL 프로토콜 사용)
conn = psycopg2.connect(
host="materialize.example.com",
port=6875,
user="materialize",
password="secret",
database="materialize"
)
conn.autocommit = True
def execute_query(query):
"""쿼리 실행 및 결과 반환"""
with conn.cursor() as cur:
cur.execute(query)
if cur.description:
columns = [desc[0] for desc in cur.description]
return pd.DataFrame(cur.fetchall(), columns=columns)
return None
# Materialized View 생성
create_view_sql = """
CREATE MATERIALIZED VIEW IF NOT EXISTS realtime_dashboard AS
SELECT
date_trunc('minute', order_time) AS minute,
COUNT(*) AS orders,
SUM(price * quantity) AS revenue
FROM orders
WHERE order_time > now() - INTERVAL '1 hour'
GROUP BY date_trunc('minute', order_time)
ORDER BY minute DESC
"""
execute_query(create_view_sql)
# 실시간 쿼리 결과 조회
def get_dashboard_data():
"""대시보드 데이터 조회"""
df = execute_query("SELECT * FROM realtime_dashboard LIMIT 60")
return df
# SUBSCRIBE를 사용한 실시간 스트리밍 (변경 사항 푸시)
def subscribe_to_changes():
"""뷰 변경사항을 실시간으로 구독"""
with conn.cursor() as cur:
# SUBSCRIBE 시작
cur.execute("SUBSCRIBE (SELECT * FROM realtime_dashboard)")
print("실시간 구독 시작...")
while True:
# 변경사항 수신
cur.execute("FETCH ALL FROM realtime_dashboard")
rows = cur.fetchall()
for row in rows:
mz_timestamp, mz_diff, minute, orders, revenue = row
action = "INSERT" if mz_diff > 0 else "DELETE"
print(f"[{action}] minute={minute}, orders={orders}, revenue={revenue}")
# 연결 종료
def cleanup():
conn.close()
# dbt_project.yml
name: 'streaming_analytics'
version: '1.0.0'
profile: 'materialize'
models:
streaming_analytics:
+materialized: materializedview # Materialize 전용
# profiles.yml
materialize:
target: dev
outputs:
dev:
type: materialize
host: materialize.example.com
port: 6875
user: materialize
password: "{{ env_var('MZ_PASSWORD') }}"
database: materialize
schema: analytics
-- models/customer_lifetime_value.sql
-- dbt 모델: 실시간 고객 생애 가치
{{ config(materialized='materializedview') }}
WITH customer_orders AS (
SELECT
customer_id,
order_id,
order_time,
price * quantity AS order_value
FROM {{ source('kafka', 'orders') }}
),
customer_stats AS (
SELECT
customer_id,
COUNT(DISTINCT order_id) AS total_orders,
SUM(order_value) AS total_revenue,
MIN(order_time) AS first_order,
MAX(order_time) AS last_order,
AVG(order_value) AS avg_order_value
FROM customer_orders
GROUP BY customer_id
)
SELECT
customer_id,
total_orders,
total_revenue,
first_order,
last_order,
avg_order_value,
-- 간단한 CLV 계산 (실제로는 더 복잡한 모델 사용)
total_revenue *
GREATEST(1, EXTRACT(EPOCH FROM (now() - first_order)) /
EXTRACT(EPOCH FROM (last_order - first_order + INTERVAL '1 day')))
AS estimated_clv
FROM customer_stats
WHERE total_orders > 0