📊 데이터공학

Data Lakehouse

데이터 레이크하우스

데이터 레이크와 웨어하우스를 결합한 아키텍처. ACID 트랜잭션과 스키마 관리를 지원하는 통합 데이터 플랫폼.

상세 설명

Data Lakehouse는 Data Lake의 유연성과 확장성, Data Warehouse의 데이터 관리 기능과 성능을 결합한 차세대 데이터 아키텍처입니다. 기존에는 원시 데이터를 Data Lake에 저장하고, 분석용 데이터를 별도의 Data Warehouse로 ETL하는 2단계 아키텍처가 일반적이었습니다. Lakehouse는 이러한 중복 저장과 복잡한 데이터 이동을 제거하고, 단일 플랫폼에서 모든 데이터 워크로드를 처리할 수 있게 합니다.

Lakehouse의 핵심 기술은 오픈 테이블 포맷인 Delta Lake, Apache Iceberg, Apache Hudi입니다. 이들은 객체 스토리지(S3, ADLS, GCS) 위에서 ACID 트랜잭션, 스키마 진화, 타임 트래블, 동시성 제어 같은 웨어하우스 수준의 기능을 제공합니다. 파일 기반 스토리지의 비용 효율성을 유지하면서도 안정적인 데이터 관리가 가능해진 것입니다. 특히 메타데이터 레이어를 통해 Parquet 파일들을 테이블처럼 관리할 수 있습니다.

Lakehouse 아키텍처는 BI 분석, 실시간 스트리밍, 머신러닝을 단일 데이터 복사본에서 수행할 수 있어 데이터 일관성이 보장됩니다. 과거에는 데이터 사이언티스트가 Lake의 원시 데이터로, 분석가가 Warehouse의 정제 데이터로 각각 작업했는데, 이로 인해 동일한 지표가 다르게 계산되는 문제가 빈번했습니다. Lakehouse는 Single Source of Truth를 실현하여 이러한 데이터 사일로 문제를 해결합니다.

주요 Lakehouse 플랫폼으로는 Databricks (Delta Lake 기반), Snowflake (Iceberg 지원), AWS Lake Formation, Google BigLake 등이 있습니다. 이들은 SQL 쿼리 엔진(Spark SQL, Trino, Presto), 스트리밍 처리(Structured Streaming), ML 워크로드(MLflow, Feature Store)를 통합 제공합니다. 최근에는 Unity Catalog, Nessie 같은 통합 카탈로그를 통해 데이터 거버넌스와 접근 제어도 강화되고 있습니다.

코드 예제

Delta Lake 기반 Data Lakehouse에서 ACID 트랜잭션, 스키마 진화, 타임 트래블을 활용하는 예제입니다.

Python (PySpark + Delta Lake - Lakehouse Architecture)
# Data Lakehouse 구현 (Delta Lake 기반)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
from delta.tables import DeltaTable

# Spark Session with Delta Lake
spark = SparkSession.builder \
    .appName("DataLakehouse") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
    .getOrCreate()

LAKEHOUSE_PATH = "s3a://company-lakehouse"

