Apache Flink
스트림 및 배치 처리 프레임워크
스트림 및 배치 처리 프레임워크
Apache Flink는 무한 스트림(unbounded stream)과 유한 스트림(bounded stream, 배치)을 통합적으로 처리하는 분산 처리 엔진입니다. "스트림 우선(stream-first)" 아키텍처를 채택하여, 배치 처리도 스트림의 특수한 형태로 취급합니다. 이를 통해 실시간 처리와 배치 처리를 동일한 API와 런타임으로 수행할 수 있습니다.
Flink의 핵심 강점은 이벤트 시간(event time) 처리와 exactly-once 보장입니다. 워터마크(watermark) 메커니즘을 통해 늦게 도착하는 이벤트를 정확하게 처리하고, 체크포인팅(checkpointing)을 통해 장애 발생 시에도 데이터 손실 없이 복구할 수 있습니다.
상태 관리(stateful processing)에서도 탁월합니다. 키별 상태(keyed state)와 연산자 상태(operator state)를 지원하며, RocksDB 백엔드를 사용하면 메모리보다 큰 상태도 관리할 수 있습니다. 세이브포인트(savepoint)를 통해 애플리케이션 업그레이드 시에도 상태를 보존할 수 있습니다.
Flink SQL과 Table API를 제공하여 SQL 쿼리로도 스트림 처리가 가능합니다. 동적 테이블(dynamic table) 개념을 통해 스트림 데이터를 테이블처럼 다룰 수 있으며, CDC(Change Data Capture) 커넥터를 통해 데이터베이스 변경사항을 실시간으로 캡처하여 처리할 수 있습니다.
# PyFlink 스트림 처리 예제
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
)
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common import Types, WatermarkStrategy, Duration
from pyflink.common.serialization import SimpleStringSchema
import json
# 실행 환경 설정
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
# 체크포인팅 설정 (exactly-once 보장)
env.enable_checkpointing(60000) # 60초마다 체크포인트
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)
# ==========================================
# 1. Kafka Source 설정
# ==========================================
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("user-events") \
.set_group_id("flink-processor") \
.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
# 워터마크 전략 (이벤트 시간 기반)
watermark_strategy = WatermarkStrategy \
.for_bounded_out_of_orderness(Duration.of_seconds(10)) \
.with_timestamp_assigner(lambda event, ts: json.loads(event)['timestamp'])
# 소스 스트림 생성
events = env.from_source(
kafka_source,
watermark_strategy,
"Kafka Source"
)
# ==========================================
# 2. 사용자 세션 분석 (Stateful Processing)
# ==========================================
class SessionAggregator(KeyedProcessFunction):
"""사용자별 세션 집계 (상태 유지)"""
def open(self, runtime_context: RuntimeContext):
# 상태 초기화
self.session_state = runtime_context.get_state(
ValueStateDescriptor("session", Types.STRING())
)
self.event_count = runtime_context.get_state(
ValueStateDescriptor("count", Types.INT())
)
def process_element(self, event, ctx):
data = json.loads(event)
user_id = data['user_id']
# 현재 상태 가져오기
current_session = self.session_state.value()
count = self.event_count.value() or 0
# 세션 타임아웃 타이머 등록 (30분)
ctx.timer_service().register_event_time_timer(
ctx.timestamp() + 30 * 60 * 1000
)
# 상태 업데이트
self.event_count.update(count + 1)
if current_session is None:
# 새 세션 시작
self.session_state.update(data.get('session_id'))
yield json.dumps({
'user_id': user_id,
'event': 'session_start',
'timestamp': data['timestamp']
})
yield json.dumps({
'user_id': user_id,
'event_type': data['event_type'],
'session_events': count + 1
})
def on_timer(self, timestamp, ctx):
# 세션 종료
count = self.event_count.value()
yield json.dumps({
'user_id': ctx.get_current_key(),
'event': 'session_end',
'total_events': count,
'timestamp': timestamp
})
# 상태 클리어
self.session_state.clear()
self.event_count.clear()
# 스트림 처리 파이프라인
processed = events \
.map(lambda x: json.loads(x)) \
.key_by(lambda x: x['user_id']) \
.process(SessionAggregator())
# ==========================================
# 3. Flink SQL로 실시간 집계
# ==========================================
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
# Table 환경 설정
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, settings)
# Kafka 테이블 정의
t_env.execute_sql("""
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
amount DECIMAL(10, 2),
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql',
'format' = 'json',
'scan.startup.mode' = 'latest-offset'
)
""")
# 윈도우 집계 쿼리
t_env.execute_sql("""
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
COUNT(*) AS event_count,
SUM(amount) AS total_amount
FROM user_events
WHERE event_type = 'purchase'
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '1' MINUTE)
""")
# 실행
env.execute("User Session Analysis")
데이터 엔지니어: "현재 Spark Streaming으로 처리하는데 지연이 1분 정도 발생해요. 더 줄일 수 있을까요?"
테크리드: "Flink로 전환하면 밀리초 단위 지연이 가능해요. 진정한 스트림 처리라서 마이크로배치 오버헤드가 없습니다."
데이터 엔지니어: "상태 관리는 어떻게 되나요? 현재 Redis에 중간 상태를 저장하고 있거든요."
테크리드: "Flink는 내장 상태 관리가 있어서 외부 저장소 없이도 돼요. RocksDB 백엔드 쓰면 TB급 상태도 처리 가능합니다."
면접관: "Flink의 exactly-once 보장은 어떻게 구현되나요?"
지원자: "분산 스냅샷 알고리즘인 Chandy-Lamport를 기반으로 합니다. 체크포인트 배리어가 스트림을 통해 전파되면서 각 연산자가 상태를 스냅샷합니다. 장애 시 마지막 체크포인트부터 재처리합니다."
면접관: "워터마크가 진행되지 않으면 어떻게 되나요?"
지원자: "특정 파티션에서 이벤트가 오지 않으면 워터마크가 멈춥니다. Idleness 설정으로 유휴 파티션을 감지하거나, Processing Time 폴백을 설정할 수 있습니다."
리뷰어: "체크포인트 간격이 10초인데, 상태 크기가 크면 백프레셔가 발생할 수 있어요."
개발자: "적절한 간격은 어느 정도인가요?"
리뷰어: "상태 크기와 처리량에 따라 다르지만, 보통 1-5분 정도가 적당해요. 또한 incremental checkpointing을 활성화하면 체크포인트 시간이 크게 줄어듭니다."