📊 데이터공학

Flink

Apache Flink

실시간 스트림 처리 프레임워크. 이벤트 시간 처리에 강점.

상세 설명

Apache Flink는 무한 및 유한 데이터 스트림에 대한 상태 기반 연산을 위한 분산 처리 엔진입니다. 이벤트 시간(Event Time) 처리와 정확히 한 번(Exactly-Once) 의미론을 네이티브로 지원합니다.

Flink의 핵심 특징

  • 이벤트 시간 처리: Watermark 기반 지연 데이터 처리
  • 상태 관리: 대규모 상태의 일관된 관리 및 체크포인팅
  • Exactly-Once 보장: 분산 스냅샷 기반 장애 복구
  • 낮은 지연: 밀리초 수준의 실시간 처리
  • 배치/스트림 통합: 동일한 API로 배치와 스트림 처리

Flink vs Spark Streaming 비교

항목FlinkSpark Streaming
처리 모델True StreamingMicro-batch
지연 시간밀리초초 단위
이벤트 시간네이티브 지원Structured Streaming에서 지원
상태 관리내장 상태 백엔드제한적
SQL 지원Flink SQLSpark SQL

코드 예제

Python (PyFlink) 스트림 처리

# PyFlink를 사용한 실시간 스트림 처리
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
    KafkaSource, KafkaOffsetsInitializer, KafkaSink,
    KafkaRecordSerializationSchema
)
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time, Duration
from pyflink.datastream.functions import ProcessWindowFunction
import json

# 실행 환경 설정
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000)  # 1분마다 체크포인팅

# Kafka 소스 설정
kafka_source = KafkaSource.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_topics("user-events") \
    .set_group_id("flink-consumer-group") \
    .set_starting_offsets(KafkaOffsetsInitializer.latest()) \
    .set_value_only_deserializer(SimpleStringSchema()) \
    .build()

# Watermark 전략 (이벤트 시간 기반)
watermark_strategy = WatermarkStrategy \
    .for_bounded_out_of_orderness(Duration.of_seconds(5)) \
    .with_timestamp_assigner(
        lambda event, timestamp: json.loads(event)['event_time']
    )

# 스트림 생성
stream = env.from_source(
    kafka_source,
    watermark_strategy,
    "Kafka Source"
)

# 이벤트 파싱 및 키 추출
def parse_event(event_str):
    event = json.loads(event_str)
    return (event['user_id'], event)

parsed_stream = stream.map(
    parse_event,
    output_type=Types.TUPLE([Types.STRING(), Types.MAP(Types.STRING(), Types.STRING())])
)

# 윈도우 집계
class EventCountWindowFunction(ProcessWindowFunction):
    def process(self, key, context, elements):
        count = len(list(elements))
        window_start = context.window().start
        window_end = context.window().end
        yield {
            'user_id': key,
            'event_count': count,
            'window_start': window_start,
            'window_end': window_end
        }

# 5분 텀블링 윈도우로 사용자별 이벤트 집계
result_stream = parsed_stream \
    .key_by(lambda x: x[0]) \
    .window(TumblingEventTimeWindows.of(Time.minutes(5))) \
    .process(EventCountWindowFunction())

# 결과를 Kafka로 전송
kafka_sink = KafkaSink.builder() \
    .set_bootstrap_servers("kafka:9092") \
    .set_record_serializer(
        KafkaRecordSerializationSchema.builder()
            .set_topic("user-event-counts")
            .set_value_serialization_schema(SimpleStringSchema())
            .build()
    ) \
    .build()

result_stream.map(lambda x: json.dumps(x)).sink_to(kafka_sink)

# 실행
env.execute("User Event Aggregation")

Flink SQL로 실시간 집계

-- Flink SQL을 사용한 실시간 데이터 처리

-- Kafka 소스 테이블 생성
CREATE TABLE user_events (
    user_id STRING,
    event_type STRING,
    event_time TIMESTAMP(3),
    properties MAP<STRING, STRING>,
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user-events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-sql-consumer',
    'scan.startup.mode' = 'latest-offset',
    'format' = 'json'
);