# ============================================
# 1. Delta Table 생성 (ACID 트랜잭션 지원)
# ============================================
def create_delta_table():
    """파티션과 Z-Order 최적화가 적용된 Delta 테이블 생성"""

    # 스키마 정의
    schema = StructType([
        StructField("order_id", StringType(), False),
        StructField("customer_id", StringType(), False),
        StructField("product_id", StringType(), False),
        StructField("quantity", IntegerType(), True),
        StructField("amount", DecimalType(10, 2), True),
        StructField("order_date", DateType(), False),
        StructField("status", StringType(), True),
        StructField("region", StringType(), True)
    ])

    # Delta 테이블 생성 (SQL)
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS lakehouse.orders (
            order_id STRING NOT NULL,
            customer_id STRING NOT NULL,
            product_id STRING NOT NULL,
            quantity INT,
            amount DECIMAL(10, 2),
            order_date DATE NOT NULL,
            status STRING,
            region STRING
        )
        USING DELTA
        PARTITIONED BY (order_date)
        LOCATION '{LAKEHOUSE_PATH}/orders'
        TBLPROPERTIES (
            'delta.autoOptimize.optimizeWrite' = 'true',
            'delta.autoOptimize.autoCompact' = 'true',
            'delta.dataSkippingNumIndexedCols' = '8'
        )
    """)

    print("Delta table created with auto-optimization")

# ============================================
# 2. MERGE (Upsert) 작업 - CDC 처리
# ============================================
def merge_cdc_data(cdc_df):
    """Change Data Capture를 Delta 테이블에 머지"""

    delta_table = DeltaTable.forPath(spark, f"{LAKEHOUSE_PATH}/orders")

    # MERGE INTO - Insert, Update, Delete 한 번에 처리
    delta_table.alias("target").merge(
        cdc_df.alias("source"),
        "target.order_id = source.order_id"
    ).whenMatchedUpdate(
        condition="source.operation = 'UPDATE'",
        set={
            "quantity": "source.quantity",
            "amount": "source.amount",
            "status": "source.status",
            "region": "source.region"
        }
    ).whenMatchedDelete(
        condition="source.operation = 'DELETE'"
    ).whenNotMatchedInsert(
        condition="source.operation = 'INSERT'",
        values={
            "order_id": "source.order_id",
            "customer_id": "source.customer_id",
            "product_id": "source.product_id",
            "quantity": "source.quantity",
            "amount": "source.amount",
            "order_date": "source.order_date",
            "status": "source.status",
            "region": "source.region"
        }
    ).execute()

    print(f"Merged {cdc_df.count()} CDC records")

# ============================================
# 3. 스키마 진화 (Schema Evolution)
# ============================================
def evolve_schema():
    """무중단 스키마 변경"""

    # 새 컬럼 추가 (자동 스키마 머지)
    spark.sql(f"""
        ALTER TABLE delta.`{LAKEHOUSE_PATH}/orders`
        ADD COLUMNS (
            discount_rate DECIMAL(5, 2) COMMENT '할인율',
            loyalty_points INT COMMENT '적립 포인트'
        )
    """)

    # 컬럼 타입 변경 (안전한 확장만 가능)
    spark.sql(f"""
        ALTER TABLE delta.`{LAKEHOUSE_PATH}/orders`
        ALTER COLUMN amount TYPE DECIMAL(15, 2)
    """)

    # 자동 스키마 머지로 데이터 쓰기
    new_data_with_extra_column = spark.createDataFrame([
        ("ORD-999", "CUST-1", "PROD-1", 2, 150.00, "2024-01-15", "completed", "서울", 0.1, 150)
    ], ["order_id", "customer_id", "product_id", "quantity", "amount",
        "order_date", "status", "region", "discount_rate", "loyalty_points"])

    new_data_with_extra_column.write \
        .format("delta") \
        .mode("append") \
        .option("mergeSchema", "true") \
        .save(f"{LAKEHOUSE_PATH}/orders")

    print("Schema evolved successfully")

# ============================================
# 4. Time Travel - 과거 데이터 조회
# ============================================
def time_travel_queries():
    """Delta Lake 타임 트래블 기능 활용"""

    # 특정 버전의 데이터 조회
    df_version_5 = spark.read \
        .format("delta") \
        .option("versionAsOf", 5) \
        .load(f"{LAKEHOUSE_PATH}/orders")

    print(f"Version 5 record count: {df_version_5.count()}")

    # 특정 시점의 데이터 조회
    df_yesterday = spark.read \
        .format("delta") \
        .option("timestampAsOf", "2024-01-14 00:00:00") \
        .load(f"{LAKEHOUSE_PATH}/orders")

    # 버전 간 변경 사항 확인
    changes = spark.sql(f"""
        SELECT * FROM delta.`{LAKEHOUSE_PATH}/orders`
        VERSION AS OF 10
        EXCEPT
        SELECT * FROM delta.`{LAKEHOUSE_PATH}/orders`
        VERSION AS OF 9
    """)

    print(f"Changes between v9 and v10: {changes.count()} rows")

    # 테이블 히스토리 조회
    history = spark.sql(f"DESCRIBE HISTORY delta.`{LAKEHOUSE_PATH}/orders`")
    history.select("version", "timestamp", "operation", "operationMetrics").show(10)

    # 잘못된 변경 롤백
    spark.sql(f"""
        RESTORE TABLE delta.`{LAKEHOUSE_PATH}/orders`
        TO VERSION AS OF 8
    """)
    print("Table restored to version 8")

# ============================================
# 5. 성능 최적화 (Z-Order, OPTIMIZE)
# ============================================
def optimize_table():
    """쿼리 성능 최적화"""

    # 파일 병합 및 최적화
    spark.sql(f"""
        OPTIMIZE delta.`{LAKEHOUSE_PATH}/orders`
        WHERE order_date >= '2024-01-01'
    """)

    # Z-Order 클러스터링 (자주 필터링되는 컬럼)
    spark.sql(f"""
        OPTIMIZE delta.`{LAKEHOUSE_PATH}/orders`
        ZORDER BY (customer_id, product_id)
    """)

    # 불필요한 파일 정리 (Vacuum)
    spark.sql(f"""
        VACUUM delta.`{LAKEHOUSE_PATH}/orders`
        RETAIN 168 HOURS  -- 7일 보존
    """)

    # 테이블 통계 수집
    spark.sql(f"ANALYZE TABLE delta.`{LAKEHOUSE_PATH}/orders` COMPUTE STATISTICS")

    print("Table optimized with Z-Order clustering")

# ============================================
# 6. 스트리밍 + 배치 통합 (Unified Processing)
# ============================================
def unified_streaming_batch():
    """동일 테이블에서 스트리밍과 배치 동시 처리"""

    # 스트리밍 쓰기 (Kafka -> Delta)
    kafka_stream = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "kafka:9092") \
        .option("subscribe", "orders") \
        .load()

    orders_stream = kafka_stream \
        .select(from_json(col("value").cast("string"), order_schema).alias("data")) \
        .select("data.*")

    # Delta 테이블로 스트리밍 (ACID 보장)
    query = orders_stream.writeStream \
        .format("delta") \
        .outputMode("append") \
        .option("checkpointLocation", f"{LAKEHOUSE_PATH}/checkpoints/orders") \
        .trigger(processingTime="1 minute") \
        .start(f"{LAKEHOUSE_PATH}/orders")

    # 동시에 배치 읽기 가능 (스트리밍과 충돌 없음)
    batch_df = spark.read.format("delta").load(f"{LAKEHOUSE_PATH}/orders")
    daily_summary = batch_df \
        .filter(col("order_date") == current_date()) \
        .groupBy("region") \
        .agg(
            count("order_id").alias("orders"),
            sum("amount").alias("revenue")
        )

    print("Streaming and batch processing running concurrently")
    return query

# ============================================
# 7. Unity Catalog 통합 (거버넌스)
# ============================================
def setup_governance():
    """Unity Catalog로 데이터 거버넌스 설정"""

    # 카탈로그 및 스키마 생성
    spark.sql("CREATE CATALOG IF NOT EXISTS main")
    spark.sql("CREATE SCHEMA IF NOT EXISTS main.sales")

    # 관리 테이블 생성
    spark.sql("""
        CREATE TABLE IF NOT EXISTS main.sales.orders
        USING DELTA
        AS SELECT * FROM delta.`s3a://company-lakehouse/orders`
    """)

    # 열 수준 보안 (Column Masking)
    spark.sql("""
        ALTER TABLE main.sales.orders
        ALTER COLUMN customer_id SET MASK mask_customer_id
    """)

    # 행 수준 보안 (Row-Level Security)
    spark.sql("""
        ALTER TABLE main.sales.orders
        SET ROW FILTER region_filter ON (region)
    """)

    # 접근 권한 설정
    spark.sql("GRANT SELECT ON TABLE main.sales.orders TO `analysts`")
    spark.sql("GRANT ALL PRIVILEGES ON TABLE main.sales.orders TO `data_engineers`")

    print("Governance policies applied via Unity Catalog")

# ============================================
# Lakehouse 파이프라인 실행
# ============================================
if __name__ == "__main__":
    create_delta_table()
    evolve_schema()
    optimize_table()
    time_travel_queries()
    setup_governance()

    print("Data Lakehouse setup completed!")

실무에서 자주 등장하는 대화 예제

CTO
"현재 Data Lake에서 Redshift로 ETL하는 파이프라인 운영 비용이 너무 높습니다. 데이터 중복 저장도 문제고요. Lakehouse로 전환하면 어떤 이점이 있을까요?"
데이터 아키텍트
"Lakehouse로 전환하면 S3에 저장된 데이터를 직접 SQL로 분석할 수 있어 Redshift 비용을 대폭 줄일 수 있습니다. Delta Lake나 Iceberg를 적용하면 ACID 트랜잭션, 스키마 진화, 타임 트래블 기능으로 데이터 품질도 웨어하우스 수준으로 관리됩니다. ETL 대신 ELT로 전환해서 파이프라인도 단순화됩니다."
데이터 사이언티스트
"ML 팀 입장에서는 Feature Store와 학습 데이터가 분석 데이터와 같은 플랫폼에 있으면 데이터 일관성 문제가 해결됩니다. 현재는 Lake와 Warehouse 데이터가 달라서 모델 성능 검증이 어려웠거든요."
면접관
"Data Lakehouse가 기존 2-tier 아키텍처(Lake + Warehouse)를 대체할 수 있는 이유를 설명해주세요."
지원자
"기존 아키텍처에서 Data Lake는 저렴한 스토리지와 다양한 데이터 포맷 지원이 장점이지만, 트랜잭션 지원이 없고 스키마 관리가 어렵습니다. Warehouse는 ACID와 SQL 성능이 우수하지만 비용이 높고 비정형 데이터 처리가 제한적입니다. Lakehouse는 Delta Lake, Iceberg 같은 오픈 테이블 포맷으로 객체 스토리지 위에서 트랜잭션, 스키마 진화, 타임 트래블을 지원해 두 장점을 모두 취합니다."
면접관
"Delta Lake, Iceberg, Hudi 중 어떤 것을 선택할지 기준이 있나요?"
지원자
"Delta Lake는 Databricks 생태계에서 최적화되어 있고, Spark와의 통합이 가장 강력합니다. Iceberg는 벤더 중립적이고 Trino, Flink, Snowflake 등 다양한 엔진 지원이 우수합니다. 특히 파티션 진화와 Hidden Partitioning이 강점입니다. Hudi는 Uber에서 만들어 증분 처리(Incremental Processing)와 스트리밍 수집에 강점이 있습니다. 기존 스택과 호환성, 팀의 기술 역량을 고려해 선택해야 합니다."
시니어 엔지니어
"Delta 테이블에 매시간 append 하고 있는데, 작은 파일이 엄청나게 많아졌네요. 쿼리 성능이 급격히 저하되고 있어요."
주니어 엔지니어
"Auto-optimization 설정이 안 되어 있었네요. delta.autoOptimize.optimizeWrite와 autoCompact를 활성화하고, 일간 OPTIMIZE 작업도 스케줄링하겠습니다."
시니어 엔지니어
"좋아요. 그리고 customer_id로 자주 필터링하니까 Z-Order 클러스터링도 적용하세요. VACUUM도 주기적으로 돌려서 오래된 버전 파일을 정리해야 스토리지 비용도 관리됩니다. 단, VACUUM 전에 타임 트래블이 필요한 기간을 확인하고 retention 기간을 설정하세요."

주의사항

관련 용어

더 배우기