Avro
Apache Avro
행 지향 직렬화 포맷. 스키마 진화 지원.
Apache Avro
행 지향 직렬화 포맷. 스키마 진화 지원.
Apache Avro는 Hadoop 생태계에서 널리 사용되는 행 지향(row-oriented) 데이터 직렬화 시스템으로, 스키마를 데이터와 함께 저장하여 자기 기술적(self-describing) 특성을 제공합니다. Doug Cutting이 개발했으며, JSON으로 정의된 스키마를 기반으로 컴팩트한 바이너리 포맷으로 데이터를 저장합니다.
Avro의 핵심 장점은 스키마 진화(Schema Evolution)입니다. 새 필드 추가, 기본값이 있는 필드 삭제, 필드 이름 변경(alias 사용) 등이 가능하며, 쓰기 스키마와 읽기 스키마가 달라도 호환성 규칙만 충족하면 데이터를 읽을 수 있습니다. 이는 데이터 파이프라인에서 프로듀서와 컨슈머의 독립적인 배포를 가능하게 합니다.
Avro는 다양한 데이터 타입을 지원합니다. 기본 타입(null, boolean, int, long, float, double, bytes, string)과 복합 타입(record, enum, array, map, union, fixed)을 조합하여 복잡한 데이터 구조를 표현할 수 있습니다. 특히 union 타입은 nullable 필드 구현에 필수적이며, ["null", "string"]처럼 여러 타입 중 하나를 선택할 수 있습니다.
Kafka, Spark, Flink, Hive 등 빅데이터 생태계에서 표준 직렬화 포맷으로 사용됩니다. Confluent Schema Registry와 함께 사용하면 중앙에서 스키마를 관리하고 호환성을 자동 검증할 수 있습니다. RPC 프로토콜도 지원하여 Avro IPC를 통한 서비스 간 통신에도 활용됩니다.
# Apache Avro 스키마 정의 및 직렬화 예제
# ============================================
# 1. Avro 스키마 정의 (user_event.avsc)
# ============================================
{
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"doc": "사용자 이벤트를 나타내는 스키마",
"fields": [
{
"name": "event_id",
"type": "string",
"doc": "고유 이벤트 ID"
},
{
"name": "user_id",
"type": "string",
"doc": "사용자 ID"
},
{
"name": "event_type",
"type": {
"type": "enum",
"name": "EventType",
"symbols": ["CLICK", "VIEW", "PURCHASE", "SIGNUP"]
},
"doc": "이벤트 유형"
},
{
"name": "timestamp",
"type": "long",
"logicalType": "timestamp-millis",
"doc": "이벤트 발생 시간 (밀리초)"
},
{
"name": "metadata",
"type": ["null", {
"type": "map",
"values": "string"
}],
"default": null,
"doc": "추가 메타데이터 (선택적)"
},
{
"name": "amount",
"type": ["null", "double"],
"default": null,
"doc": "금액 (구매 이벤트용)"
},
{
"name": "tags",
"type": {
"type": "array",
"items": "string"
},
"default": [],
"doc": "태그 목록"
},
{
"name": "device_info",
"type": ["null", {
"type": "record",
"name": "DeviceInfo",
"fields": [
{"name": "device_type", "type": "string"},
{"name": "os_version", "type": "string"},
{"name": "app_version", "type": ["null", "string"], "default": null}
]
}],
"default": null,
"doc": "디바이스 정보"
}
]
}
# ============================================
# 2. Python fastavro 라이브러리 사용
# ============================================
import fastavro
from fastavro import writer, reader, parse_schema
import io
import json
# 스키마 파싱
schema = {
"type": "record",
"name": "UserEvent",
"namespace": "com.example.events",
"fields": [
{"name": "event_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "event_type", "type": {
"type": "enum",
"name": "EventType",
"symbols": ["CLICK", "VIEW", "PURCHASE", "SIGNUP"]
}},
{"name": "timestamp", "type": "long"},
{"name": "metadata", "type": ["null", {"type": "map", "values": "string"}], "default": None},
{"name": "amount", "type": ["null", "double"], "default": None},
{"name": "tags", "type": {"type": "array", "items": "string"}, "default": []}
]
}
parsed_schema = parse_schema(schema)
# 샘플 레코드 생성
records = [
{
"event_id": "evt_001",
"user_id": "user_123",
"event_type": "CLICK",
"timestamp": 1705312800000,
"metadata": {"page": "home", "button": "cta"},
"amount": None,
"tags": ["web", "mobile"]
},
{
"event_id": "evt_002",
"user_id": "user_456",
"event_type": "PURCHASE",
"timestamp": 1705312860000,
"metadata": {"product_id": "P001"},
"amount": 99.99,
"tags": ["mobile", "ios"]
}
]
# 파일로 쓰기
with open('events.avro', 'wb') as f:
writer(f, parsed_schema, records)
# 파일에서 읽기
with open('events.avro', 'rb') as f:
avro_reader = reader(f)
for record in avro_reader:
print(record)
# ============================================
# 3. 바이트 스트림 직렬화/역직렬화 (Kafka용)
# ============================================
from fastavro import schemaless_writer, schemaless_reader
def serialize_avro(record, schema):
"""레코드를 Avro 바이트로 직렬화"""
buffer = io.BytesIO()
schemaless_writer(buffer, schema, record)
return buffer.getvalue()
def deserialize_avro(data, schema):
"""Avro 바이트를 레코드로 역직렬화"""
buffer = io.BytesIO(data)
return schemaless_reader(buffer, schema)
# 직렬화
event = {
"event_id": "evt_003",
"user_id": "user_789",
"event_type": "VIEW",
"timestamp": 1705312900000,
"metadata": None,
"amount": None,
"tags": []
}
serialized = serialize_avro(event, parsed_schema)
print(f"직렬화된 크기: {len(serialized)} bytes")
# 역직렬화
deserialized = deserialize_avro(serialized, parsed_schema)
print(f"역직렬화된 레코드: {deserialized}")
# ============================================
# 4. 스키마 진화 (Schema Evolution)
# ============================================
# 원본 스키마 (v1)
schema_v1 = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"}
]
}
# 진화된 스키마 (v2) - 필드 추가, 기본값 설정
schema_v2 = {
"type": "record",
"name": "User",
"fields": [
{"name": "id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
# 새 필드 (기본값 있음 - 하위 호환)
{"name": "phone", "type": ["null", "string"], "default": None},
{"name": "created_at", "type": "long", "default": 0}
]
}
# v1으로 작성된 데이터
user_v1 = {"id": "u001", "name": "홍길동", "email": "hong@example.com"}
# v1 스키마로 직렬화
buffer = io.BytesIO()
schemaless_writer(buffer, parse_schema(schema_v1), user_v1)
data = buffer.getvalue()
# v2 스키마로 역직렬화 (기본값 적용)
buffer = io.BytesIO(data)
# writer_schema: 쓸 때 사용한 스키마, reader_schema: 읽을 때 사용할 스키마
user_v2 = schemaless_reader(
buffer,
parse_schema(schema_v1), # writer schema
parse_schema(schema_v2) # reader schema
)
print(f"스키마 진화 결과: {user_v2}")
# {'id': 'u001', 'name': '홍길동', 'email': 'hong@example.com', 'phone': None, 'created_at': 0}
# ============================================
# 5. Confluent Schema Registry 연동
# ============================================
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
# Schema Registry 클라이언트
schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
# Avro 스키마 문자열
avro_schema_str = json.dumps(schema)
# 직렬화/역직렬화 설정
avro_serializer = AvroSerializer(
schema_registry_client,
avro_schema_str,
lambda obj, ctx: obj # dict를 그대로 사용
)
avro_deserializer = AvroDeserializer(
schema_registry_client,
avro_schema_str,
lambda obj, ctx: obj
)
# Producer 설정
producer_conf = {
'bootstrap.servers': 'localhost:9092',
'key.serializer': lambda k, ctx: k.encode('utf-8') if k else None,
'value.serializer': avro_serializer
}
producer = SerializingProducer(producer_conf)
# 메시지 전송
producer.produce(
topic='user-events',
key='user_123',
value=event,
on_delivery=lambda err, msg: print(f"Delivered: {msg.topic()}")
)
producer.flush()
# Consumer 설정
consumer_conf = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'avro-consumer-group',
'key.deserializer': lambda k, ctx: k.decode('utf-8') if k else None,
'value.deserializer': avro_deserializer,
'auto.offset.reset': 'earliest'
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['user-events'])
# 메시지 수신
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
print(f"Received: {msg.value()}")
# ============================================
# 6. Spark에서 Avro 사용
# ============================================
"""
from pyspark.sql import SparkSession
spark = SparkSession.builder \\
.appName("AvroExample") \\
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.5.0") \\
.getOrCreate()
# Avro 파일 읽기
df = spark.read.format("avro").load("events.avro")
df.show()
# Avro 파일 쓰기
df.write.format("avro").save("output_events.avro")
# 스키마 지정하여 읽기
avro_schema = open("user_event.avsc").read()
df = spark.read.format("avro") \\
.option("avroSchema", avro_schema) \\
.load("events.avro")
"""
print("Avro 예제 완료")
백엔드 개발자: "Kafka 메시지 포맷을 JSON에서 바꾸려고 하는데, Avro와 Protobuf 중 뭐가 좋을까요?"
데이터 엔지니어: "스키마 진화가 빈번하면 Avro가 유리해요. Schema Registry와 함께 쓰면 호환성 검증도 자동이고, Kafka 에코시스템에서 지원이 잘 되어있어요."
백엔드 개발자: "기존 JSON 데이터와 호환은 어떻게 하죠?"
데이터 엔지니어: "새 토픽에 Avro로 시작하고, 컨슈머에서 두 포맷 다 처리하도록 하면 돼요. 점진적으로 마이그레이션 하면서 JSON 토픽은 deprecate 하면 됩니다."
면접관: "Avro에서 스키마 호환성의 종류와 각각 어떤 상황에서 사용하는지 설명해주세요."
지원자: "Avro 스키마 호환성은 세 가지입니다. BACKWARD 호환은 새 스키마로 이전 데이터를 읽을 수 있는 것으로, 필드 추가(기본값 필수)나 필드 삭제가 가능합니다. FORWARD 호환은 이전 스키마로 새 데이터를 읽을 수 있는 것으로, 필드 삭제(기본값 필수)나 필드 추가가 가능합니다. FULL 호환은 양방향 모두 가능한 것으로 가장 엄격합니다. 일반적으로 BACKWARD 호환을 권장하며, 이는 컨슈머가 프로듀서보다 먼저 배포되는 것을 허용합니다."
리뷰어: "nullable 필드를 ["string", "null"]로 정의했는데, 순서가 중요해요."
개발자: "순서가 영향을 주나요?"
리뷰어: "네, Avro는 union의 첫 번째 타입을 기본값으로 사용해요. ["null", "string"]으로 하고 default: null을 명시해야 해요. 안 그러면 필드가 없을 때 역직렬화 에러가 날 수 있어요."