Replication
레플리케이션, 데이터 복제
데이터베이스의 데이터를 여러 노드(서버)에 복제하여 고가용성(HA), 내결함성, 읽기 성능을 향상시키는 기술입니다. Primary-Replica(Master-Slave) 구조가 가장 일반적이며, 장애 시 자동 Failover를 지원합니다.
레플리케이션, 데이터 복제
데이터베이스의 데이터를 여러 노드(서버)에 복제하여 고가용성(HA), 내결함성, 읽기 성능을 향상시키는 기술입니다. Primary-Replica(Master-Slave) 구조가 가장 일반적이며, 장애 시 자동 Failover를 지원합니다.
레플리케이션(Replication)은 하나의 데이터베이스 서버(Primary/Master)의 데이터를 하나 이상의 서버(Replica/Slave)에 실시간으로 복제하는 기술입니다. 데이터의 중복 저장을 통해 단일 장애점(SPOF)을 제거하고, 읽기 요청을 분산시켜 성능을 향상시킵니다.
동기(Synchronous) 복제는 Primary가 트랜잭션을 커밋하기 전에 최소 하나의 Replica가 데이터를 받았음을 확인합니다. 데이터 손실이 없지만 지연이 발생합니다. 비동기(Asynchronous) 복제는 Primary가 즉시 커밋하고 Replica는 나중에 따라잡습니다. 빠르지만 장애 시 최근 데이터가 유실될 수 있습니다.
복제 방식에는 Statement-based(SQL 문 복제), Row-based(변경된 행 복제), Mixed(상황에 따라 선택)가 있습니다. PostgreSQL의 WAL(Write-Ahead Log) 기반 Streaming Replication, MySQL의 Binary Log 기반 복제가 대표적입니다.
Failover는 Primary 장애 시 Replica 중 하나를 새로운 Primary로 승격시키는 과정입니다. 자동 Failover를 위해 PostgreSQL은 Patroni, MySQL은 MySQL InnoDB Cluster나 Orchestrator를 사용합니다. 클라우드 환경에서는 AWS RDS, Cloud SQL 등이 관리형으로 제공합니다.
읽기 부하 분산을 위해 애플리케이션에서 쓰기는 Primary로, 읽기는 Replica로 라우팅합니다. 단, Replica는 약간의 지연(Replication Lag)이 있으므로 즉시 일관성이 필요한 읽기는 Primary에서 처리해야 합니다.
# Primary-Replica 읽기/쓰기 분리 - Python
import psycopg2
from psycopg2 import pool
from contextlib import contextmanager
from typing import Optional
import random
class ReplicaAwareDBPool:
"""
Primary-Replica 구조의 DB 연결 풀
쓰기: Primary, 읽기: Replica(s)
"""
def __init__(self, primary_config: dict, replica_configs: list):
# Primary 연결 풀 (쓰기 전용)
self.primary_pool = pool.ThreadedConnectionPool(
minconn=2,
maxconn=10,
host=primary_config['host'],
database=primary_config['database'],
user=primary_config['user'],
password=primary_config['password']
)
# Replica 연결 풀들 (읽기 전용)
self.replica_pools = []
for config in replica_configs:
replica_pool = pool.ThreadedConnectionPool(
minconn=2,
maxconn=20, # 읽기는 더 많은 연결
host=config['host'],
database=config['database'],
user=config['user'],
password=config['password']
)
self.replica_pools.append(replica_pool)
@contextmanager
def get_primary_connection(self):
"""쓰기 작업용 Primary 연결"""
conn = self.primary_pool.getconn()
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
self.primary_pool.putconn(conn)
@contextmanager
def get_replica_connection(self):
"""
읽기 작업용 Replica 연결
라운드 로빈 또는 랜덤으로 Replica 선택
"""
if not self.replica_pools:
# Replica가 없으면 Primary 사용
with self.get_primary_connection() as conn:
yield conn
return
# 랜덤하게 Replica 선택 (로드 밸런싱)
replica_pool = random.choice(self.replica_pools)
conn = replica_pool.getconn()
try:
yield conn
finally:
replica_pool.putconn(conn)
class UserService:
"""레플리케이션을 활용한 사용자 서비스"""
def __init__(self, db_pool: ReplicaAwareDBPool):
self.db = db_pool
def create_user(self, name: str, email: str) -> int:
"""
사용자 생성 - Primary에서 실행
쓰기 작업은 반드시 Primary로
"""
with self.db.get_primary_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO users (name, email, created_at)
VALUES (%s, %s, NOW()) RETURNING id""",
(name, email)
)
user_id = cur.fetchone()[0]
print(f"✅ 사용자 생성 (Primary): {user_id}")
return user_id
def get_user(self, user_id: int, require_latest: bool = False) -> Optional[dict]:
"""
사용자 조회
require_latest=True: Primary에서 조회 (즉시 일관성)
require_latest=False: Replica에서 조회 (최종 일관성)
"""
if require_latest:
# 방금 생성한 데이터 조회 등 즉시 일관성 필요
conn_manager = self.db.get_primary_connection()
source = "Primary"
else:
# 일반 조회는 Replica에서
conn_manager = self.db.get_replica_connection()
source = "Replica"
with conn_manager as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email, created_at FROM users WHERE id = %s",
(user_id,)
)
row = cur.fetchone()
if row:
print(f"📖 사용자 조회 ({source}): {user_id}")
return {
'id': row[0],
'name': row[1],
'email': row[2],
'created_at': row[3]
}
return None
def list_users(self, limit: int = 100) -> list:
"""
사용자 목록 조회 - Replica에서 실행
대량 읽기는 Replica로 분산
"""
with self.db.get_replica_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT id, name, email FROM users ORDER BY created_at DESC LIMIT %s",
(limit,)
)
users = [
{'id': row[0], 'name': row[1], 'email': row[2]}
for row in cur.fetchall()
]
print(f"📖 사용자 목록 조회 (Replica): {len(users)}건")
return users
def update_user(self, user_id: int, name: str) -> bool:
"""사용자 수정 - Primary에서 실행"""
with self.db.get_primary_connection() as conn:
with conn.cursor() as cur:
cur.execute(
"UPDATE users SET name = %s, updated_at = NOW() WHERE id = %s",
(name, user_id)
)
success = cur.rowcount > 0
if success:
print(f"✏️ 사용자 수정 (Primary): {user_id}")
return success
# Replication Lag 모니터링
def check_replication_lag(primary_conn, replica_conn) -> float:
"""
복제 지연 시간 확인 (PostgreSQL)
지연이 크면 Replica 읽기에 주의 필요
"""
with replica_conn.cursor() as cur:
cur.execute("""
SELECT EXTRACT(EPOCH FROM (NOW() - pg_last_xact_replay_timestamp()))
AS lag_seconds
""")
result = cur.fetchone()
lag = result[0] if result[0] else 0
print(f"⏱️ Replication Lag: {lag:.2f}초")
return lag
# 사용 예시
if __name__ == "__main__":
db_pool = ReplicaAwareDBPool(
primary_config={
'host': 'primary.db.example.com',
'database': 'myapp',
'user': 'app_user',
'password': 'secret'
},
replica_configs=[
{'host': 'replica1.db.example.com', 'database': 'myapp',
'user': 'app_user', 'password': 'secret'},
{'host': 'replica2.db.example.com', 'database': 'myapp',
'user': 'app_user', 'password': 'secret'},
]
)
service = UserService(db_pool)
# 쓰기 → Primary
user_id = service.create_user("김철수", "kim@example.com")
# 방금 생성한 데이터 조회 → Primary (즉시 일관성)
user = service.get_user(user_id, require_latest=True)
# 일반 조회 → Replica (읽기 분산)
users = service.list_users()
// Primary-Replica 읽기/쓰기 분리 - Node.js + Sequelize
const { Sequelize, DataTypes } = require('sequelize');
/**
* Sequelize를 이용한 읽기/쓰기 분리 설정
* read: Replica(s), write: Primary
*/
const sequelize = new Sequelize('database', null, null, {
dialect: 'postgres',
replication: {
// 쓰기 전용 Primary
write: {
host: 'primary.db.example.com',
username: 'app_user',
password: 'secret',
database: 'myapp'
},
// 읽기 전용 Replica(s) - 자동 로드밸런싱
read: [
{
host: 'replica1.db.example.com',
username: 'app_user',
password: 'secret',
database: 'myapp'
},
{
host: 'replica2.db.example.com',
username: 'app_user',
password: 'secret',
database: 'myapp'
}
]
},
pool: {
max: 20,
min: 5,
acquire: 30000,
idle: 10000
}
});
// 사용자 모델 정의
const User = sequelize.define('User', {
id: {
type: DataTypes.INTEGER,
primaryKey: true,
autoIncrement: true
},
name: {
type: DataTypes.STRING(100),
allowNull: false
},
email: {
type: DataTypes.STRING(255),
allowNull: false,
unique: true
}
}, {
tableName: 'users',
timestamps: true,
createdAt: 'created_at',
updatedAt: 'updated_at'
});
class UserService {
/**
* 사용자 생성 - Primary에서 실행
*/
async createUser(name, email) {
// create()는 자동으로 write 연결 사용
const user = await User.create({ name, email });
console.log(`✅ 사용자 생성 (Primary): ${user.id}`);
return user;
}
/**
* 사용자 조회 - 기본적으로 Replica에서 실행
*/
async getUser(userId, requireLatest = false) {
const options = {};
if (requireLatest) {
// 즉시 일관성이 필요하면 Primary에서 조회
options.useMaster = true;
console.log(`📖 사용자 조회 (Primary - 즉시 일관성): ${userId}`);
} else {
console.log(`📖 사용자 조회 (Replica): ${userId}`);
}
return await User.findByPk(userId, options);
}
/**
* 사용자 목록 - Replica에서 실행
*/
async listUsers(limit = 100) {
// findAll()은 자동으로 read 연결 사용
const users = await User.findAll({
limit,
order: [['created_at', 'DESC']]
});
console.log(`📖 사용자 목록 (Replica): ${users.length}건`);
return users;
}
/**
* 사용자 수정 - Primary에서 실행
*/
async updateUser(userId, data) {
// update()는 자동으로 write 연결 사용
const [affectedRows] = await User.update(data, {
where: { id: userId }
});
console.log(`✏️ 사용자 수정 (Primary): ${userId}`);
return affectedRows > 0;
}
/**
* 트랜잭션 내에서 읽기 후 쓰기
* 트랜잭션은 항상 Primary에서 실행
*/
async transferBalance(fromUserId, toUserId, amount) {
const transaction = await sequelize.transaction();
try {
// 트랜잭션 내 모든 쿼리는 Primary에서 실행
const fromUser = await User.findByPk(fromUserId, { transaction });
const toUser = await User.findByPk(toUserId, { transaction });
if (fromUser.balance < amount) {
throw new Error('잔액 부족');
}
await fromUser.update(
{ balance: fromUser.balance - amount },
{ transaction }
);
await toUser.update(
{ balance: toUser.balance + amount },
{ transaction }
);
await transaction.commit();
console.log(`💰 이체 완료 (Primary): ${fromUserId} → ${toUserId}`);
return true;
} catch (error) {
await transaction.rollback();
console.error(`❌ 이체 실패: ${error.message}`);
throw error;
}
}
}
/**
* Replication Lag 모니터링 (PostgreSQL)
*/
async function checkReplicationLag() {
const [results] = await sequelize.query(`
SELECT
client_addr,
state,
sent_lsn,
write_lsn,
flush_lsn,
replay_lsn,
EXTRACT(EPOCH FROM (now() - reply_time)) AS lag_seconds
FROM pg_stat_replication
`, { type: Sequelize.QueryTypes.SELECT, useMaster: true });
results.forEach(replica => {
console.log(`📊 Replica ${replica.client_addr}: ${replica.lag_seconds}초 지연`);
});
return results;
}
/**
* Health Check - Primary와 Replica 상태 확인
*/
async function healthCheck() {
try {
// Primary 확인 (쓰기 테스트)
await sequelize.query('SELECT 1', { useMaster: true });
console.log('✅ Primary: 정상');
// Replica 확인 (읽기 테스트)
await sequelize.query('SELECT 1');
console.log('✅ Replica: 정상');
return { primary: 'healthy', replica: 'healthy' };
} catch (error) {
console.error('❌ Health Check 실패:', error.message);
return { error: error.message };
}
}
// 사용 예시
async function main() {
await sequelize.authenticate();
console.log('🔌 데이터베이스 연결 완료');
const service = new UserService();
// 쓰기 → Primary
const user = await service.createUser('홍길동', 'hong@example.com');
// 방금 생성한 데이터 → Primary에서 조회
const freshUser = await service.getUser(user.id, true);
// 일반 조회 → Replica
const users = await service.listUsers();
// Replication Lag 확인
await checkReplicationLag();
await sequelize.close();
}
main().catch(console.error);
-- ============================================
-- PostgreSQL Streaming Replication 설정
-- ============================================
-- [Primary 서버 설정] postgresql.conf
-- wal_level = replica
-- max_wal_senders = 10
-- wal_keep_size = 1GB
-- synchronous_commit = on -- 동기 복제 (off면 비동기)
-- synchronous_standby_names = 'replica1' -- 동기 복제 대상
-- [Primary] 복제용 사용자 생성
CREATE USER replicator WITH REPLICATION ENCRYPTED PASSWORD 'repl_password';
-- [Primary] pg_hba.conf에 Replica 허용
-- host replication replicator 10.0.0.0/24 md5
-- [Replica 서버] 기본 백업으로 시작
-- pg_basebackup -h primary.example.com -D /var/lib/postgresql/data \
-- -U replicator -P -v -R -X stream -C -S replica1_slot
-- [Replica] postgresql.conf
-- hot_standby = on
-- primary_conninfo = 'host=primary.example.com user=replicator password=repl_password'
-- primary_slot_name = 'replica1_slot'
-- ============================================
-- 복제 상태 모니터링 (Primary에서 실행)
-- ============================================
-- 연결된 Replica 목록과 지연 상태
SELECT
client_addr AS replica_ip,
state,
sync_state,
pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn) AS send_lag_bytes,
pg_wal_lsn_diff(pg_current_wal_lsn(), replay_lsn) AS replay_lag_bytes,
replay_lag
FROM pg_stat_replication;
-- 복제 슬롯 상태
SELECT
slot_name,
slot_type,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) AS lag_bytes
FROM pg_replication_slots;
-- ============================================
-- Replica에서 실행할 쿼리
-- ============================================
-- 복제 지연 시간 확인
SELECT
CASE
WHEN pg_last_wal_receive_lsn() = pg_last_wal_replay_lsn()
THEN 0
ELSE EXTRACT(EPOCH FROM now() - pg_last_xact_replay_timestamp())
END AS replication_lag_seconds;
-- Replica인지 확인
SELECT pg_is_in_recovery(); -- true면 Replica
-- ============================================
-- MySQL Replication 설정
-- ============================================
-- [Primary] my.cnf
-- server-id = 1
-- log_bin = mysql-bin
-- binlog_format = ROW
-- gtid_mode = ON
-- enforce_gtid_consistency = ON
-- [Primary] 복제용 사용자 생성
CREATE USER 'replicator'@'%' IDENTIFIED BY 'repl_password';
GRANT REPLICATION SLAVE ON *.* TO 'replicator'@'%';
FLUSH PRIVILEGES;
-- [Replica] my.cnf
-- server-id = 2
-- relay_log = relay-bin
-- read_only = ON
-- gtid_mode = ON
-- enforce_gtid_consistency = ON
-- [Replica] 복제 시작
CHANGE REPLICATION SOURCE TO
SOURCE_HOST = 'primary.example.com',
SOURCE_USER = 'replicator',
SOURCE_PASSWORD = 'repl_password',
SOURCE_AUTO_POSITION = 1;
START REPLICA;
-- MySQL 복제 상태 확인
SHOW REPLICA STATUS\G
-- Replica_IO_Running: Yes
-- Replica_SQL_Running: Yes
-- Seconds_Behind_Source: 0 -- 지연 시간
-- ============================================
-- Failover 관련 쿼리
-- ============================================
-- PostgreSQL: Replica를 Primary로 승격
-- pg_ctl promote -D /var/lib/postgresql/data
-- 또는 SQL로:
SELECT pg_promote();
-- 새 Primary에서 타임라인 확인
SELECT pg_walfile_name(pg_current_wal_lsn());
-- ============================================
-- 읽기 전용 확인 및 설정
-- ============================================
-- PostgreSQL: 읽기 전용 모드 확인
SHOW default_transaction_read_only;
-- Replica에서 쓰기 시도 시 에러
-- ERROR: cannot execute INSERT in a read-only transaction
-- MySQL: 읽기 전용 확인
SHOW VARIABLES LIKE 'read_only';
SHOW VARIABLES LIKE 'super_read_only';
-- ============================================
-- 동기/비동기 복제 전환 (PostgreSQL)
-- ============================================
-- 동기 복제로 전환 (데이터 무손실, 느림)
ALTER SYSTEM SET synchronous_commit = 'on';
ALTER SYSTEM SET synchronous_standby_names = 'replica1';
SELECT pg_reload_conf();
-- 비동기 복제로 전환 (빠름, 장애 시 데이터 유실 가능)
ALTER SYSTEM SET synchronous_standby_names = '';
SELECT pg_reload_conf();
-- 복제 모드 확인
SELECT name, setting FROM pg_settings
WHERE name IN ('synchronous_commit', 'synchronous_standby_names');
"읽기 트래픽이 80% 이상이니 Primary-Replica 구성으로 가겠습니다. Primary 1대, Replica 2대로 구성하고, 읽기 요청은 Replica로 분산시키면 Primary 부하를 크게 줄일 수 있어요. 다만 Replication Lag 모니터링은 필수입니다."
"Primary가 다운되어 Patroni가 Replica1을 새 Primary로 자동 승격시켰습니다. Failover에 약 30초 걸렸고, 비동기 복제였기 때문에 마지막 2-3개 트랜잭션이 유실됐을 수 있어요. 중요 데이터는 동기 복제를 고려해야 합니다."
"회원가입 직후 프로필 조회에서 간헐적으로 404가 발생하는데, Replica 조회라 Replication Lag 때문입니다. 생성 직후 조회는 useMaster 옵션으로 Primary에서 읽어야 해요. Read-after-write 일관성 문제입니다."
Replica는 Primary보다 데이터가 늦습니다. 방금 쓴 데이터를 바로 읽을 때는 Primary에서 조회하거나, 충분한 대기 시간을 두세요.
비동기 복제에서 Primary 장애 시 아직 복제되지 않은 트랜잭션이 유실됩니다. 금융 등 중요 데이터는 동기 복제를 사용하세요.
네트워크 분리로 두 노드가 모두 Primary가 되면 데이터 불일치가 발생합니다. Quorum 기반 클러스터(Patroni, etcd)로 방지하세요.
Replication Lag 모니터링 및 알림, 자동 Failover 구성, 읽기/쓰기 분리 시 일관성 요구사항 명확히, 정기적인 Failover 테스트.