📊데이터공학

ETL

Extract, Transform, Load

데이터 파이프라인의 3단계. 추출, 변환, 적재. 데이터 웨어하우스 구축의 기초.

📖 상세 설명

ETL(Extract, Transform, Load)은 데이터 파이프라인의 기본 패턴으로, 다양한 소스에서 데이터를 추출하고(Extract), 분석에 적합한 형태로 변환한 뒤(Transform), 목적지 시스템에 적재(Load)하는 과정입니다. 데이터 웨어하우스, 데이터 레이크, ML 파이프라인의 기초가 됩니다.

Extract 단계에서는 데이터베이스(MySQL, PostgreSQL), SaaS(Salesforce, Stripe), API, 파일(CSV, JSON) 등 다양한 소스에서 데이터를 가져옵니다. 전체 데이터를 가져오는 Full Load와 변경된 데이터만 가져오는 Incremental Load 방식이 있으며, 대용량 데이터는 Incremental이 효율적입니다.

Transform 단계에서는 데이터 정제(null 처리, 중복 제거), 형식 변환(날짜 포맷, 타입 캐스팅), 비즈니스 로직 적용(계산 필드, 집계), 조인 및 통합을 수행합니다. 이 단계가 가장 복잡하며 비즈니스 요구사항에 따라 크게 달라집니다.

최근에는 ELT(Extract, Load, Transform) 패턴도 많이 사용됩니다. 클라우드 DW의 컴퓨팅 파워를 활용하여 원시 데이터를 먼저 적재한 후 변환하는 방식입니다. Fivetran, Airbyte 같은 도구가 EL(Extract, Load)을 담당하고, dbt가 Transform을 담당하는 구조가 일반적입니다.

💻 코드 예제

# Python ETL 파이프라인 예제

import pandas as pd
from sqlalchemy import create_engine
from datetime import datetime, timedelta
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class ETLPipeline:
    def __init__(self, source_conn, target_conn):
        self.source_engine = create_engine(source_conn)
        self.target_engine = create_engine(target_conn)

    # 1. EXTRACT - 소스에서 데이터 추출
    def extract(self, table_name: str, incremental_col: str = None,
                last_value = None) -> pd.DataFrame:
        """증분 또는 전체 데이터 추출"""

        if incremental_col and last_value:
            # Incremental Load: 변경된 데이터만
            query = f"""
                SELECT * FROM {table_name}
                WHERE {incremental_col} > '{last_value}'
                ORDER BY {incremental_col}
            """
            logger.info(f"Incremental extract from {table_name} after {last_value}")
        else:
            # Full Load: 전체 데이터
            query = f"SELECT * FROM {table_name}"
            logger.info(f"Full extract from {table_name}")

        df = pd.read_sql(query, self.source_engine)
        logger.info(f"Extracted {len(df)} rows")
        return df

    # 2. TRANSFORM - 데이터 변환
    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        """데이터 정제 및 변환"""

        # 2-1. 데이터 정제
        df = df.drop_duplicates()
        df = df.dropna(subset=['user_id', 'event_type'])  # 필수 필드 null 제거

        # 2-2. 타입 변환
        df['created_at'] = pd.to_datetime(df['created_at'])
        df['amount'] = pd.to_numeric(df['amount'], errors='coerce').fillna(0)

        # 2-3. 파생 컬럼 생성
        df['event_date'] = df['created_at'].dt.date
        df['event_hour'] = df['created_at'].dt.hour
        df['is_weekend'] = df['created_at'].dt.dayofweek >= 5

        # 2-4. 비즈니스 로직 적용
        df['amount_category'] = pd.cut(
            df['amount'],
            bins=[0, 100, 500, float('inf')],
            labels=['small', 'medium', 'large']
        )

        # 2-5. 메타데이터 추가
        df['etl_timestamp'] = datetime.now()
        df['etl_batch_id'] = datetime.now().strftime('%Y%m%d%H%M%S')

        logger.info(f"Transformed {len(df)} rows")
        return df

    # 3. LOAD - 목적지에 적재
    def load(self, df: pd.DataFrame, table_name: str,
             if_exists: str = 'append') -> None:
        """데이터 적재 (append 또는 replace)"""

        df.to_sql(
            table_name,
            self.target_engine,
            if_exists=if_exists,
            index=False,
            method='multi',  # 배치 insert로 성능 향상
            chunksize=1000
        )
        logger.info(f"Loaded {len(df)} rows to {table_name}")

    # ETL 실행
    def run(self, source_table: str, target_table: str,
            incremental_col: str = None, last_value = None):
        """ETL 파이프라인 실행"""

        logger.info(f"Starting ETL: {source_table} -> {target_table}")

        # Extract
        raw_data = self.extract(source_table, incremental_col, last_value)

        if raw_data.empty:
            logger.info("No new data to process")
            return

        # Transform
        transformed_data = self.transform(raw_data)

        # Load
        self.load(transformed_data, target_table)

        logger.info("ETL completed successfully")
        return transformed_data[incremental_col].max() if incremental_col else None


# 사용 예시
if __name__ == "__main__":
    pipeline = ETLPipeline(
        source_conn="postgresql://source_db/production",
        target_conn="postgresql://target_db/analytics"
    )

    # 증분 ETL 실행
    last_processed = pipeline.run(
        source_table="events",
        target_table="fact_events",
        incremental_col="created_at",
        last_value="2024-01-15 00:00:00"
    )

🗣️ 실무에서 이렇게 말해요

데이터엔지니어: "어제 ETL 작업이 3시간 걸렸어요. 변환 단계에서 join이 너무 많아서요."

주니어: "ELT로 바꾸면 어떨까요? 먼저 raw로 적재하고 BigQuery에서 변환하면 분산 처리되니까 빨라질 것 같은데요."

시니어: "좋은 아이디어예요. dbt로 변환 로직 관리하면 버전 관리도 되고 테스트도 쉬워집니다."

면접관: "ETL과 ELT의 차이점과 각각 언제 사용하는지 설명해주세요."

지원자: "ETL은 적재 전에 변환하므로 정제된 데이터만 저장됩니다. 온프레미스 DW에서 많이 쓰였어요. ELT는 원시 데이터를 먼저 적재하고 변환합니다. 클라우드 DW의 컴퓨팅 파워를 활용하고, 원시 데이터를 보존하여 나중에 다른 변환도 가능합니다. 요즘은 ELT가 더 유연해서 선호됩니다."

리뷰어: "이 ETL에 멱등성(idempotency)이 없네요. 같은 데이터를 두 번 실행하면 중복 적재돼요."

작성자: "merge/upsert 로직 추가하겠습니다. 또는 적재 전에 해당 날짜 데이터를 삭제하는 delete-insert 패턴으로 변경할게요."

⚠️ 주의사항

📚 더 배우기