📊 데이터공학

Dagster

Dagster

데이터 파이프라인 오케스트레이터. 소프트웨어 정의 자산.

상세 설명

Dagster는 현대적인 데이터 파이프라인 오케스트레이터로, "Software-Defined Assets(소프트웨어 정의 자산)" 패러다임을 통해 데이터 엔지니어링을 혁신합니다. 기존 워크플로 오케스트레이터가 "작업(task)"을 중심으로 설계된 반면, Dagster는 "자산(asset)" 즉 데이터 제품을 중심으로 파이프라인을 정의합니다.

Asset 중심 접근의 장점은 데이터 리니지가 자연스럽게 추적된다는 것입니다. 각 자산이 어떤 업스트림 자산에 의존하는지 코드에서 명시적으로 선언하므로, 변경 영향 분석, 증분 실행, 백필이 자동으로 지원됩니다. Dagster UI의 Asset Graph는 전체 데이터 플랫폼의 구조를 시각적으로 보여주어 데이터 디스커버리에도 활용됩니다.

Dagster의 핵심 개념에는 Assets, Resources, Sensors, Schedules가 있습니다. Assets은 데이터 제품(테이블, 파일, ML 모델 등)을 나타내고, Resources는 외부 시스템(데이터베이스, 클라우드 서비스)과의 연결을 추상화합니다. Sensors는 외부 이벤트(새 파일 도착, API 호출)에 반응하여 파이프라인을 트리거하고, Schedules는 시간 기반 실행을 관리합니다.

Dagster는 개발자 경험을 중시합니다. 타입 시스템을 통해 데이터 계약을 검증하고, 로컬 개발 서버로 즉각적인 피드백을 제공하며, pytest 통합으로 파이프라인 테스트를 용이하게 합니다. dbt, Spark, Snowflake, AWS, GCP 등과의 풍부한 통합을 제공하여 기존 데이터 스택에 쉽게 도입할 수 있습니다.

코드 예제

# Dagster Asset 기반 파이프라인 예제
from dagster import (
    asset, AssetIn, AssetKey, AssetExecutionContext,
    Definitions, ScheduleDefinition, define_asset_job,
    DailyPartitionsDefinition, MetadataValue, Output,
    ConfigurableResource, MaterializeResult
)
from dagster_dbt import DbtCliResource, dbt_assets
import pandas as pd
from datetime import datetime

# ============================================
# 1. 기본 Asset 정의
# ============================================

@asset(
    description="외부 API에서 원시 사용자 데이터 수집",
    group_name="raw_data",
    compute_kind="python",
)
def raw_users(context: AssetExecutionContext) -> pd.DataFrame:
    """사용자 데이터 수집 자산"""
    # 실제로는 API 호출 또는 데이터베이스 쿼리
    users = pd.DataFrame({
        'user_id': [1, 2, 3, 4, 5],
        'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
        'email': ['alice@example.com', 'bob@example.com',
                  'charlie@example.com', 'david@example.com', 'eve@example.com'],
        'signup_date': pd.to_datetime(['2024-01-01', '2024-01-02',
                                        '2024-01-03', '2024-01-04', '2024-01-05']),
        'country': ['US', 'UK', 'US', 'DE', 'FR']
    })

    context.log.info(f"수집된 사용자 수: {len(users)}")

    # 메타데이터 기록
    return Output(
        value=users,
        metadata={
            "row_count": len(users),
            "columns": list(users.columns),
            "preview": MetadataValue.md(users.head().to_markdown())
        }
    )

@asset(
    description="외부 시스템에서 주문 데이터 수집",
    group_name="raw_data",
    compute_kind="python",
)
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    """주문 데이터 수집 자산"""
    orders = pd.DataFrame({
        'order_id': range(1, 11),
        'user_id': [1, 2, 1, 3, 4, 2, 5, 1, 3, 4],
        'product_id': ['P001', 'P002', 'P001', 'P003', 'P001',
                       'P002', 'P003', 'P002', 'P001', 'P003'],
        'amount': [100, 200, 150, 300, 100, 250, 350, 200, 150, 400],
        'order_date': pd.date_range('2024-01-01', periods=10, freq='D')
    })

    return Output(
        value=orders,
        metadata={
            "row_count": len(orders),
            "total_revenue": float(orders['amount'].sum())
        }
    )

