TimescaleDB
PostgreSQL Time-Series Extension
PostgreSQL 기반 시계열 데이터베이스입니다. 표준 SQL을 그대로 사용하면서 하이퍼테이블(Hypertable)로 자동 파티셔닝, 압축, 연속 집계를 지원하여 시계열 데이터를 효율적으로 처리합니다.
PostgreSQL Time-Series Extension
PostgreSQL 기반 시계열 데이터베이스입니다. 표준 SQL을 그대로 사용하면서 하이퍼테이블(Hypertable)로 자동 파티셔닝, 압축, 연속 집계를 지원하여 시계열 데이터를 효율적으로 처리합니다.
TimescaleDB는 PostgreSQL의 확장(Extension)으로 설치되는 시계열 데이터베이스입니다. PostgreSQL의 모든 기능(ACID 트랜잭션, JOIN, 인덱스, 확장성)을 그대로 사용하면서 시계열 데이터에 최적화된 기능을 추가합니다. 새로운 쿼리 언어를 배울 필요 없이 표준 SQL로 시계열 분석이 가능합니다.
하이퍼테이블(Hypertable)은 TimescaleDB의 핵심 개념입니다. 일반 테이블처럼 보이지만 내부적으로 시간 기준으로 자동 파티셔닝(청크)됩니다. 수십억 행의 데이터도 청크 단위로 분산되어 쿼리 성능이 유지됩니다. 공간 인덱스와 결합하면 공간-시간 데이터도 효율적으로 처리할 수 있습니다.
연속 집계(Continuous Aggregates)는 시계열 집계를 자동으로 유지하는 기능입니다. 1분 데이터를 1시간 평균으로 미리 계산해두면 대시보드 쿼리가 수백 배 빨라집니다. 새 데이터가 들어오면 자동으로 집계가 업데이트되어 항상 최신 상태를 유지합니다.
네이티브 압축은 시계열 데이터의 특성을 이용해 최대 95%까지 저장 공간을 줄입니다. 압축된 데이터도 SQL로 직접 조회할 수 있으며, 압축 정책을 설정하면 오래된 청크를 자동으로 압축합니다. 이를 통해 수년간의 데이터를 경제적으로 보관할 수 있습니다.
데이터 보존 정책으로 오래된 데이터를 자동 삭제하거나 콜드 스토리지로 이동할 수 있습니다. 핫 데이터는 빠른 SSD에, 오래된 데이터는 저렴한 스토리지에 저장하는 계층형 스토리지 전략이 가능합니다. PostgreSQL 에코시스템의 모든 도구(pg_dump, 리플리케이션 등)와 호환됩니다.
# TimescaleDB Python 예제 (psycopg2)
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime, timedelta
import random
# PostgreSQL + TimescaleDB 연결
conn = psycopg2.connect(
host="localhost",
database="tsdb",
user="postgres",
password="password"
)
conn.autocommit = True
cur = conn.cursor()
# ============================================
# 하이퍼테이블 생성
# ============================================
cur.execute("""
CREATE TABLE IF NOT EXISTS sensor_data (
time TIMESTAMPTZ NOT NULL,
sensor_id TEXT NOT NULL,
location TEXT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION
);
""")
# 일반 테이블을 하이퍼테이블로 변환
cur.execute("""
SELECT create_hypertable('sensor_data', 'time',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 day'
);
""")
print("하이퍼테이블 생성 완료")
# ============================================
# 데이터 삽입 (배치)
# ============================================
data = []
base_time = datetime.utcnow() - timedelta(days=7)
for i in range(10000):
time = base_time + timedelta(minutes=i)
sensor_id = f"sensor-{i % 10}"
location = ["factory-a", "factory-b", "warehouse"][i % 3]
temp = 20 + random.random() * 15
humidity = 40 + random.random() * 30
pressure = 1000 + random.random() * 50
data.append((time, sensor_id, location, temp, humidity, pressure))
# execute_values로 빠른 배치 삽입
execute_values(cur, """
INSERT INTO sensor_data (time, sensor_id, location, temperature, humidity, pressure)
VALUES %s
""", data)
print(f"{len(data)}행 삽입 완료")
# ============================================
# 시계열 쿼리 - time_bucket
# ============================================
cur.execute("""
SELECT
time_bucket('1 hour', time) AS hour,
sensor_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp
FROM sensor_data
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, sensor_id
ORDER BY hour DESC, sensor_id
LIMIT 20;
""")
print("\n시간별 온도 통계:")
for row in cur.fetchall():
print(f" {row[0]} | {row[1]} | 평균: {row[2]:.1f}C, 최대: {row[3]:.1f}C")
# ============================================
# 연속 집계 (Continuous Aggregates)
# ============================================
cur.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS hourly_sensor_stats
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
sensor_id,
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity,
COUNT(*) as readings
FROM sensor_data
GROUP BY hour, sensor_id
WITH NO DATA;
""")
# 연속 집계 정책 추가 (1시간마다 자동 갱신)
cur.execute("""
SELECT add_continuous_aggregate_policy('hourly_sensor_stats',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour',
if_not_exists => TRUE
);
""")
print("\n연속 집계 뷰 생성 완료")
# ============================================
# 압축 정책 설정
# ============================================
cur.execute("""
ALTER TABLE sensor_data SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'sensor_id'
);
""")
cur.execute("""
SELECT add_compression_policy('sensor_data',
INTERVAL '7 days',
if_not_exists => TRUE
);
""")
print("7일 이후 데이터 자동 압축 정책 추가 완료")
# ============================================
# 데이터 보존 정책 (자동 삭제)
# ============================================
cur.execute("""
SELECT add_retention_policy('sensor_data',
INTERVAL '90 days',
if_not_exists => TRUE
);
""")
print("90일 이후 데이터 자동 삭제 정책 추가 완료")
cur.close()
conn.close()
// TimescaleDB Node.js 예제 (pg)
const { Pool } = require('pg');
const pool = new Pool({
host: 'localhost',
database: 'tsdb',
user: 'postgres',
password: 'password',
max: 20
});
// ============================================
// 하이퍼테이블 생성
// ============================================
async function createHypertable() {
const client = await pool.connect();
try {
// 테이블 생성
await client.query(`
CREATE TABLE IF NOT EXISTS metrics (
time TIMESTAMPTZ NOT NULL,
host TEXT NOT NULL,
service TEXT NOT NULL,
cpu_usage DOUBLE PRECISION,
memory_mb INTEGER,
requests INTEGER
);
`);
// 하이퍼테이블로 변환
await client.query(`
SELECT create_hypertable('metrics', 'time',
if_not_exists => TRUE,
chunk_time_interval => INTERVAL '1 day'
);
`);
console.log('하이퍼테이블 생성 완료');
} finally {
client.release();
}
}
// ============================================
// 배치 삽입
// ============================================
async function insertMetrics(metrics) {
const client = await pool.connect();
try {
const values = metrics.map((m, i) => {
const offset = i * 6;
return `($${offset+1}, $${offset+2}, $${offset+3}, $${offset+4}, $${offset+5}, $${offset+6})`;
}).join(', ');
const params = metrics.flatMap(m => [
m.time, m.host, m.service, m.cpu_usage, m.memory_mb, m.requests
]);
await client.query(`
INSERT INTO metrics (time, host, service, cpu_usage, memory_mb, requests)
VALUES ${values}
`, params);
console.log(`${metrics.length}개 메트릭 삽입 완료`);
} finally {
client.release();
}
}
// ============================================
// 시계열 쿼리 - time_bucket
// ============================================
async function queryHourlyStats() {
const client = await pool.connect();
try {
const result = await client.query(`
SELECT
time_bucket('1 hour', time) AS hour,
host,
AVG(cpu_usage) as avg_cpu,
MAX(memory_mb) as max_memory,
SUM(requests) as total_requests
FROM metrics
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, host
ORDER BY hour DESC
LIMIT 20;
`);
console.log('\n시간별 호스트 통계:');
result.rows.forEach(row => {
console.log(` ${row.hour} | ${row.host} | CPU: ${row.avg_cpu?.toFixed(1)}%`);
});
return result.rows;
} finally {
client.release();
}
}
// ============================================
// 마지막 값 조회 (last())
// ============================================
async function getLastReadings() {
const client = await pool.connect();
try {
const result = await client.query(`
SELECT
host,
last(cpu_usage, time) as latest_cpu,
last(memory_mb, time) as latest_memory,
last(time, time) as last_seen
FROM metrics
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY host;
`);
console.log('\n호스트별 최신 상태:');
result.rows.forEach(row => {
console.log(` ${row.host}: CPU ${row.latest_cpu}%, Memory ${row.latest_memory}MB`);
});
return result.rows;
} finally {
client.release();
}
}
// ============================================
// 연속 집계 조회
// ============================================
async function queryFromContinuousAggregate() {
const client = await pool.connect();
try {
// 연속 집계 뷰에서 조회 (미리 계산된 데이터)
const result = await client.query(`
SELECT * FROM hourly_metrics_stats
WHERE hour > NOW() - INTERVAL '7 days'
ORDER BY hour DESC
LIMIT 50;
`);
console.log(`\n연속 집계에서 ${result.rowCount}행 조회`);
return result.rows;
} finally {
client.release();
}
}
// ============================================
// 청크 정보 조회
// ============================================
async function getChunkInfo() {
const client = await pool.connect();
try {
const result = await client.query(`
SELECT
hypertable_name,
chunk_name,
range_start,
range_end,
is_compressed
FROM timescaledb_information.chunks
WHERE hypertable_name = 'metrics'
ORDER BY range_start DESC
LIMIT 10;
`);
console.log('\n청크 정보:');
result.rows.forEach(row => {
const status = row.is_compressed ? '압축됨' : '미압축';
console.log(` ${row.chunk_name}: ${row.range_start} ~ ${row.range_end} [${status}]`);
});
} finally {
client.release();
}
}
// 실행
(async () => {
await createHypertable();
// 테스트 데이터 생성
const metrics = [];
for (let i = 0; i < 100; i++) {
metrics.push({
time: new Date(Date.now() - i * 60000),
host: `server-${i % 5}`,
service: ['web', 'api', 'worker'][i % 3],
cpu_usage: 20 + Math.random() * 60,
memory_mb: 1000 + Math.floor(Math.random() * 3000),
requests: Math.floor(Math.random() * 1000)
});
}
await insertMetrics(metrics);
await queryHourlyStats();
await getLastReadings();
await getChunkInfo();
await pool.end();
})();
-- ============================================
-- TimescaleDB 설치 및 확장 활성화
-- ============================================
CREATE EXTENSION IF NOT EXISTS timescaledb;
-- 버전 확인
SELECT extversion FROM pg_extension WHERE extname = 'timescaledb';
-- ============================================
-- 하이퍼테이블 생성
-- ============================================
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
device_id TEXT NOT NULL,
location TEXT,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION
);
-- 하이퍼테이블로 변환 (1일 단위 청크)
SELECT create_hypertable('conditions', 'time',
chunk_time_interval => INTERVAL '1 day'
);
-- 인덱스 추가
CREATE INDEX ON conditions (device_id, time DESC);
-- ============================================
-- time_bucket을 이용한 집계
-- ============================================
-- 1시간 단위 평균
SELECT
time_bucket('1 hour', time) AS hour,
device_id,
AVG(temperature) as avg_temp,
AVG(humidity) as avg_humidity
FROM conditions
WHERE time > NOW() - INTERVAL '24 hours'
GROUP BY hour, device_id
ORDER BY hour DESC;
-- 다양한 시간 버킷
SELECT time_bucket('15 minutes', time) AS bucket, ... -- 15분
SELECT time_bucket('1 day', time) AS bucket, ... -- 1일
SELECT time_bucket('1 week', time) AS bucket, ... -- 1주
-- ============================================
-- 연속 집계 (Continuous Aggregates)
-- ============================================
CREATE MATERIALIZED VIEW hourly_conditions
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
device_id,
AVG(temperature) as avg_temp,
MAX(temperature) as max_temp,
MIN(temperature) as min_temp,
COUNT(*) as readings
FROM conditions
GROUP BY hour, device_id
WITH NO DATA;
-- 자동 새로고침 정책
SELECT add_continuous_aggregate_policy('hourly_conditions',
start_offset => INTERVAL '3 hours',
end_offset => INTERVAL '1 hour',
schedule_interval => INTERVAL '1 hour'
);
-- 수동 새로고침
CALL refresh_continuous_aggregate('hourly_conditions',
NOW() - INTERVAL '24 hours', NOW());
-- ============================================
-- 압축 설정
-- ============================================
-- 압축 활성화
ALTER TABLE conditions SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'device_id',
timescaledb.compress_orderby = 'time DESC'
);
-- 7일 이후 자동 압축 정책
SELECT add_compression_policy('conditions', INTERVAL '7 days');
-- 수동 압축
SELECT compress_chunk(c.chunk_name)
FROM timescaledb_information.chunks c
WHERE c.hypertable_name = 'conditions'
AND c.range_end < NOW() - INTERVAL '7 days'
AND NOT c.is_compressed;
-- 압축 통계 확인
SELECT
hypertable_name,
before_compression_total_bytes,
after_compression_total_bytes,
(1 - after_compression_total_bytes::float / before_compression_total_bytes) * 100 as ratio
FROM timescaledb_information.compression_stats;
-- ============================================
-- 데이터 보존 정책
-- ============================================
-- 90일 이후 자동 삭제
SELECT add_retention_policy('conditions', INTERVAL '90 days');
-- 정책 확인
SELECT * FROM timescaledb_information.jobs;
-- 수동 삭제
SELECT drop_chunks('conditions', older_than => INTERVAL '90 days');
-- ============================================
-- 유용한 시계열 함수
-- ============================================
-- 첫 번째/마지막 값
SELECT
device_id,
first(temperature, time) as first_temp,
last(temperature, time) as last_temp
FROM conditions
GROUP BY device_id;
-- 시간 가중 평균
SELECT
device_id,
time_weight('Linear', time, temperature) as weighted_avg
FROM conditions
WHERE time > NOW() - INTERVAL '1 hour'
GROUP BY device_id;
-- 갭 채우기 (time_bucket_gapfill)
SELECT
time_bucket_gapfill('1 hour', time) AS hour,
device_id,
COALESCE(AVG(temperature), locf(AVG(temperature))) as temp
FROM conditions
WHERE time BETWEEN '2024-01-01' AND '2024-01-02'
GROUP BY hour, device_id;
"이미 PostgreSQL을 쓰고 있으니 TimescaleDB 확장을 설치하면 됩니다. 새로운 쿼리 언어 없이 SQL로 모든 시계열 분석이 가능하고, 기존 ORM이나 도구도 그대로 사용할 수 있어요. JOIN이 필요한 복잡한 쿼리에서 특히 강점이 있습니다."
"대시보드가 느린 건 매번 raw 데이터를 집계해서 그래요. 연속 집계(Continuous Aggregate)를 만들어두면 미리 계산된 데이터를 조회해서 수백 배 빨라집니다. 1시간 단위, 1일 단위 뷰를 만들어두세요."
"TimescaleDB 압축을 활성화하면 스토리지를 90% 이상 줄일 수 있어요. 일주일 지난 청크는 자동 압축하고, 3개월 지나면 자동 삭제하는 정책을 설정합시다. 압축된 데이터도 SQL로 바로 조회 가능해요."
청크 간격이 너무 작으면 청크가 너무 많아지고, 너무 크면 압축/삭제 단위가 커집니다. 보통 1일~1주일이 적당하며, 데이터 볼륨에 맞게 조정하세요.
압축된 청크는 UPDATE/DELETE가 불가능합니다. 수정이 필요하면 압축 해제 후 작업해야 합니다. 최신 데이터 청크는 압축하지 마세요.
연속 집계는 start_offset보다 오래된 데이터만 처리합니다. 너무 짧으면 최신 데이터가 누락되고, 너무 길면 집계 지연이 발생합니다.
time 컬럼에 인덱스 필수, segmentby로 압축 효율 극대화, 연속 집계로 대시보드 성능 향상, 보존 정책으로 자동 데이터 관리를 구현하세요.