📊 데이터공학

Apache Hudi

스트리밍 데이터 레이크 플랫폼

상세 설명

Apache Hudi(Hadoop Upserts Deletes and Incrementals)는 Uber에서 개발한 오픈소스 데이터 레이크 플랫폼으로, 대규모 분석 데이터셋에서 레코드 수준의 삽입, 업데이트, 삭제 작업을 효율적으로 처리합니다. 기존 데이터 레이크의 한계인 변경 불가능성(immutability)을 극복하여, ACID 트랜잭션과 증분 데이터 처리를 지원합니다.

Hudi는 Copy-on-Write(CoW)와 Merge-on-Read(MoR) 두 가지 테이블 타입을 제공합니다. CoW는 쓰기 시점에 전체 파일을 다시 작성하여 읽기 성능을 최적화하고, MoR은 변경 사항을 델타 로그에 저장한 후 읽기 시점에 병합하여 쓰기 지연 시간을 최소화합니다. 워크로드 특성에 따라 적절한 타입을 선택할 수 있습니다.

증분 처리(Incremental Processing)는 Hudi의 핵심 기능으로, 변경된 데이터만 추출하여 다운스트림 파이프라인으로 전달합니다. 이를 통해 전체 테이블을 스캔하는 비효율을 제거하고, 실시간에 가까운 데이터 처리가 가능합니다. Spark, Flink, Presto, Trino 등 다양한 쿼리 엔진과 통합되어 있습니다.

Hudi는 타임 트래블(Time Travel), 롤백, 동시성 제어, 클러스터링, 컴팩션 등 엔터프라이즈급 기능을 제공합니다. AWS, Azure, GCP의 클라우드 스토리지와 완벽하게 호환되며, Debezium 등 CDC 도구와 연동하여 데이터베이스 변경 사항을 실시간으로 데이터 레이크에 반영할 수 있습니다.

코드 예제

# Apache Hudi 테이블 설정 및 upsert 예제 (PySpark)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp

