📊데이터공학

데이터 파이프라인

Data Pipeline

데이터를 수집, 변환, 저장하는 자동화된 워크플로우. ETL/ELT 프로세스를 구성.

상세 설명

데이터 파이프라인(Data Pipeline)은 데이터를 소스에서 추출하여 변환하고 목적지에 저장하는 자동화된 워크플로우입니다. ETL(Extract-Transform-Load) 또는 ELT(Extract-Load-Transform) 패턴으로 구현되며, 배치 처리와 실시간 스트리밍을 모두 지원합니다. 데이터 엔지니어링의 핵심 인프라로, 데이터 웨어하우스, 데이터 레이크, ML 학습 데이터 생성에 필수적입니다.

파이프라인의 핵심 개념은 DAG(Directed Acyclic Graph)입니다. 작업(Task)들과 의존성을 방향성 비순환 그래프로 정의하여, 어떤 작업이 완료되어야 다음 작업이 실행될 수 있는지 명시합니다. 이를 통해 병렬 처리, 재시도, 백필(Backfill)이 가능해집니다. Apache Airflow가 사실상 표준이며, Prefect, Dagster 같은 현대적 대안도 있습니다.

ETL은 전통적 방식으로 변환 후 적재하고, ELT는 클라우드 데이터 웨어하우스(Snowflake, BigQuery)의 강력한 컴퓨팅을 활용해 적재 후 변환합니다. 최근에는 dbt를 활용한 Transform 레이어 분리, Great Expectations를 통한 데이터 품질 검증 통합이 일반화되고 있습니다. 스트리밍 파이프라인은 Kafka, Flink, Spark Streaming으로 구현합니다.

좋은 파이프라인은 멱등성(Idempotent), 원자성(Atomic), 모니터링 가능성을 갖춥니다. 멱등성은 동일한 입력에 대해 여러 번 실행해도 같은 결과를 보장합니다. 실패 시 재시도가 안전하려면 필수입니다. 데이터 엔지니어링 팀은 파이프라인의 신뢰성, 효율성, 유지보수성을 지속적으로 개선해야 합니다.

코드 예제

Apache Airflow DAG
# Apache Airflow DAG 예시 - 일별 데이터 파이프라인
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.task_group import TaskGroup

