Apache Airflow
워크플로우 오케스트레이션 플랫폼
워크플로우 오케스트레이션 플랫폼
Apache Airflow는 데이터 파이프라인을 프로그래밍 방식으로 작성, 스케줄링, 모니터링할 수 있는 오픈소스 워크플로우 관리 플랫폼입니다. 2014년 Airbnb에서 개발을 시작하여 2016년 Apache 재단의 인큐베이터 프로젝트가 되었고, 현재는 데이터 엔지니어링의 표준 도구로 자리잡았습니다.
Airflow의 핵심은 DAG(Directed Acyclic Graph) 개념입니다. 각 워크플로우는 방향성 비순환 그래프로 표현되며, 노드는 개별 태스크를, 엣지는 태스크 간 의존관계를 나타냅니다. DAG는 순수 Python 코드로 정의되어 동적 파이프라인 생성, 조건부 실행, 반복문 활용 등이 가능합니다.
아키텍처는 Scheduler, Executor, Worker, Web Server, Metadata Database로 구성됩니다. Scheduler는 DAG를 파싱하고 태스크 실행을 조율하며, Executor는 실제 태스크 실행 방식(Local, Celery, Kubernetes 등)을 결정합니다. Web Server는 모니터링과 관리를 위한 UI를 제공합니다.
Airflow는 풍부한 Operator 생태계를 갖추고 있습니다. BashOperator, PythonOperator 같은 범용 Operator부터 AWS, GCP, Azure 등 클라우드 서비스 전용 Operator, 그리고 Spark, Hadoop 같은 빅데이터 도구와의 연동 Operator까지 다양합니다. 또한 Hook과 Connection을 통해 외부 시스템과의 인증을 안전하게 관리합니다.
# dags/ml_training_pipeline.py
# ML 모델 학습 파이프라인 DAG
from datetime import datetime, timedelta
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.s3 import S3CreateObjectOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
# TaskFlow API를 사용한 현대적인 DAG 작성
@dag(
dag_id='ml_training_pipeline',
schedule_interval='@weekly',
start_date=datetime(2024, 1, 1),
catchup=False,
default_args={
'owner': 'ml-team',
'retries': 2,
'retry_delay': timedelta(minutes=10),
},
tags=['ml', 'training'],
)
def ml_training_pipeline():
@task
def prepare_training_data(execution_date: str) -> dict:
"""학습 데이터 준비"""
import pandas as pd
from sklearn.model_selection import train_test_split
# 데이터 로드 (예시)
s3_hook = S3Hook(aws_conn_id='aws_default')
data_key = f'raw/features/{execution_date}/data.parquet'
# 데이터 전처리
# df = pd.read_parquet(...)
# X_train, X_test, y_train, y_test = train_test_split(...)
return {
'train_path': f's3://ml-bucket/processed/{execution_date}/train.parquet',
'test_path': f's3://ml-bucket/processed/{execution_date}/test.parquet',
'feature_count': 150,
'train_samples': 100000,
}
@task
def train_model(data_info: dict) -> dict:
"""모델 학습"""
import mlflow
from sklearn.ensemble import RandomForestClassifier
mlflow.set_tracking_uri('http://mlflow:5000')
mlflow.set_experiment('weekly-training')
with mlflow.start_run():
# 모델 학습
model = RandomForestClassifier(n_estimators=100)
# model.fit(X_train, y_train)
# 메트릭 기록
mlflow.log_params({
'n_estimators': 100,
'feature_count': data_info['feature_count'],
})
mlflow.log_metrics({
'accuracy': 0.95,
'f1_score': 0.93,
})
# 모델 저장
model_uri = mlflow.sklearn.log_model(model, 'model').model_uri
return {
'model_uri': model_uri,
'accuracy': 0.95,
'f1_score': 0.93,
}
@task
def evaluate_model(model_info: dict) -> dict:
"""모델 평가 및 검증"""
# A/B 테스트용 기준 모델과 비교
baseline_accuracy = 0.90
if model_info['accuracy'] > baseline_accuracy:
return {
'approved': True,
'model_uri': model_info['model_uri'],
'improvement': model_info['accuracy'] - baseline_accuracy,
}
else:
raise ValueError(f"Model accuracy {model_info['accuracy']} below baseline {baseline_accuracy}")
@task.branch
def check_deployment_decision(evaluation: dict) -> str:
"""배포 여부 결정"""
if evaluation['approved'] and evaluation['improvement'] > 0.02:
return 'deploy_to_production'
elif evaluation['approved']:
return 'deploy_to_staging'
else:
return 'notify_failure'
@task
def deploy_to_production(evaluation: dict):
"""프로덕션 배포"""
import mlflow.sagemaker
print(f"Deploying model {evaluation['model_uri']} to production")
# mlflow.sagemaker.deploy(...)
@task
def deploy_to_staging(evaluation: dict):
"""스테이징 배포"""
print(f"Deploying model {evaluation['model_uri']} to staging")
@task(trigger_rule=TriggerRule.ONE_FAILED)
def notify_failure():
"""실패 알림"""
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_ml_alerts')
hook.send(text="ML Training Pipeline Failed!")
@task(trigger_rule=TriggerRule.ONE_SUCCESS)
def notify_success(**context):
"""성공 알림"""
from airflow.providers.slack.hooks.slack_webhook import SlackWebhookHook
hook = SlackWebhookHook(slack_webhook_conn_id='slack_ml_alerts')
hook.send(text=f"ML Training Pipeline Completed! Run: {context['run_id']}")
# DAG 흐름 정의
data_info = prepare_training_data(execution_date='{{ ds }}')
model_info = train_model(data_info)
evaluation = evaluate_model(model_info)
decision = check_deployment_decision(evaluation)
# 조건부 분기
prod_deploy = deploy_to_production(evaluation)
staging_deploy = deploy_to_staging(evaluation)
failure_notify = notify_failure()
success_notify = notify_success()
decision >> [prod_deploy, staging_deploy, failure_notify]
[prod_deploy, staging_deploy] >> success_notify
# DAG 인스턴스 생성
dag = ml_training_pipeline()
PM: "매일 아침 데이터 리포트가 늦게 나온다는 민원이 많습니다."
데이터 엔지니어: "Airflow에서 확인해보니 업스트림 태스크가 새벽 3시에 실패해서 자동 재시도 중이었어요. SLA 설정해서 알림 받았으면 빨리 대응했을 텐데요."
테크리드: "주요 DAG에 SLA Miss 알림을 추가하고, 실패 시 온콜 담당자에게 PagerDuty로 알림 가도록 설정합시다."
면접관: "Airflow 2.x의 TaskFlow API에 대해 설명해주세요."
지원자: "@task 데코레이터를 사용해서 Python 함수를 직접 태스크로 정의할 수 있습니다. XCom 데이터 전달이 암묵적으로 처리되어 코드가 훨씬 간결해지고, 타입 힌트도 지원됩니다. 기존 Operator와도 혼용 가능합니다."
면접관: "대규모 DAG 운영 경험이 있으시면 공유해주세요."
지원자: "500개 이상의 DAG를 운영했는데, DAG 파일 파싱 시간 최적화가 중요했습니다. Dynamic DAG 생성을 최소화하고, 무거운 import를 지연시키고, dag_dir_list_interval 튜닝을 했습니다."
리뷰어: "이 DAG에 depends_on_past=True가 설정되어 있는데, 필요한 이유가 있나요?"
개발자: "이전 날짜 데이터가 있어야 증분 계산이 가능해서요."
리뷰어: "그럼 wait_for_downstream도 True로 설정해야 할 것 같아요. 그리고 backfill 시 병목이 될 수 있으니 최초 실행 시 manual trigger로 초기 데이터를 먼저 적재하는 방법도 고려해보세요."