데이터 계보
데이터의 출처와 변환 이력을 추적하는 것
데이터의 출처와 변환 이력을 추적하는 것
데이터 계보(Data Lineage)는 데이터가 어디서 생성되어 어떤 변환 과정을 거쳐 최종 목적지에 도달했는지를 추적하는 기술입니다. EU AI Act 제10조와 제11조에서 고위험 AI의 학습 데이터 출처 및 처리 이력 문서화를 요구하면서, AI 컴플라이언스의 핵심 요소로 부상했습니다.
데이터 계보는 크게 세 가지 수준으로 구분됩니다. 컬럼 수준 계보(column-level lineage)는 개별 필드의 변환을 추적하고, 테이블 수준 계보(table-level lineage)는 데이터셋 간의 관계를 보여주며, 비즈니스 수준 계보(business-level lineage)는 데이터의 비즈니스적 의미와 활용 맥락을 기록합니다.
AI 개발에서 데이터 계보는 특히 중요합니다. 모델이 잘못된 예측을 했을 때 원인 분석(root cause analysis)이 가능하고, 편향이 발견되면 그 출처를 역추적할 수 있습니다. 또한 GDPR의 삭제권 행사 시 해당 개인정보가 포함된 학습 데이터를 식별하는 데도 필수적입니다.
실무에서는 Apache Atlas, DataHub, Amundsen 같은 메타데이터 관리 도구나, dbt의 lineage 기능, MLflow의 데이터 버전 추적 등을 활용합니다. 자동화된 계보 추적은 수동 문서화의 오류를 줄이고 실시간 가시성을 제공합니다.
# 데이터 계보 추적 시스템 구현 예제
from dataclasses import dataclass, field
from datetime import datetime
from typing import List, Dict, Optional
from enum import Enum
import json
import hashlib
class TransformationType(Enum):
EXTRACTION = "extraction" # 원본 추출
CLEANING = "cleaning" # 정제
TRANSFORMATION = "transformation" # 변환
AGGREGATION = "aggregation" # 집계
ANONYMIZATION = "anonymization" # 익명화/가명화
AUGMENTATION = "augmentation" # 데이터 증강
SPLIT = "split" # 학습/검증 분할
@dataclass
class LineageNode:
"""계보 그래프의 노드 (데이터셋 또는 변환)"""
node_id: str
node_type: str # "dataset" or "transform"
name: str
metadata: Dict = field(default_factory=dict)
@dataclass
class LineageEdge:
"""계보 그래프의 엣지 (데이터 흐름)"""
source_id: str
target_id: str
transform_type: TransformationType
transform_params: Dict
timestamp: datetime
executor: str # 실행자 또는 시스템
class DataLineageTracker:
"""AI 학습 데이터 계보 추적기"""
def __init__(self, project_name: str):
self.project_name = project_name
self.nodes: Dict[str, LineageNode] = {}
self.edges: List[LineageEdge] = []
def register_dataset(self, dataset_id: str, name: str,
source: str, record_count: int,
schema: Dict = None) -> LineageNode:
"""데이터셋 등록"""
node = LineageNode(
node_id=dataset_id,
node_type="dataset",
name=name,
metadata={
"source": source,
"record_count": record_count,
"schema": schema or {},
"registered_at": datetime.now().isoformat()
}
)
self.nodes[dataset_id] = node
return node
def record_transformation(self,
source_ids: List[str],
target_id: str,
transform_type: TransformationType,
transform_params: Dict,
executor: str = "system") -> str:
"""변환 이력 기록"""
transform_id = f"transform_{hashlib.md5(f'{source_ids}{target_id}{datetime.now()}'.encode()).hexdigest()[:8]}"
# 변환 노드 생성
transform_node = LineageNode(
node_id=transform_id,
node_type="transform",
name=f"{transform_type.value}",
metadata={"params": transform_params}
)
self.nodes[transform_id] = transform_node
# 엣지 생성 (source -> transform -> target)
for source_id in source_ids:
self.edges.append(LineageEdge(
source_id=source_id,
target_id=transform_id,
transform_type=transform_type,
transform_params=transform_params,
timestamp=datetime.now(),
executor=executor
))
self.edges.append(LineageEdge(
source_id=transform_id,
target_id=target_id,
transform_type=transform_type,
transform_params=transform_params,
timestamp=datetime.now(),
executor=executor
))
return transform_id
def trace_upstream(self, dataset_id: str) -> List[str]:
"""특정 데이터셋의 상위 계보 추적 (출처 찾기)"""
upstream = []
visited = set()
def dfs(node_id):
if node_id in visited:
return
visited.add(node_id)
for edge in self.edges:
if edge.target_id == node_id:
upstream.append(edge.source_id)
dfs(edge.source_id)
dfs(dataset_id)
return upstream
def get_gdpr_impact(self, source_dataset_id: str) -> List[str]:
"""GDPR 삭제권 영향 분석: 특정 원본에서 파생된 모든 데이터셋"""
downstream = []
visited = set()
def dfs(node_id):
if node_id in visited:
return
visited.add(node_id)
for edge in self.edges:
if edge.source_id == node_id:
if self.nodes[edge.target_id].node_type == "dataset":
downstream.append(edge.target_id)
dfs(edge.target_id)
dfs(source_dataset_id)
return downstream
def export_for_audit(self) -> str:
"""EU AI Act 감사용 계보 문서 생성"""
return json.dumps({
"project": self.project_name,
"export_timestamp": datetime.now().isoformat(),
"datasets": {k: {"name": v.name, **v.metadata}
for k, v in self.nodes.items()
if v.node_type == "dataset"},
"transformations": [
{
"source": e.source_id,
"target": e.target_id,
"type": e.transform_type.value,
"params": e.transform_params,
"timestamp": e.timestamp.isoformat(),
"executor": e.executor
} for e in self.edges
]
}, indent=2, ensure_ascii=False)
# 사용 예시
tracker = DataLineageTracker("고객이탈예측모델")
# 원본 데이터 등록
tracker.register_dataset("raw_customer", "원본 고객 데이터",
source="CRM 시스템", record_count=100000)
# 변환 이력 기록
tracker.record_transformation(
source_ids=["raw_customer"],
target_id="anonymized_customer",
transform_type=TransformationType.ANONYMIZATION,
transform_params={"method": "k-anonymity", "k": 5}
)
# 상위 계보 추적
print(f"학습데이터 출처: {tracker.trace_upstream('anonymized_customer')}")
PM: 모델이 특정 고객군에서 예측 정확도가 낮은데 원인을 파악할 수 있나요?
ML엔지니어: 데이터 계보를 추적해보니, 해당 고객군 데이터가 외부 구매 데이터셋에서 왔는데 품질 이슈가 있었습니다.
시니어: 좋아요. 이런 추적이 가능하려면 모든 전처리 단계에서 계보 로깅이 필수입니다. EU AI Act 감사 때도 이 자료가 필요해요.
면접관: GDPR 삭제권 요청이 들어왔을 때 AI 모델에 미치는 영향을 어떻게 파악하나요?
지원자: 데이터 계보 시스템을 통해 해당 개인정보가 포함된 원본 데이터셋을 식별하고, 그로부터 파생된 모든 학습 데이터셋과 모델을 추적합니다. 영향받는 모델은 재학습 또는 machine unlearning 기법을 적용해야 하며, 이 과정도 계보에 기록해서 컴플라이언스 증빙으로 활용합니다.
시니어: 데이터 파이프라인에 계보 추적 로직이 없네요.
주니어: MLflow로 모델 버전만 관리하고 있는데 부족한가요?
시니어: 모델 버전은 모델 계보고, 데이터 계보는 별도예요. 각 전처리 step에서 입력/출력 데이터셋 ID와 변환 파라미터를 기록해야 합니다. dbt나 DataHub 연동을 검토해보세요.