📊 데이터공학

Data Lake

데이터 레이크

원시 데이터를 원형 그대로 저장하는 저장소. 정형/비정형 모두 수용. S3 기반.

상세 설명

Data Lake는 구조화된 데이터, 반구조화된 데이터, 비구조화된 데이터를 원시 형태 그대로 저장할 수 있는 중앙 집중식 저장소입니다. 기존의 데이터 웨어하우스가 미리 정의된 스키마에 따라 정형 데이터만 저장하는 것과 달리, 데이터 레이크는 Schema-on-Read 방식을 채택하여 데이터를 먼저 저장하고 분석 시점에 스키마를 적용합니다. 이러한 유연성 덕분에 다양한 소스에서 발생하는 대용량 데이터를 빠르게 수집하고 저장할 수 있습니다.

데이터 레이크 아키텍처는 일반적으로 Bronze(Raw), Silver(Cleansed), Gold(Curated)의 3계층 구조로 설계됩니다. Bronze 레이어에는 원시 데이터가 그대로 저장되고, Silver 레이어에서 정제 및 변환 작업이 수행되며, Gold 레이어에는 비즈니스 분석에 즉시 사용 가능한 집계 데이터가 저장됩니다. 이러한 계층화된 접근 방식을 Medallion Architecture라고 하며, 데이터 품질 관리와 거버넌스를 체계적으로 수행할 수 있게 합니다.

주요 구현 기술로는 AWS S3, Azure Data Lake Storage(ADLS), Google Cloud Storage(GCS) 등의 객체 스토리지가 사용됩니다. 이들은 저렴한 비용으로 페타바이트 규모의 데이터를 저장할 수 있으며, 컴퓨팅과 스토리지를 분리하여 각각 독립적으로 확장할 수 있습니다. 데이터 포맷으로는 Parquet, ORC, Avro 같은 컬럼형 포맷이 널리 사용되며, 이를 통해 분석 쿼리 성능을 크게 향상시킬 수 있습니다.

데이터 레이크의 성공적인 운영을 위해서는 데이터 카탈로그, 메타데이터 관리, 접근 제어, 데이터 품질 모니터링이 필수적입니다. AWS Glue Data Catalog, Apache Hive Metastore, Unity Catalog 등을 통해 데이터를 검색하고 관리할 수 있습니다. 적절한 거버넌스 없이 운영되는 데이터 레이크는 "데이터 늪(Data Swamp)"이 될 수 있으므로, 초기 설계 단계부터 데이터 품질과 거버넌스 정책을 수립하는 것이 중요합니다.

코드 예제

AWS S3 기반 Data Lake에서 Medallion Architecture를 구현하고 PySpark로 데이터 처리하는 예제입니다.

Python (PySpark - Data Lake Architecture)
# Data Lake Medallion Architecture 구현
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta import *  # Delta Lake 사용 시
import boto3

# Spark Session 생성 (S3 연동)
spark = SparkSession.builder \
    .appName("DataLakePipeline") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
    .getOrCreate()

# Data Lake 경로 정의
DATA_LAKE_BUCKET = "s3a://company-data-lake"
BRONZE_PATH = f"{DATA_LAKE_BUCKET}/bronze"
SILVER_PATH = f"{DATA_LAKE_BUCKET}/silver"
GOLD_PATH = f"{DATA_LAKE_BUCKET}/gold"

# ============================================
# Bronze Layer: 원시 데이터 수집
# ============================================
def ingest_to_bronze(source_path: str, table_name: str):
    """원시 데이터를 Bronze 레이어에 적재"""

    # 다양한 포맷 지원
    if source_path.endswith('.json'):
        df = spark.read.json(source_path)
    elif source_path.endswith('.csv'):
        df = spark.read.option("header", "true").csv(source_path)
    elif source_path.endswith('.parquet'):
        df = spark.read.parquet(source_path)
    else:
        # 스트리밍 데이터 (Kafka 등)
        df = spark.read.format("kafka") \
            .option("kafka.bootstrap.servers", "kafka:9092") \
            .option("subscribe", source_path) \
            .load()

    # 메타데이터 추가
    df_with_metadata = df \
        .withColumn("_ingestion_timestamp", current_timestamp()) \
        .withColumn("_source_file", lit(source_path)) \
        .withColumn("_ingestion_date", current_date())

    # Bronze 레이어에 저장 (파티션: 날짜별)
    bronze_table_path = f"{BRONZE_PATH}/{table_name}"
    df_with_metadata.write \
        .format("delta") \
        .mode("append") \
        .partitionBy("_ingestion_date") \
        .save(bronze_table_path)

    print(f"Ingested {df_with_metadata.count()} records to Bronze: {table_name}")
    return bronze_table_path

