빅데이터
Big Data
대용량, 고속, 다양한 형태의 데이터. 3V(Volume, Velocity, Variety)로 정의. Hadoop, Spark로 처리.
Big Data
대용량, 고속, 다양한 형태의 데이터. 3V(Volume, Velocity, Variety)로 정의. Hadoop, Spark로 처리.
빅데이터(Big Data)는 전통적인 데이터 처리 도구로는 관리하기 어려운 대규모 데이터 집합을 의미합니다. 3V로 정의되는 특성을 가집니다: Volume(대용량, 페타바이트 이상), Velocity(고속 생성, 실시간 스트리밍), Variety(다양한 형식, 정형/비정형/반정형). 최근에는 Veracity(정확성), Value(가치)를 추가한 5V로 확장됩니다.
빅데이터 처리의 핵심 기술은 분산 컴퓨팅입니다. Apache Hadoop은 HDFS(분산 파일 시스템)와 MapReduce(분산 처리 프레임워크)로 페타바이트 규모 데이터를 처리합니다. Apache Spark는 인메모리 처리로 Hadoop MapReduce보다 100배 이상 빠른 성능을 제공하며, 현재 배치/스트리밍 분석의 사실상 표준입니다.
실시간 스트리밍 처리에는 Apache Kafka(메시지 큐), Apache Flink(스트림 프로세싱), Spark Streaming이 사용됩니다. 클라우드 환경에서는 AWS EMR, Google Dataproc, Azure HDInsight 같은 관리형 서비스로 인프라 운영 부담 없이 빅데이터를 처리할 수 있습니다. Databricks는 Spark 기반의 통합 분석 플랫폼으로 인기를 얻고 있습니다.
빅데이터는 AI/ML 모델 학습의 기반이 됩니다. 대규모 학습 데이터가 모델 성능을 결정하며, GPT 같은 대규모 언어 모델은 수천억 토큰의 텍스트 데이터로 학습됩니다. IoT, 소셜 미디어, 금융 거래, 의료 기록 등 다양한 분야에서 빅데이터 분석이 비즈니스 의사결정과 혁신을 주도하고 있습니다.
# Apache Spark를 활용한 빅데이터 처리 예시
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
# Spark 세션 생성 (클러스터 모드)
spark = SparkSession.builder \
.appName("BigDataAnalytics") \
.config("spark.sql.adaptive.enabled", "true") \
.config("spark.sql.shuffle.partitions", "200") \
.config("spark.executor.memory", "8g") \
.config("spark.executor.cores", "4") \
.config("spark.dynamicAllocation.enabled", "true") \
.getOrCreate()
# 1. 대용량 데이터 로드 (페타바이트 규모)
# Parquet 형식으로 효율적 컬럼 읽기
events_df = spark.read \
.parquet("s3://datalake/events/year=2024/") \
.filter(col("event_date") >= "2024-01-01")
print(f"총 레코드 수: {events_df.count():,}")
print(f"파티션 수: {events_df.rdd.getNumPartitions()}")
# 2. 3V 특성 활용
# Volume: 수십억 건 데이터 병렬 처리
# Velocity: 시간 단위 파티션으로 증분 처리
# Variety: JSON, 로그, 트랜잭션 데이터 통합
# 복잡한 집계 연산 (분산 처리)
user_metrics = events_df \
.groupBy("user_id", "event_date") \
.agg(
count("*").alias("event_count"),
countDistinct("session_id").alias("session_count"),
sum("revenue").alias("total_revenue"),
collect_set("event_type").alias("event_types"),
avg("page_load_time").alias("avg_load_time")
)
# 3. 윈도우 함수로 시계열 분석
window_spec = Window \
.partitionBy("user_id") \
.orderBy("event_date") \
.rowsBetween(-6, 0) # 7일 이동 윈도우
user_trends = user_metrics \
.withColumn("rolling_avg_revenue",
avg("total_revenue").over(window_spec)) \
.withColumn("revenue_growth",
(col("total_revenue") - lag("total_revenue", 1).over(
Window.partitionBy("user_id").orderBy("event_date")
)) / lag("total_revenue", 1).over(
Window.partitionBy("user_id").orderBy("event_date")
))
# 4. 대규모 ML 파이프라인 (분산 학습)
# 특성 벡터 생성
assembler = VectorAssembler(
inputCols=["event_count", "session_count", "total_revenue", "avg_load_time"],
outputCol="features"
)
# 분산 학습 (수백만 샘플)
rf = RandomForestClassifier(
labelCol="churn_label",
featuresCol="features",
numTrees=100,
maxDepth=10,
seed=42
)
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(user_trends.filter(col("churn_label").isNotNull()))
# 5. 결과 저장 (최적화된 형식)
user_trends.write \
.partitionBy("event_date") \
.mode("overwrite") \
.format("delta") \
.option("mergeSchema", "true") \
.save("s3://datalake/gold/user_metrics/")
# 6. 실시간 스트리밍 처리 (Kafka 연동)
streaming_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "user-events") \
.option("startingOffsets", "latest") \
.load()
# 스트림 변환
processed_stream = streaming_df \
.select(from_json(col("value").cast("string"), schema).alias("data")) \
.select("data.*") \
.withWatermark("timestamp", "10 minutes") \
.groupBy(window("timestamp", "5 minutes"), "user_id") \
.agg(count("*").alias("event_count"))
# 스트림 출력 (Delta Lake)
query = processed_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "s3://checkpoints/user_counts/") \
.trigger(processingTime="1 minute") \
.start("s3://datalake/streaming/user_counts/")
print("Streaming query started...")