🗄️ 데이터베이스

Redis

Remote Dictionary Server

인메모리 키-값 저장소. 캐싱, 세션 관리, 메시지 큐로 활용. 초당 수십만 연산 처리.

📖 상세 설명

Redis란?

Redis는 "Remote Dictionary Server"의 약자로, 2009년 Salvatore Sanfilippo가 개발한 오픈소스 인메모리 데이터 구조 저장소입니다. 모든 데이터를 메모리에 저장하여 디스크 기반 데이터베이스보다 수십~수백 배 빠른 읽기/쓰기 성능을 제공합니다.

핵심 데이터 구조

Redis는 단순한 키-값 저장소를 넘어 다양한 데이터 구조를 지원합니다:

  • String: 기본 문자열, 숫자 카운터, 바이너리 데이터
  • Hash: 필드-값 쌍의 컬렉션 (사용자 프로필 등)
  • List: 삽입 순서가 유지되는 문자열 리스트 (큐, 스택)
  • Set: 중복 없는 문자열 컬렉션 (태그, 유니크 방문자)
  • Sorted Set (ZSet): 점수로 정렬된 Set (리더보드, 랭킹)
  • Stream: 로그 형태의 데이터 구조 (이벤트 소싱)
  • HyperLogLog: 대용량 카디널리티 추정

주요 활용 사례

  • 캐싱: DB 쿼리 결과, API 응답, 세션 데이터 캐시
  • 세션 스토어: 분산 환경에서 사용자 세션 관리
  • Rate Limiting: API 호출 빈도 제한
  • 실시간 리더보드: 게임 랭킹, 실시간 순위
  • Pub/Sub: 실시간 메시징, 이벤트 브로커
  • 분산 락: Redlock 알고리즘 기반 동시성 제어

영속성 (Persistence)

인메모리지만 데이터 영속성을 위한 옵션을 제공합니다:

  • RDB (Snapshotting): 특정 시점의 전체 데이터 스냅샷 저장
  • AOF (Append Only File): 모든 쓰기 연산을 로그로 기록
  • RDB + AOF: 두 방식 조합으로 안정성 극대화

고가용성과 확장성

  • Redis Sentinel: 자동 장애 조치(Failover)와 모니터링
  • Redis Cluster: 데이터 샤딩으로 수평 확장
  • Master-Replica: 읽기 분산 및 데이터 복제

💻 코드 예제

Python (redis-py) 기본 사용

Python
import redis
from datetime import timedelta

# Redis 연결
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)

# String 기본 연산
r.set('user:1001:name', 'Alice')
r.set('user:1001:visits', 0)
r.incr('user:1001:visits')  # 원자적 증가

# TTL 설정 (캐싱)
r.setex('session:abc123', timedelta(hours=1), 'user_data_here')

# Hash - 사용자 프로필
r.hset('user:1001', mapping={
    'name': 'Alice',
    'email': 'alice@example.com',
    'plan': 'premium'
})
profile = r.hgetall('user:1001')

# List - 최근 활동 (최근 10개만 유지)
r.lpush('user:1001:activity', 'login')
r.lpush('user:1001:activity', 'view_page')
r.ltrim('user:1001:activity', 0, 9)

# Sorted Set - 리더보드
r.zadd('leaderboard', {'alice': 1500, 'bob': 1200, 'charlie': 1800})
top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
# [('charlie', 1800), ('alice', 1500), ('bob', 1200)]

캐시 패턴 (Cache-Aside)

Python
import redis
import json

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def get_user(user_id):
    cache_key = f'user:{user_id}'

    # 1. 캐시에서 먼저 조회
    cached = r.get(cache_key)
    if cached:
        return json.loads(cached)

    # 2. 캐시 미스 → DB 조회
    user = db.query(f"SELECT * FROM users WHERE id = {user_id}")

    # 3. 캐시에 저장 (TTL 5분)
    if user:
        r.setex(cache_key, 300, json.dumps(user))

    return user

def update_user(user_id, data):
    # DB 업데이트
    db.update('users', user_id, data)

    # 캐시 무효화
    r.delete(f'user:{user_id}')

Rate Limiting (슬라이딩 윈도우)

Python
import time

