📊 데이터공학

Materialize

스트리밍 데이터베이스

상세 설명

Materialize는 스트리밍 데이터에 대해 표준 SQL 쿼리를 실시간으로 실행할 수 있는 스트리밍 데이터베이스입니다. 증분 뷰 유지보수(Incremental View Maintenance) 기술을 사용하여 데이터 변경 시 쿼리 결과를 즉시 업데이트합니다.

Materialize의 핵심 특징

  • 스트리밍 SQL: 표준 PostgreSQL 호환 SQL 지원
  • 증분 계산: 변경된 데이터만 재계산하여 효율적 업데이트
  • Materialized Views: 쿼리 결과를 실시간으로 유지
  • Kafka 네이티브: Kafka 스트림 직접 연결
  • 밀리초 지연: 실시간 분석 쿼리 지원

Materialize vs 다른 솔루션

특성MaterializeFlink SQLksqlDB
SQL 호환성PostgreSQLANSI SQLKSQL
Join 복잡도다중 테이블제한적제한적
지연 시간밀리초밀리초~초밀리초~초
상태 관리내장RocksDBRocksDB
사용 편의성높음 (SQL)중간높음

코드 예제

Kafka 스트림 연결 및 실시간 집계

-- Materialize 스트리밍 SQL 예제

-- 1. Kafka 소스 연결 생성
CREATE CONNECTION kafka_connection TO KAFKA (
    BROKER 'kafka:9092',
    SECURITY PROTOCOL = 'SASL_SSL',
    SASL MECHANISMS = 'PLAIN',
    SASL USERNAME = SECRET kafka_username,
    SASL PASSWORD = SECRET kafka_password
);

-- 2. 주문 이벤트 스트림 소스 생성
CREATE SOURCE orders_source
FROM KAFKA CONNECTION kafka_connection (
    TOPIC 'orders'
)
FORMAT JSON
WITH (
    TIMESTAMP FIELD = 'order_time',
    RETENTION = '7d'
);

-- 3. 상품 정보 스트림 소스 생성
CREATE SOURCE products_source
FROM KAFKA CONNECTION kafka_connection (
    TOPIC 'products'
)
FORMAT JSON;

-- 4. 실시간 주문 뷰 생성
CREATE MATERIALIZED VIEW orders AS
SELECT
    (data->>'order_id')::int AS order_id,
    (data->>'customer_id')::int AS customer_id,
    (data->>'product_id')::int AS product_id,
    (data->>'quantity')::int AS quantity,
    (data->>'price')::decimal AS price,
    (data->>'order_time')::timestamp AS order_time
FROM orders_source;

-- 5. 상품 정보 뷰 생성
CREATE MATERIALIZED VIEW products AS
SELECT
    (data->>'product_id')::int AS product_id,
    data->>'product_name' AS product_name,
    data->>'category' AS category,
    (data->>'unit_price')::decimal AS unit_price
FROM products_source;

-- 6. 실시간 매출 집계 (1시간 윈도우)
CREATE MATERIALIZED VIEW hourly_sales AS
SELECT
    date_trunc('hour', order_time) AS hour,
    p.category,
    COUNT(*) AS order_count,
    SUM(o.quantity) AS total_quantity,
    SUM(o.quantity * o.price) AS total_revenue
FROM orders o
JOIN products p ON o.product_id = p.product_id
WHERE order_time > now() - INTERVAL '24 hours'
GROUP BY date_trunc('hour', order_time), p.category;

-- 7. 실시간 고객별 구매 현황
CREATE MATERIALIZED VIEW customer_metrics AS
SELECT
    customer_id,
    COUNT(*) AS total_orders,
    SUM(quantity * price) AS total_spent,
    AVG(quantity * price) AS avg_order_value,
    MAX(order_time) AS last_order_time
FROM orders
WHERE order_time > now() - INTERVAL '30 days'
GROUP BY customer_id;

실시간 이상 탐지 쿼리

-- 실시간 이상 탐지를 위한 Materialized Views

-- 1분 단위 거래량 모니터링
CREATE MATERIALIZED VIEW transaction_rate AS
SELECT
    date_trunc('minute', order_time) AS minute,
    COUNT(*) AS transaction_count,
    SUM(price * quantity) AS total_value
FROM orders
WHERE order_time > now() - INTERVAL '1 hour'
GROUP BY date_trunc('minute', order_time);

-- 이상 거래 탐지 (평균의 3배 이상)
CREATE MATERIALIZED VIEW anomaly_alerts AS
WITH stats AS (
    SELECT
        AVG(transaction_count) AS avg_count,
        STDDEV(transaction_count) AS stddev_count
    FROM transaction_rate
    WHERE minute > now() - INTERVAL '24 hours'
)
SELECT
    t.minute,
    t.transaction_count,
    t.total_value,
    'HIGH_VOLUME' AS alert_type
FROM transaction_rate t, stats s
WHERE t.minute > now() - INTERVAL '5 minutes'
  AND t.transaction_count > s.avg_count + 3 * s.stddev_count;

-- 고객별 이상 행동 탐지 (5분 내 10건 이상 주문)
CREATE MATERIALIZED VIEW suspicious_customers AS
SELECT
    customer_id,
    COUNT(*) AS order_count_5min,
    SUM(price * quantity) AS total_value_5min
FROM orders
WHERE order_time > now() - INTERVAL '5 minutes'
GROUP BY customer_id
HAVING COUNT(*) > 10;

Python으로 Materialize 연결

# Python에서 Materialize 사용하기 (psycopg2)
import psycopg2
import pandas as pd
from psycopg2 import sql
import time

