📊데이터공학

Apache Flink

스트림 및 배치 처리 프레임워크

상세 설명

Apache Flink는 무한 스트림(unbounded stream)과 유한 스트림(bounded stream, 배치)을 통합적으로 처리하는 분산 처리 엔진입니다. "스트림 우선(stream-first)" 아키텍처를 채택하여, 배치 처리도 스트림의 특수한 형태로 취급합니다. 이를 통해 실시간 처리와 배치 처리를 동일한 API와 런타임으로 수행할 수 있습니다.

Flink의 핵심 강점은 이벤트 시간(event time) 처리와 exactly-once 보장입니다. 워터마크(watermark) 메커니즘을 통해 늦게 도착하는 이벤트를 정확하게 처리하고, 체크포인팅(checkpointing)을 통해 장애 발생 시에도 데이터 손실 없이 복구할 수 있습니다.

상태 관리(stateful processing)에서도 탁월합니다. 키별 상태(keyed state)와 연산자 상태(operator state)를 지원하며, RocksDB 백엔드를 사용하면 메모리보다 큰 상태도 관리할 수 있습니다. 세이브포인트(savepoint)를 통해 애플리케이션 업그레이드 시에도 상태를 보존할 수 있습니다.

Flink SQL과 Table API를 제공하여 SQL 쿼리로도 스트림 처리가 가능합니다. 동적 테이블(dynamic table) 개념을 통해 스트림 데이터를 테이블처럼 다룰 수 있으며, CDC(Change Data Capture) 커넥터를 통해 데이터베이스 변경사항을 실시간으로 캡처하여 처리할 수 있습니다.

코드 예제

Python 클릭하여 복사
# PyFlink 스트림 처리 예제

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
)
from pyflink.datastream.functions import MapFunction, KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.common import Types, WatermarkStrategy, Duration
from pyflink.common.serialization import SimpleStringSchema
import json

# 실행 환경 설정
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

# 체크포인팅 설정 (exactly-once 보장)
env.enable_checkpointing(60000)  # 60초마다 체크포인트
env.get_checkpoint_config().set_min_pause_between_checkpoints(30000)

# ==========================================
# 1. Kafka Source 설정
# ==========================================

kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_topics("user-events") \
    .set_group_id("flink-processor") \
    .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

# 워터마크 전략 (이벤트 시간 기반)
watermark_strategy = WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(10)) \
    .with_timestamp_assigner(lambda event, ts: json.loads(event)['timestamp'])

# 소스 스트림 생성
events = env.from_source(
    kafka_source,
    watermark_strategy,
    "Kafka Source"
)

# ==========================================
# 2. 사용자 세션 분석 (Stateful Processing)
# ==========================================

class SessionAggregator(KeyedProcessFunction):
    """사용자별 세션 집계 (상태 유지)"""

    def open(self, runtime_context: RuntimeContext):
        # 상태 초기화
        self.session_state = runtime_context.get_state(
            ValueStateDescriptor("session", Types.STRING())
        )
        self.event_count = runtime_context.get_state(
            ValueStateDescriptor("count", Types.INT())
        )

    def process_element(self, event, ctx):
        data = json.loads(event)
        user_id = data['user_id']

        # 현재 상태 가져오기
        current_session = self.session_state.value()
        count = self.event_count.value() or 0

        # 세션 타임아웃 타이머 등록 (30분)
        ctx.timer_service().register_event_time_timer(
            ctx.timestamp() + 30 * 60 * 1000
        )

        # 상태 업데이트
        self.event_count.update(count + 1)

        if current_session is None:
            # 새 세션 시작
            self.session_state.update(data.get('session_id'))
            yield json.dumps({
                'user_id': user_id,
                'event': 'session_start',
                'timestamp': data['timestamp']
            })

        yield json.dumps({
            'user_id': user_id,
            'event_type': data['event_type'],
            'session_events': count + 1
        })

    def on_timer(self, timestamp, ctx):
        # 세션 종료
        count = self.event_count.value()
        yield json.dumps({
            'user_id': ctx.get_current_key(),
            'event': 'session_end',
            'total_events': count,
            'timestamp': timestamp
        })
        # 상태 클리어
        self.session_state.clear()
        self.event_count.clear()

# 스트림 처리 파이프라인
processed = events \
    .map(lambda x: json.loads(x)) \
    .key_by(lambda x: x['user_id']) \
    .process(SessionAggregator())

# ==========================================
# 3. Flink SQL로 실시간 집계
# ==========================================

from pyflink.table import StreamTableEnvironment, EnvironmentSettings

# Table 환경 설정
settings = EnvironmentSettings.new_instance().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, settings)

# Kafka 테이블 정의
t_env.execute_sql("""
    CREATE TABLE user_events (
        user_id STRING,
        event_type STRING,
        amount DECIMAL(10, 2),
        event_time TIMESTAMP(3),
        WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
    ) WITH (
        'connector' = 'kafka',
        'topic' = 'user-events',
        'properties.bootstrap.servers' = 'kafka:9092',
        'properties.group.id' = 'flink-sql',
        'format' = 'json',
        'scan.startup.mode' = 'latest-offset'
    )
""")

# 윈도우 집계 쿼리
t_env.execute_sql("""
    SELECT
        user_id,
        TUMBLE_START(event_time, INTERVAL '1' MINUTE) AS window_start,
        COUNT(*) AS event_count,
        SUM(amount) AS total_amount
    FROM user_events
    WHERE event_type = 'purchase'
    GROUP BY
        user_id,
        TUMBLE(event_time, INTERVAL '1' MINUTE)
""")

# 실행
env.execute("User Session Analysis")

실무에서 이렇게 사용됩니다

데이터 엔지니어: "현재 Spark Streaming으로 처리하는데 지연이 1분 정도 발생해요. 더 줄일 수 있을까요?"

테크리드: "Flink로 전환하면 밀리초 단위 지연이 가능해요. 진정한 스트림 처리라서 마이크로배치 오버헤드가 없습니다."

데이터 엔지니어: "상태 관리는 어떻게 되나요? 현재 Redis에 중간 상태를 저장하고 있거든요."

테크리드: "Flink는 내장 상태 관리가 있어서 외부 저장소 없이도 돼요. RocksDB 백엔드 쓰면 TB급 상태도 처리 가능합니다."

면접관: "Flink의 exactly-once 보장은 어떻게 구현되나요?"

지원자: "분산 스냅샷 알고리즘인 Chandy-Lamport를 기반으로 합니다. 체크포인트 배리어가 스트림을 통해 전파되면서 각 연산자가 상태를 스냅샷합니다. 장애 시 마지막 체크포인트부터 재처리합니다."

면접관: "워터마크가 진행되지 않으면 어떻게 되나요?"

지원자: "특정 파티션에서 이벤트가 오지 않으면 워터마크가 멈춥니다. Idleness 설정으로 유휴 파티션을 감지하거나, Processing Time 폴백을 설정할 수 있습니다."

리뷰어: "체크포인트 간격이 10초인데, 상태 크기가 크면 백프레셔가 발생할 수 있어요."

개발자: "적절한 간격은 어느 정도인가요?"

리뷰어: "상태 크기와 처리량에 따라 다르지만, 보통 1-5분 정도가 적당해요. 또한 incremental checkpointing을 활성화하면 체크포인트 시간이 크게 줄어듭니다."

주의사항

관련 용어

더 배우기