# ============================================
# 2. 의존성이 있는 Asset
# ============================================

@asset(
    ins={
        "raw_users": AssetIn(),
        "raw_orders": AssetIn()
    },
    description="사용자 주문 통계 집계",
    group_name="analytics",
    compute_kind="pandas",
)
def user_order_stats(
    context: AssetExecutionContext,
    raw_users: pd.DataFrame,
    raw_orders: pd.DataFrame
) -> pd.DataFrame:
    """사용자별 주문 통계"""

    # 주문 집계
    order_stats = raw_orders.groupby('user_id').agg({
        'order_id': 'count',
        'amount': ['sum', 'mean']
    }).reset_index()

    order_stats.columns = ['user_id', 'order_count', 'total_spent', 'avg_order_value']

    # 사용자 정보와 조인
    result = pd.merge(raw_users, order_stats, on='user_id', how='left')
    result = result.fillna(0)

    context.log.info(f"처리된 사용자 수: {len(result)}")

    return Output(
        value=result,
        metadata={
            "row_count": len(result),
            "users_with_orders": int((result['order_count'] > 0).sum()),
            "total_revenue": float(result['total_spent'].sum())
        }
    )

# ============================================
# 3. 파티션된 Asset
# ============================================

daily_partitions = DailyPartitionsDefinition(
    start_date="2024-01-01",
    end_date="2024-12-31"
)

@asset(
    partitions_def=daily_partitions,
    description="일별 이벤트 데이터",
    group_name="raw_data",
)
def daily_events(context: AssetExecutionContext) -> pd.DataFrame:
    """파티션 키(날짜)에 해당하는 이벤트 수집"""
    partition_date = context.partition_key

    context.log.info(f"Processing partition: {partition_date}")

    # 실제로는 해당 날짜의 데이터만 조회
    events = pd.DataFrame({
        'event_id': range(100),
        'event_date': partition_date,
        'user_id': [i % 10 for i in range(100)],
        'event_type': ['click', 'view', 'purchase'] * 33 + ['click']
    })

    return events

@asset(
    partitions_def=daily_partitions,
    ins={"daily_events": AssetIn()},
    description="일별 이벤트 집계",
    group_name="analytics",
)
def daily_event_metrics(
    context: AssetExecutionContext,
    daily_events: pd.DataFrame
) -> pd.DataFrame:
    """일별 이벤트 메트릭 계산"""
    metrics = daily_events.groupby('event_type').size().reset_index(name='count')
    metrics['date'] = context.partition_key

    return metrics

# ============================================
# 4. Resource를 사용한 외부 시스템 연동
# ============================================

class DatabaseResource(ConfigurableResource):
    """데이터베이스 연결 리소스"""
    host: str
    port: int = 5432
    database: str
    user: str
    password: str

    def get_connection(self):
        # 실제로는 psycopg2 등으로 연결
        return f"postgresql://{self.user}@{self.host}:{self.port}/{self.database}"

    def execute_query(self, query: str) -> pd.DataFrame:
        # 실제로는 쿼리 실행
        return pd.DataFrame()

@asset(
    description="데이터베이스에서 고객 세그먼트 로드",
    group_name="segments",
)
def customer_segments(
    context: AssetExecutionContext,
    database: DatabaseResource
) -> pd.DataFrame:
    """데이터베이스에서 고객 세그먼트 정보 가져오기"""
    conn_string = database.get_connection()
    context.log.info(f"Connecting to: {conn_string}")

    # 실제로는 쿼리 실행
    return pd.DataFrame({
        'user_id': [1, 2, 3],
        'segment': ['VIP', 'Regular', 'New']
    })

# ============================================
# 5. dbt 통합
# ============================================

