Delta Lake
Delta Lake
데이터 레이크에 ACID 트랜잭션 제공. Databricks 개발.
Delta Lake
데이터 레이크에 ACID 트랜잭션 제공. Databricks 개발.
Delta Lake는 데이터 레이크에 ACID 트랜잭션, 스키마 진화, Time Travel 기능을 제공하는 오픈소스 스토리지 레이어입니다. 기존 데이터 레이크의 "데이터 늪(Data Swamp)" 문제를 해결하고, 데이터 웨어하우스 수준의 신뢰성을 제공합니다.
핵심 메커니즘은 트랜잭션 로그(_delta_log)입니다. 모든 데이터 변경은 JSON 형식의 커밋 로그로 기록되며, 이를 통해 ACID 트랜잭션을 보장합니다. 동시에 여러 작업이 같은 테이블을 수정해도 데이터 정합성이 유지됩니다. 또한 이 로그를 통해 과거 버전의 데이터를 조회하는 Time Travel이 가능합니다.
ACID 트랜잭션의 Atomicity는 작업이 완전히 성공하거나 완전히 실패함을 보장합니다. Consistency는 스키마 검증을 통해 데이터 일관성을 유지합니다. Isolation은 동시 작업 간의 격리를 제공하고, Durability는 커밋된 데이터의 영속성을 보장합니다.
Time Travel은 버전 번호나 타임스탬프로 과거 데이터를 조회할 수 있게 합니다. 이를 통해 잘못된 데이터 수정을 롤백하거나, 특정 시점의 데이터로 감사(audit)를 수행할 수 있습니다. 기본적으로 30일간 히스토리가 보존되며, VACUUM 명령으로 오래된 데이터를 정리합니다.
# Delta Lake 핵심 기능 예제
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp
# 1. Delta 테이블 생성
df = spark.createDataFrame([
("user1", "Alice", 100),
("user2", "Bob", 200),
("user3", "Charlie", 150)
], ["user_id", "name", "balance"])
df.write.format("delta").mode("overwrite").save("/delta/accounts")
# 2. ACID 트랜잭션 - MERGE (Upsert)
updates = spark.createDataFrame([
("user1", "Alice", 120), # 기존 사용자 업데이트
("user4", "David", 300) # 새 사용자 삽입
], ["user_id", "name", "balance"])
delta_table = DeltaTable.forPath(spark, "/delta/accounts")
delta_table.alias("target").merge(
updates.alias("source"),
"target.user_id = source.user_id"
).whenMatchedUpdate(set={
"name": col("source.name"),
"balance": col("source.balance")
}).whenNotMatchedInsert(values={
"user_id": col("source.user_id"),
"name": col("source.name"),
"balance": col("source.balance")
}).execute()
# 3. Time Travel - 버전으로 조회
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/delta/accounts")
print("버전 0 (초기 상태):")
df_v0.show()
# 타임스탬프로 조회
df_past = spark.read.format("delta").option("timestampAsOf", "2024-01-15 10:00:00").load("/delta/accounts")
# 4. 히스토리 조회
delta_table.history().show(truncate=False)
# 5. 롤백 - 이전 버전으로 복원
# 방법 1: RESTORE 명령
spark.sql("RESTORE TABLE delta.`/delta/accounts` TO VERSION AS OF 0")
# 방법 2: 이전 버전 덮어쓰기
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/delta/accounts")
df_v0.write.format("delta").mode("overwrite").save("/delta/accounts")
# 6. 스키마 진화 - 새 컬럼 자동 추가
new_data = spark.createDataFrame([
("user5", "Eve", 250, "VIP")
], ["user_id", "name", "balance", "tier"])
new_data.write.format("delta").mode("append").option("mergeSchema", "true").save("/delta/accounts")
# 7. 최적화 - 소규모 파일 병합
delta_table.optimize().executeCompaction()
# Z-ORDER로 쿼리 성능 최적화
delta_table.optimize().executeZOrderBy("user_id")
# 8. VACUUM - 오래된 파일 정리 (기본 7일 이상 지난 파일)
delta_table.vacuum(168) # 168시간 = 7일
print("최종 테이블 상태:")
spark.read.format("delta").load("/delta/accounts").show()
데이터엔지니어: "어제 배치 작업이 중간에 실패했는데, Delta Lake 덕분에 partial write 문제가 없었어요. 트랜잭션이 롤백되어서 데이터 정합성이 유지됐습니다."
주니어: "예전 Parquet 테이블이었으면 어떻게 됐을까요?"
시니어: "절반만 쓰여진 상태로 남아서 수동으로 정리해야 했을 거예요. ACID가 없으면 이런 상황에서 데이터가 꼬이기 쉽습니다."
면접관: "Delta Lake, Apache Iceberg, Apache Hudi 중 어떤 것을 선택하시겠어요?"
지원자: "Databricks 환경이면 Delta Lake가 최적화되어 있어서 선택하겠습니다. 멀티 엔진 환경에서 Presto, Trino, Flink 등 다양한 쿼리 엔진과의 호환성이 중요하면 Iceberg를 고려할 것 같아요. 세 가지 모두 ACID와 Time Travel을 지원하지만 생태계가 다릅니다."
리뷰어: "VACUUM retention을 0시간으로 설정했는데, Time Travel이 안 될 수 있어요."
작성자: "저장 공간 절약하려고 했는데, 최소 7일은 유지하도록 수정하겠습니다. 만약의 롤백 상황에 대비해야겠네요."