데이터 파이프라인
Data Pipeline
데이터를 수집, 변환, 저장하는 자동화된 워크플로우. ETL/ELT 프로세스를 구성.
Data Pipeline
데이터를 수집, 변환, 저장하는 자동화된 워크플로우. ETL/ELT 프로세스를 구성.
데이터 파이프라인(Data Pipeline)은 데이터를 소스에서 추출하여 변환하고 목적지에 저장하는 자동화된 워크플로우입니다. ETL(Extract-Transform-Load) 또는 ELT(Extract-Load-Transform) 패턴으로 구현되며, 배치 처리와 실시간 스트리밍을 모두 지원합니다. 데이터 엔지니어링의 핵심 인프라로, 데이터 웨어하우스, 데이터 레이크, ML 학습 데이터 생성에 필수적입니다.
파이프라인의 핵심 개념은 DAG(Directed Acyclic Graph)입니다. 작업(Task)들과 의존성을 방향성 비순환 그래프로 정의하여, 어떤 작업이 완료되어야 다음 작업이 실행될 수 있는지 명시합니다. 이를 통해 병렬 처리, 재시도, 백필(Backfill)이 가능해집니다. Apache Airflow가 사실상 표준이며, Prefect, Dagster 같은 현대적 대안도 있습니다.
ETL은 전통적 방식으로 변환 후 적재하고, ELT는 클라우드 데이터 웨어하우스(Snowflake, BigQuery)의 강력한 컴퓨팅을 활용해 적재 후 변환합니다. 최근에는 dbt를 활용한 Transform 레이어 분리, Great Expectations를 통한 데이터 품질 검증 통합이 일반화되고 있습니다. 스트리밍 파이프라인은 Kafka, Flink, Spark Streaming으로 구현합니다.
좋은 파이프라인은 멱등성(Idempotent), 원자성(Atomic), 모니터링 가능성을 갖춥니다. 멱등성은 동일한 입력에 대해 여러 번 실행해도 같은 결과를 보장합니다. 실패 시 재시도가 안전하려면 필수입니다. 데이터 엔지니어링 팀은 파이프라인의 신뢰성, 효율성, 유지보수성을 지속적으로 개선해야 합니다.
# Apache Airflow DAG 예시 - 일별 데이터 파이프라인
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.utils.task_group import TaskGroup
# DAG 기본 설정
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email': ['alerts@company.com'],
'retries': 3,
'retry_delay': timedelta(minutes=5),
'execution_timeout': timedelta(hours=2),
}
dag = DAG(
dag_id='daily_sales_pipeline',
default_args=default_args,
description='일별 매출 데이터 ETL 파이프라인',
schedule_interval='0 2 * * *', # 매일 02:00 UTC
start_date=datetime(2024, 1, 1),
catchup=False,
tags=['sales', 'etl', 'daily'],
max_active_runs=1,
)
# 1. 데이터 추출 (Extract)
def extract_from_api(**context):
"""API에서 매출 데이터 추출"""
import requests
execution_date = context['ds']
response = requests.get(
f"https://api.internal/sales/{execution_date}",
headers={"Authorization": "Bearer {{ var.value.api_token }}"}
)
data = response.json()
# XCom으로 다음 태스크에 전달
context['ti'].xcom_push(key='record_count', value=len(data['records']))
return len(data['records'])
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_from_api,
provide_context=True,
dag=dag,
)
# 2. 데이터 품질 검증 (Validation)
validate_task = BashOperator(
task_id='validate_data_quality',
bash_command='''
cd /opt/data-quality && \
great_expectations checkpoint run sales_checkpoint \
--data-asset-name raw_sales_{{ ds_nodash }} \
|| exit 1
''',
dag=dag,
)
# 3. Transform + Load (dbt 활용)
with TaskGroup(group_id='transform_load', dag=dag) as transform_group:
# Staging 모델
dbt_staging = BashOperator(
task_id='dbt_staging',
bash_command='''
cd /opt/dbt/sales_project && \
dbt run --select staging.stg_sales \
--vars '{"run_date": "{{ ds }}"}'
''',
)
# Mart 모델
dbt_marts = BashOperator(
task_id='dbt_marts',
bash_command='''
cd /opt/dbt/sales_project && \
dbt run --select marts.fct_daily_sales marts.dim_products \
--vars '{"run_date": "{{ ds }}"}'
''',
)
# dbt 테스트
dbt_test = BashOperator(
task_id='dbt_test',
bash_command='''
cd /opt/dbt/sales_project && \
dbt test --select staging marts
''',
)
dbt_staging >> dbt_marts >> dbt_test
# 4. 집계 테이블 업데이트 (Snowflake MERGE)
aggregate_task = SnowflakeOperator(
task_id='update_aggregates',
snowflake_conn_id='snowflake_prod',
sql='''
MERGE INTO gold.daily_sales_summary target
USING (
SELECT
sale_date,
SUM(amount) as total_sales,
COUNT(*) as transaction_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM gold.fct_daily_sales
WHERE sale_date = '{{ ds }}'
GROUP BY sale_date
) source
ON target.sale_date = source.sale_date
WHEN MATCHED THEN UPDATE SET
total_sales = source.total_sales,
transaction_count = source.transaction_count,
unique_customers = source.unique_customers,
updated_at = CURRENT_TIMESTAMP()
WHEN NOT MATCHED THEN INSERT (
sale_date, total_sales, transaction_count, unique_customers, updated_at
) VALUES (
source.sale_date, source.total_sales,
source.transaction_count, source.unique_customers, CURRENT_TIMESTAMP()
);
''',
dag=dag,
)
# 5. 완료 알림
success_alert = SlackWebhookOperator(
task_id='slack_success_alert',
slack_webhook_conn_id='slack_webhook',
message='''
:white_check_mark: *일별 매출 파이프라인 완료*
- 실행일: {{ ds }}
- 처리 건수: {{ ti.xcom_pull(task_ids='extract_sales_data', key='record_count') }}
- 소요 시간: {{ task_instance.duration }}초
''',
dag=dag,
)
# DAG 의존성 정의 (ETL 흐름)
extract_task >> validate_task >> transform_group >> aggregate_task >> success_alert