🗄️ 데이터베이스

Debezium

Change Data Capture Platform

오픈소스 CDC(Change Data Capture) 플랫폼입니다. Kafka Connect 기반으로 데이터베이스 변경 사항을 실시간 스트리밍하여 마이크로서비스 간 데이터 동기화, 캐시 무효화, 검색 인덱스 업데이트 등을 구현합니다.

📖 상세 설명

Debezium은 데이터베이스의 변경 로그(WAL, binlog 등)를 읽어 변경 이벤트를 Kafka 토픽으로 스트리밍하는 CDC 플랫폼입니다. Red Hat이 주도하는 오픈소스 프로젝트로, 쿼리 기반 폴링 없이 로우 레벨의 변경 캡처가 가능합니다.

동작 원리: 데이터베이스의 트랜잭션 로그(PostgreSQL의 WAL, MySQL의 binlog, MongoDB의 oplog)를 직접 읽어 INSERT, UPDATE, DELETE 이벤트를 감지합니다. 소스 DB에 부하를 주지 않으면서 밀리초 단위의 지연시간으로 변경을 캡처합니다.

지원 데이터베이스: PostgreSQL, MySQL/MariaDB, MongoDB, SQL Server, Oracle, Cassandra, Db2 등 다양한 데이터베이스를 커넥터로 지원합니다. 각 커넥터는 해당 DB의 복제 프로토콜을 네이티브하게 구현합니다.

사용 사례: (1) 마이크로서비스 간 데이터 동기화 - 주문 서비스 DB 변경을 재고 서비스가 구독, (2) Elasticsearch/Redis 캐시 실시간 업데이트, (3) 데이터 웨어하우스 실시간 ETL, (4) 이벤트 소싱/CQRS 패턴 구현, (5) 레거시 모놀리스에서 마이크로서비스로 점진적 마이그레이션.

Debezium 이벤트는 변경 전/후 데이터(before/after), 소스 메타데이터(테이블명, 트랜잭션 ID), 작업 타입(c/u/d/r) 등을 포함한 구조화된 JSON 형식으로 제공되어 다운스트림 처리가 용이합니다.

💻 코드 예제

# Debezium CDC 이벤트 Consumer - Python
from kafka import KafkaConsumer
import json

class DebeziumConsumer:
    """Debezium CDC 이벤트를 처리하는 Consumer"""

    def __init__(self, bootstrap_servers: str, topic: str):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='earliest',
            group_id='my-cdc-consumer'
        )

    def process_events(self):
        """CDC 이벤트 처리"""
        for message in self.consumer:
            event = message.value

            # Debezium 이벤트 구조
            payload = event.get('payload', event)
            operation = payload.get('op')  # c=create, u=update, d=delete, r=read(snapshot)
            before = payload.get('before')  # 변경 전 데이터
            after = payload.get('after')    # 변경 후 데이터
            source = payload.get('source')  # 소스 메타데이터

            # 작업 타입별 처리
            if operation == 'c':
                self.handle_insert(after, source)
            elif operation == 'u':
                self.handle_update(before, after, source)
            elif operation == 'd':
                self.handle_delete(before, source)
            elif operation == 'r':
                # 스냅샷 읽기 (초기 동기화)
                self.handle_snapshot(after, source)

    def handle_insert(self, data: dict, source: dict):
        """INSERT 이벤트 처리"""
        table = source.get('table')
        print(f"INSERT on {table}: {data}")

        # 예: Elasticsearch 인덱싱
        # es.index(index=table, id=data['id'], body=data)

        # 예: Redis 캐시 업데이트
        # redis.hset(f"{table}:{data['id']}", mapping=data)

    def handle_update(self, before: dict, after: dict, source: dict):
        """UPDATE 이벤트 처리 - 변경된 필드만 추출"""
        table = source.get('table')
        changed_fields = {
            k: after[k] for k in after
            if before.get(k) != after.get(k)
        }
        print(f"UPDATE on {table}, changed: {changed_fields}")

        # 예: 주문 상태 변경 시 알림 발송
        if table == 'orders' and 'status' in changed_fields:
            self.send_order_notification(after)

    def handle_delete(self, data: dict, source: dict):
        """DELETE 이벤트 처리"""
        table = source.get('table')
        print(f"DELETE on {table}: id={data.get('id')}")

        # 예: 캐시에서 삭제
        # redis.delete(f"{table}:{data['id']}")