# ============================================
# Silver Layer: 데이터 정제 및 변환
# ============================================
def transform_to_silver(bronze_table: str, silver_table: str):
    """Bronze 데이터를 정제하여 Silver 레이어로 변환"""

    bronze_path = f"{BRONZE_PATH}/{bronze_table}"
    silver_path = f"{SILVER_PATH}/{silver_table}"

    # Bronze 데이터 읽기
    df_bronze = spark.read.format("delta").load(bronze_path)

    # 데이터 정제 및 변환
    df_silver = df_bronze \
        .dropDuplicates(["id"]) \
        .filter(col("id").isNotNull()) \
        .withColumn("email", lower(trim(col("email")))) \
        .withColumn("phone", regexp_replace(col("phone"), "[^0-9]", "")) \
        .withColumn("created_at", to_timestamp(col("created_at"))) \
        .withColumn("_processed_timestamp", current_timestamp())

    # 데이터 품질 검증
    quality_checks = {
        "null_check": df_silver.filter(col("id").isNull()).count() == 0,
        "email_format": df_silver.filter(~col("email").rlike("^[a-z0-9._%+-]+@[a-z0-9.-]+\\.[a-z]{2,}$")).count() == 0,
        "duplicate_check": df_silver.count() == df_silver.dropDuplicates(["id"]).count()
    }

    print("Data Quality Checks:", quality_checks)

    # Silver 레이어에 Merge (Upsert)
    if DeltaTable.isDeltaTable(spark, silver_path):
        delta_table = DeltaTable.forPath(spark, silver_path)
        delta_table.alias("target").merge(
            df_silver.alias("source"),
            "target.id = source.id"
        ).whenMatchedUpdateAll() \
         .whenNotMatchedInsertAll() \
         .execute()
    else:
        df_silver.write.format("delta").mode("overwrite").save(silver_path)

    return silver_path

# ============================================
# Gold Layer: 비즈니스 집계 데이터
# ============================================
def aggregate_to_gold(silver_tables: list, gold_table: str):
    """Silver 데이터를 집계하여 Gold 레이어 생성"""

    # 여러 Silver 테이블 조인
    df_users = spark.read.format("delta").load(f"{SILVER_PATH}/users")
    df_orders = spark.read.format("delta").load(f"{SILVER_PATH}/orders")
    df_products = spark.read.format("delta").load(f"{SILVER_PATH}/products")

    # 비즈니스 메트릭 집계
    df_gold = df_orders \
        .join(df_users, "user_id") \
        .join(df_products, "product_id") \
        .groupBy(
            date_trunc("month", col("order_date")).alias("month"),
            col("category"),
            col("region")
        ) \
        .agg(
            countDistinct("user_id").alias("unique_customers"),
            count("order_id").alias("total_orders"),
            sum("amount").alias("total_revenue"),
            avg("amount").alias("avg_order_value"),
            percentile_approx("amount", 0.5).alias("median_order_value")
        ) \
        .withColumn("_aggregation_timestamp", current_timestamp())

    # Gold 레이어에 저장
    gold_path = f"{GOLD_PATH}/{gold_table}"
    df_gold.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("month") \
        .save(gold_path)

    return gold_path

