Apache Arrow
Apache Arrow
언어 간 컬럼 메모리 포맷. 제로카피 데이터 교환.
Apache Arrow
언어 간 컬럼 메모리 포맷. 제로카피 데이터 교환.
Apache Arrow는 고성능 분석 애플리케이션을 위한 언어 독립적인 컬럼형 메모리 포맷입니다. 다양한 프로그래밍 언어(Python, Java, C++, Rust, Go 등)에서 동일한 메모리 레이아웃을 사용하여 데이터를 표현함으로써, 직렬화/역직렬화 오버헤드 없이 제로카피(zero-copy) 데이터 교환이 가능합니다.
컬럼형 메모리 포맷의 핵심 장점은 분석 쿼리에서의 성능입니다. 특정 컬럼만 읽을 때 불필요한 데이터를 건너뛸 수 있고, SIMD(Single Instruction Multiple Data) 명령어를 활용한 벡터화 연산이 가능하며, CPU 캐시 효율성이 극대화됩니다.
Arrow는 단순히 메모리 포맷만이 아니라 완전한 데이터 처리 생태계를 제공합니다. Arrow Flight는 고성능 RPC 프레임워크로 네트워크를 통한 대용량 데이터 전송에 최적화되어 있고, Arrow Compute는 다양한 분석 함수 라이브러리를, Arrow Dataset은 Parquet, CSV, ORC 등 다양한 파일 포맷의 읽기/쓰기를 지원합니다.
현재 Pandas 2.0, Spark, DuckDB, Polars, DataFusion 등 주요 데이터 처리 도구들이 Arrow를 내부적으로 사용하거나 Arrow와의 상호 운용을 지원합니다. 이를 통해 서로 다른 시스템 간 데이터 교환 시 변환 비용을 최소화할 수 있습니다.
# PyArrow를 사용한 고성능 데이터 처리 예제
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow.flight as flight
import pandas as pd
import numpy as np
# ==========================================
# 1. Arrow Table 생성 및 기본 연산
# ==========================================
# Python 리스트에서 Arrow Table 생성
data = {
'user_id': [1, 2, 3, 4, 5],
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'age': [25, 30, 35, 28, 32],
'salary': [50000.0, 60000.0, 75000.0, 55000.0, 80000.0],
'department': ['Engineering', 'Marketing', 'Engineering', 'Sales', 'Engineering']
}
# Arrow Table 생성
table = pa.table(data)
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}, Columns: {table.num_columns}")
# Arrow Compute 함수를 사용한 벡터화 연산
# 급여 10% 인상 계산
salary_increase = pc.multiply(table.column('salary'), 1.1)
print(f"Salary after 10% raise: {salary_increase}")
# 조건 필터링 (Engineering 부서)
mask = pc.equal(table.column('department'), 'Engineering')
engineering_team = pc.filter(table, mask)
print(f"Engineering team: {engineering_team.to_pandas()}")
# 집계 연산
avg_salary = pc.mean(table.column('salary'))
max_age = pc.max(table.column('age'))
print(f"Average salary: {avg_salary}, Max age: {max_age}")
# ==========================================
# 2. Pandas와의 제로카피 변환
# ==========================================
# 대용량 데이터 생성
n_rows = 10_000_000
large_data = {
'id': np.arange(n_rows),
'value': np.random.randn(n_rows),
'category': np.random.choice(['A', 'B', 'C'], n_rows)
}
# Pandas DataFrame으로 생성
df = pd.DataFrame(large_data)
# Arrow Table로 변환 (제로카피 가능한 경우)
arrow_table = pa.Table.from_pandas(df, preserve_index=False)
# 다시 Pandas로 변환 (제로카피)
df_back = arrow_table.to_pandas(self_destruct=True) # 메모리 효율적
# ==========================================
# 3. Parquet 파일 읽기/쓰기
# ==========================================
# Parquet 파일 쓰기 (압축 및 파티셔닝)
pq.write_table(
table,
'employees.parquet',
compression='snappy',
row_group_size=1000000
)
# Parquet 파일 읽기 (특정 컬럼만)
table_read = pq.read_table(
'employees.parquet',
columns=['name', 'salary']
)
# 파티션된 데이터셋 쓰기
pq.write_to_dataset(
arrow_table,
root_path='partitioned_data',
partition_cols=['category']
)
# ==========================================
# 4. Dataset API로 대용량 데이터 처리
# ==========================================
# 여러 Parquet 파일을 하나의 Dataset으로
dataset = ds.dataset(
'partitioned_data',
format='parquet',
partitioning='hive'
)
# 푸시다운 필터링과 프로젝션
scanner = dataset.scanner(
columns=['id', 'value'],
filter=ds.field('category') == 'A'
)
# 배치 단위로 스트리밍 처리
for batch in scanner.to_batches():
# 각 배치를 처리 (메모리 효율적)
result = pc.mean(batch.column('value'))
print(f"Batch mean: {result}")
# ==========================================
# 5. Arrow Flight로 고성능 데이터 전송
# ==========================================
class FlightServer(flight.FlightServerBase):
def __init__(self, location, data):
super().__init__(location)
self.data = data
def do_get(self, context, ticket):
"""데이터 스트리밍 제공"""
return flight.RecordBatchStream(self.data)
def get_flight_info(self, context, descriptor):
"""데이터셋 메타데이터 제공"""
schema = self.data.schema
endpoints = [flight.FlightEndpoint(
descriptor.path[0],
[flight.Location.for_grpc_tcp("localhost", 8815)]
)]
return flight.FlightInfo(
schema, descriptor, endpoints,
self.data.num_rows, -1
)
# 클라이언트에서 데이터 가져오기
# client = flight.connect("grpc://localhost:8815")
# reader = client.do_get(ticket)
# table = reader.read_all()
데이터 엔지니어: "Python에서 Spark로 데이터 넘길 때 직렬화 오버헤드가 너무 커요. 1GB 데이터인데 변환에 30초나 걸려요."
테크리드: "Arrow 포맷 사용하면 제로카피로 넘길 수 있어요. Spark 3.0부터 Arrow 기반 pandas UDF가 기본이에요."
데이터 엔지니어: "Pandas도 Arrow 지원하나요?"
테크리드: "Pandas 2.0부터 Arrow 백엔드를 선택할 수 있어요. 문자열 처리가 특히 빨라지고, nullable 타입도 제대로 지원됩니다."
면접관: "Arrow가 기존 Parquet와 어떻게 다른가요?"
지원자: "Parquet는 디스크 저장용 컬럼형 포맷이고, Arrow는 메모리상 컬럼형 포맷입니다. Parquet 파일을 읽으면 Arrow Table로 로드되고, Arrow Table을 Parquet로 저장할 수 있어서 상호보완적입니다."
면접관: "제로카피의 장점과 제약사항은요?"
지원자: "장점은 메모리 복사 없이 포인터만 전달하므로 매우 빠르고 메모리 효율적입니다. 제약은 양쪽 시스템이 동일한 Arrow 메모리 레이아웃을 이해해야 하고, 데이터가 immutable해야 합니다."
리뷰어: "여기서 to_pandas() 호출할 때 self_destruct=True 옵션을 고려해보세요."
개발자: "그게 뭐하는 옵션인가요?"
리뷰어: "Arrow Table을 Pandas로 변환하면서 Arrow 메모리를 점진적으로 해제해요. 대용량 데이터에서 메모리 피크를 줄일 수 있습니다. 단, Arrow Table을 다시 사용할 수 없으니 주의하세요."