📊 데이터공학

데이터 리니지

Data Lineage

데이터 흐름 추적. 원본부터 최종 결과까지 경로.

상세 설명

데이터 리니지(Data Lineage)는 데이터의 생성, 이동, 변환 과정을 추적하여 원본부터 최종 결과까지의 흐름을 시각화하는 기술입니다. 데이터가 '어디서 왔고, 어떻게 변환되었으며, 어디로 갔는지'를 투명하게 파악할 수 있게 합니다.

데이터 거버넌스, 규정 준수(GDPR, SOX, HIPAA), 영향도 분석, 디버깅에 필수적인 기능입니다. 스키마 변경이나 데이터 품질 이슈 발생 시 어떤 하류 시스템에 영향을 주는지 즉시 파악할 수 있어 장애 대응 시간을 크게 단축합니다.

OpenLineage가 업계 표준 프로토콜로 자리 잡았으며, Airflow, Spark, dbt 등 주요 데이터 도구들이 OpenLineage 이벤트를 자동 발행합니다. DataHub, Marquez, Apache Atlas 같은 플랫폼이 리니지 그래프를 수집하고 시각화합니다.

컬럼 레벨 리니지(Column-Level Lineage)는 개별 필드가 어떤 변환을 거쳐 생성되었는지까지 추적합니다. SQL 파싱을 통해 SELECT, JOIN, GROUP BY 연산에서 필드 간 매핑 관계를 자동 추출합니다.

코드 예제

Python (OpenLineage)
# OpenLineage 기반 데이터 리니지 추적
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
    RunEvent, RunState, Run, Job,
    InputDataset, OutputDataset
)
from openlineage.client.facet import (
    SqlJobFacet, SchemaDatasetFacet, SchemaField,
    ColumnLineageDatasetFacet
)
import uuid
from datetime import datetime

# OpenLineage 클라이언트 초기화
client = OpenLineageClient(url="http://marquez:5000")
namespace = "data_platform"

# Job과 Run 정의
job = Job(namespace=namespace, name="daily_customer_aggregation")
run = Run(runId=str(uuid.uuid4()))

# 리니지 이벤트 발행: 작업 시작
start_event = RunEvent(
    eventType=RunState.START,
    eventTime=datetime.now().isoformat(),
    job=job,
    run=run,
    inputs=[
        InputDataset(
            namespace=namespace,
            name="bronze.orders",
            facets={
                "schema": SchemaDatasetFacet(fields=[
                    SchemaField(name="order_id", type="STRING"),
                    SchemaField(name="customer_id", type="STRING"),
                    SchemaField(name="amount", type="DECIMAL")
                ])
            }
        ),
        InputDataset(namespace=namespace, name="bronze.customers")
    ],
    outputs=[
        OutputDataset(
            namespace=namespace,
            name="gold.customer_metrics",
            facets={
                "columnLineage": ColumnLineageDatasetFacet(fields={
                    "total_revenue": {
                        "inputFields": [
                            {"namespace": namespace,
                             "name": "bronze.orders",
                             "field": "amount"}
                        ],
                        "transformationType": "AGGREGATION",
                        "transformationDescription": "SUM(amount)"
                    }
                })
            }
        )
    ],
    producer="https://github.com/company/data-pipeline"
)
client.emit(start_event)

# 작업 완료 이벤트
complete_event = RunEvent(
    eventType=RunState.COMPLETE,
    eventTime=datetime.now().isoformat(),
    job=job, run=run
)
client.emit(complete_event)

print(f"Lineage event emitted for run: {run.runId}")

실무 대화

원천 테이블 컬럼을 바꾸려는데, 어디에 영향을 주는지 모르겠어요.
리니지 그래프를 확인하세요. DataHub나 dbt docs에서 해당 컬럼의 다운스트림을 조회하면 영향받는 모든 모델과 대시보드를 확인할 수 있어요. 변경 전에 영향도 분석(Impact Analysis)을 필수로 수행해야 합니다.
GDPR 개인정보 삭제 요청 시 리니지를 어떻게 활용하나요?
리니지를 통해 특정 개인 데이터가 어떤 테이블과 파이프라인을 거쳤는지 추적합니다. email 컬럼이 있는 모든 하류 테이블을 식별하고, 삭제 대상 범위를 파악해요. 이를 '잊힐 권리' 대응에 활용합니다. 컬럼 레벨 리니지가 있으면 더 정확합니다.
Airflow DAG에서 리니지가 자동 수집되지 않는 것 같아요.
airflow.cfg에 OpenLineage 설정을 추가해야 해요. [openlineage] 섹션에 transport URL을 지정하고, 커스텀 Operator는 OpenLineage Extractor를 구현해야 합니다. 공식 Provider들은 대부분 자동 지원되니 버전을 확인해보세요.

주의사항

  • 레거시 시스템이나 수동 프로세스는 자동 리니지 수집이 어려우므로 수동 등록이 필요합니다.
  • 대규모 리니지 그래프는 시각화가 복잡해지므로 필터링과 레벨 제한 기능이 중요합니다.
  • 컬럼 레벨 리니지는 SQL 파싱 정확도에 의존하므로 복잡한 동적 SQL은 추적 누락이 발생할 수 있습니다.
  • 도구별 메타데이터 형식이 다르므로 OpenLineage 같은 표준 프로토콜 채택을 권장합니다.
  • dbt는 manifest.json에서 리니지 정보를 자동 생성하며, DataHub/Marquez와 쉽게 연동됩니다.

더 배우기