📊 데이터공학

Great Expectations

Great Expectations

데이터 품질 검증 프레임워크. 기대치 정의, 테스트.

상세 설명

Great Expectations (GX)는 데이터 품질을 검증하고 문서화하기 위한 오픈소스 Python 라이브러리입니다. 데이터에 대한 "기대치(Expectations)"를 정의하고, 실제 데이터가 이를 충족하는지 자동으로 검증합니다.

Great Expectations의 핵심 개념

  • Expectation: 데이터에 대한 검증 규칙 (예: null 없음, 범위 내 값)
  • Expectation Suite: Expectation들의 집합
  • Checkpoint: 검증 실행 단위
  • Data Docs: 자동 생성되는 문서화 페이지
  • Validation Result: 검증 결과 리포트

주요 Expectation 유형

카테고리Expectation설명
컬럼 존재expect_column_to_exist특정 컬럼 존재 확인
Null 검사expect_column_values_to_not_be_nullNull 값 없음 확인
유일성expect_column_values_to_be_unique중복 값 없음 확인
범위expect_column_values_to_be_between값 범위 확인
집합expect_column_values_to_be_in_set허용 값 집합 확인
패턴expect_column_values_to_match_regex정규식 패턴 매칭

코드 예제

기본 데이터 검증 설정

# Great Expectations를 사용한 데이터 품질 검증
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration
import pandas as pd

# 컨텍스트 초기화
context = gx.get_context()

# 데이터 소스 설정
datasource = context.sources.add_pandas("my_pandas_datasource")

# 데이터 에셋 추가
data_asset = datasource.add_dataframe_asset(name="user_data")

# 샘플 데이터로 배치 요청
df = pd.DataFrame({
    'user_id': [1, 2, 3, 4, 5],
    'email': ['a@test.com', 'b@test.com', 'c@test.com', 'd@test.com', 'e@test.com'],
    'age': [25, 30, 35, 40, -5],  # -5는 이상값
    'status': ['active', 'inactive', 'active', 'pending', 'unknown']
})

batch_request = data_asset.build_batch_request(dataframe=df)

# Expectation Suite 생성
suite_name = "user_data_quality_suite"
context.add_or_update_expectation_suite(expectation_suite_name=suite_name)

# Validator 생성
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=suite_name
)

# Expectations 정의
# 1. user_id는 null이 아니어야 함
validator.expect_column_values_to_not_be_null(column="user_id")

# 2. user_id는 유일해야 함
validator.expect_column_values_to_be_unique(column="user_id")

# 3. email은 이메일 형식이어야 함
validator.expect_column_values_to_match_regex(
    column="email",
    regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)

# 4. age는 0~120 사이여야 함
validator.expect_column_values_to_be_between(
    column="age",
    min_value=0,
    max_value=120
)

# 5. status는 특정 값들 중 하나여야 함
validator.expect_column_values_to_be_in_set(
    column="status",
    value_set=["active", "inactive", "pending"]
)

# Suite 저장
validator.save_expectation_suite(discard_failed_expectations=False)

# 검증 실행
checkpoint = context.add_or_update_checkpoint(
    name="user_data_checkpoint",
    validations=[
        {
            "batch_request": batch_request,
            "expectation_suite_name": suite_name
        }
    ]
)

result = checkpoint.run()
print(f"검증 성공: {result.success}")

ETL 파이프라인과 통합

# Airflow DAG에서 Great Expectations 사용
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import great_expectations as gx

def validate_source_data(**context):
    """소스 데이터 검증"""
    gx_context = gx.get_context()

    checkpoint_result = gx_context.run_checkpoint(
        checkpoint_name="source_data_checkpoint"
    )

    if not checkpoint_result.success:
        # 검증 실패 시 상세 정보 로깅
        for validation_result in checkpoint_result.run_results.values():
            for result in validation_result["validation_result"]["results"]:
                if not result["success"]:
                    print(f"실패: {result['expectation_config']['expectation_type']}")
                    print(f"세부사항: {result['result']}")

        raise ValueError("데이터 품질 검증 실패!")

    return "검증 통과"

def validate_transformed_data(**context):
    """변환된 데이터 검증"""
    gx_context = gx.get_context()

    checkpoint_result = gx_context.run_checkpoint(
        checkpoint_name="transformed_data_checkpoint"
    )

    if not checkpoint_result.success:
        raise ValueError("변환 데이터 품질 검증 실패!")

    return "변환 데이터 검증 통과"


with DAG(
    dag_id="data_quality_pipeline",
    start_date=days_ago(1),
    schedule_interval="@daily",
    catchup=False
) as dag:

    validate_source = PythonOperator(
        task_id="validate_source_data",
        python_callable=validate_source_data
    )

    # ETL 작업 (생략)
    # transform_data = ...

    validate_output = PythonOperator(
        task_id="validate_transformed_data",
        python_callable=validate_transformed_data
    )

    validate_source >> validate_output

