📊 데이터공학

Hudi

Apache Hudi

데이터 레이크 테이블 포맷. 증분 처리, 업서트 지원.

상세 설명

Apache Hudi (Hadoop Upserts Deletes and Incrementals)는 데이터 레이크에서 레코드 수준의 업데이트, 삭제, 증분 처리를 지원하는 오픈소스 테이블 포맷입니다. Uber에서 개발되어 대규모 분석 워크로드에서 사용됩니다.

Hudi의 핵심 특징

  • Upsert 지원: 레코드 수준의 업데이트 및 삽입 처리
  • 증분 쿼리: 변경된 데이터만 효율적으로 조회
  • ACID 트랜잭션: 데이터 일관성 보장
  • 타임 트래블: 과거 시점 데이터 조회
  • 자동 컴팩션: 파일 최적화 자동화

Hudi 테이블 유형

유형특징사용 사례
Copy on Write (CoW)쓰기 시 전체 파일 재작성읽기 중심, 배치 처리
Merge on Read (MoR)델타 로그로 쓰기, 읽기 시 병합쓰기 중심, 실시간 처리

Hudi vs Delta Lake vs Iceberg

기능HudiDelta LakeIceberg
Upsert 성능우수 (인덱싱)양호양호
증분 쿼리네이티브 지원CDF 필요제한적
스트리밍Spark/FlinkSpark 중심Flink 우수
인덱싱다양한 인덱스제한적제한적

코드 예제

PySpark로 Hudi Upsert 처리

# PySpark로 Hudi 테이블 Upsert
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp

# Spark 세션 생성 (Hudi 패키지 포함)
spark = SparkSession.builder \
    .appName("HudiUpsertExample") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
    .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
    .getOrCreate()

# Hudi 테이블 설정
hudi_options = {
    'hoodie.table.name': 'user_events',
    'hoodie.datasource.write.recordkey.field': 'event_id',
    'hoodie.datasource.write.partitionpath.field': 'event_date',
    'hoodie.datasource.write.precombine.field': 'updated_at',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.upsert.shuffle.parallelism': 100,
    'hoodie.insert.shuffle.parallelism': 100,
    # 인덱싱 설정 (Upsert 성능 향상)
    'hoodie.index.type': 'BLOOM',
    'hoodie.bloom.index.filter.dynamic.max.entries': 100000
}

# 초기 데이터 삽입
initial_data = [
    ('evt_001', 'user_1', 'click', '2024-01-15', '2024-01-15 10:00:00'),
    ('evt_002', 'user_2', 'purchase', '2024-01-15', '2024-01-15 11:00:00'),
    ('evt_003', 'user_1', 'view', '2024-01-15', '2024-01-15 12:00:00')
]

df_initial = spark.createDataFrame(
    initial_data,
    ['event_id', 'user_id', 'event_type', 'event_date', 'updated_at']
)

# Hudi 테이블에 쓰기
df_initial.write \
    .format('hudi') \
    .options(**hudi_options) \
    .mode('overwrite') \
    .save('/data/hudi/user_events')

print("초기 데이터 삽입 완료")

# Upsert 데이터 (기존 레코드 업데이트 + 신규 삽입)
upsert_data = [
    # 기존 레코드 업데이트 (event_type 변경)
    ('evt_001', 'user_1', 'purchase', '2024-01-15', '2024-01-15 14:00:00'),
    # 신규 레코드 삽입
    ('evt_004', 'user_3', 'signup', '2024-01-15', '2024-01-15 15:00:00'),
    ('evt_005', 'user_2', 'view', '2024-01-15', '2024-01-15 16:00:00')
]

df_upsert = spark.createDataFrame(
    upsert_data,
    ['event_id', 'user_id', 'event_type', 'event_date', 'updated_at']
)

# Upsert 실행
df_upsert.write \
    .format('hudi') \
    .options(**hudi_options) \
    .mode('append') \
    .save('/data/hudi/user_events')

print("Upsert 완료")

# 결과 확인
df_result = spark.read.format('hudi').load('/data/hudi/user_events')
df_result.show()

증분 쿼리 (Incremental Query)

# Hudi 증분 쿼리 - 변경된 데이터만 조회
from datetime import datetime, timedelta

# 최근 커밋 타임스탬프 조회
commits_df = spark.read \
    .format('hudi') \
    .load('/data/hudi/user_events') \
    .select('_hoodie_commit_time') \
    .distinct() \
    .orderBy('_hoodie_commit_time')

commits_df.show()

# 특정 시점 이후 변경 데이터 조회 (증분 쿼리)
begin_time = '20240115100000'  # yyyyMMddHHmmss 형식

incremental_df = spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type', 'incremental') \
    .option('hoodie.datasource.read.begin.instanttime', begin_time) \
    .load('/data/hudi/user_events')

print(f"{begin_time} 이후 변경된 레코드:")
incremental_df.show()

# 실시간 파이프라인에서 증분 처리
def process_incremental_batch(last_commit_time):
    """마지막 처리 시점 이후 데이터만 처리"""
    incremental_df = spark.read \
        .format('hudi') \
        .option('hoodie.datasource.query.type', 'incremental') \
        .option('hoodie.datasource.read.begin.instanttime', last_commit_time) \
        .load('/data/hudi/user_events')

    if incremental_df.count() > 0:
        # 비즈니스 로직 처리
        processed_df = incremental_df.groupBy('user_id') \
            .count() \
            .withColumnRenamed('count', 'event_count')

        # 결과 저장 또는 전송
        processed_df.write.mode('append').parquet('/data/processed/events')

        # 새로운 커밋 타임스탬프 반환
        new_commit_time = incremental_df.select(
            max('_hoodie_commit_time')
        ).collect()[0][0]
        return new_commit_time

    return last_commit_time

