📊
데이터공학
Hudi
Apache Hudi
데이터 레이크 테이블 포맷. 증분 처리, 업서트 지원.
Apache Hudi
데이터 레이크 테이블 포맷. 증분 처리, 업서트 지원.
Apache Hudi (Hadoop Upserts Deletes and Incrementals)는 데이터 레이크에서 레코드 수준의 업데이트, 삭제, 증분 처리를 지원하는 오픈소스 테이블 포맷입니다. Uber에서 개발되어 대규모 분석 워크로드에서 사용됩니다.
| 유형 | 특징 | 사용 사례 |
|---|---|---|
| Copy on Write (CoW) | 쓰기 시 전체 파일 재작성 | 읽기 중심, 배치 처리 |
| Merge on Read (MoR) | 델타 로그로 쓰기, 읽기 시 병합 | 쓰기 중심, 실시간 처리 |
| 기능 | Hudi | Delta Lake | Iceberg |
|---|---|---|---|
| Upsert 성능 | 우수 (인덱싱) | 양호 | 양호 |
| 증분 쿼리 | 네이티브 지원 | CDF 필요 | 제한적 |
| 스트리밍 | Spark/Flink | Spark 중심 | Flink 우수 |
| 인덱싱 | 다양한 인덱스 | 제한적 | 제한적 |
# PySpark로 Hudi 테이블 Upsert
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp
# Spark 세션 생성 (Hudi 패키지 포함)
spark = SparkSession.builder \
.appName("HudiUpsertExample") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.getOrCreate()
# Hudi 테이블 설정
hudi_options = {
'hoodie.table.name': 'user_events',
'hoodie.datasource.write.recordkey.field': 'event_id',
'hoodie.datasource.write.partitionpath.field': 'event_date',
'hoodie.datasource.write.precombine.field': 'updated_at',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
'hoodie.upsert.shuffle.parallelism': 100,
'hoodie.insert.shuffle.parallelism': 100,
# 인덱싱 설정 (Upsert 성능 향상)
'hoodie.index.type': 'BLOOM',
'hoodie.bloom.index.filter.dynamic.max.entries': 100000
}
# 초기 데이터 삽입
initial_data = [
('evt_001', 'user_1', 'click', '2024-01-15', '2024-01-15 10:00:00'),
('evt_002', 'user_2', 'purchase', '2024-01-15', '2024-01-15 11:00:00'),
('evt_003', 'user_1', 'view', '2024-01-15', '2024-01-15 12:00:00')
]
df_initial = spark.createDataFrame(
initial_data,
['event_id', 'user_id', 'event_type', 'event_date', 'updated_at']
)
# Hudi 테이블에 쓰기
df_initial.write \
.format('hudi') \
.options(**hudi_options) \
.mode('overwrite') \
.save('/data/hudi/user_events')
print("초기 데이터 삽입 완료")
# Upsert 데이터 (기존 레코드 업데이트 + 신규 삽입)
upsert_data = [
# 기존 레코드 업데이트 (event_type 변경)
('evt_001', 'user_1', 'purchase', '2024-01-15', '2024-01-15 14:00:00'),
# 신규 레코드 삽입
('evt_004', 'user_3', 'signup', '2024-01-15', '2024-01-15 15:00:00'),
('evt_005', 'user_2', 'view', '2024-01-15', '2024-01-15 16:00:00')
]
df_upsert = spark.createDataFrame(
upsert_data,
['event_id', 'user_id', 'event_type', 'event_date', 'updated_at']
)
# Upsert 실행
df_upsert.write \
.format('hudi') \
.options(**hudi_options) \
.mode('append') \
.save('/data/hudi/user_events')
print("Upsert 완료")
# 결과 확인
df_result = spark.read.format('hudi').load('/data/hudi/user_events')
df_result.show()
# Hudi 증분 쿼리 - 변경된 데이터만 조회
from datetime import datetime, timedelta
# 최근 커밋 타임스탬프 조회
commits_df = spark.read \
.format('hudi') \
.load('/data/hudi/user_events') \
.select('_hoodie_commit_time') \
.distinct() \
.orderBy('_hoodie_commit_time')
commits_df.show()
# 특정 시점 이후 변경 데이터 조회 (증분 쿼리)
begin_time = '20240115100000' # yyyyMMddHHmmss 형식
incremental_df = spark.read \
.format('hudi') \
.option('hoodie.datasource.query.type', 'incremental') \
.option('hoodie.datasource.read.begin.instanttime', begin_time) \
.load('/data/hudi/user_events')
print(f"{begin_time} 이후 변경된 레코드:")
incremental_df.show()
# 실시간 파이프라인에서 증분 처리
def process_incremental_batch(last_commit_time):
"""마지막 처리 시점 이후 데이터만 처리"""
incremental_df = spark.read \
.format('hudi') \
.option('hoodie.datasource.query.type', 'incremental') \
.option('hoodie.datasource.read.begin.instanttime', last_commit_time) \
.load('/data/hudi/user_events')
if incremental_df.count() > 0:
# 비즈니스 로직 처리
processed_df = incremental_df.groupBy('user_id') \
.count() \
.withColumnRenamed('count', 'event_count')
# 결과 저장 또는 전송
processed_df.write.mode('append').parquet('/data/processed/events')
# 새로운 커밋 타임스탬프 반환
new_commit_time = incremental_df.select(
max('_hoodie_commit_time')
).collect()[0][0]
return new_commit_time
return last_commit_time
# Merge on Read 테이블 설정
mor_options = {
'hoodie.table.name': 'realtime_events',
'hoodie.datasource.write.recordkey.field': 'event_id',
'hoodie.datasource.write.partitionpath.field': 'event_date',
'hoodie.datasource.write.precombine.field': 'event_timestamp',
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
# 비동기 컴팩션 설정
'hoodie.compact.inline': 'false',
'hoodie.compact.inline.max.delta.commits': 5,
# 클리닝 설정
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.cleaner.commits.retained': 10
}
# MoR 테이블에 스트리밍 데이터 쓰기
streaming_df.writeStream \
.format('hudi') \
.options(**mor_options) \
.option('checkpointLocation', '/checkpoints/realtime_events') \
.outputMode('append') \
.start('/data/hudi/realtime_events')
# MoR 테이블 읽기 - Snapshot Query (병합된 최신 데이터)
snapshot_df = spark.read \
.format('hudi') \
.option('hoodie.datasource.query.type', 'snapshot') \
.load('/data/hudi/realtime_events')
# MoR 테이블 읽기 - Read Optimized Query (컴팩션된 데이터만)
read_optimized_df = spark.read \
.format('hudi') \
.option('hoodie.datasource.query.type', 'read_optimized') \
.load('/data/hudi/realtime_events')
# 수동 컴팩션 실행
from hudi.spark import HoodieSparkClient
hudi_client = HoodieSparkClient(spark, '/data/hudi/realtime_events')
hudi_client.schedule_compaction()
hudi_client.run_compaction()
-- Hudi 테이블 생성 (Spark SQL)
CREATE TABLE user_profiles (
user_id STRING,
name STRING,
email STRING,
status STRING,
updated_at TIMESTAMP
) USING hudi
TBLPROPERTIES (
'hoodie.table.name' = 'user_profiles',
'hoodie.datasource.write.recordkey.field' = 'user_id',
'hoodie.datasource.write.precombine.field' = 'updated_at',
'hoodie.datasource.write.operation' = 'upsert',
'type' = 'cow'
)
PARTITIONED BY (status)
LOCATION '/data/hudi/user_profiles';
-- Upsert (MERGE INTO)
MERGE INTO user_profiles AS target
USING source_updates AS source
ON target.user_id = source.user_id
WHEN MATCHED THEN
UPDATE SET
name = source.name,
email = source.email,
status = source.status,
updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT (user_id, name, email, status, updated_at)
VALUES (source.user_id, source.name, source.email, source.status, source.updated_at);
-- 삭제
DELETE FROM user_profiles WHERE status = 'deleted';
-- Time Travel 쿼리
SELECT * FROM user_profiles TIMESTAMP AS OF '2024-01-15 10:00:00';
-- 커밋 이력 조회
CALL show_commits('user_profiles', 10);