def is_rate_limited(user_id, limit=100, window=60):
    """
    1분에 100회 요청 제한
    """
    key = f'ratelimit:{user_id}'
    now = time.time()

    pipe = r.pipeline()

    # 윈도우 밖의 요청 제거
    pipe.zremrangebyscore(key, 0, now - window)

    # 현재 요청 수 확인
    pipe.zcard(key)

    # 현재 요청 추가
    pipe.zadd(key, {str(now): now})

    # TTL 설정
    pipe.expire(key, window)

    results = pipe.execute()
    request_count = results[1]

    return request_count >= limit

분산 락 (Distributed Lock)

Python
import uuid
import time

class RedisLock:
    def __init__(self, redis_client, lock_name, expire_time=10):
        self.redis = redis_client
        self.lock_name = f'lock:{lock_name}'
        self.expire_time = expire_time
        self.lock_id = str(uuid.uuid4())

    def acquire(self, timeout=10):
        end_time = time.time() + timeout
        while time.time() < end_time:
            if self.redis.set(self.lock_name, self.lock_id,
                             ex=self.expire_time, nx=True):
                return True
            time.sleep(0.1)
        return False

    def release(self):
        # Lua 스크립트로 원자적 해제 (자신의 락만)
        script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(script, 1, self.lock_name, self.lock_id)

# 사용 예
lock = RedisLock(r, 'payment:user:1001')
if lock.acquire():
    try:
        process_payment()
    finally:
        lock.release()

Pub/Sub 실시간 메시징

Python
# Publisher
def publish_event(channel, message):
    r.publish(channel, json.dumps(message))

publish_event('notifications', {
    'user_id': 1001,
    'type': 'order_complete',
    'order_id': 'ORD-12345'
})

# Subscriber
def subscribe_handler():
    pubsub = r.pubsub()
    pubsub.subscribe('notifications')

    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"Received: {data}")

💬 현업 대화 예시

백엔드 개발자
"API 응답 속도가 너무 느려요. DB 쿼리가 복잡해서 500ms나 걸리는데..."
시니어 개발자
"자주 조회되는 데이터면 Redis 캐싱 적용하자. 캐시 히트면 1ms 이내로 응답 가능해."
백엔드 개발자
"캐시 무효화는 어떻게 해요? 데이터 변경될 때마다 동기화해야 하잖아요."
시니어 개발자
"Cache-Aside 패턴 쓰고, TTL 짧게 가져가자. 일관성 중요하면 Write-Through도 고려하고. 아예 CDC로 자동 동기화할 수도 있어."
DevOps 엔지니어
"Redis가 죽으면 전체 서비스 장애 나는데, 어떻게 대응해요?"
아키텍트
"Redis Sentinel로 자동 페일오버 구성하고, 애플리케이션에서는 캐시 미스 시 DB 폴백 로직 꼭 넣어둬. 캐시가 없어도 느리지만 동작하도록."
주니어 개발자
"세션을 Redis에 저장하라는데, 기존 서버 메모리 세션이랑 뭐가 달라요?"
시니어 개발자
"서버가 여러 대면 세션 공유가 안 돼. Redis 쓰면 모든 서버가 같은 세션 데이터 접근 가능해서 로드밸런서 뒤에서도 문제없어."

⚠️ 주의사항

⚠️ 메모리 용량 관리
Redis는 모든 데이터를 메모리에 저장합니다. maxmemory 설정과 eviction 정책(LRU, LFU 등)을 반드시 구성하세요. 메모리 부족 시 쓰기 거부나 예기치 않은 데이터 손실이 발생할 수 있습니다.
⚠️ 캐시 일관성 문제
DB와 캐시 간 데이터 불일치(Cache Inconsistency) 주의. TTL 기반 만료, Write-Through/Write-Behind 패턴 적절히 선택하고, 중요 데이터는 캐시 무효화 로직을 철저히 구현하세요.
⚠️ 키 네이밍 컨벤션
일관된 키 네이밍 규칙을 정의하세요 (예: "service:entity:id:field"). 네임스페이스 없이 사용하면 키 충돌과 관리 어려움이 발생합니다. KEYS 명령은 프로덕션에서 절대 사용 금지 (블로킹 발생).
⚠️ 영속성 설정 확인
기본 설정의 RDB 스냅샷 주기가 적절한지 확인하세요. 중요 데이터는 AOF를 활성화하고, appendfsync 옵션으로 데이터 손실 허용 범위를 조절하세요. 완전 무손실이 필요하면 "always" 사용 (성능 저하 감안).

🔗 관련 용어

📚 더 배우기