CDC
Change Data Capture - Real-time Data Synchronization
CDC(Change Data Capture)는 데이터베이스의 변경 사항(INSERT, UPDATE, DELETE)을 실시간으로 감지하고 캡처하는 기술입니다. Debezium, Kafka Connect 등을 통해 시스템 간 데이터 동기화, 이벤트 소싱, 실시간 ETL에 핵심적으로 사용됩니다.
Change Data Capture - Real-time Data Synchronization
CDC(Change Data Capture)는 데이터베이스의 변경 사항(INSERT, UPDATE, DELETE)을 실시간으로 감지하고 캡처하는 기술입니다. Debezium, Kafka Connect 등을 통해 시스템 간 데이터 동기화, 이벤트 소싱, 실시간 ETL에 핵심적으로 사용됩니다.
CDC의 핵심 원리 - CDC는 데이터베이스의 트랜잭션 로그(WAL, Binlog)를 읽어 변경 사항을 추출합니다. 폴링 방식과 달리 원본 DB에 부하를 주지 않으며, 삭제된 데이터도 캡처할 수 있습니다. 변경 순서가 보장되어 데이터 일관성을 유지합니다.
Debezium의 역할 - Red Hat이 개발한 오픈소스 CDC 플랫폼 Debezium은 MySQL, PostgreSQL, MongoDB, SQL Server 등 다양한 DB를 지원합니다. Kafka Connect 기반으로 동작하며, 변경 이벤트를 Kafka 토픽으로 스트리밍합니다. 스키마 레지스트리와 통합해 스키마 진화를 관리합니다.
CDC 활용 사례 - 마이크로서비스 간 데이터 동기화, CQRS 패턴의 읽기 모델 업데이트, 데이터 웨어하우스 실시간 적재, 캐시 무효화, 감사 로그 생성, 검색 인덱스 동기화(Elasticsearch) 등에 활용됩니다. 이벤트 드리븐 아키텍처의 핵심 구성 요소입니다.
로그 기반 vs 쿼리 기반 - 로그 기반 CDC(Debezium)는 트랜잭션 로그를 읽어 실시간성이 높고 부하가 적습니다. 쿼리 기반 CDC(타임스탬프 폴링)는 구현이 간단하지만 삭제 감지가 어렵고 지연이 있습니다. 대부분의 프로덕션 환경에서는 로그 기반을 권장합니다.
Outbox 패턴과의 결합 - 트랜잭션과 이벤트 발행의 원자성을 보장하기 위해 Outbox 패턴과 CDC를 함께 사용합니다. 비즈니스 테이블 변경과 Outbox 테이블 삽입을 하나의 트랜잭션으로 묶고, CDC가 Outbox 테이블을 감시하여 이벤트를 발행합니다.
// Debezium MySQL Connector 설정 (Kafka Connect REST API)
// POST /connectors
{
"name": "mysql-orders-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql-primary",
"database.port": "3306",
"database.user": "debezium",
"database.password": "${secrets:mysql-password}",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "ecommerce",
"table.include.list": "ecommerce.orders,ecommerce.order_items",
// 스키마 히스토리 (DDL 변경 추적)
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes.ecommerce",
// 변환 설정
"transforms": "route,unwrap",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "cdc.$3",
// Outbox 패턴 지원
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode": "rewrite",
// 스냅샷 설정
"snapshot.mode": "initial",
"snapshot.locking.mode": "minimal",
// 오프셋 저장
"offset.storage.topic": "connect-offsets",
"offset.flush.interval.ms": "10000"
}
}
// 토픽 메시지 예시 (orders 테이블 UPDATE)
/*
{
"before": {
"id": 1001,
"status": "pending",
"total": 50000
},
"after": {
"id": 1001,
"status": "shipped",
"total": 50000
},
"source": {
"version": "2.4.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1706277600000,
"db": "ecommerce",
"table": "orders",
"server_id": 184054,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 12345
},
"op": "u", // c=create, u=update, d=delete, r=read(snapshot)
"ts_ms": 1706277600100
}
*/
// Kafka Consumer로 CDC 이벤트 처리 (Java)
import org.apache.kafka.clients.consumer.*;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CDCEventConsumer {
private final ObjectMapper mapper = new ObjectMapper();
public void consumeCDCEvents() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("group.id", "order-sync-service");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "false");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("cdc.orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processCDCEvent(record.value());
}
consumer.commitSync(); // 처리 완료 후 커밋
}
}
}
private void processCDCEvent(String eventJson) throws Exception {
JsonNode event = mapper.readTree(eventJson);
String operation = event.get("op").asText();
switch (operation) {
case "c" -> handleCreate(event.get("after"));
case "u" -> handleUpdate(event.get("before"), event.get("after"));
case "d" -> handleDelete(event.get("before"));
case "r" -> handleSnapshot(event.get("after"));
}
}
private void handleUpdate(JsonNode before, JsonNode after) {
Long orderId = after.get("id").asLong();
String oldStatus = before.get("status").asText();
String newStatus = after.get("status").asText();
if (!oldStatus.equals(newStatus)) {
System.out.println("Order " + orderId + ": " + oldStatus + " → " + newStatus);
// 읽기 모델 업데이트, 알림 발송 등
updateReadModel(orderId, after);
sendNotification(orderId, newStatus);
}
}
}
# Python으로 CDC 이벤트 처리
from kafka import KafkaConsumer
from dataclasses import dataclass
from typing import Optional, Dict, Any
import json
@dataclass
class CDCEvent:
operation: str # c, u, d, r
before: Optional[Dict[str, Any]]
after: Optional[Dict[str, Any]]
source: Dict[str, Any]
timestamp: int
@classmethod
def from_json(cls, data: dict) -> "CDCEvent":
return cls(
operation=data.get("op"),
before=data.get("before"),
after=data.get("after"),
source=data.get("source", {}),
timestamp=data.get("ts_ms", 0)
)
class OrderCDCProcessor:
def __init__(self, kafka_servers: str):
self.consumer = KafkaConsumer(
"cdc.orders",
bootstrap_servers=kafka_servers,
group_id="order-analytics",
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
auto_offset_reset="earliest",
enable_auto_commit=False
)
def process_events(self):
"""CDC 이벤트 처리 루프"""
for message in self.consumer:
try:
event = CDCEvent.from_json(message.value)
self.handle_event(event)
self.consumer.commit()
except Exception as e:
print(f"Error processing event: {e}")
# Dead Letter Queue로 전송
def handle_event(self, event: CDCEvent):
"""이벤트 타입별 처리"""
handlers = {
"c": self.on_create,
"u": self.on_update,
"d": self.on_delete,
"r": self.on_snapshot
}
handler = handlers.get(event.operation)
if handler:
handler(event)
def on_create(self, event: CDCEvent):
"""새 주문 생성"""
order = event.after
print(f"✅ New order: #{order['id']} - {order['total']}원")
# 분석 시스템에 전송
self.send_to_analytics("order_created", order)
def on_update(self, event: CDCEvent):
"""주문 상태 변경"""
before, after = event.before, event.after
# 변경된 필드 감지
changes = {
k: (before.get(k), after.get(k))
for k in after.keys()
if before.get(k) != after.get(k)
}
if "status" in changes:
old, new = changes["status"]
print(f"📦 Order #{after['id']}: {old} → {new}")
if new == "shipped":
self.notify_shipping(after)
elif new == "delivered":
self.update_inventory(after)
def on_delete(self, event: CDCEvent):
"""주문 삭제 (Soft delete 권장)"""
order = event.before
print(f"🗑️ Order deleted: #{order['id']}")
# 인덱스에서 제거
self.remove_from_search_index(order['id'])
def send_to_analytics(self, event_type: str, data: dict):
"""분석 시스템으로 전송"""
# ClickHouse, BigQuery 등으로 적재
pass
if __name__ == "__main__":
processor = OrderCDCProcessor("kafka:9092")
processor.process_events()
"Elasticsearch 동기화를 폴링으로 하고 있는데, 삭제 이벤트를 놓치고 있어요. Debezium으로 CDC 파이프라인을 구축하면 binlog에서 DELETE까지 캡처할 수 있습니다. 지연도 수십 밀리초로 줄어들어요."
"주문 서비스에서 배송 서비스로 이벤트를 보내야 하는데, 분산 트랜잭션은 피하고 싶어요. Outbox 패턴이랑 CDC를 조합해서 최종 일관성을 보장하는 게 어떨까요? 비즈니스 로직과 메시징이 깔끔하게 분리됩니다."
"Debezium이 MySQL의 binlog를 읽는 건 슬레이브 복제와 같은 방식이라 메인 DB 부하가 거의 없어요. 단, binlog_format은 ROW로 설정해야 하고, 보관 기간도 충분히 잡아야 장애 복구가 수월합니다."
테이블 DDL 변경 시 CDC 커넥터가 중단될 수 있습니다. 스키마 레지스트리로 버전 관리하고, 호환 가능한 변경만 적용하세요.
대용량 테이블의 초기 스냅샷은 수 시간이 걸릴 수 있습니다. 점진적 스냅샷이나 병렬 처리 옵션을 활용하세요.
장애 복구 시 이벤트가 재전송될 수 있습니다. 컨슈머에서 멱등성을 보장하거나 중복 체크 로직을 구현하세요.
binlog/WAL 보관 기간이 짧으면 커넥터 재시작 시 누락이 발생합니다. 최소 7일 이상 보관을 권장합니다.