# ============================================
# 데이터 카탈로그 등록 (AWS Glue)
# ============================================
def register_to_glue_catalog(table_path: str, database: str, table_name: str):
    """Glue Data Catalog에 테이블 등록"""

    # Glue 카탈로그에 테이블 생성
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS glue_catalog.{database}.{table_name}
        USING DELTA
        LOCATION '{table_path}'
    """)

    # 테이블 통계 수집
    spark.sql(f"ANALYZE TABLE glue_catalog.{database}.{table_name} COMPUTE STATISTICS")

    print(f"Registered table: {database}.{table_name}")

# ============================================
# 데이터 레이크 파이프라인 실행
# ============================================
if __name__ == "__main__":
    # Bronze: 원시 데이터 수집
    ingest_to_bronze("s3a://raw-data/users/*.json", "users")
    ingest_to_bronze("s3a://raw-data/orders/*.csv", "orders")
    ingest_to_bronze("s3a://raw-data/products/*.parquet", "products")

    # Silver: 데이터 정제
    transform_to_silver("users", "users")
    transform_to_silver("orders", "orders")
    transform_to_silver("products", "products")

    # Gold: 비즈니스 집계
    aggregate_to_gold(
        ["users", "orders", "products"],
        "monthly_sales_summary"
    )

    # Glue Catalog 등록
    register_to_glue_catalog(f"{GOLD_PATH}/monthly_sales_summary", "analytics", "monthly_sales")

    print("Data Lake Pipeline completed successfully!")

실무에서 자주 등장하는 대화 예제

데이터 아키텍트
"현재 Data Lake가 Data Swamp가 되어가고 있어요. Bronze 레이어에 2년치 원시 데이터가 쌓여 있는데 메타데이터도 없고, 어떤 데이터가 어디에 있는지 파악이 안 됩니다."
데이터 엔지니어
"우선 Glue Crawler로 스키마 추론하고 Data Catalog에 등록합시다. 그리고 Medallion Architecture를 제대로 적용해서 Bronze/Silver/Gold 레이어별 책임을 명확히 하고, 각 테이블에 소유자와 SLA를 지정해야 합니다."
데이터 거버넌스 담당
"Unity Catalog나 Apache Atlas 같은 데이터 카탈로그 도입을 검토해야겠네요. 데이터 리니지 추적, 접근 제어, 품질 모니터링이 모두 필요합니다. 특히 PII 데이터 마스킹 정책도 수립해야 합니다."
면접관
"Data Lake와 Data Warehouse의 차이점을 설명하고, 언제 Data Lake를 선택해야 하는지 말씀해주세요."
지원자
"Data Warehouse는 Schema-on-Write 방식으로 사전 정의된 스키마에 맞는 정형 데이터만 저장합니다. 반면 Data Lake는 Schema-on-Read 방식으로 원시 데이터를 그대로 저장하고 분석 시점에 스키마를 적용합니다. Data Lake는 다양한 소스의 대용량 데이터를 빠르게 수집해야 하거나, 향후 활용 방안이 명확하지 않은 데이터를 보관할 때 적합합니다. ML/AI 학습 데이터, IoT 센서 데이터, 로그 데이터 같은 비정형 데이터 처리에도 유리합니다."
면접관
"Data Lake가 Data Swamp가 되는 것을 방지하기 위한 전략은 무엇인가요?"
지원자
"첫째, Medallion Architecture를 적용하여 Bronze/Silver/Gold 계층으로 데이터 품질을 단계별로 관리합니다. 둘째, Data Catalog을 통해 모든 데이터셋의 메타데이터, 스키마, 소유자를 관리합니다. 셋째, 데이터 품질 체크를 파이프라인에 내장하여 문제가 있는 데이터가 다음 레이어로 전파되지 않게 합니다. 넷째, 데이터 보존 정책을 수립하여 불필요한 데이터를 주기적으로 아카이빙하거나 삭제합니다."
시니어 엔지니어
"Bronze 레이어에 날짜 파티션 없이 데이터를 적재하고 있네요. 이러면 전체 데이터를 스캔해야 해서 쿼리 성능이 심각하게 저하됩니다."
주니어 엔지니어
"파티션 키로 ingestion_date를 추가하겠습니다. Hive 스타일 파티션(year=2024/month=01/day=15)으로 구성하면 될까요?"
시니어 엔지니어
"네, 그리고 소규모 파일 문제도 고려해주세요. 스트리밍으로 들어오는 데이터는 작은 파일이 많이 생기니까, Spark의 coalesce나 Delta Lake의 OPTIMIZE 명령으로 파일을 주기적으로 병합해야 합니다. 파일 하나당 128MB~1GB 정도가 적당합니다."

주의사항

관련 용어

더 배우기