📊
데이터공학
데이터 리니지
Data Lineage
데이터 흐름 추적. 원본부터 최종 결과까지 경로.
Data Lineage
데이터 흐름 추적. 원본부터 최종 결과까지 경로.
데이터 리니지(Data Lineage)는 데이터의 생성, 이동, 변환 과정을 추적하여 원본부터 최종 결과까지의 흐름을 시각화하는 기술입니다. 데이터가 '어디서 왔고, 어떻게 변환되었으며, 어디로 갔는지'를 투명하게 파악할 수 있게 합니다.
데이터 거버넌스, 규정 준수(GDPR, SOX, HIPAA), 영향도 분석, 디버깅에 필수적인 기능입니다. 스키마 변경이나 데이터 품질 이슈 발생 시 어떤 하류 시스템에 영향을 주는지 즉시 파악할 수 있어 장애 대응 시간을 크게 단축합니다.
OpenLineage가 업계 표준 프로토콜로 자리 잡았으며, Airflow, Spark, dbt 등 주요 데이터 도구들이 OpenLineage 이벤트를 자동 발행합니다. DataHub, Marquez, Apache Atlas 같은 플랫폼이 리니지 그래프를 수집하고 시각화합니다.
컬럼 레벨 리니지(Column-Level Lineage)는 개별 필드가 어떤 변환을 거쳐 생성되었는지까지 추적합니다. SQL 파싱을 통해 SELECT, JOIN, GROUP BY 연산에서 필드 간 매핑 관계를 자동 추출합니다.
# OpenLineage 기반 데이터 리니지 추적
from openlineage.client import OpenLineageClient
from openlineage.client.run import (
RunEvent, RunState, Run, Job,
InputDataset, OutputDataset
)
from openlineage.client.facet import (
SqlJobFacet, SchemaDatasetFacet, SchemaField,
ColumnLineageDatasetFacet
)
import uuid
from datetime import datetime
# OpenLineage 클라이언트 초기화
client = OpenLineageClient(url="http://marquez:5000")
namespace = "data_platform"
# Job과 Run 정의
job = Job(namespace=namespace, name="daily_customer_aggregation")
run = Run(runId=str(uuid.uuid4()))
# 리니지 이벤트 발행: 작업 시작
start_event = RunEvent(
eventType=RunState.START,
eventTime=datetime.now().isoformat(),
job=job,
run=run,
inputs=[
InputDataset(
namespace=namespace,
name="bronze.orders",
facets={
"schema": SchemaDatasetFacet(fields=[
SchemaField(name="order_id", type="STRING"),
SchemaField(name="customer_id", type="STRING"),
SchemaField(name="amount", type="DECIMAL")
])
}
),
InputDataset(namespace=namespace, name="bronze.customers")
],
outputs=[
OutputDataset(
namespace=namespace,
name="gold.customer_metrics",
facets={
"columnLineage": ColumnLineageDatasetFacet(fields={
"total_revenue": {
"inputFields": [
{"namespace": namespace,
"name": "bronze.orders",
"field": "amount"}
],
"transformationType": "AGGREGATION",
"transformationDescription": "SUM(amount)"
}
})
}
)
],
producer="https://github.com/company/data-pipeline"
)
client.emit(start_event)
# 작업 완료 이벤트
complete_event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=datetime.now().isoformat(),
job=job, run=run
)
client.emit(complete_event)
print(f"Lineage event emitted for run: {run.runId}")