Data Lake
데이터 레이크
원시 데이터를 원형 그대로 저장하는 저장소. 정형/비정형 모두 수용. S3 기반.
데이터 레이크
원시 데이터를 원형 그대로 저장하는 저장소. 정형/비정형 모두 수용. S3 기반.
Data Lake는 구조화된 데이터, 반구조화된 데이터, 비구조화된 데이터를 원시 형태 그대로 저장할 수 있는 중앙 집중식 저장소입니다. 기존의 데이터 웨어하우스가 미리 정의된 스키마에 따라 정형 데이터만 저장하는 것과 달리, 데이터 레이크는 Schema-on-Read 방식을 채택하여 데이터를 먼저 저장하고 분석 시점에 스키마를 적용합니다. 이러한 유연성 덕분에 다양한 소스에서 발생하는 대용량 데이터를 빠르게 수집하고 저장할 수 있습니다.
데이터 레이크 아키텍처는 일반적으로 Bronze(Raw), Silver(Cleansed), Gold(Curated)의 3계층 구조로 설계됩니다. Bronze 레이어에는 원시 데이터가 그대로 저장되고, Silver 레이어에서 정제 및 변환 작업이 수행되며, Gold 레이어에는 비즈니스 분석에 즉시 사용 가능한 집계 데이터가 저장됩니다. 이러한 계층화된 접근 방식을 Medallion Architecture라고 하며, 데이터 품질 관리와 거버넌스를 체계적으로 수행할 수 있게 합니다.
주요 구현 기술로는 AWS S3, Azure Data Lake Storage(ADLS), Google Cloud Storage(GCS) 등의 객체 스토리지가 사용됩니다. 이들은 저렴한 비용으로 페타바이트 규모의 데이터를 저장할 수 있으며, 컴퓨팅과 스토리지를 분리하여 각각 독립적으로 확장할 수 있습니다. 데이터 포맷으로는 Parquet, ORC, Avro 같은 컬럼형 포맷이 널리 사용되며, 이를 통해 분석 쿼리 성능을 크게 향상시킬 수 있습니다.
데이터 레이크의 성공적인 운영을 위해서는 데이터 카탈로그, 메타데이터 관리, 접근 제어, 데이터 품질 모니터링이 필수적입니다. AWS Glue Data Catalog, Apache Hive Metastore, Unity Catalog 등을 통해 데이터를 검색하고 관리할 수 있습니다. 적절한 거버넌스 없이 운영되는 데이터 레이크는 "데이터 늪(Data Swamp)"이 될 수 있으므로, 초기 설계 단계부터 데이터 품질과 거버넌스 정책을 수립하는 것이 중요합니다.
AWS S3 기반 Data Lake에서 Medallion Architecture를 구현하고 PySpark로 데이터 처리하는 예제입니다.
# Data Lake Medallion Architecture 구현
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import * # Delta Lake 사용 시
import boto3
# Spark Session 생성 (S3 연동)
spark = SparkSession.builder \
.appName("DataLakePipeline") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider",
"com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.getOrCreate()
# Data Lake 경로 정의
DATA_LAKE_BUCKET = "s3a://company-data-lake"
BRONZE_PATH = f"{DATA_LAKE_BUCKET}/bronze"
SILVER_PATH = f"{DATA_LAKE_BUCKET}/silver"
GOLD_PATH = f"{DATA_LAKE_BUCKET}/gold"
# ============================================
# Bronze Layer: 원시 데이터 수집
# ============================================
def ingest_to_bronze(source_path: str, table_name: str):
"""원시 데이터를 Bronze 레이어에 적재"""
# 다양한 포맷 지원
if source_path.endswith('.json'):
df = spark.read.json(source_path)
elif source_path.endswith('.csv'):
df = spark.read.option("header", "true").csv(source_path)
elif source_path.endswith('.parquet'):
df = spark.read.parquet(source_path)
else:
# 스트리밍 데이터 (Kafka 등)
df = spark.read.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", source_path) \
.load()
# 메타데이터 추가
df_with_metadata = df \
.withColumn("_ingestion_timestamp", current_timestamp()) \
.withColumn("_source_file", lit(source_path)) \
.withColumn("_ingestion_date", current_date())
# Bronze 레이어에 저장 (파티션: 날짜별)
bronze_table_path = f"{BRONZE_PATH}/{table_name}"
df_with_metadata.write \
.format("delta") \
.mode("append") \
.partitionBy("_ingestion_date") \
.save(bronze_table_path)
print(f"Ingested {df_with_metadata.count()} records to Bronze: {table_name}")
return bronze_table_path
# ============================================
# Silver Layer: 데이터 정제 및 변환
# ============================================
def transform_to_silver(bronze_table: str, silver_table: str):
"""Bronze 데이터를 정제하여 Silver 레이어로 변환"""
bronze_path = f"{BRONZE_PATH}/{bronze_table}"
silver_path = f"{SILVER_PATH}/{silver_table}"
# Bronze 데이터 읽기
df_bronze = spark.read.format("delta").load(bronze_path)
# 데이터 정제 및 변환
df_silver = df_bronze \
.dropDuplicates(["id"]) \
.filter(col("id").isNotNull()) \
.withColumn("email", lower(trim(col("email")))) \
.withColumn("phone", regexp_replace(col("phone"), "[^0-9]", "")) \
.withColumn("created_at", to_timestamp(col("created_at"))) \
.withColumn("_processed_timestamp", current_timestamp())
# 데이터 품질 검증
quality_checks = {
"null_check": df_silver.filter(col("id").isNull()).count() == 0,
"email_format": df_silver.filter(~col("email").rlike("^[a-z0-9._%+-]+@[a-z0-9.-]+\\.[a-z]{2,}$")).count() == 0,
"duplicate_check": df_silver.count() == df_silver.dropDuplicates(["id"]).count()
}
print("Data Quality Checks:", quality_checks)
# Silver 레이어에 Merge (Upsert)
if DeltaTable.isDeltaTable(spark, silver_path):
delta_table = DeltaTable.forPath(spark, silver_path)
delta_table.alias("target").merge(
df_silver.alias("source"),
"target.id = source.id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
else:
df_silver.write.format("delta").mode("overwrite").save(silver_path)
return silver_path
# ============================================
# Gold Layer: 비즈니스 집계 데이터
# ============================================
def aggregate_to_gold(silver_tables: list, gold_table: str):
"""Silver 데이터를 집계하여 Gold 레이어 생성"""
# 여러 Silver 테이블 조인
df_users = spark.read.format("delta").load(f"{SILVER_PATH}/users")
df_orders = spark.read.format("delta").load(f"{SILVER_PATH}/orders")
df_products = spark.read.format("delta").load(f"{SILVER_PATH}/products")
# 비즈니스 메트릭 집계
df_gold = df_orders \
.join(df_users, "user_id") \
.join(df_products, "product_id") \
.groupBy(
date_trunc("month", col("order_date")).alias("month"),
col("category"),
col("region")
) \
.agg(
countDistinct("user_id").alias("unique_customers"),
count("order_id").alias("total_orders"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
percentile_approx("amount", 0.5).alias("median_order_value")
) \
.withColumn("_aggregation_timestamp", current_timestamp())
# Gold 레이어에 저장
gold_path = f"{GOLD_PATH}/{gold_table}"
df_gold.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("month") \
.save(gold_path)
return gold_path
# ============================================
# 데이터 카탈로그 등록 (AWS Glue)
# ============================================
def register_to_glue_catalog(table_path: str, database: str, table_name: str):
"""Glue Data Catalog에 테이블 등록"""
# Glue 카탈로그에 테이블 생성
spark.sql(f"""
CREATE TABLE IF NOT EXISTS glue_catalog.{database}.{table_name}
USING DELTA
LOCATION '{table_path}'
""")
# 테이블 통계 수집
spark.sql(f"ANALYZE TABLE glue_catalog.{database}.{table_name} COMPUTE STATISTICS")
print(f"Registered table: {database}.{table_name}")
# ============================================
# 데이터 레이크 파이프라인 실행
# ============================================
if __name__ == "__main__":
# Bronze: 원시 데이터 수집
ingest_to_bronze("s3a://raw-data/users/*.json", "users")
ingest_to_bronze("s3a://raw-data/orders/*.csv", "orders")
ingest_to_bronze("s3a://raw-data/products/*.parquet", "products")
# Silver: 데이터 정제
transform_to_silver("users", "users")
transform_to_silver("orders", "orders")
transform_to_silver("products", "products")
# Gold: 비즈니스 집계
aggregate_to_gold(
["users", "orders", "products"],
"monthly_sales_summary"
)
# Glue Catalog 등록
register_to_glue_catalog(f"{GOLD_PATH}/monthly_sales_summary", "analytics", "monthly_sales")
print("Data Lake Pipeline completed successfully!")