# Outbox 패턴 구현 (이벤트 발행 보장)
class OutboxProcessor:
    """Outbox 테이블의 CDC 이벤트를 실제 이벤트로 변환"""

    def process_outbox_event(self, payload: dict):
        """
        Outbox 테이블 구조:
        - id: UUID
        - aggregate_type: 'Order', 'User' 등
        - aggregate_id: 실제 엔티티 ID
        - event_type: 'OrderCreated', 'OrderShipped' 등
        - payload: JSON
        """
        after = payload.get('after')
        if not after:
            return

        event = {
            'type': after['event_type'],
            'aggregate_type': after['aggregate_type'],
            'aggregate_id': after['aggregate_id'],
            'data': json.loads(after['payload']),
            'timestamp': after['created_at']
        }

        # 실제 이벤트 토픽으로 발행
        self.publish_domain_event(event)


# 사용 예시
if __name__ == "__main__":
    consumer = DebeziumConsumer(
        bootstrap_servers='localhost:9092',
        topic='mydb.public.orders'  # {server}.{schema}.{table}
    )
    consumer.process_events()
// Debezium CDC 이벤트 Consumer - Node.js
const { Kafka } = require('kafkajs');

class DebeziumConsumer {
    constructor(brokers, groupId) {
        this.kafka = new Kafka({
            clientId: 'debezium-consumer',
            brokers: brokers
        });
        this.consumer = this.kafka.consumer({ groupId });
    }

    async connect(topics) {
        await this.consumer.connect();
        await this.consumer.subscribe({
            topics: topics,
            fromBeginning: false
        });
    }

    async processEvents(handlers) {
        await this.consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                const event = JSON.parse(message.value.toString());
                const payload = event.payload || event;

                const { op, before, after, source } = payload;
                const table = source?.table;

                console.log(`[${table}] Operation: ${op}`);

                // 작업 타입별 핸들러 호출
                switch (op) {
                    case 'c': // CREATE
                        await handlers.onCreate?.(table, after, source);
                        break;
                    case 'u': // UPDATE
                        await handlers.onUpdate?.(table, before, after, source);
                        break;
                    case 'd': // DELETE
                        await handlers.onDelete?.(table, before, source);
                        break;
                    case 'r': // READ (snapshot)
                        await handlers.onSnapshot?.(table, after, source);
                        break;
                }
            }
        });
    }
}

// Elasticsearch 실시간 동기화 예제
const { Client } = require('@elastic/elasticsearch');
const esClient = new Client({ node: 'http://localhost:9200' });

const handlers = {
    async onCreate(table, data, source) {
        // 새 문서 인덱싱
        await esClient.index({
            index: table,
            id: data.id.toString(),
            body: data
        });
        console.log(`Indexed new document in ${table}`);
    },

    async onUpdate(table, before, after, source) {
        // 문서 업데이트
        await esClient.update({
            index: table,
            id: after.id.toString(),
            body: { doc: after }
        });

        // 특정 필드 변경 시 추가 로직
        if (table === 'products' && before.price !== after.price) {
            console.log(`Price changed: ${before.price} -> ${after.price}`);
            // 가격 변경 알림 등
        }
    },

    async onDelete(table, data, source) {
        // 문서 삭제
        await esClient.delete({
            index: table,
            id: data.id.toString()
        });
        console.log(`Deleted document from ${table}`);
    },

    async onSnapshot(table, data, source) {
        // 초기 스냅샷 동기화
        await esClient.index({
            index: table,
            id: data.id.toString(),
            body: data
        });
    }
};

// 실행
async function main() {
    const consumer = new DebeziumConsumer(
        ['localhost:9092'],
        'es-sync-group'
    );

    await consumer.connect([
        'mydb.public.products',
        'mydb.public.orders'
    ]);

    await consumer.processEvents(handlers);
}