-- 결과 싱크 테이블 (Kafka)
CREATE TABLE user_event_metrics (
    user_id STRING,
    window_start TIMESTAMP(3),
    window_end TIMESTAMP(3),
    event_count BIGINT,
    unique_event_types BIGINT,
    PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'user-metrics',
    'properties.bootstrap.servers' = 'kafka:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);

-- 5분 텀블링 윈도우 집계 쿼리
INSERT INTO user_event_metrics
SELECT
    user_id,
    TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
    TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
    COUNT(*) AS event_count,
    COUNT(DISTINCT event_type) AS unique_event_types
FROM user_events
GROUP BY
    user_id,
    TUMBLE(event_time, INTERVAL '5' MINUTE);

-- 슬라이딩 윈도우로 실시간 이상 탐지
SELECT
    user_id,
    HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_start,
    COUNT(*) AS event_count
FROM user_events
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
HAVING COUNT(*) > 100;  -- 5분 내 100개 이상 이벤트는 이상 징후

Java DataStream API

// Java를 사용한 Flink 스트림 처리
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;

public class UserEventProcessor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
            StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(60000);
        env.setParallelism(4);

        // Kafka 소스
        KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
            .setBootstrapServers("kafka:9092")
            .setTopics("user-events")
            .setGroupId("flink-java-consumer")
            .setValueOnlyDeserializer(new UserEventDeserializer())
            .build();

        // Watermark 전략
        WatermarkStrategy<UserEvent> watermarkStrategy =
            WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(
                Duration.ofSeconds(5))
            .withTimestampAssigner((event, ts) -> event.getEventTime());

        // 스트림 처리
        DataStream<UserEventCount> result = env
            .fromSource(source, watermarkStrategy, "Kafka Source")
            .keyBy(UserEvent::getUserId)
            .window(TumblingEventTimeWindows.of(Time.minutes(5)))
            .aggregate(new EventCountAggregator());

        result.sinkTo(kafkaSink);

        env.execute("User Event Processing");
    }
}

실무 대화 예시

데이터 엔지니어: "실시간 이상 탐지 시스템 구축해야 하는데, Spark Streaming이랑 Flink 중 뭐가 좋을까요?"
시니어 엔지니어: "지연 시간이 중요하면 Flink가 나아. Spark는 마이크로배치라 초 단위 지연이 생기는데, Flink는 밀리초 단위 처리가 가능해."
데이터 엔지니어: "이벤트 시간 기반 처리도 필요한데, 지연 도착 데이터는 어떻게 처리하나요?"
시니어 엔지니어: "Flink는 Watermark로 처리해. 예를 들어 5초 지연 허용하면, 5초 늦게 도착한 이벤트도 올바른 윈도우에 포함시킬 수 있어. 더 늦은 건 Side Output으로 별도 처리하고."
데이터 엔지니어: "상태 관리는 어떻게 되나요? 장애 발생하면 데이터 유실되나요?"
시니어 엔지니어: "Flink는 Checkpoint로 상태를 주기적으로 저장해. RocksDB 백엔드 쓰면 수 테라바이트 상태도 관리 가능하고, 장애 복구 시 Exactly-Once 보장돼."

주의사항

상태 관리 주의점

  • 상태 크기가 커지면 체크포인트 시간 증가 - TTL 설정 필수
  • RocksDB 사용 시 디스크 I/O 고려 필요
  • 키 분포가 불균형하면 특정 태스크에 상태 집중 (데이터 스큐)

Watermark 설정

  • Watermark 간격이 너무 짧으면 늦은 데이터 손실
  • 너무 길면 결과 출력 지연 및 메모리 사용 증가
  • 데이터 특성에 맞는 적절한 Out-of-orderness 설정 필요

병렬성 및 리소스

  • 병렬성 변경 시 상태 재분배에 주의
  • TaskManager 메모리 설정 - 힙/비힙 비율 조정
  • 네트워크 버퍼 크기가 처리량에 영향

관련 용어

더 배우기