📊 데이터공학

Airflow

Apache Airflow

워크플로우 오케스트레이션 플랫폼. DAG로 파이프라인 정의.

상세 설명

Apache Airflow는 프로그래밍 방식으로 워크플로우를 작성, 스케줄링, 모니터링할 수 있는 오픈소스 플랫폼입니다. Airbnb에서 개발되어 2016년 Apache 재단에 기증되었으며, 현재 데이터 엔지니어링 분야에서 가장 널리 사용되는 워크플로우 오케스트레이션 도구입니다.

Airflow의 핵심 개념은 DAG(Directed Acyclic Graph)입니다. DAG는 태스크들의 의존관계를 방향성 비순환 그래프로 표현하며, Python 코드로 정의됩니다. 이를 통해 복잡한 데이터 파이프라인의 실행 순서, 재시도 정책, 알림 설정 등을 코드로 관리할 수 있어 버전 관리와 협업이 용이합니다.

Airflow는 다양한 시스템과의 연동을 위한 풍부한 Operator를 제공합니다. BashOperator, PythonOperator 같은 기본 Operator부터 BigQueryOperator, S3ToRedshiftOperator 같은 클라우드 서비스 전용 Operator까지 있어, 다양한 데이터 소스와 목적지를 연결하는 파이프라인을 쉽게 구축할 수 있습니다.

실행 환경으로는 SequentialExecutor(개발용), LocalExecutor(소규모), CeleryExecutor(분산처리), KubernetesExecutor(컨테이너 기반) 등을 지원합니다. AWS MWAA, Google Cloud Composer, Astronomer 등 관리형 서비스도 제공되어 운영 부담을 줄일 수 있습니다.

코드 예제

Python 클릭하여 복사
# dags/etl_pipeline.py
# 일일 ETL 파이프라인 DAG 예제

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.google.cloud.operators.bigquery import (
    BigQueryCreateEmptyTableOperator,
    BigQueryInsertJobOperator
)
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import (
    GCSToBigQueryOperator
)
from airflow.utils.task_group import TaskGroup
from airflow.models import Variable

# DAG 기본 설정
default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'email': ['data-alerts@company.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'execution_timeout': timedelta(hours=2),
}

# DAG 정의
with DAG(
    dag_id='daily_sales_etl',
    default_args=default_args,
    description='일일 매출 데이터 ETL 파이프라인',
    schedule_interval='0 2 * * *',  # 매일 오전 2시
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=['etl', 'sales', 'production'],
    max_active_runs=1,
) as dag:

    # 1. 데이터 추출 태스크
    def extract_sales_data(**context):
        """소스 시스템에서 매출 데이터 추출"""
        import pandas as pd
        from google.cloud import storage

        execution_date = context['ds']

        # API에서 데이터 추출 (예시)
        # sales_data = fetch_from_api(execution_date)

        # GCS에 저장
        bucket_name = Variable.get('gcs_bucket')
        blob_path = f'raw/sales/{execution_date}/data.parquet'

        print(f"Extracted sales data for {execution_date}")
        return blob_path

    extract_task = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
        provide_context=True,
    )

    # 2. 데이터 검증 태스크 그룹
    with TaskGroup(group_id='data_validation') as validation_group:

        def validate_row_count(**context):
            """행 수 검증"""
            ti = context['ti']
            blob_path = ti.xcom_pull(task_ids='extract_sales_data')
            # 실제 검증 로직
            print(f"Validating row count for {blob_path}")

        def validate_schema(**context):
            """스키마 검증"""
            ti = context['ti']
            blob_path = ti.xcom_pull(task_ids='extract_sales_data')
            print(f"Validating schema for {blob_path}")

        row_count_check = PythonOperator(
            task_id='check_row_count',
            python_callable=validate_row_count,
        )

        schema_check = PythonOperator(
            task_id='check_schema',
            python_callable=validate_schema,
        )

        [row_count_check, schema_check]

    # 3. BigQuery 로드 태스크
    load_to_bigquery = GCSToBigQueryOperator(
        task_id='load_to_bigquery',
        bucket='{{ var.value.gcs_bucket }}',
        source_objects=['raw/sales/{{ ds }}/data.parquet'],
        destination_project_dataset_table='analytics.raw_sales',
        source_format='PARQUET',
        write_disposition='WRITE_APPEND',
        create_disposition='CREATE_IF_NEEDED',
    )

    # 4. 변환 쿼리 실행
    transform_query = """
    INSERT INTO `analytics.sales_daily_summary`
    SELECT
        DATE('{{ ds }}') as report_date,
        product_category,
        SUM(amount) as total_sales,
        COUNT(DISTINCT customer_id) as unique_customers,
        COUNT(*) as transaction_count
    FROM `analytics.raw_sales`
    WHERE DATE(created_at) = '{{ ds }}'
    GROUP BY product_category
    """

    transform_task = BigQueryInsertJobOperator(
        task_id='transform_data',
        configuration={
            'query': {
                'query': transform_query,
                'useLegacySql': False,
            }
        },
    )

    # 5. dbt 실행 (선택적)
    run_dbt = BashOperator(
        task_id='run_dbt_models',
        bash_command='cd /opt/dbt && dbt run --models sales_marts --target prod',
    )

    # 6. 완료 알림
    def send_completion_notification(**context):
        """Slack 알림 전송"""
        from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook

        slack_hook = SlackWebhookHook(
            slack_webhook_conn_id='slack_webhook'
        )
        slack_hook.send(
            text=f"Daily Sales ETL completed for {context['ds']}"
        )

    notify_task = PythonOperator(
        task_id='send_notification',
        python_callable=send_completion_notification,
        trigger_rule='all_success',
    )

    # 태스크 의존관계 정의
    extract_task >> validation_group >> load_to_bigquery >> transform_task >> run_dbt >> notify_task

실무에서 이렇게 사용됩니다

데이터 엔지니어: "현재 cron으로 관리하는 배치 작업이 50개가 넘어서 의존관계 파악이 어렵습니다."

테크리드: "Airflow로 마이그레이션하면 DAG UI에서 전체 파이프라인 상태를 한눈에 볼 수 있어요. 실패 시 자동 재시도도 되고요."

데이터 엔지니어: "운영은 어떻게 하죠? 직접 설치해야 하나요?"

테크리드: "Cloud Composer 쓰면 관리형으로 편하게 운영할 수 있어요. DAG 코드만 GCS에 올리면 자동 배포됩니다."

면접관: "Airflow에서 대량의 태스크를 병렬 처리할 때 주의할 점은 무엇인가요?"

지원자: "parallelism, dag_concurrency, max_active_runs 등의 설정을 조절해야 합니다. 또한 외부 시스템의 Rate Limit도 고려해서 Pool을 설정하는 것이 좋습니다."

면접관: "Backfill 시 주의사항은요?"

지원자: "depends_on_past가 True면 순차 실행되어 오래 걸릴 수 있고, catchup=True 설정 시 과거 날짜부터 모든 DAG Run이 생성되므로 신중하게 사용해야 합니다."

리뷰어: "이 DAG에서 PythonOperator 내부에 무거운 라이브러리 import가 있네요. 스케줄러 성능에 영향을 줄 수 있어요."

개발자: "어떻게 개선하면 될까요?"

리뷰어: "import를 함수 내부로 옮기거나, 무거운 작업은 KubernetesPodOperator로 분리하는 게 좋아요. 또한 execution_timeout 설정도 추가해서 태스크가 무한정 실행되는 것을 방지하세요."

주의사항

관련 용어

더 배우기