main().catch(console.error);
// ============================================
// PostgreSQL Debezium Connector 설정
// ============================================
{
    "name": "postgres-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "debezium",
        "database.password": "secret",
        "database.dbname": "mydb",
        "database.server.name": "mydb",

        // 캡처할 테이블 지정
        "table.include.list": "public.orders,public.products,public.users",

        // 스냅샷 모드
        // initial: 최초 전체 스냅샷 후 스트리밍
        // never: 스냅샷 없이 바로 스트리밍
        // when_needed: 필요시 스냅샷
        "snapshot.mode": "initial",

        // PostgreSQL 복제 슬롯
        "slot.name": "debezium_slot",
        "publication.name": "dbz_publication",

        // 토픽 설정
        "topic.prefix": "mydb",

        // 변환 설정
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.unwrap.drop.tombstones": "false",

        // 스키마 레지스트리 (Avro 사용 시)
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter"
    }
}

// ============================================
// MySQL Debezium Connector 설정
// ============================================
{
    "name": "mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "debezium",
        "database.password": "secret",
        "database.server.id": "184054",
        "database.server.name": "mydb",

        // binlog 위치 (초기 시작점)
        "database.include.list": "inventory",
        "table.include.list": "inventory.orders,inventory.products",

        // GTID 모드 (고가용성 권장)
        "include.schema.changes": "true",
        "gtid.source.includes": "",

        // 스냅샷 설정
        "snapshot.mode": "initial",
        "snapshot.locking.mode": "minimal"
    }
}

// ============================================
// Docker Compose로 Debezium 실행
// ============================================
# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

  connect:
    image: debezium/connect:2.4
    depends_on:
      - kafka
    ports:
      - "8083:8083"
    environment:
      BOOTSTRAP_SERVERS: kafka:29092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses

# 커넥터 등록
# curl -X POST http://localhost:8083/connectors \
#   -H "Content-Type: application/json" \
#   -d @postgres-connector.json

🗣️ 실무에서 이렇게 말하세요

💬 마이크로서비스 아키텍처 설계 회의에서
"서비스 간 데이터 동기화를 API 호출로 하면 강결합이 됩니다. Debezium으로 CDC 구성하면 주문 서비스 DB 변경이 자동으로 Kafka로 흘러가서 재고, 배송 서비스가 독립적으로 구독할 수 있어요. 소스 서비스는 다른 서비스의 존재를 몰라도 됩니다."
💬 검색 기능 개선 논의에서
"상품 DB와 Elasticsearch 동기화가 배치라 최대 1시간 지연이 있어요. Debezium 도입하면 DB 변경 즉시 Elasticsearch에 반영되어 검색 결과가 실시간으로 업데이트됩니다. 별도 동기화 로직 없이 DB만 수정하면 되니까 개발도 단순해져요."
💬 데이터 파이프라인 장애 대응에서
"Debezium이 다운됐다가 복구되면 마지막 처리 위치부터 재개됩니다. WAL 위치를 Kafka Connect 오프셋으로 관리하거든요. 단, PostgreSQL의 복제 슬롯이 WAL을 보존하고 있어야 해서 장시간 다운되면 디스크 풀 위험이 있어요. 모니터링 알람 설정이 필수입니다."

⚠️ 주의사항 & 베스트 프랙티스

복제 슬롯 모니터링 필수

PostgreSQL 복제 슬롯이 WAL을 보존합니다. Debezium 장애 시 WAL이 삭제되지 않아 디스크가 가득 찰 수 있습니다. slot lag 모니터링과 알람을 설정하세요.

스키마 변경 시 주의

ALTER TABLE 시 Debezium이 새 스키마를 인식해야 합니다. 호환되지 않는 변경(컬럼 삭제, 타입 변경)은 Consumer 장애를 유발할 수 있으니 점진적으로 진행하세요.

대용량 스냅샷 처리

수억 건 테이블의 initial 스냅샷은 수 시간이 걸리고 소스 DB에 부하를 줍니다. snapshot.mode를 신중히 선택하고, 필요시 분할 스냅샷을 사용하세요.

Outbox 패턴 활용

비즈니스 로직과 이벤트 발행의 원자성을 위해 Outbox 테이블을 사용하세요. 트랜잭션으로 데이터와 이벤트를 함께 저장하고, Debezium이 이벤트를 캡처합니다.

🔗 관련 용어

📚 더 배우기