📊
데이터공학
Great Expectations
Great Expectations
데이터 품질 검증 프레임워크. 기대치 정의, 테스트.
Great Expectations
데이터 품질 검증 프레임워크. 기대치 정의, 테스트.
Great Expectations (GX)는 데이터 품질을 검증하고 문서화하기 위한 오픈소스 Python 라이브러리입니다. 데이터에 대한 "기대치(Expectations)"를 정의하고, 실제 데이터가 이를 충족하는지 자동으로 검증합니다.
| 카테고리 | Expectation | 설명 |
|---|---|---|
| 컬럼 존재 | expect_column_to_exist | 특정 컬럼 존재 확인 |
| Null 검사 | expect_column_values_to_not_be_null | Null 값 없음 확인 |
| 유일성 | expect_column_values_to_be_unique | 중복 값 없음 확인 |
| 범위 | expect_column_values_to_be_between | 값 범위 확인 |
| 집합 | expect_column_values_to_be_in_set | 허용 값 집합 확인 |
| 패턴 | expect_column_values_to_match_regex | 정규식 패턴 매칭 |
# Great Expectations를 사용한 데이터 품질 검증
import great_expectations as gx
from great_expectations.core.expectation_configuration import ExpectationConfiguration
import pandas as pd
# 컨텍스트 초기화
context = gx.get_context()
# 데이터 소스 설정
datasource = context.sources.add_pandas("my_pandas_datasource")
# 데이터 에셋 추가
data_asset = datasource.add_dataframe_asset(name="user_data")
# 샘플 데이터로 배치 요청
df = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5],
'email': ['a@test.com', 'b@test.com', 'c@test.com', 'd@test.com', 'e@test.com'],
'age': [25, 30, 35, 40, -5], # -5는 이상값
'status': ['active', 'inactive', 'active', 'pending', 'unknown']
})
batch_request = data_asset.build_batch_request(dataframe=df)
# Expectation Suite 생성
suite_name = "user_data_quality_suite"
context.add_or_update_expectation_suite(expectation_suite_name=suite_name)
# Validator 생성
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=suite_name
)
# Expectations 정의
# 1. user_id는 null이 아니어야 함
validator.expect_column_values_to_not_be_null(column="user_id")
# 2. user_id는 유일해야 함
validator.expect_column_values_to_be_unique(column="user_id")
# 3. email은 이메일 형식이어야 함
validator.expect_column_values_to_match_regex(
column="email",
regex=r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
)
# 4. age는 0~120 사이여야 함
validator.expect_column_values_to_be_between(
column="age",
min_value=0,
max_value=120
)
# 5. status는 특정 값들 중 하나여야 함
validator.expect_column_values_to_be_in_set(
column="status",
value_set=["active", "inactive", "pending"]
)
# Suite 저장
validator.save_expectation_suite(discard_failed_expectations=False)
# 검증 실행
checkpoint = context.add_or_update_checkpoint(
name="user_data_checkpoint",
validations=[
{
"batch_request": batch_request,
"expectation_suite_name": suite_name
}
]
)
result = checkpoint.run()
print(f"검증 성공: {result.success}")
# Airflow DAG에서 Great Expectations 사용
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
import great_expectations as gx
def validate_source_data(**context):
"""소스 데이터 검증"""
gx_context = gx.get_context()
checkpoint_result = gx_context.run_checkpoint(
checkpoint_name="source_data_checkpoint"
)
if not checkpoint_result.success:
# 검증 실패 시 상세 정보 로깅
for validation_result in checkpoint_result.run_results.values():
for result in validation_result["validation_result"]["results"]:
if not result["success"]:
print(f"실패: {result['expectation_config']['expectation_type']}")
print(f"세부사항: {result['result']}")
raise ValueError("데이터 품질 검증 실패!")
return "검증 통과"
def validate_transformed_data(**context):
"""변환된 데이터 검증"""
gx_context = gx.get_context()
checkpoint_result = gx_context.run_checkpoint(
checkpoint_name="transformed_data_checkpoint"
)
if not checkpoint_result.success:
raise ValueError("변환 데이터 품질 검증 실패!")
return "변환 데이터 검증 통과"
with DAG(
dag_id="data_quality_pipeline",
start_date=days_ago(1),
schedule_interval="@daily",
catchup=False
) as dag:
validate_source = PythonOperator(
task_id="validate_source_data",
python_callable=validate_source_data
)
# ETL 작업 (생략)
# transform_data = ...
validate_output = PythonOperator(
task_id="validate_transformed_data",
python_callable=validate_transformed_data
)
validate_source >> validate_output
# great_expectations/expectations/user_data_suite.json
{
"expectation_suite_name": "user_data_suite",
"ge_cloud_id": null,
"expectations": [
{
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {
"min_value": 1,
"max_value": 1000000
},
"meta": {
"notes": "테이블에 최소 1개, 최대 100만 개의 행이 있어야 함"
}
},
{
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"column": "user_id"
},
"meta": {
"notes": "user_id는 필수 필드"
}
},
{
"expectation_type": "expect_column_values_to_be_unique",
"kwargs": {
"column": "user_id"
},
"meta": {
"notes": "user_id는 기본 키로 유일해야 함"
}
},
{
"expectation_type": "expect_column_values_to_be_between",
"kwargs": {
"column": "age",
"min_value": 0,
"max_value": 120,
"mostly": 0.99
},
"meta": {
"notes": "age는 0-120 사이, 99% 이상 충족 필요"
}
},
{
"expectation_type": "expect_column_values_to_match_strftime_format",
"kwargs": {
"column": "created_at",
"strftime_format": "%Y-%m-%d %H:%M:%S"
},
"meta": {
"notes": "날짜 형식 검증"
}
}
],
"data_asset_type": null
}
# 커스텀 Expectation 정의
from great_expectations.expectations.expectation import ColumnMapExpectation
from great_expectations.execution_engine import PandasExecutionEngine
from great_expectations.expectations.metrics import ColumnMapMetricProvider, column_condition_partial
class ExpectColumnValuesToBeValidKoreanPhoneNumber(ColumnMapExpectation):
"""한국 전화번호 형식 검증 Expectation"""
expectation_type = "expect_column_values_to_be_valid_korean_phone_number"
# 설명
map_metric = "column_values.valid_korean_phone"
success_keys = ("mostly",)
default_kwarg_values = {
"mostly": 1.0, # 기본적으로 100% 충족 필요
}
class ColumnValuesValidKoreanPhone(ColumnMapMetricProvider):
condition_metric_name = "column_values.valid_korean_phone"
@column_condition_partial(engine=PandasExecutionEngine)
def _pandas(cls, column, **kwargs):
import re
# 한국 전화번호 패턴: 010-1234-5678 또는 02-123-4567
pattern = r'^(01[016789]-\d{3,4}-\d{4}|0\d{1,2}-\d{3,4}-\d{4})$'
return column.str.match(pattern, na=False)
# 사용 예시
validator.expect_column_values_to_be_valid_korean_phone_number(
column="phone_number",
mostly=0.95 # 95% 이상 충족
)