# Materialize 연결 (PostgreSQL 프로토콜 사용)
conn = psycopg2.connect(
    host="materialize.example.com",
    port=6875,
    user="materialize",
    password="secret",
    database="materialize"
)
conn.autocommit = True

def execute_query(query):
    """쿼리 실행 및 결과 반환"""
    with conn.cursor() as cur:
        cur.execute(query)
        if cur.description:
            columns = [desc[0] for desc in cur.description]
            return pd.DataFrame(cur.fetchall(), columns=columns)
        return None

# Materialized View 생성
create_view_sql = """
CREATE MATERIALIZED VIEW IF NOT EXISTS realtime_dashboard AS
SELECT
    date_trunc('minute', order_time) AS minute,
    COUNT(*) AS orders,
    SUM(price * quantity) AS revenue
FROM orders
WHERE order_time > now() - INTERVAL '1 hour'
GROUP BY date_trunc('minute', order_time)
ORDER BY minute DESC
"""
execute_query(create_view_sql)

# 실시간 쿼리 결과 조회
def get_dashboard_data():
    """대시보드 데이터 조회"""
    df = execute_query("SELECT * FROM realtime_dashboard LIMIT 60")
    return df

# SUBSCRIBE를 사용한 실시간 스트리밍 (변경 사항 푸시)
def subscribe_to_changes():
    """뷰 변경사항을 실시간으로 구독"""
    with conn.cursor() as cur:
        # SUBSCRIBE 시작
        cur.execute("SUBSCRIBE (SELECT * FROM realtime_dashboard)")

        print("실시간 구독 시작...")
        while True:
            # 변경사항 수신
            cur.execute("FETCH ALL FROM realtime_dashboard")
            rows = cur.fetchall()

            for row in rows:
                mz_timestamp, mz_diff, minute, orders, revenue = row
                action = "INSERT" if mz_diff > 0 else "DELETE"
                print(f"[{action}] minute={minute}, orders={orders}, revenue={revenue}")

# 연결 종료
def cleanup():
    conn.close()

dbt + Materialize 통합

# dbt_project.yml
name: 'streaming_analytics'
version: '1.0.0'
profile: 'materialize'

models:
  streaming_analytics:
    +materialized: materializedview  # Materialize 전용


# profiles.yml
materialize:
  target: dev
  outputs:
    dev:
      type: materialize
      host: materialize.example.com
      port: 6875
      user: materialize
      password: "{{ env_var('MZ_PASSWORD') }}"
      database: materialize
      schema: analytics
-- models/customer_lifetime_value.sql
-- dbt 모델: 실시간 고객 생애 가치

{{ config(materialized='materializedview') }}

WITH customer_orders AS (
    SELECT
        customer_id,
        order_id,
        order_time,
        price * quantity AS order_value
    FROM {{ source('kafka', 'orders') }}
),

customer_stats AS (
    SELECT
        customer_id,
        COUNT(DISTINCT order_id) AS total_orders,
        SUM(order_value) AS total_revenue,
        MIN(order_time) AS first_order,
        MAX(order_time) AS last_order,
        AVG(order_value) AS avg_order_value
    FROM customer_orders
    GROUP BY customer_id
)

SELECT
    customer_id,
    total_orders,
    total_revenue,
    first_order,
    last_order,
    avg_order_value,
    -- 간단한 CLV 계산 (실제로는 더 복잡한 모델 사용)
    total_revenue *
        GREATEST(1, EXTRACT(EPOCH FROM (now() - first_order)) /
                    EXTRACT(EPOCH FROM (last_order - first_order + INTERVAL '1 day')))
        AS estimated_clv
FROM customer_stats
WHERE total_orders > 0

실무 대화 예시

데이터 엔지니어: "실시간 대시보드를 만들어야 하는데, Kafka 데이터를 SQL로 쿼리하고 싶어요. Flink SQL은 설정이 복잡하던데..."
시니어 엔지니어: "Materialize 써봐. PostgreSQL 프로토콜이라 기존 도구들이랑 호환되고, SQL만 알면 스트리밍 처리를 할 수 있어."
데이터 엔지니어: "Materialized View랑 일반 View 차이가 뭔가요?"
시니어 엔지니어: "Materialized View는 결과를 저장하고 있어서 쿼리 시 즉시 반환돼. 데이터가 변경되면 Materialize가 자동으로 증분 업데이트해. 그래서 복잡한 집계도 밀리초 지연으로 볼 수 있어."
데이터 엔지니어: "ksqlDB랑 비교하면 어떤가요?"
시니어 엔지니어: "ksqlDB는 Kafka 전용이고 SQL 문법이 조금 달라. Materialize는 PostgreSQL 호환이라 복잡한 JOIN이나 서브쿼리도 잘 돼. 근데 Materialize가 리소스를 좀 더 써."

주의사항

리소스 관리

  • Materialized View는 메모리에 상태 유지 - 많은 뷰는 메모리 부담
  • 복잡한 JOIN은 메모리 사용량 급증 가능
  • 사용하지 않는 뷰는 DROP하여 리소스 해제

쿼리 설계

  • INTERVAL 없는 집계는 무한 상태 증가
  • 항상 시간 윈도우 조건 추가 권장
  • 인덱스 생성으로 쿼리 성능 향상

운영 고려사항

  • Kafka 소스 오프셋 관리 - 재시작 시 데이터 중복 주의
  • 스키마 변경 시 소스/뷰 재생성 필요
  • 상용 버전은 클라우드 관리형 서비스 제공

관련 용어

더 배우기