YAML 기반 Expectation Suite 정의

# great_expectations/expectations/user_data_suite.json
{
  "expectation_suite_name": "user_data_suite",
  "ge_cloud_id": null,
  "expectations": [
    {
      "expectation_type": "expect_table_row_count_to_be_between",
      "kwargs": {
        "min_value": 1,
        "max_value": 1000000
      },
      "meta": {
        "notes": "테이블에 최소 1개, 최대 100만 개의 행이 있어야 함"
      }
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "user_id"
      },
      "meta": {
        "notes": "user_id는 필수 필드"
      }
    },
    {
      "expectation_type": "expect_column_values_to_be_unique",
      "kwargs": {
        "column": "user_id"
      },
      "meta": {
        "notes": "user_id는 기본 키로 유일해야 함"
      }
    },
    {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": {
        "column": "age",
        "min_value": 0,
        "max_value": 120,
        "mostly": 0.99
      },
      "meta": {
        "notes": "age는 0-120 사이, 99% 이상 충족 필요"
      }
    },
    {
      "expectation_type": "expect_column_values_to_match_strftime_format",
      "kwargs": {
        "column": "created_at",
        "strftime_format": "%Y-%m-%d %H:%M:%S"
      },
      "meta": {
        "notes": "날짜 형식 검증"
      }
    }
  ],
  "data_asset_type": null
}

커스텀 Expectation 작성

# 커스텀 Expectation 정의
from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.metrics import ColumnMapMetricProvider, column_condition_partial

class ExpectColumnValuesToBeValidKoreanPhoneNumber(ColumnMapExpectation):
    """한국 전화번호 형식 검증 Expectation"""

    expectation_type = "expect_column_values_to_be_valid_korean_phone_number"

    # 설명
    map_metric = "column_values.valid_korean_phone"
    success_keys = ("mostly",)

    default_kwarg_values = {
        "mostly": 1.0,  # 기본적으로 100% 충족 필요
    }

class ColumnValuesValidKoreanPhone(ColumnMapMetricProvider):
    condition_metric_name = "column_values.valid_korean_phone"

    @column_condition_partial(engine=PandasExecutionEngine)
    def _pandas(cls, column, **kwargs):
        import re
        # 한국 전화번호 패턴: 010-1234-5678 또는 02-123-4567
        pattern = r'^(01[016789]-\d{3,4}-\d{4}|0\d{1,2}-\d{3,4}-\d{4})$'
        return column.str.match(pattern, na=False)


# 사용 예시
validator.expect_column_values_to_be_valid_korean_phone_number(
    column="phone_number",
    mostly=0.95  # 95% 이상 충족
)

실무 대화 예시

데이터 엔지니어: "ETL 파이프라인에서 가끔 잘못된 데이터가 들어와서 분석 결과가 이상해져요. 사전에 막을 방법이 없을까요?"
데이터 팀장: "Great Expectations 도입해보자. 데이터에 대한 기대치를 정의하고, ETL 파이프라인에서 자동 검증하면 불량 데이터가 들어오는 걸 막을 수 있어."
데이터 엔지니어: "기대치는 어떻게 정의하나요? 하나하나 수동으로?"
데이터 팀장: "Profiler를 사용하면 기존 데이터를 분석해서 기대치를 자동으로 추천해줘. 그걸 기반으로 조정하면 돼. 예를 들어 나이 컬럼이 0-100 사이라면 자동으로 범위 기대치를 생성해."
데이터 엔지니어: "검증 실패하면 어떻게 처리하나요?"
데이터 팀장: "Airflow 태스크에서 검증 실패 시 파이프라인을 중단시키거나, Slack으로 알림 보내게 설정할 수 있어. Data Docs로 어떤 규칙이 실패했는지 시각적으로도 확인 가능해."

주의사항

Expectation 설계

  • 너무 엄격한 기대치는 정상 데이터도 거부할 수 있음
  • mostly 파라미터로 허용 비율 설정 (예: 99% 충족)
  • 비즈니스 요구사항에 맞는 기대치 정의 필요

성능 고려사항

  • 대용량 데이터셋 검증 시 샘플링 고려
  • 모든 컬럼 검증은 시간이 오래 걸림 - 중요 컬럼 우선
  • Spark/BigQuery 연동 시 푸시다운 쿼리 활용

운영 시 주의점

  • 스키마 변경 시 Expectation Suite 업데이트 필요
  • 새로운 데이터 패턴 등장 시 기대치 재검토
  • 검증 결과 이력 관리 및 모니터링 체계 구축

관련 용어

더 배우기