🗄️
데이터베이스
Redis
Remote Dictionary Server
인메모리 키-값 저장소. 캐싱, 세션 관리, 메시지 큐로 활용. 초당 수십만 연산 처리.
Remote Dictionary Server
인메모리 키-값 저장소. 캐싱, 세션 관리, 메시지 큐로 활용. 초당 수십만 연산 처리.
Redis는 "Remote Dictionary Server"의 약자로, 2009년 Salvatore Sanfilippo가 개발한 오픈소스 인메모리 데이터 구조 저장소입니다. 모든 데이터를 메모리에 저장하여 디스크 기반 데이터베이스보다 수십~수백 배 빠른 읽기/쓰기 성능을 제공합니다.
Redis는 단순한 키-값 저장소를 넘어 다양한 데이터 구조를 지원합니다:
인메모리지만 데이터 영속성을 위한 옵션을 제공합니다:
import redis
from datetime import timedelta
# Redis 연결
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
# String 기본 연산
r.set('user:1001:name', 'Alice')
r.set('user:1001:visits', 0)
r.incr('user:1001:visits') # 원자적 증가
# TTL 설정 (캐싱)
r.setex('session:abc123', timedelta(hours=1), 'user_data_here')
# Hash - 사용자 프로필
r.hset('user:1001', mapping={
'name': 'Alice',
'email': 'alice@example.com',
'plan': 'premium'
})
profile = r.hgetall('user:1001')
# List - 최근 활동 (최근 10개만 유지)
r.lpush('user:1001:activity', 'login')
r.lpush('user:1001:activity', 'view_page')
r.ltrim('user:1001:activity', 0, 9)
# Sorted Set - 리더보드
r.zadd('leaderboard', {'alice': 1500, 'bob': 1200, 'charlie': 1800})
top3 = r.zrevrange('leaderboard', 0, 2, withscores=True)
# [('charlie', 1800), ('alice', 1500), ('bob', 1200)]
import redis
import json
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
def get_user(user_id):
cache_key = f'user:{user_id}'
# 1. 캐시에서 먼저 조회
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. 캐시 미스 → DB 조회
user = db.query(f"SELECT * FROM users WHERE id = {user_id}")
# 3. 캐시에 저장 (TTL 5분)
if user:
r.setex(cache_key, 300, json.dumps(user))
return user
def update_user(user_id, data):
# DB 업데이트
db.update('users', user_id, data)
# 캐시 무효화
r.delete(f'user:{user_id}')
import time
def is_rate_limited(user_id, limit=100, window=60):
"""
1분에 100회 요청 제한
"""
key = f'ratelimit:{user_id}'
now = time.time()
pipe = r.pipeline()
# 윈도우 밖의 요청 제거
pipe.zremrangebyscore(key, 0, now - window)
# 현재 요청 수 확인
pipe.zcard(key)
# 현재 요청 추가
pipe.zadd(key, {str(now): now})
# TTL 설정
pipe.expire(key, window)
results = pipe.execute()
request_count = results[1]
return request_count >= limit
import uuid
import time
class RedisLock:
def __init__(self, redis_client, lock_name, expire_time=10):
self.redis = redis_client
self.lock_name = f'lock:{lock_name}'
self.expire_time = expire_time
self.lock_id = str(uuid.uuid4())
def acquire(self, timeout=10):
end_time = time.time() + timeout
while time.time() < end_time:
if self.redis.set(self.lock_name, self.lock_id,
ex=self.expire_time, nx=True):
return True
time.sleep(0.1)
return False
def release(self):
# Lua 스크립트로 원자적 해제 (자신의 락만)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return self.redis.eval(script, 1, self.lock_name, self.lock_id)
# 사용 예
lock = RedisLock(r, 'payment:user:1001')
if lock.acquire():
try:
process_payment()
finally:
lock.release()
# Publisher
def publish_event(channel, message):
r.publish(channel, json.dumps(message))
publish_event('notifications', {
'user_id': 1001,
'type': 'order_complete',
'order_id': 'ORD-12345'
})
# Subscriber
def subscribe_handler():
pubsub = r.pubsub()
pubsub.subscribe('notifications')
for message in pubsub.listen():
if message['type'] == 'message':
data = json.loads(message['data'])
print(f"Received: {data}")