Data Lakehouse
데이터 레이크하우스
데이터 레이크와 웨어하우스를 결합한 아키텍처. ACID 트랜잭션과 스키마 관리를 지원하는 통합 데이터 플랫폼.
데이터 레이크하우스
데이터 레이크와 웨어하우스를 결합한 아키텍처. ACID 트랜잭션과 스키마 관리를 지원하는 통합 데이터 플랫폼.
Data Lakehouse는 Data Lake의 유연성과 확장성, Data Warehouse의 데이터 관리 기능과 성능을 결합한 차세대 데이터 아키텍처입니다. 기존에는 원시 데이터를 Data Lake에 저장하고, 분석용 데이터를 별도의 Data Warehouse로 ETL하는 2단계 아키텍처가 일반적이었습니다. Lakehouse는 이러한 중복 저장과 복잡한 데이터 이동을 제거하고, 단일 플랫폼에서 모든 데이터 워크로드를 처리할 수 있게 합니다.
Lakehouse의 핵심 기술은 오픈 테이블 포맷인 Delta Lake, Apache Iceberg, Apache Hudi입니다. 이들은 객체 스토리지(S3, ADLS, GCS) 위에서 ACID 트랜잭션, 스키마 진화, 타임 트래블, 동시성 제어 같은 웨어하우스 수준의 기능을 제공합니다. 파일 기반 스토리지의 비용 효율성을 유지하면서도 안정적인 데이터 관리가 가능해진 것입니다. 특히 메타데이터 레이어를 통해 Parquet 파일들을 테이블처럼 관리할 수 있습니다.
Lakehouse 아키텍처는 BI 분석, 실시간 스트리밍, 머신러닝을 단일 데이터 복사본에서 수행할 수 있어 데이터 일관성이 보장됩니다. 과거에는 데이터 사이언티스트가 Lake의 원시 데이터로, 분석가가 Warehouse의 정제 데이터로 각각 작업했는데, 이로 인해 동일한 지표가 다르게 계산되는 문제가 빈번했습니다. Lakehouse는 Single Source of Truth를 실현하여 이러한 데이터 사일로 문제를 해결합니다.
주요 Lakehouse 플랫폼으로는 Databricks (Delta Lake 기반), Snowflake (Iceberg 지원), AWS Lake Formation, Google BigLake 등이 있습니다. 이들은 SQL 쿼리 엔진(Spark SQL, Trino, Presto), 스트리밍 처리(Structured Streaming), ML 워크로드(MLflow, Feature Store)를 통합 제공합니다. 최근에는 Unity Catalog, Nessie 같은 통합 카탈로그를 통해 데이터 거버넌스와 접근 제어도 강화되고 있습니다.
Delta Lake 기반 Data Lakehouse에서 ACID 트랜잭션, 스키마 진화, 타임 트래블을 활용하는 예제입니다.
# Data Lakehouse 구현 (Delta Lake 기반)
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *
from delta.tables import DeltaTable
# Spark Session with Delta Lake
spark = SparkSession.builder \
.appName("DataLakehouse") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
.getOrCreate()
LAKEHOUSE_PATH = "s3a://company-lakehouse"
# ============================================
# 1. Delta Table 생성 (ACID 트랜잭션 지원)
# ============================================
def create_delta_table():
"""파티션과 Z-Order 최적화가 적용된 Delta 테이블 생성"""
# 스키마 정의
schema = StructType([
StructField("order_id", StringType(), False),
StructField("customer_id", StringType(), False),
StructField("product_id", StringType(), False),
StructField("quantity", IntegerType(), True),
StructField("amount", DecimalType(10, 2), True),
StructField("order_date", DateType(), False),
StructField("status", StringType(), True),
StructField("region", StringType(), True)
])
# Delta 테이블 생성 (SQL)
spark.sql(f"""
CREATE TABLE IF NOT EXISTS lakehouse.orders (
order_id STRING NOT NULL,
customer_id STRING NOT NULL,
product_id STRING NOT NULL,
quantity INT,
amount DECIMAL(10, 2),
order_date DATE NOT NULL,
status STRING,
region STRING
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '{LAKEHOUSE_PATH}/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true',
'delta.dataSkippingNumIndexedCols' = '8'
)
""")
print("Delta table created with auto-optimization")
# ============================================
# 2. MERGE (Upsert) 작업 - CDC 처리
# ============================================
def merge_cdc_data(cdc_df):
"""Change Data Capture를 Delta 테이블에 머지"""
delta_table = DeltaTable.forPath(spark, f"{LAKEHOUSE_PATH}/orders")
# MERGE INTO - Insert, Update, Delete 한 번에 처리
delta_table.alias("target").merge(
cdc_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.operation = 'UPDATE'",
set={
"quantity": "source.quantity",
"amount": "source.amount",
"status": "source.status",
"region": "source.region"
}
).whenMatchedDelete(
condition="source.operation = 'DELETE'"
).whenNotMatchedInsert(
condition="source.operation = 'INSERT'",
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"product_id": "source.product_id",
"quantity": "source.quantity",
"amount": "source.amount",
"order_date": "source.order_date",
"status": "source.status",
"region": "source.region"
}
).execute()
print(f"Merged {cdc_df.count()} CDC records")
# ============================================
# 3. 스키마 진화 (Schema Evolution)
# ============================================
def evolve_schema():
"""무중단 스키마 변경"""
# 새 컬럼 추가 (자동 스키마 머지)
spark.sql(f"""
ALTER TABLE delta.`{LAKEHOUSE_PATH}/orders`
ADD COLUMNS (
discount_rate DECIMAL(5, 2) COMMENT '할인율',
loyalty_points INT COMMENT '적립 포인트'
)
""")
# 컬럼 타입 변경 (안전한 확장만 가능)
spark.sql(f"""
ALTER TABLE delta.`{LAKEHOUSE_PATH}/orders`
ALTER COLUMN amount TYPE DECIMAL(15, 2)
""")
# 자동 스키마 머지로 데이터 쓰기
new_data_with_extra_column = spark.createDataFrame([
("ORD-999", "CUST-1", "PROD-1", 2, 150.00, "2024-01-15", "completed", "서울", 0.1, 150)
], ["order_id", "customer_id", "product_id", "quantity", "amount",
"order_date", "status", "region", "discount_rate", "loyalty_points"])
new_data_with_extra_column.write \
.format("delta") \
.mode("append") \
.option("mergeSchema", "true") \
.save(f"{LAKEHOUSE_PATH}/orders")
print("Schema evolved successfully")
# ============================================
# 4. Time Travel - 과거 데이터 조회
# ============================================
def time_travel_queries():
"""Delta Lake 타임 트래블 기능 활용"""
# 특정 버전의 데이터 조회
df_version_5 = spark.read \
.format("delta") \
.option("versionAsOf", 5) \
.load(f"{LAKEHOUSE_PATH}/orders")
print(f"Version 5 record count: {df_version_5.count()}")
# 특정 시점의 데이터 조회
df_yesterday = spark.read \
.format("delta") \
.option("timestampAsOf", "2024-01-14 00:00:00") \
.load(f"{LAKEHOUSE_PATH}/orders")
# 버전 간 변경 사항 확인
changes = spark.sql(f"""
SELECT * FROM delta.`{LAKEHOUSE_PATH}/orders`
VERSION AS OF 10
EXCEPT
SELECT * FROM delta.`{LAKEHOUSE_PATH}/orders`
VERSION AS OF 9
""")
print(f"Changes between v9 and v10: {changes.count()} rows")
# 테이블 히스토리 조회
history = spark.sql(f"DESCRIBE HISTORY delta.`{LAKEHOUSE_PATH}/orders`")
history.select("version", "timestamp", "operation", "operationMetrics").show(10)
# 잘못된 변경 롤백
spark.sql(f"""
RESTORE TABLE delta.`{LAKEHOUSE_PATH}/orders`
TO VERSION AS OF 8
""")
print("Table restored to version 8")
# ============================================
# 5. 성능 최적화 (Z-Order, OPTIMIZE)
# ============================================
def optimize_table():
"""쿼리 성능 최적화"""
# 파일 병합 및 최적화
spark.sql(f"""
OPTIMIZE delta.`{LAKEHOUSE_PATH}/orders`
WHERE order_date >= '2024-01-01'
""")
# Z-Order 클러스터링 (자주 필터링되는 컬럼)
spark.sql(f"""
OPTIMIZE delta.`{LAKEHOUSE_PATH}/orders`
ZORDER BY (customer_id, product_id)
""")
# 불필요한 파일 정리 (Vacuum)
spark.sql(f"""
VACUUM delta.`{LAKEHOUSE_PATH}/orders`
RETAIN 168 HOURS -- 7일 보존
""")
# 테이블 통계 수집
spark.sql(f"ANALYZE TABLE delta.`{LAKEHOUSE_PATH}/orders` COMPUTE STATISTICS")
print("Table optimized with Z-Order clustering")
# ============================================
# 6. 스트리밍 + 배치 통합 (Unified Processing)
# ============================================
def unified_streaming_batch():
"""동일 테이블에서 스트리밍과 배치 동시 처리"""
# 스트리밍 쓰기 (Kafka -> Delta)
kafka_stream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "orders") \
.load()
orders_stream = kafka_stream \
.select(from_json(col("value").cast("string"), order_schema).alias("data")) \
.select("data.*")
# Delta 테이블로 스트리밍 (ACID 보장)
query = orders_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"{LAKEHOUSE_PATH}/checkpoints/orders") \
.trigger(processingTime="1 minute") \
.start(f"{LAKEHOUSE_PATH}/orders")
# 동시에 배치 읽기 가능 (스트리밍과 충돌 없음)
batch_df = spark.read.format("delta").load(f"{LAKEHOUSE_PATH}/orders")
daily_summary = batch_df \
.filter(col("order_date") == current_date()) \
.groupBy("region") \
.agg(
count("order_id").alias("orders"),
sum("amount").alias("revenue")
)
print("Streaming and batch processing running concurrently")
return query
# ============================================
# 7. Unity Catalog 통합 (거버넌스)
# ============================================
def setup_governance():
"""Unity Catalog로 데이터 거버넌스 설정"""
# 카탈로그 및 스키마 생성
spark.sql("CREATE CATALOG IF NOT EXISTS main")
spark.sql("CREATE SCHEMA IF NOT EXISTS main.sales")
# 관리 테이블 생성
spark.sql("""
CREATE TABLE IF NOT EXISTS main.sales.orders
USING DELTA
AS SELECT * FROM delta.`s3a://company-lakehouse/orders`
""")
# 열 수준 보안 (Column Masking)
spark.sql("""
ALTER TABLE main.sales.orders
ALTER COLUMN customer_id SET MASK mask_customer_id
""")
# 행 수준 보안 (Row-Level Security)
spark.sql("""
ALTER TABLE main.sales.orders
SET ROW FILTER region_filter ON (region)
""")
# 접근 권한 설정
spark.sql("GRANT SELECT ON TABLE main.sales.orders TO `analysts`")
spark.sql("GRANT ALL PRIVILEGES ON TABLE main.sales.orders TO `data_engineers`")
print("Governance policies applied via Unity Catalog")
# ============================================
# Lakehouse 파이프라인 실행
# ============================================
if __name__ == "__main__":
create_delta_table()
evolve_schema()
optimize_table()
time_travel_queries()
setup_governance()
print("Data Lakehouse setup completed!")