# DAG 기본 설정
default_args = {
    'owner': 'data-engineering',
    'depends_on_past': False,
    'email_on_failure': True,
    'email': ['alerts@company.com'],
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

dag = DAG(
    dag_id='daily_sales_pipeline',
    default_args=default_args,
    description='일별 매출 데이터 ETL 파이프라인',
    schedule_interval='0 2 * * *',  # 매일 02:00 UTC
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['sales', 'etl', 'daily'],
    max_active_runs=1,
)

# 1. 데이터 추출 (Extract)
def extract_from_api(**context):
    """API에서 매출 데이터 추출"""
    import requests
    execution_date = context['ds']
    response = requests.get(
        f"https://api.internal/sales/{execution_date}",
        headers={"Authorization": "Bearer {{ var.value.api_token }}"}
    )
    data = response.json()
    # XCom으로 다음 태스크에 전달
    context['ti'].xcom_push(key='record_count', value=len(data['records']))
    return len(data['records'])

extract_task = PythonOperator(
    task_id='extract_sales_data',
    python_callable=extract_from_api,
    provide_context=True,
    dag=dag,
)

# 2. 데이터 품질 검증 (Validation)
validate_task = BashOperator(
    task_id='validate_data_quality',
    bash_command='''
    cd /opt/data-quality && \
    great_expectations checkpoint run sales_checkpoint \
    --data-asset-name raw_sales_{{ ds_nodash }} \
    || exit 1
    ''',
    dag=dag,
)

# 3. Transform + Load (dbt 활용)
with TaskGroup(group_id='transform_load', dag=dag) as transform_group:
    # Staging 모델
    dbt_staging = BashOperator(
        task_id='dbt_staging',
        bash_command='''
        cd /opt/dbt/sales_project && \
        dbt run --select staging.stg_sales \
        --vars '{"run_date": "{{ ds }}"}'
        ''',
    )

    # Mart 모델
    dbt_marts = BashOperator(
        task_id='dbt_marts',
        bash_command='''
        cd /opt/dbt/sales_project && \
        dbt run --select marts.fct_daily_sales marts.dim_products \
        --vars '{"run_date": "{{ ds }}"}'
        ''',
    )

    # dbt 테스트
    dbt_test = BashOperator(
        task_id='dbt_test',
        bash_command='''
        cd /opt/dbt/sales_project && \
        dbt test --select staging marts
        ''',
    )

    dbt_staging >> dbt_marts >> dbt_test

# 4. 집계 테이블 업데이트 (Snowflake MERGE)
aggregate_task = SnowflakeOperator(
    task_id='update_aggregates',
    snowflake_conn_id='snowflake_prod',
    sql='''
    MERGE INTO gold.daily_sales_summary target
    USING (
        SELECT
            sale_date,
            SUM(amount) as total_sales,
            COUNT(*) as transaction_count,
            COUNT(DISTINCT customer_id) as unique_customers
        FROM gold.fct_daily_sales
        WHERE sale_date = '{{ ds }}'
        GROUP BY sale_date
    ) source
    ON target.sale_date = source.sale_date
    WHEN MATCHED THEN UPDATE SET
        total_sales = source.total_sales,
        transaction_count = source.transaction_count,
        unique_customers = source.unique_customers,
        updated_at = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN INSERT (
        sale_date, total_sales, transaction_count, unique_customers, updated_at
    ) VALUES (
        source.sale_date, source.total_sales,
        source.transaction_count, source.unique_customers, CURRENT_TIMESTAMP()
    );
    ''',
    dag=dag,
)

# 5. 완료 알림
success_alert = SlackWebhookOperator(
    task_id='slack_success_alert',
    slack_webhook_conn_id='slack_webhook',
    message='''
:white_check_mark: *일별 매출 파이프라인 완료*
- 실행일: {{ ds }}
- 처리 건수: {{ ti.xcom_pull(task_ids='extract_sales_data', key='record_count') }}
- 소요 시간: {{ task_instance.duration }}초
    ''',
    dag=dag,
)

# DAG 의존성 정의 (ETL 흐름)
extract_task >> validate_task >> transform_group >> aggregate_task >> success_alert

실무 대화

PM: 어제 데일리 파이프라인이 실패했는데, 원인이 뭔가요?
데이터 엔지니어: API 서버 점검으로 extract 단계에서 timeout이 발생했어요. retry 3회 후 실패했고, 오늘 아침에 백필로 복구했습니다. SLA는 맞췄어요.
PM: 이런 상황을 어떻게 예방할 수 있을까요?
데이터 엔지니어: API 팀과 점검 일정 공유 채널을 만들고, 파이프라인에 graceful degradation 로직을 추가할게요. 빈 데이터로 진행하고 다음 실행에서 catch-up하는 방식이요.
면접관: ETL과 ELT의 차이점과 언제 각각 사용하나요?
지원자: ETL은 변환 후 적재로, 데이터 웨어하우스 용량/비용이 제한적일 때 적합합니다. ELT는 적재 후 변환으로, Snowflake/BigQuery 같은 클라우드 DW의 강력한 컴퓨팅을 활용합니다. 현대적 스택에서는 ELT + dbt 조합이 일반적입니다.
면접관: 멱등성(Idempotency)이 왜 중요한가요?
지원자: 같은 입력에 여러 번 실행해도 동일한 결과를 보장합니다. 실패 후 재시도, 백필, 중복 실행 시에도 데이터 일관성을 유지할 수 있습니다. MERGE/UPSERT, 파티션 덮어쓰기 패턴으로 구현합니다.
개발자: catchup=True로 하면 과거 데이터도 자동으로 처리되나요?
시니어: 맞아, 하지만 주의해야 해. start_date부터 현재까지 모든 날짜에 대해 실행되니까, 잘못 설정하면 수천 개 태스크가 큐에 쌓여. max_active_runs=1로 제한하고, 필요한 경우에만 True로 설정해.
개발자: depends_on_past는 언제 사용하나요?
시니어: 이전 날짜 실행이 성공해야 다음 날짜가 실행되게 할 때 써. 누적 집계나 순차적 의존성이 있을 때 유용해. 하지만 한 번 실패하면 이후 모든 실행이 막히니까 신중하게 사용해야 해.

주의사항

  • 멱등성(Idempotency)을 보장하세요. 재시도, 백필 시 데이터 중복이나 불일치가 발생하지 않아야 합니다.
  • catchup=True 사용 시 start_date를 신중히 설정하세요. 과거 전체 기간이 실행될 수 있습니다.
  • XCom으로 대용량 데이터를 전달하지 마세요. 메타데이터 DB에 저장되므로 경로나 키만 전달하세요.
  • 데이터 품질 검증을 파이프라인에 통합하세요. Great Expectations, dbt tests 활용을 권장합니다.
  • 실패 알림과 SLA 모니터링을 설정하세요. 데이터 지연이 비즈니스에 미치는 영향을 최소화합니다.

더 배우기