📊데이터공학

빅데이터

Big Data

대용량, 고속, 다양한 형태의 데이터. 3V(Volume, Velocity, Variety)로 정의. Hadoop, Spark로 처리.

상세 설명

빅데이터(Big Data)는 전통적인 데이터 처리 도구로는 관리하기 어려운 대규모 데이터 집합을 의미합니다. 3V로 정의되는 특성을 가집니다: Volume(대용량, 페타바이트 이상), Velocity(고속 생성, 실시간 스트리밍), Variety(다양한 형식, 정형/비정형/반정형). 최근에는 Veracity(정확성), Value(가치)를 추가한 5V로 확장됩니다.

빅데이터 처리의 핵심 기술은 분산 컴퓨팅입니다. Apache Hadoop은 HDFS(분산 파일 시스템)와 MapReduce(분산 처리 프레임워크)로 페타바이트 규모 데이터를 처리합니다. Apache Spark는 인메모리 처리로 Hadoop MapReduce보다 100배 이상 빠른 성능을 제공하며, 현재 배치/스트리밍 분석의 사실상 표준입니다.

실시간 스트리밍 처리에는 Apache Kafka(메시지 큐), Apache Flink(스트림 프로세싱), Spark Streaming이 사용됩니다. 클라우드 환경에서는 AWS EMR, Google Dataproc, Azure HDInsight 같은 관리형 서비스로 인프라 운영 부담 없이 빅데이터를 처리할 수 있습니다. Databricks는 Spark 기반의 통합 분석 플랫폼으로 인기를 얻고 있습니다.

빅데이터는 AI/ML 모델 학습의 기반이 됩니다. 대규모 학습 데이터가 모델 성능을 결정하며, GPT 같은 대규모 언어 모델은 수천억 토큰의 텍스트 데이터로 학습됩니다. IoT, 소셜 미디어, 금융 거래, 의료 기록 등 다양한 분야에서 빅데이터 분석이 비즈니스 의사결정과 혁신을 주도하고 있습니다.

코드 예제

PySpark 빅데이터 처리
# Apache Spark를 활용한 빅데이터 처리 예시
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline

# Spark 세션 생성 (클러스터 모드)
spark = SparkSession.builder \
    .appName("BigDataAnalytics") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .getOrCreate()

# 1. 대용량 데이터 로드 (페타바이트 규모)
# Parquet 형식으로 효율적 컬럼 읽기
events_df = spark.read \
    .parquet("s3://datalake/events/year=2024/") \
    .filter(col("event_date") >= "2024-01-01")

print(f"총 레코드 수: {events_df.count():,}")
print(f"파티션 수: {events_df.rdd.getNumPartitions()}")

# 2. 3V 특성 활용
# Volume: 수십억 건 데이터 병렬 처리
# Velocity: 시간 단위 파티션으로 증분 처리
# Variety: JSON, 로그, 트랜잭션 데이터 통합

# 복잡한 집계 연산 (분산 처리)
user_metrics = events_df \
    .groupBy("user_id", "event_date") \
    .agg(
        count("*").alias("event_count"),
        countDistinct("session_id").alias("session_count"),
        sum("revenue").alias("total_revenue"),
        collect_set("event_type").alias("event_types"),
        avg("page_load_time").alias("avg_load_time")
    )

# 3. 윈도우 함수로 시계열 분석
window_spec = Window \
    .partitionBy("user_id") \
    .orderBy("event_date") \
    .rowsBetween(-6, 0)  # 7일 이동 윈도우

user_trends = user_metrics \
    .withColumn("rolling_avg_revenue",
                avg("total_revenue").over(window_spec)) \
    .withColumn("revenue_growth",
                (col("total_revenue") - lag("total_revenue", 1).over(
                    Window.partitionBy("user_id").orderBy("event_date")
                )) / lag("total_revenue", 1).over(
                    Window.partitionBy("user_id").orderBy("event_date")
                ))

# 4. 대규모 ML 파이프라인 (분산 학습)
# 특성 벡터 생성
assembler = VectorAssembler(
    inputCols=["event_count", "session_count", "total_revenue", "avg_load_time"],
    outputCol="features"
)

