📊
데이터공학
Flink
Apache Flink
실시간 스트림 처리 프레임워크. 이벤트 시간 처리에 강점.
Apache Flink
실시간 스트림 처리 프레임워크. 이벤트 시간 처리에 강점.
Apache Flink는 무한 및 유한 데이터 스트림에 대한 상태 기반 연산을 위한 분산 처리 엔진입니다. 이벤트 시간(Event Time) 처리와 정확히 한 번(Exactly-Once) 의미론을 네이티브로 지원합니다.
| 항목 | Flink | Spark Streaming |
|---|---|---|
| 처리 모델 | True Streaming | Micro-batch |
| 지연 시간 | 밀리초 | 초 단위 |
| 이벤트 시간 | 네이티브 지원 | Structured Streaming에서 지원 |
| 상태 관리 | 내장 상태 백엔드 | 제한적 |
| SQL 지원 | Flink SQL | Spark SQL |
# PyFlink를 사용한 실시간 스트림 처리
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.kafka import (
KafkaSource, KafkaOffsetsInitializer, KafkaSink,
KafkaRecordSerializationSchema
)
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time, Duration
from pyflink.datastream.functions import ProcessWindowFunction
import json
# 실행 환경 설정
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
env.enable_checkpointing(60000) # 1분마다 체크포인팅
# Kafka 소스 설정
kafka_source = KafkaSource.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_topics("user-events") \
.set_group_id("flink-consumer-group") \
.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
.set_value_only_deserializer(SimpleStringSchema()) \
.build()
# Watermark 전략 (이벤트 시간 기반)
watermark_strategy = WatermarkStrategy \
.for_bounded_out_of_orderness(Duration.of_seconds(5)) \
.with_timestamp_assigner(
lambda event, timestamp: json.loads(event)['event_time']
)
# 스트림 생성
stream = env.from_source(
kafka_source,
watermark_strategy,
"Kafka Source"
)
# 이벤트 파싱 및 키 추출
def parse_event(event_str):
event = json.loads(event_str)
return (event['user_id'], event)
parsed_stream = stream.map(
parse_event,
output_type=Types.TUPLE([Types.STRING(), Types.MAP(Types.STRING(), Types.STRING())])
)
# 윈도우 집계
class EventCountWindowFunction(ProcessWindowFunction):
def process(self, key, context, elements):
count = len(list(elements))
window_start = context.window().start
window_end = context.window().end
yield {
'user_id': key,
'event_count': count,
'window_start': window_start,
'window_end': window_end
}
# 5분 텀블링 윈도우로 사용자별 이벤트 집계
result_stream = parsed_stream \
.key_by(lambda x: x[0]) \
.window(TumblingEventTimeWindows.of(Time.minutes(5))) \
.process(EventCountWindowFunction())
# 결과를 Kafka로 전송
kafka_sink = KafkaSink.builder() \
.set_bootstrap_servers("kafka:9092") \
.set_record_serializer(
KafkaRecordSerializationSchema.builder()
.set_topic("user-event-counts")
.set_value_serialization_schema(SimpleStringSchema())
.build()
) \
.build()
result_stream.map(lambda x: json.dumps(x)).sink_to(kafka_sink)
# 실행
env.execute("User Event Aggregation")
-- Flink SQL을 사용한 실시간 데이터 처리
-- Kafka 소스 테이블 생성
CREATE TABLE user_events (
user_id STRING,
event_type STRING,
event_time TIMESTAMP(3),
properties MAP<STRING, STRING>,
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'kafka:9092',
'properties.group.id' = 'flink-sql-consumer',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- 결과 싱크 테이블 (Kafka)
CREATE TABLE user_event_metrics (
user_id STRING,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3),
event_count BIGINT,
unique_event_types BIGINT,
PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'user-metrics',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 5분 텀블링 윈도우 집계 쿼리
INSERT INTO user_event_metrics
SELECT
user_id,
TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start,
TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end,
COUNT(*) AS event_count,
COUNT(DISTINCT event_type) AS unique_event_types
FROM user_events
GROUP BY
user_id,
TUMBLE(event_time, INTERVAL '5' MINUTE);
-- 슬라이딩 윈도우로 실시간 이상 탐지
SELECT
user_id,
HOP_START(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE) AS window_start,
COUNT(*) AS event_count
FROM user_events
GROUP BY
user_id,
HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '5' MINUTE)
HAVING COUNT(*) > 100; -- 5분 내 100개 이상 이벤트는 이상 징후
// Java를 사용한 Flink 스트림 처리
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
public class UserEventProcessor {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(60000);
env.setParallelism(4);
// Kafka 소스
KafkaSource<UserEvent> source = KafkaSource.<UserEvent>builder()
.setBootstrapServers("kafka:9092")
.setTopics("user-events")
.setGroupId("flink-java-consumer")
.setValueOnlyDeserializer(new UserEventDeserializer())
.build();
// Watermark 전략
WatermarkStrategy<UserEvent> watermarkStrategy =
WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(
Duration.ofSeconds(5))
.withTimestampAssigner((event, ts) -> event.getEventTime());
// 스트림 처리
DataStream<UserEventCount> result = env
.fromSource(source, watermarkStrategy, "Kafka Source")
.keyBy(UserEvent::getUserId)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(new EventCountAggregator());
result.sinkTo(kafkaSink);
env.execute("User Event Processing");
}
}