# Spark 세션 생성 (Hudi 번들 포함)
spark = SparkSession.builder \
    .appName("HudiExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions",
            "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .getOrCreate()

# Hudi 테이블 설정
hudi_options = {
    # 테이블 기본 설정
    "hoodie.table.name": "user_events",
    "hoodie.datasource.write.table.type": "COPY_ON_WRITE",  # 또는 MERGE_ON_READ

    # 레코드 키 및 파티션 설정
    "hoodie.datasource.write.recordkey.field": "event_id",
    "hoodie.datasource.write.partitionpath.field": "event_date",
    "hoodie.datasource.write.precombine.field": "updated_at",

    # 쓰기 작업 설정
    "hoodie.datasource.write.operation": "upsert",
    "hoodie.upsert.shuffle.parallelism": 100,

    # 인덱스 설정 (Bloom Filter 기본)
    "hoodie.index.type": "BLOOM",
    "hoodie.bloom.index.update.partition.path": "true",

    # 파일 크기 최적화
    "hoodie.parquet.max.file.size": 128 * 1024 * 1024,  # 128MB
    "hoodie.parquet.small.file.limit": 100 * 1024 * 1024,  # 100MB

    # 메타데이터 테이블 활성화 (성능 향상)
    "hoodie.metadata.enable": "true",
    "hoodie.metadata.index.column.stats.enable": "true"
}

# 샘플 데이터 생성
initial_data = [
    ("evt001", "user_1", "click", "2024-01-15", "2024-01-15 10:00:00"),
    ("evt002", "user_2", "purchase", "2024-01-15", "2024-01-15 10:05:00"),
    ("evt003", "user_1", "view", "2024-01-15", "2024-01-15 10:10:00"),
]

df = spark.createDataFrame(
    initial_data,
    ["event_id", "user_id", "event_type", "event_date", "updated_at"]
)

# Hudi 테이블에 쓰기
table_path = "s3://data-lake/hudi/user_events"

df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("overwrite") \
    .save(table_path)

print("초기 데이터 적재 완료")

# Upsert 작업 (업데이트 + 삽입)
upsert_data = [
    # 기존 레코드 업데이트
    ("evt001", "user_1", "click_confirmed", "2024-01-15", "2024-01-15 10:30:00"),
    # 새로운 레코드 삽입
    ("evt004", "user_3", "signup", "2024-01-15", "2024-01-15 10:35:00"),
]

upsert_df = spark.createDataFrame(
    upsert_data,
    ["event_id", "user_id", "event_type", "event_date", "updated_at"]
)

upsert_df.write.format("hudi") \
    .options(**hudi_options) \
    .mode("append") \
    .save(table_path)

print("Upsert 완료")

# ============================================
# 증분 쿼리 (Incremental Query)
# ============================================
# 특정 커밋 이후 변경된 데이터만 조회
incremental_options = {
    "hoodie.datasource.query.type": "incremental",
    "hoodie.datasource.read.begin.instanttime": "20240115100000000",
    "hoodie.datasource.read.end.instanttime": "20240115110000000"
}

incremental_df = spark.read.format("hudi") \
    .options(**incremental_options) \
    .load(table_path)

print("증분 변경 데이터:")
incremental_df.show()

# ============================================
# 타임 트래블 쿼리
# ============================================
# 특정 시점의 데이터 조회
time_travel_df = spark.read.format("hudi") \
    .option("as.of.instant", "20240115100500000") \
    .load(table_path)

print("특정 시점 데이터:")
time_travel_df.show()

# ============================================
# Soft Delete 구현
# ============================================
delete_options = hudi_options.copy()
delete_options["hoodie.datasource.write.operation"] = "delete"

# 삭제할 레코드
delete_df = spark.createDataFrame(
    [("evt002", "user_2", "purchase", "2024-01-15", "2024-01-15 11:00:00")],
    ["event_id", "user_id", "event_type", "event_date", "updated_at"]
)

delete_df.write.format("hudi") \
    .options(**delete_options) \
    .mode("append") \
    .save(table_path)

# ============================================
# Compaction 실행 (MoR 테이블용)
# ============================================
from pyspark.sql import functions as F

# 인라인 컴팩션 설정
mor_options = hudi_options.copy()
mor_options.update({
    "hoodie.datasource.write.table.type": "MERGE_ON_READ",
    "hoodie.compact.inline": "true",
    "hoodie.compact.inline.max.delta.commits": 5,
    "hoodie.compaction.strategy": "org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy",
    "hoodie.compaction.target.io": 500 * 1024 * 1024  # 500MB
})

# ============================================
# Clustering (파일 레이아웃 최적화)
# ============================================
clustering_options = hudi_options.copy()
clustering_options.update({
    "hoodie.clustering.inline": "true",
    "hoodie.clustering.inline.max.commits": 4,
    "hoodie.clustering.plan.strategy.target.file.max.bytes": 1073741824,  # 1GB
    "hoodie.clustering.plan.strategy.sort.columns": "user_id,event_type"
})

# ============================================
# Hudi SQL (Spark SQL 사용)
# ============================================
# 테이블 생성
spark.sql("""
    CREATE TABLE IF NOT EXISTS user_events_sql (
        event_id STRING,
        user_id STRING,
        event_type STRING,
        event_date STRING,
        updated_at TIMESTAMP
    ) USING hudi
    TBLPROPERTIES (
        'primaryKey' = 'event_id',
        'preCombineField' = 'updated_at',
        'type' = 'cow'
    )
    PARTITIONED BY (event_date)
    LOCATION 's3://data-lake/hudi/user_events_sql'
""")

# MERGE INTO 문 (Upsert)
spark.sql("""
    MERGE INTO user_events_sql target
    USING source_updates source
    ON target.event_id = source.event_id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *
""")

# 타임 트래블 SQL
spark.sql("""
    SELECT * FROM user_events_sql
    TIMESTAMP AS OF '2024-01-15 10:00:00'
""")

print("Hudi 작업 완료")

실무에서 이렇게 쓰여요

데이터 엔지니어: "실시간 주문 데이터를 데이터 레이크에 반영하는데, 현재는 매일 전체 테이블을 다시 적재하고 있어요. 효율적인 방법이 없을까요?"

테크 리드: "Apache Hudi를 도입하면 증분 처리가 가능해요. CDC로 변경분만 캡처해서 Hudi 테이블에 upsert하면 되고, 다운스트림에서는 증분 쿼리로 변경된 데이터만 가져갈 수 있어요."

데이터 엔지니어: "테이블 타입은 CoW와 MoR 중 뭐가 좋을까요?"

테크 리드: "읽기가 많고 쓰기가 적으면 CoW, 쓰기가 빈번하면 MoR이 좋아요. 우리 주문 데이터는 쓰기가 많으니 MoR로 시작하고, 컴팩션 주기를 조정하면서 최적화하죠."

면접관: "Apache Hudi의 Copy-on-Write와 Merge-on-Read 테이블 타입의 차이점과 각각의 사용 시나리오를 설명해주세요."

지원자: "CoW는 쓰기 시 전체 파일을 다시 작성하므로 쓰기 지연이 크지만 읽기 성능이 우수합니다. 분석 워크로드처럼 읽기가 많고 쓰기가 적은 경우에 적합해요. MoR은 델타 로그에 변경사항을 기록하고 나중에 컴팩션하므로 쓰기가 빠르지만 읽기 시 병합 오버헤드가 있습니다. 실시간 수집이 필요한 이벤트 로그나 CDC 파이프라인에 적합합니다. 또한 MoR은 Snapshot Query와 Read Optimized Query를 제공해서 최신성과 성능 중 선택할 수 있어요."

리뷰어: "Hudi 인덱스 타입을 GLOBAL_BLOOM으로 설정했는데, 파티션 수가 많아지면 성능 이슈가 있을 수 있어요."

개발자: "맞아요. GLOBAL_BLOOM은 모든 파티션을 스캔해야 해서 느릴 수 있죠. 대신 BUCKET 인덱스를 사용하면 해시 기반으로 특정 파티션만 스캔할 수 있어요."

리뷰어: "좋아요. 그리고 hoodie.metadata.enable을 활성화하면 파일 리스팅 성능도 크게 개선돼요. S3 같은 오브젝트 스토리지에서 특히 효과적이에요."

주의사항

관련 용어

더 배우기