# 분산 학습 (수백만 샘플)
rf = RandomForestClassifier(
    labelCol="churn_label",
    featuresCol="features",
    numTrees=100,
    maxDepth=10,
    seed=42
)

pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(user_trends.filter(col("churn_label").isNotNull()))

# 5. 결과 저장 (최적화된 형식)
user_trends.write \
    .partitionBy("event_date") \
    .mode("overwrite") \
    .format("delta") \
    .option("mergeSchema", "true") \
    .save("s3://datalake/gold/user_metrics/")

# 6. 실시간 스트리밍 처리 (Kafka 연동)
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest") \
    .load()

# 스트림 변환
processed_stream = streaming_df \
    .select(from_json(col("value").cast("string"), schema).alias("data")) \
    .select("data.*") \
    .withWatermark("timestamp", "10 minutes") \
    .groupBy(window("timestamp", "5 minutes"), "user_id") \
    .agg(count("*").alias("event_count"))

# 스트림 출력 (Delta Lake)
query = processed_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", "s3://checkpoints/user_counts/") \
    .trigger(processingTime="1 minute") \
    .start("s3://datalake/streaming/user_counts/")

print("Streaming query started...")

실무 대화

PM: 일별 집계 작업이 8시간이 걸리는데, 더 빠르게 할 수 없나요?
데이터 엔지니어: Spark 설정을 최적화하면 됩니다. 현재 shuffle 파티션이 200인데, 데이터 크기에 비해 적어요. 2000으로 늘리고, AQE(Adaptive Query Execution)를 활성화하면 2시간 이내로 단축될 거예요.
PM: 클라우드 비용도 고려해야 하는데요.
데이터 엔지니어: 스팟 인스턴스를 활용하면 비용을 70% 절감할 수 있어요. Dynamic Allocation으로 필요한 만큼만 리소스를 할당하고, 피크 시간 외에 실행하면 됩니다.
면접관: Spark에서 Data Skew 문제를 어떻게 해결하나요?
지원자: 특정 키에 데이터가 집중되면 일부 파티션만 오래 걸립니다. salting 기법으로 키에 랜덤 접미사를 붙여 분산시키거나, AQE의 skew join optimization을 활성화합니다. broadcast join도 작은 테이블에는 효과적입니다.
면접관: Hadoop과 Spark의 차이점은?
지원자: Hadoop MapReduce는 디스크 기반으로 매 단계 결과를 저장합니다. Spark는 인메모리 처리로 중간 결과를 메모리에 유지하여 반복 연산에서 100배 이상 빠릅니다. 현재는 Spark가 배치/스트리밍 모두에서 표준입니다.
개발자: collect()를 사용했는데 OOM이 발생해요.
시니어: collect()는 전체 데이터를 드라이버 메모리로 가져와. 빅데이터에서는 절대 쓰면 안 돼. 대신 take(n)으로 샘플만 가져오거나, write()로 분산 저장해. 집계 결과만 필요하면 agg() 후 collect()해.
개발자: 파티션 수는 어떻게 정하나요?
시니어: 일반적으로 클러스터 총 코어 수의 2-4배가 적당해. 파티션당 128MB-1GB가 이상적이야. spark.sql.adaptive.enabled=true로 AQE를 켜면 자동 최적화도 돼.

주의사항

  • collect(), toPandas()는 빅데이터에서 OOM을 유발합니다. 분산 저장 또는 샘플링을 사용하세요.
  • Data Skew는 특정 파티션에 부하를 집중시킵니다. salting, broadcast join, AQE로 해결하세요.
  • 소규모 데이터에 빅데이터 스택은 오버헤드만 증가시킵니다. 데이터 크기에 맞는 도구를 선택하세요.
  • Parquet, Delta Lake 같은 컬럼형 포맷을 사용하면 스토리지와 쿼리 성능이 크게 향상됩니다.
  • Dynamic Allocation과 스팟 인스턴스를 활용하면 클라우드 비용을 70% 이상 절감할 수 있습니다.

더 배우기