"""
# dbt 프로젝트가 있는 경우
from pathlib import Path

dbt_project_dir = Path(__file__).parent / "dbt_project"

@dbt_assets(
    manifest=dbt_project_dir / "target" / "manifest.json",
)
def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()
"""

# ============================================
# 6. Sensor 정의
# ============================================

from dagster import sensor, RunRequest, SensorEvaluationContext

@sensor(
    job_name="process_new_files_job",
    minimum_interval_seconds=60
)
def new_file_sensor(context: SensorEvaluationContext):
    """새 파일이 도착하면 파이프라인 트리거"""
    # 실제로는 S3, GCS 등 확인
    new_files = ["file1.csv", "file2.csv"]  # 예시

    for file in new_files:
        yield RunRequest(
            run_key=f"file_{file}",
            run_config={
                "ops": {
                    "process_file": {
                        "config": {"file_path": file}
                    }
                }
            }
        )

# ============================================
# 7. Job과 Schedule 정의
# ============================================

# Asset 기반 Job
daily_analytics_job = define_asset_job(
    name="daily_analytics_job",
    selection=["raw_users", "raw_orders", "user_order_stats"],
    description="일별 분석 데이터 갱신"
)

# Schedule
daily_analytics_schedule = ScheduleDefinition(
    name="daily_analytics_schedule",
    job=daily_analytics_job,
    cron_schedule="0 6 * * *",  # 매일 오전 6시
    execution_timezone="Asia/Seoul"
)

# ============================================
# 8. Definitions (프로젝트 진입점)
# ============================================

defs = Definitions(
    assets=[
        raw_users,
        raw_orders,
        user_order_stats,
        daily_events,
        daily_event_metrics,
        customer_segments,
    ],
    resources={
        "database": DatabaseResource(
            host="localhost",
            database="analytics",
            user="dagster",
            password="secret"
        )
    },
    schedules=[daily_analytics_schedule],
    sensors=[new_file_sensor],
    jobs=[daily_analytics_job]
)

실무에서 이렇게 쓰여요

데이터 엔지니어: "Airflow DAG가 너무 복잡해지고, 의존성 관리가 어려워졌어요. dbt 모델과의 연동도 깔끔하지 않고요."

테크 리드: "Dagster로 전환하면 Asset 기반으로 구조화할 수 있어요. 각 테이블이 Asset이 되고, 의존성이 코드에서 명시적으로 선언되니까 리니지도 자동으로 추적돼요."

데이터 엔지니어: "dbt와는 어떻게 통합하나요?"

테크 리드: "dagster-dbt 라이브러리로 dbt 모델을 Dagster Asset으로 자동 변환할 수 있어요. Python 전처리와 dbt 변환을 하나의 Asset Graph에서 관리하면 전체 파이프라인이 한눈에 보여요."

면접관: "Dagster의 Software-Defined Assets 개념이 기존 워크플로 오케스트레이터와 어떻게 다른가요?"

지원자: "기존 오케스트레이터는 'task'를 중심으로 설계되어 '무엇을 실행할지'에 초점을 맞춥니다. 반면 Dagster의 Software-Defined Assets는 '무엇을 생성할지'에 초점을 맞춥니다. 각 Asset이 데이터 제품(테이블, 파일, ML 모델)을 나타내고, 의존성을 선언적으로 정의합니다. 이로 인해 데이터 리니지가 자동 추적되고, 증분 실행이나 백필이 자연스럽게 지원됩니다. 또한 Asset Catalog 기능으로 데이터 디스커버리도 가능해집니다."

리뷰어: "Asset에서 직접 DB 연결을 생성하고 있는데, Resource로 분리하는 게 좋겠어요."

개발자: "Resource를 왜 써야 하나요?"

리뷰어: "테스트할 때 mock으로 대체하기 쉽고, 환경별로 다른 설정을 주입할 수 있어요. 예를 들어 로컬에서는 SQLite, 프로덕션에서는 PostgreSQL을 사용하도록 Resource만 바꾸면 되죠. 코드 재사용성도 높아지고요."

주의사항

관련 용어

더 배우기