Apache Iceberg
Apache Iceberg
테이블 포맷 for 대규모 분석 데이터셋. 스키마 진화.
Apache Iceberg
테이블 포맷 for 대규모 분석 데이터셋. 스키마 진화.
Apache Iceberg는 Netflix에서 개발한 오픈 테이블 포맷으로, 대규모 분석 데이터셋을 위한 고성능 테이블 관리를 제공합니다. 기존 Hive 테이블의 한계를 극복하여 ACID 트랜잭션, 스키마 진화, 파티션 진화, 타임 트래블 등 현대적인 데이터 레이크하우스 기능을 지원합니다.
Iceberg의 핵심은 메타데이터 레이어로, 스냅샷 기반의 버전 관리를 통해 데이터 일관성을 보장합니다. 각 스냅샷은 테이블의 완전한 상태를 나타내며, 매니페스트 파일을 통해 데이터 파일의 위치와 통계 정보를 효율적으로 관리합니다. 이 구조 덕분에 파일 리스팅 없이도 쿼리 계획이 가능하여 S3 같은 오브젝트 스토리지에서 성능이 크게 향상됩니다.
스키마 진화(Schema Evolution)는 Iceberg의 강력한 기능 중 하나로, 컬럼 추가, 삭제, 이름 변경, 타입 변경을 데이터 재작성 없이 수행할 수 있습니다. 파티션 진화(Partition Evolution) 역시 기존 데이터를 건드리지 않고 파티셔닝 전략을 변경할 수 있어, 데이터 구조 변경에 따른 운영 부담을 최소화합니다.
Iceberg는 Spark, Flink, Trino, Presto, Hive, Dremio 등 다양한 컴퓨트 엔진과 호환되며, 동일한 테이블에 대해 여러 엔진이 동시에 읽기/쓰기를 수행할 수 있습니다. AWS, Azure, GCP의 주요 클라우드 서비스에서 네이티브 지원을 제공하며, 데이터 레이크하우스 아키텍처의 핵심 구성 요소로 자리잡고 있습니다.
# Apache Iceberg 테이블 생성 및 관리 (PySpark)
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, lit, current_timestamp
# Spark 세션 설정 (Iceberg 카탈로그 포함)
spark = SparkSession.builder \
.appName("IcebergExample") \
.config("spark.sql.extensions",
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.iceberg_catalog",
"org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.iceberg_catalog.type", "hadoop") \
.config("spark.sql.catalog.iceberg_catalog.warehouse",
"s3://data-lake/iceberg/warehouse") \
.config("spark.sql.defaultCatalog", "iceberg_catalog") \
.getOrCreate()
# ============================================
# 테이블 생성 (SQL)
# ============================================
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg_catalog.analytics.user_events (
event_id STRING,
user_id STRING,
event_type STRING,
event_data STRING,
event_timestamp TIMESTAMP,
created_at TIMESTAMP
)
USING iceberg
PARTITIONED BY (days(event_timestamp))
TBLPROPERTIES (
'format-version' = '2',
'write.format.default' = 'parquet',
'write.parquet.compression-codec' = 'zstd',
'write.metadata.compression-codec' = 'gzip',
'write.target-file-size-bytes' = '134217728'
)
""")
# ============================================
# 데이터 삽입
# ============================================
data = [
("evt001", "user_1", "click", '{"page": "home"}',
"2024-01-15 10:00:00", "2024-01-15 10:00:00"),
("evt002", "user_2", "purchase", '{"product_id": "P001", "amount": 100}',
"2024-01-15 10:05:00", "2024-01-15 10:05:00"),
("evt003", "user_1", "view", '{"page": "product"}',
"2024-01-15 10:10:00", "2024-01-15 10:10:00"),
]
schema = StructType([
StructField("event_id", StringType(), False),
StructField("user_id", StringType(), False),
StructField("event_type", StringType(), False),
StructField("event_data", StringType(), True),
StructField("event_timestamp", TimestampType(), False),
StructField("created_at", TimestampType(), False),
])
df = spark.createDataFrame(data, schema)
df.writeTo("iceberg_catalog.analytics.user_events").append()
# ============================================
# 스키마 진화 (Schema Evolution)
# ============================================
# 새 컬럼 추가
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
ADD COLUMN session_id STRING AFTER user_id
""")
# 컬럼 이름 변경
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
RENAME COLUMN event_data TO event_payload
""")
# 컬럼 타입 변경 (widening만 가능)
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
ALTER COLUMN event_payload TYPE STRING
""")
# 컬럼 삭제
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
DROP COLUMN created_at
""")
# ============================================
# 파티션 진화 (Partition Evolution)
# ============================================
# 기존 파티션 전략 변경 (days -> months)
# 기존 데이터는 그대로 유지되고, 새 데이터만 새 전략 적용
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
ADD PARTITION FIELD months(event_timestamp)
""")
# 히든 파티션 추가 (user_id 기반 버킷)
spark.sql("""
ALTER TABLE iceberg_catalog.analytics.user_events
ADD PARTITION FIELD bucket(16, user_id)
""")
# ============================================
# MERGE INTO (Upsert)
# ============================================
spark.sql("""
MERGE INTO iceberg_catalog.analytics.user_events target
USING (
SELECT 'evt001' as event_id, 'user_1' as user_id,
'click_confirmed' as event_type,
'{"page": "home", "confirmed": true}' as event_payload,
TIMESTAMP '2024-01-15 10:00:00' as event_timestamp
) source
ON target.event_id = source.event_id
WHEN MATCHED THEN
UPDATE SET event_type = source.event_type,
event_payload = source.event_payload
WHEN NOT MATCHED THEN
INSERT (event_id, user_id, event_type, event_payload, event_timestamp)
VALUES (source.event_id, source.user_id, source.event_type,
source.event_payload, source.event_timestamp)
""")
# ============================================
# 타임 트래블 (Time Travel)
# ============================================
# 스냅샷 히스토리 조회
spark.sql("""
SELECT * FROM iceberg_catalog.analytics.user_events.history
""").show()
# 특정 스냅샷 ID로 조회
spark.sql("""
SELECT * FROM iceberg_catalog.analytics.user_events
VERSION AS OF 1234567890123456789
""")
# 특정 시간으로 조회
spark.sql("""
SELECT * FROM iceberg_catalog.analytics.user_events
TIMESTAMP AS OF '2024-01-15 10:00:00'
""")
# 이전 스냅샷으로 롤백
spark.sql("""
CALL iceberg_catalog.system.rollback_to_snapshot(
'analytics.user_events',
1234567890123456789
)
""")
# ============================================
# 메타데이터 쿼리
# ============================================
# 스냅샷 정보
spark.sql("SELECT * FROM iceberg_catalog.analytics.user_events.snapshots").show()
# 매니페스트 파일 정보
spark.sql("SELECT * FROM iceberg_catalog.analytics.user_events.manifests").show()
# 데이터 파일 정보
spark.sql("SELECT * FROM iceberg_catalog.analytics.user_events.files").show()
# 파티션 정보
spark.sql("SELECT * FROM iceberg_catalog.analytics.user_events.partitions").show()
# ============================================
# 테이블 유지보수 (Maintenance)
# ============================================
# 오래된 스냅샷 만료
spark.sql("""
CALL iceberg_catalog.system.expire_snapshots(
'analytics.user_events',
TIMESTAMP '2024-01-01 00:00:00',
100 -- retain_last
)
""")
# 고아 파일 정리
spark.sql("""
CALL iceberg_catalog.system.remove_orphan_files(
'analytics.user_events',
TIMESTAMP '2024-01-01 00:00:00'
)
""")
# 매니페스트 파일 리라이트 (쿼리 성능 최적화)
spark.sql("""
CALL iceberg_catalog.system.rewrite_manifests('analytics.user_events')
""")
# 데이터 파일 컴팩션
spark.sql("""
CALL iceberg_catalog.system.rewrite_data_files(
table => 'analytics.user_events',
options => map(
'target-file-size-bytes', '134217728',
'min-file-size-bytes', '104857600',
'max-file-size-bytes', '180355072'
)
)
""")
# ============================================
# Row-Level Delete (Position Delete vs Equality Delete)
# ============================================
# 조건부 삭제
spark.sql("""
DELETE FROM iceberg_catalog.analytics.user_events
WHERE user_id = 'user_1' AND event_type = 'view'
""")
# 조건부 업데이트
spark.sql("""
UPDATE iceberg_catalog.analytics.user_events
SET event_type = 'click_v2'
WHERE event_type = 'click'
""")
print("Iceberg 테이블 작업 완료")
데이터 엔지니어: "Hive 테이블의 스키마를 변경하려면 전체 데이터를 다시 적재해야 하는데, 이 작업이 너무 오래 걸려요."
데이터 아키텍트: "Iceberg로 마이그레이션하면 스키마 진화가 메타데이터 레벨에서 처리돼서 데이터 재작성 없이 컬럼 추가나 변경이 가능해요. 파티션 전략도 기존 데이터 유지하면서 변경할 수 있고요."
데이터 엔지니어: "그러면 기존 Spark 쿼리는 수정해야 하나요?"
데이터 아키텍트: "Iceberg Spark 확장만 추가하면 대부분 SQL 그대로 사용 가능해요. 오히려 MERGE INTO 같은 SQL 표준 문법을 쓸 수 있어서 더 편해질 거예요."
면접관: "Apache Iceberg의 스냅샷 기반 아키텍처가 어떻게 동작하고, 이것이 왜 중요한지 설명해주세요."
지원자: "Iceberg는 각 쓰기 작업마다 새로운 스냅샷을 생성합니다. 스냅샷은 메타데이터 파일, 매니페스트 리스트, 매니페스트 파일로 구성되며, 각 매니페스트는 데이터 파일의 위치와 컬럼별 통계(min/max, null count)를 포함합니다. 이 구조 덕분에 쿼리 시 파일 시스템 리스팅 없이 필요한 파일만 정확히 찾을 수 있어 S3에서 성능이 크게 향상됩니다. 또한 스냅샷 격리를 통해 읽기와 쓰기가 서로 간섭하지 않고, 타임 트래블과 롤백 기능도 자연스럽게 지원됩니다."
리뷰어: "테이블 생성 시 format-version을 1로 설정했는데, v2로 변경하는 게 좋을 것 같아요."
개발자: "v2에서 뭐가 달라지나요?"
리뷰어: "v2는 row-level delete를 position delete로 처리해서 삭제 성능이 좋아요. v1은 copy-on-write라서 삭제 시 전체 파일을 다시 써야 하거든요. 그리고 v2에서는 정렬 순서 지정도 가능해서 쿼리 성능 최적화에 도움이 돼요."