Cassandra
Apache Cassandra
Apache Cassandra는 분산형 NoSQL 데이터베이스로, 고가용성과 선형적 확장성을 제공합니다. 페이스북에서 개발되어 Apache 재단에 기부되었으며, 대규모 시계열 데이터, IoT, 로그 저장에 최적화되어 있습니다.
Apache Cassandra
Apache Cassandra는 분산형 NoSQL 데이터베이스로, 고가용성과 선형적 확장성을 제공합니다. 페이스북에서 개발되어 Apache 재단에 기부되었으며, 대규모 시계열 데이터, IoT, 로그 저장에 최적화되어 있습니다.
분산 아키텍처: Cassandra는 마스터-슬레이브 구조 없이 모든 노드가 동등한 피어(Peer-to-Peer) 방식으로 동작합니다. 링(Ring) 구조로 데이터를 분산 저장하며, 일관된 해싱(Consistent Hashing)을 통해 파티션 키 기반으로 데이터를 분배합니다. 단일 장애점(SPOF)이 없어 고가용성을 보장합니다.
데이터 모델: Wide Column Store 모델을 사용합니다. 키스페이스(Keyspace) > 테이블 > 파티션 > 로우 > 컬럼 구조로 데이터를 저장합니다. 파티션 키는 데이터 분산의 핵심이며, 클러스터링 키로 파티션 내 정렬 순서를 정의합니다. CQL(Cassandra Query Language)로 SQL과 유사하게 쿼리할 수 있습니다.
일관성 레벨: 튜너블 일관성(Tunable Consistency)을 지원합니다. ONE, QUORUM, ALL 등 다양한 일관성 레벨을 쿼리별로 설정할 수 있어, 일관성과 가용성 사이의 트레이드오프를 조절할 수 있습니다. 쓰기는 빠르게, 읽기 시 복제본을 비교하는 Read Repair로 최종 일관성을 보장합니다.
복제와 내결함성: 복제 계수(Replication Factor)를 설정하여 여러 노드에 데이터를 복제합니다. NetworkTopologyStrategy로 데이터센터별 복제 전략을 다르게 설정할 수 있어, 지역 간 재해 복구(DR)에 효과적입니다. 힌트 핸드오프(Hinted Handoff)로 일시적 장애 시에도 데이터 손실을 방지합니다.
주요 사용 사례: Netflix(스트리밍 메타데이터), Apple(10만+ 노드 클러스터), Instagram(메시지 저장), Discord(채팅 메시지), IoT 센서 데이터, 시계열 분석, 로그 저장 등 대규모 쓰기 워크로드에 적합합니다.
# Cassandra Python 예제 - cassandra-driver
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra.query import SimpleStatement, ConsistencyLevel
from datetime import datetime
import uuid
class CassandraService:
"""Cassandra 데이터베이스 서비스"""
def __init__(self, contact_points: list, keyspace: str):
# 인증 설정 (선택적)
auth_provider = PlainTextAuthProvider(
username='cassandra',
password='cassandra'
)
# 클러스터 연결 (로드 밸런싱 정책 포함)
self.cluster = Cluster(
contact_points=contact_points,
auth_provider=auth_provider,
load_balancing_policy=DCAwareRoundRobinPolicy(local_dc='datacenter1')
)
self.session = self.cluster.connect(keyspace)
def create_schema(self):
"""키스페이스 및 테이블 생성"""
# 키스페이스 생성 (복제 계수 3)
self.session.execute("""
CREATE KEYSPACE IF NOT EXISTS iot_data
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3
}
""")
# 시계열 테이블 생성 (파티션 키 + 클러스터링 키)
self.session.execute("""
CREATE TABLE IF NOT EXISTS iot_data.sensor_readings (
device_id UUID,
reading_date DATE,
reading_time TIMESTAMP,
temperature FLOAT,
humidity FLOAT,
PRIMARY KEY ((device_id, reading_date), reading_time)
) WITH CLUSTERING ORDER BY (reading_time DESC)
""")
print("스키마 생성 완료")
def insert_sensor_data(self, device_id: uuid.UUID, temp: float, humidity: float):
"""센서 데이터 삽입 (배치 처리)"""
now = datetime.now()
# Prepared Statement 사용 (성능 향상)
insert_stmt = self.session.prepare("""
INSERT INTO sensor_readings
(device_id, reading_date, reading_time, temperature, humidity)
VALUES (?, ?, ?, ?, ?)
""")
# 일관성 레벨 설정
bound_stmt = insert_stmt.bind([
device_id,
now.date(),
now,
temp,
humidity
])
bound_stmt.consistency_level = ConsistencyLevel.QUORUM
self.session.execute(bound_stmt)
print(f"데이터 삽입: device={device_id}, temp={temp}")
def get_recent_readings(self, device_id: uuid.UUID, date, limit: int = 100):
"""최근 센서 데이터 조회"""
query = SimpleStatement(
"""
SELECT reading_time, temperature, humidity
FROM sensor_readings
WHERE device_id = %s AND reading_date = %s
LIMIT %s
""",
consistency_level=ConsistencyLevel.LOCAL_QUORUM
)
rows = self.session.execute(query, [device_id, date, limit])
return list(rows)
def batch_insert(self, readings: list):
"""배치 삽입 (대량 데이터)"""
from cassandra.query import BatchStatement, BatchType
batch = BatchStatement(batch_type=BatchType.UNLOGGED)
insert_stmt = self.session.prepare("""
INSERT INTO sensor_readings
(device_id, reading_date, reading_time, temperature, humidity)
VALUES (?, ?, ?, ?, ?)
""")
for reading in readings:
batch.add(insert_stmt, reading)
self.session.execute(batch)
print(f"{len(readings)}건 배치 삽입 완료")
def close(self):
"""연결 종료"""
self.cluster.shutdown()
# 사용 예시
if __name__ == "__main__":
# 클러스터 연결
cassandra = CassandraService(
contact_points=['127.0.0.1', '127.0.0.2', '127.0.0.3'],
keyspace='iot_data'
)
# 스키마 생성
cassandra.create_schema()
# 데이터 삽입
device_id = uuid.uuid4()
cassandra.insert_sensor_data(device_id, 25.5, 60.0)
# 데이터 조회
from datetime import date
readings = cassandra.get_recent_readings(device_id, date.today())
for row in readings:
print(f"시간: {row.reading_time}, 온도: {row.temperature}°C")
cassandra.close()
// Cassandra Node.js 예제 - cassandra-driver
const cassandra = require('cassandra-driver');
class CassandraService {
constructor(contactPoints, keyspace) {
// 클러스터 연결
this.client = new cassandra.Client({
contactPoints: contactPoints,
localDataCenter: 'datacenter1',
keyspace: keyspace,
credentials: {
username: 'cassandra',
password: 'cassandra'
},
pooling: {
coreConnectionsPerHost: {
[cassandra.types.distance.local]: 2,
[cassandra.types.distance.remote]: 1
}
}
});
}
async connect() {
await this.client.connect();
console.log('Cassandra 연결 성공');
}
async createSchema() {
// 키스페이스 생성
await this.client.execute(`
CREATE KEYSPACE IF NOT EXISTS iot_data
WITH replication = {
'class': 'NetworkTopologyStrategy',
'datacenter1': 3
}
`);
// 테이블 생성
await this.client.execute(`
CREATE TABLE IF NOT EXISTS iot_data.sensor_readings (
device_id UUID,
reading_date DATE,
reading_time TIMESTAMP,
temperature FLOAT,
humidity FLOAT,
PRIMARY KEY ((device_id, reading_date), reading_time)
) WITH CLUSTERING ORDER BY (reading_time DESC)
`);
console.log('스키마 생성 완료');
}
async insertSensorData(deviceId, temperature, humidity) {
const query = `
INSERT INTO sensor_readings
(device_id, reading_date, reading_time, temperature, humidity)
VALUES (?, ?, ?, ?, ?)
`;
const now = new Date();
const params = [
deviceId,
cassandra.types.LocalDate.fromDate(now),
now,
temperature,
humidity
];
// Prepared Statement + 일관성 레벨
await this.client.execute(query, params, {
prepare: true,
consistency: cassandra.types.consistencies.quorum
});
console.log(`데이터 삽입: device=${deviceId}, temp=${temperature}`);
}
async getRecentReadings(deviceId, date, limit = 100) {
const query = `
SELECT reading_time, temperature, humidity
FROM sensor_readings
WHERE device_id = ? AND reading_date = ?
LIMIT ?
`;
const result = await this.client.execute(query, [deviceId, date, limit], {
prepare: true,
consistency: cassandra.types.consistencies.localQuorum
});
return result.rows;
}
async batchInsert(readings) {
// 배치 쿼리 생성
const queries = readings.map(reading => ({
query: `
INSERT INTO sensor_readings
(device_id, reading_date, reading_time, temperature, humidity)
VALUES (?, ?, ?, ?, ?)
`,
params: reading
}));
await this.client.batch(queries, { prepare: true });
console.log(`${readings.length}건 배치 삽입 완료`);
}
async streamLargeData(deviceId, date) {
// 대용량 데이터 스트리밍
const query = `
SELECT * FROM sensor_readings
WHERE device_id = ? AND reading_date = ?
`;
return this.client.stream(query, [deviceId, date], {
prepare: true,
fetchSize: 1000
});
}
async shutdown() {
await this.client.shutdown();
console.log('연결 종료');
}
}
// 사용 예시
async function main() {
const service = new CassandraService(
['127.0.0.1', '127.0.0.2'],
'iot_data'
);
await service.connect();
await service.createSchema();
// 데이터 삽입
const deviceId = cassandra.types.Uuid.random();
await service.insertSensorData(deviceId, 25.5, 60.0);
// 데이터 조회
const today = cassandra.types.LocalDate.now();
const readings = await service.getRecentReadings(deviceId, today);
readings.forEach(row => {
console.log(`시간: ${row.reading_time}, 온도: ${row.temperature}°C`);
});
// 스트리밍 조회 (대용량)
const stream = await service.streamLargeData(deviceId, today);
stream.on('readable', function() {
let row;
while ((row = this.read()) !== null) {
console.log('스트림:', row.temperature);
}
});
await service.shutdown();
}
main().catch(console.error);
-- Cassandra CQL 예제
-- ============================================
-- 1. 키스페이스 생성 (복제 전략 설정)
-- ============================================
-- 단일 데이터센터
CREATE KEYSPACE IF NOT EXISTS my_app
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 3
};
-- 멀티 데이터센터 (운영 환경 권장)
CREATE KEYSPACE IF NOT EXISTS production_app
WITH replication = {
'class': 'NetworkTopologyStrategy',
'dc-east': 3,
'dc-west': 3
};
USE my_app;
-- ============================================
-- 2. 테이블 설계 (파티션 키가 핵심!)
-- ============================================
-- 시계열 데이터 테이블
-- 파티션 키: (user_id, event_date) - 하루 단위로 파티션
-- 클러스터링 키: event_time - 시간순 정렬
CREATE TABLE user_events (
user_id UUID,
event_date DATE,
event_time TIMESTAMP,
event_type TEXT,
event_data MAP,
PRIMARY KEY ((user_id, event_date), event_time)
) WITH CLUSTERING ORDER BY (event_time DESC)
AND compaction = {'class': 'TimeWindowCompactionStrategy',
'compaction_window_unit': 'DAYS',
'compaction_window_size': 1};
-- 채팅 메시지 테이블 (Discord 패턴)
CREATE TABLE messages (
channel_id UUID,
bucket INT, -- 버킷팅으로 파티션 크기 제한
message_id TIMEUUID,
author_id UUID,
content TEXT,
PRIMARY KEY ((channel_id, bucket), message_id)
) WITH CLUSTERING ORDER BY (message_id DESC);
-- ============================================
-- 3. 데이터 삽입
-- ============================================
INSERT INTO user_events (user_id, event_date, event_time, event_type, event_data)
VALUES (
uuid(),
'2024-01-15',
toTimestamp(now()),
'page_view',
{'page': '/home', 'duration': '30s'}
);
-- TTL 설정 (30일 후 자동 삭제)
INSERT INTO user_events (user_id, event_date, event_time, event_type, event_data)
VALUES (uuid(), '2024-01-15', toTimestamp(now()), 'click', {'button': 'signup'})
USING TTL 2592000;
-- ============================================
-- 4. 데이터 조회 (파티션 키 필수!)
-- ============================================
-- 특정 사용자의 특정 날짜 이벤트 조회
SELECT * FROM user_events
WHERE user_id = 123e4567-e89b-12d3-a456-426614174000
AND event_date = '2024-01-15'
LIMIT 100;
-- 시간 범위 조회
SELECT * FROM user_events
WHERE user_id = 123e4567-e89b-12d3-a456-426614174000
AND event_date = '2024-01-15'
AND event_time >= '2024-01-15 09:00:00'
AND event_time < '2024-01-15 18:00:00';
-- 특정 이벤트 타입 필터링 (ALLOW FILTERING 주의!)
SELECT * FROM user_events
WHERE user_id = 123e4567-e89b-12d3-a456-426614174000
AND event_date = '2024-01-15'
AND event_type = 'click'
ALLOW FILTERING; -- 대용량에서는 피해야 함
-- ============================================
-- 5. 인덱스 (신중하게 사용)
-- ============================================
-- Secondary Index (카디널리티 낮은 컬럼에만)
CREATE INDEX ON user_events (event_type);
-- SASI Index (텍스트 검색)
CREATE CUSTOM INDEX ON user_events (content)
USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'CONTAINS'};
-- ============================================
-- 6. 배치 작업 (같은 파티션에서만!)
-- ============================================
BEGIN BATCH
INSERT INTO user_events (user_id, event_date, event_time, event_type)
VALUES (123e4567-e89b-12d3-a456-426614174000, '2024-01-15', toTimestamp(now()), 'view');
INSERT INTO user_events (user_id, event_date, event_time, event_type)
VALUES (123e4567-e89b-12d3-a456-426614174000, '2024-01-15', toTimestamp(now()), 'click');
APPLY BATCH;
-- ============================================
-- 7. 운영 명령어
-- ============================================
-- 테이블 통계
DESCRIBE TABLE user_events;
-- 파티션 크기 확인 (nodetool)
-- nodetool tablehistograms my_app user_events
-- 압축 상태 확인
-- nodetool compactionstats
-- 복구 실행
-- nodetool repair my_app user_events
"IoT 센서 데이터를 초당 수만 건씩 저장해야 하니까 Cassandra가 적합할 것 같아요. 쓰기 성능이 뛰어나고 시계열 데이터에 최적화되어 있거든요. 파티션 키를 device_id와 날짜 조합으로 설계하면 균등하게 분산됩니다."
"ALLOW FILTERING 쿼리가 너무 많이 쓰이고 있어요. Cassandra에서는 파티션 키 없이 조회하면 전체 클러스터를 스캔하게 됩니다. 조회 패턴에 맞는 별도 테이블을 만들거나 Materialized View를 검토해야 합니다."
"노드 3개 중 1개가 죽었는데 서비스는 정상입니다. Cassandra는 복제 계수 3, QUORUM 일관성이라 2개 노드만 응답해도 됩니다. 다만 장시간 다운되면 nodetool repair를 돌려서 데이터 일관성을 맞춰야 해요."
단일 파티션 키에 데이터가 몰리면 특정 노드만 과부하됩니다. 복합 파티션 키(예: user_id + date)나 버킷팅을 사용해 균등 분산하세요.
파티션 키 없는 조회는 전체 스캔을 유발합니다. 조회 패턴별로 별도 테이블을 설계하세요. "쿼리가 데이터 모델을 결정한다"가 Cassandra 철학입니다.
파티션당 100MB, 10만 행을 넘지 않도록 설계하세요. 시계열 데이터는 시간 단위로 파티션을 분할하고, 메시지 같은 데이터는 버킷팅을 적용하세요.
Prepared Statement 사용, 적절한 TTL 설정으로 데이터 정리, 정기적인 nodetool repair 실행, LeveledCompactionStrategy로 읽기 성능 개선.