Merge on Read (MoR) 테이블 및 컴팩션

# Merge on Read 테이블 설정
mor_options = {
    'hoodie.table.name': 'realtime_events',
    'hoodie.datasource.write.recordkey.field': 'event_id',
    'hoodie.datasource.write.partitionpath.field': 'event_date',
    'hoodie.datasource.write.precombine.field': 'event_timestamp',
    'hoodie.datasource.write.operation': 'upsert',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    # 비동기 컴팩션 설정
    'hoodie.compact.inline': 'false',
    'hoodie.compact.inline.max.delta.commits': 5,
    # 클리닝 설정
    'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
    'hoodie.cleaner.commits.retained': 10
}

# MoR 테이블에 스트리밍 데이터 쓰기
streaming_df.writeStream \
    .format('hudi') \
    .options(**mor_options) \
    .option('checkpointLocation', '/checkpoints/realtime_events') \
    .outputMode('append') \
    .start('/data/hudi/realtime_events')

# MoR 테이블 읽기 - Snapshot Query (병합된 최신 데이터)
snapshot_df = spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type', 'snapshot') \
    .load('/data/hudi/realtime_events')

# MoR 테이블 읽기 - Read Optimized Query (컴팩션된 데이터만)
read_optimized_df = spark.read \
    .format('hudi') \
    .option('hoodie.datasource.query.type', 'read_optimized') \
    .load('/data/hudi/realtime_events')

# 수동 컴팩션 실행
from hudi.spark import HoodieSparkClient

hudi_client = HoodieSparkClient(spark, '/data/hudi/realtime_events')
hudi_client.schedule_compaction()
hudi_client.run_compaction()

Hudi SQL (Spark SQL)

-- Hudi 테이블 생성 (Spark SQL)
CREATE TABLE user_profiles (
    user_id STRING,
    name STRING,
    email STRING,
    status STRING,
    updated_at TIMESTAMP
) USING hudi
TBLPROPERTIES (
    'hoodie.table.name' = 'user_profiles',
    'hoodie.datasource.write.recordkey.field' = 'user_id',
    'hoodie.datasource.write.precombine.field' = 'updated_at',
    'hoodie.datasource.write.operation' = 'upsert',
    'type' = 'cow'
)
PARTITIONED BY (status)
LOCATION '/data/hudi/user_profiles';

-- Upsert (MERGE INTO)
MERGE INTO user_profiles AS target
USING source_updates AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN
    UPDATE SET
        name = source.name,
        email = source.email,
        status = source.status,
        updated_at = source.updated_at
WHEN NOT MATCHED THEN
    INSERT (user_id, name, email, status, updated_at)
    VALUES (source.user_id, source.name, source.email, source.status, source.updated_at);

-- 삭제
DELETE FROM user_profiles WHERE status = 'deleted';

-- Time Travel 쿼리
SELECT * FROM user_profiles TIMESTAMP AS OF '2024-01-15 10:00:00';

-- 커밋 이력 조회
CALL show_commits('user_profiles', 10);

실무 대화 예시

데이터 엔지니어: "CDC 데이터를 데이터 레이크에 적재해야 하는데, 매번 전체 데이터를 다시 쓰는 건 비효율적이에요."
시니어 엔지니어: "Hudi 쓰면 돼. Upsert 연산으로 변경된 레코드만 업데이트하고, 인덱싱으로 빠르게 대상 레코드를 찾아줘."
데이터 엔지니어: "CoW랑 MoR 중에 뭐가 좋을까요?"
시니어 엔지니어: "읽기 성능이 중요하면 CoW, 쓰기 빈도가 높으면 MoR이 좋아. MoR은 델타 로그로 빠르게 쓰고 나중에 컴팩션하거든. 근데 MoR은 읽을 때 병합 오버헤드가 있어."
데이터 엔지니어: "하루에 수백만 건 업데이트가 있는데 성능이 걱정되네요."
시니어 엔지니어: "Hudi의 BLOOM 인덱스가 도움 될 거야. 레코드 키 기반으로 어떤 파일에 있는지 빠르게 찾아. 그리고 파티셔닝도 잘 설계하면 업데이트 범위를 줄일 수 있어."

주의사항

테이블 유형 선택

  • CoW는 쓰기 시 전체 파일 재작성 - 대용량 파티션에서 느림
  • MoR은 컴팩션 지연 시 읽기 성능 저하
  • 워크로드 특성에 맞는 유형 선택 필수

인덱싱 및 성능

  • BLOOM 인덱스는 false positive 발생 가능 - 파라미터 튜닝 필요
  • 레코드 키 분포가 균일해야 효율적
  • 파티셔닝 전략이 Upsert 성능에 큰 영향

운영 고려사항

  • 컴팩션과 클리닝 스케줄링 적절히 설정
  • 작은 파일 문제 (Small Files) 모니터링 필요
  • 메타데이터 테이블 활성화로 성능 향상 가능

관련 용어

더 배우기