Spark
Apache Spark
대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크. 인메모리 처리로 Hadoop보다 빠름.
Apache Spark
대규모 데이터 처리를 위한 분산 컴퓨팅 프레임워크. 인메모리 처리로 Hadoop보다 빠름.
Apache Spark는 2009년 UC Berkeley AMPLab에서 개발되어 2014년 Apache 최상위 프로젝트가 된 분산 컴퓨팅 프레임워크입니다. 인메모리 처리로 Hadoop MapReduce보다 100배 빠르며, 배치와 스트리밍 처리를 통합된 API로 제공합니다.
핵심 추상화는 RDD(Resilient Distributed Dataset)이지만, 현재는 DataFrame과 Dataset API를 주로 사용합니다. DataFrame은 SQL처럼 컬럼 기반 데이터를 다루며, Catalyst 쿼리 최적화와 Tungsten 실행 엔진으로 성능을 극대화합니다.
Spark SQL(SQL 쿼리), Spark Streaming(마이크로 배치 스트리밍), MLlib(머신러닝), GraphX(그래프 처리)로 구성된 통합 스택을 제공합니다. PySpark로 Python에서 Spark를 사용하며, pandas API on Spark(구 Koalas)로 Pandas 코드를 Spark에서 실행합니다.
Databricks가 Spark 상용 서비스를 제공하고, AWS EMR, Google Dataproc, Azure HDInsight에서 관리형으로 사용합니다. 페타바이트 규모 데이터 처리, ETL 파이프라인, ML 피처 엔지니어링에 사실상 표준으로 사용됩니다.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, when, lit
from pyspark.sql.window import Window
# SparkSession 생성
spark = SparkSession.builder \
.appName("DataEngineering") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.getOrCreate()
# 데이터 읽기
df = spark.read.parquet("s3://bucket/sales/*.parquet")
# df = spark.read.csv("data.csv", header=True, inferSchema=True)
# df = spark.read.json("data.json")
# 스키마 확인
df.printSchema()
df.show(5)
# DataFrame 변환
transformed = df \
.filter(col("sale_date") >= "2024-01-01") \
.withColumn("tax", col("amount") * 0.1) \
.withColumn("total", col("amount") + col("tax")) \
.withColumn("category_type",
when(col("amount") > 1000, "high")
.when(col("amount") > 100, "medium")
.otherwise("low")
)
# 그룹화 및 집계
summary = transformed \
.groupBy("region", "category_type") \
.agg(
sum("amount").alias("total_sales"),
avg("amount").alias("avg_sales"),
count("*").alias("num_transactions")
) \
.orderBy(col("total_sales").desc())
summary.show()
# 윈도우 함수
window_spec = Window.partitionBy("region").orderBy(col("sale_date"))
df_with_running = df.withColumn(
"running_total",
sum("amount").over(window_spec)
).withColumn(
"rank_in_region",
row_number().over(window_spec)
)
# SQL 쿼리 사용
df.createOrReplaceTempView("sales")
result = spark.sql("""
SELECT
region,
DATE_TRUNC('month', sale_date) AS month,
SUM(amount) AS monthly_sales
FROM sales
WHERE sale_date >= '2024-01-01'
GROUP BY region, DATE_TRUNC('month', sale_date)
ORDER BY month, monthly_sales DESC
""")
# Join
customers = spark.read.parquet("s3://bucket/customers/")
joined = df.join(customers, df.customer_id == customers.id, "left")
# 데이터 저장
summary.write \
.mode("overwrite") \
.partitionBy("region") \
.parquet("s3://bucket/output/summary/")
# Delta Lake 저장 (ACID 지원)
# summary.write.format("delta").mode("overwrite").save("s3://bucket/delta/summary")
# 캐싱 (반복 사용 시)
df.cache()
df.count() # 캐시 트리거
# 실행 계획 확인
df.explain(True)
# 종료
spark.stop()
# spark-submit으로 실행
# spark-submit --master yarn --deploy-mode cluster \
# --num-executors 10 --executor-memory 4G \
# etl_job.py
시니어: "일 100GB 로그 처리는 Spark Job으로 돌려요. EMR에서 10노드로 30분 내 완료돼요."
주니어: "Job이 자꾸 OOM 에러 나요."
시니어: "executor-memory 늘리고, 파티션 수 조정하세요. explain()으로 셔플 단계 확인하고 broadcast join 쓸 수 있는지도 봐야 해요."
면접관: "대용량 데이터 처리 경험을 말씀해주세요."
지원자: "PySpark로 하루 10TB 로그를 처리하는 ETL 파이프라인을 구축했습니다. 파티셔닝으로 셔플을 최소화하고, broadcast join으로 작은 테이블 조인을 최적화했습니다. AQE(Adaptive Query Execution)를 활성화해 런타임에 최적화하고, Delta Lake로 ACID 트랜잭션과 타임 트래블 기능을 추가했습니다."
리뷰어: "collect()는 전체 데이터를 드라이버로 가져와서 메모리 부족이 발생해요. take(100)이나 show() 쓰세요."
개발자: "디버깅용이었는데, 프로덕션 코드에서 제거하겠습니다."