InfluxDB
Time-Series Database
시계열 데이터 전용 데이터베이스입니다. 타임스탬프 기반 메트릭, IoT 센서 데이터, 모니터링 로그를 초당 수백만 건 처리하며, Flux 쿼리 언어로 강력한 시계열 분석을 지원합니다.
Time-Series Database
시계열 데이터 전용 데이터베이스입니다. 타임스탬프 기반 메트릭, IoT 센서 데이터, 모니터링 로그를 초당 수백만 건 처리하며, Flux 쿼리 언어로 강력한 시계열 분석을 지원합니다.
InfluxDB는 InfluxData에서 개발한 오픈소스 시계열 데이터베이스(TSDB)입니다. 시간순으로 정렬된 대량의 데이터를 빠르게 저장하고 조회하는 데 최적화되어 있으며, 서버 모니터링, IoT 센서 데이터, 금융 시장 데이터 등 시간 기반 데이터 처리에 널리 사용됩니다.
데이터 모델은 measurement(테이블), tags(인덱싱된 메타데이터), fields(실제 값), timestamp로 구성됩니다. tags는 자동으로 인덱싱되어 빠른 필터링이 가능하고, fields는 실제 측정값을 저장합니다. 이 구조는 시계열 데이터의 특성에 맞게 최적화되어 기존 RDBMS보다 훨씬 효율적입니다.
Flux 쿼리 언어는 InfluxDB 2.0부터 도입된 함수형 데이터 스크립팅 언어입니다. SQL과 달리 파이프라인 방식으로 데이터를 처리하며, 시계열 분석에 특화된 함수(이동 평균, 다운샘플링, 예측 등)를 기본 제공합니다. 이전 버전의 InfluxQL도 계속 지원됩니다.
TSM(Time-Structured Merge Tree) 엔진은 InfluxDB의 핵심 스토리지 엔진입니다. 시계열 데이터를 압축하여 저장 공간을 최대 90%까지 절약하고, 시간 범위 기반 쿼리를 매우 빠르게 처리합니다. 데이터 보존 정책(Retention Policy)으로 오래된 데이터를 자동 삭제할 수 있습니다.
TICK 스택은 Telegraf(데이터 수집), InfluxDB(저장), Chronograf(시각화), Kapacitor(알림/처리)로 구성된 완전한 모니터링 솔루션입니다. InfluxDB 2.0부터는 이 기능들이 통합되어 단일 플랫폼으로 제공됩니다. Grafana와의 연동도 매우 잘 지원됩니다.
# InfluxDB Python 클라이언트 예제
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime, timedelta
import random
# InfluxDB 연결 설정
url = "http://localhost:8086"
token = "your-token-here"
org = "your-org"
bucket = "metrics"
# 클라이언트 생성
client = InfluxDBClient(url=url, token=token, org=org)
# ============================================
# 데이터 쓰기
# ============================================
write_api = client.write_api(write_options=SYNCHRONOUS)
# Point 객체로 데이터 생성
point = Point("server_metrics") \
.tag("host", "server-01") \
.tag("region", "ap-northeast-2") \
.field("cpu_usage", 45.5) \
.field("memory_usage", 72.3) \
.field("disk_io", 1250) \
.time(datetime.utcnow())
# 단일 포인트 쓰기
write_api.write(bucket=bucket, record=point)
print("데이터 기록 완료")
# 배치 쓰기 (대량 데이터)
points = []
for i in range(100):
p = Point("sensor_data") \
.tag("sensor_id", f"sensor-{i % 10}") \
.tag("location", "factory-a") \
.field("temperature", 20 + random.random() * 10) \
.field("humidity", 40 + random.random() * 20) \
.time(datetime.utcnow() - timedelta(minutes=i))
points.append(p)
write_api.write(bucket=bucket, record=points)
print(f"{len(points)}개 포인트 배치 기록 완료")
# ============================================
# 데이터 조회 (Flux 쿼리)
# ============================================
query_api = client.query_api()
# 최근 1시간 CPU 사용량 조회
query = '''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "server_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> filter(fn: (r) => r.host == "server-01")
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
print(f"시간: {record.get_time()}, CPU: {record.get_value()}%")
# ============================================
# 집계 쿼리 - 5분 평균
# ============================================
aggregation_query = '''
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "server_metrics")
|> filter(fn: (r) => r._field == "cpu_usage")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
|> yield(name: "5분 평균")
'''
result = query_api.query(aggregation_query)
print("\n5분 평균 CPU 사용량:")
for table in result:
for record in table.records:
print(f" {record.get_time()}: {record.get_value():.2f}%")
# ============================================
# DataFrame으로 변환 (Pandas 연동)
# ============================================
df = query_api.query_data_frame('''
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "server_metrics")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
''')
print("\nDataFrame 결과:")
print(df.head())
# 리소스 정리
client.close()
// InfluxDB Node.js 클라이언트 예제
const { InfluxDB, Point } = require('@influxdata/influxdb-client');
// 연결 설정
const url = 'http://localhost:8086';
const token = 'your-token-here';
const org = 'your-org';
const bucket = 'metrics';
const client = new InfluxDB({ url, token });
// ============================================
// 데이터 쓰기
// ============================================
const writeApi = client.getWriteApi(org, bucket);
// 기본 태그 설정 (모든 포인트에 적용)
writeApi.useDefaultTags({ app: 'my-app', env: 'production' });
// Point 객체로 데이터 생성
const point = new Point('api_metrics')
.tag('endpoint', '/api/users')
.tag('method', 'GET')
.floatField('response_time', 125.5)
.intField('status_code', 200)
.intField('request_size', 1024);
writeApi.writePoint(point);
// 여러 포인트 쓰기
for (let i = 0; i < 10; i++) {
const p = new Point('request_log')
.tag('service', 'user-service')
.floatField('latency', Math.random() * 100)
.intField('requests', Math.floor(Math.random() * 1000));
writeApi.writePoint(p);
}
// 버퍼 플러시 및 완료 대기
writeApi.close()
.then(() => console.log('데이터 기록 완료'))
.catch(err => console.error('쓰기 오류:', err));
// ============================================
// 데이터 조회
// ============================================
const queryApi = client.getQueryApi(org);
const fluxQuery = `
from(bucket: "${bucket}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "api_metrics")
|> filter(fn: (r) => r._field == "response_time")
`;
console.log('\n쿼리 결과:');
queryApi.queryRows(fluxQuery, {
next: (row, tableMeta) => {
const record = tableMeta.toObject(row);
console.log(` ${record._time}: ${record._value}ms (${record.endpoint})`);
},
error: (err) => console.error('쿼리 오류:', err),
complete: () => console.log('쿼리 완료'),
});
// ============================================
// Promise 기반 조회
// ============================================
async function queryMetrics() {
const query = `
from(bucket: "${bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "api_metrics")
|> aggregateWindow(every: 1h, fn: mean)
|> sort(columns: ["_time"], desc: true)
|> limit(n: 10)
`;
const results = [];
for await (const { values, tableMeta } of queryApi.iterateRows(query)) {
const record = tableMeta.toObject(values);
results.push({
time: record._time,
field: record._field,
value: record._value,
endpoint: record.endpoint
});
}
return results;
}
queryMetrics()
.then(data => {
console.log('\n시간별 평균 응답시간:');
data.forEach(r => console.log(` ${r.time}: ${r.value?.toFixed(2)}ms`));
});
// ============================================
// Health Check
// ============================================
const { HealthAPI } = require('@influxdata/influxdb-client-apis');
const healthApi = new HealthAPI(client);
healthApi.getHealth()
.then(health => console.log('\nInfluxDB 상태:', health.status))
.catch(err => console.error('Health check 실패:', err));
// ============================================
// Flux 쿼리 언어 기본
// ============================================
// 기본 조회 - 최근 1시간 데이터
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "server_metrics")
// 특정 필드와 태그 필터링
from(bucket: "metrics")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "cpu")
|> filter(fn: (r) => r._field == "usage_percent")
|> filter(fn: (r) => r.host == "server-01" or r.host == "server-02")
// ============================================
// 집계 함수
// ============================================
// 윈도우 집계 - 5분 평균
from(bucket: "metrics")
|> range(start: -6h)
|> filter(fn: (r) => r._measurement == "cpu")
|> aggregateWindow(every: 5m, fn: mean, createEmpty: false)
// 다양한 집계 함수
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "temperature")
|> aggregateWindow(every: 10m, fn: max) // max, min, sum, count, last
// 그룹별 집계
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "api_requests")
|> group(columns: ["endpoint"])
|> count()
// ============================================
// 다운샘플링 & 데이터 변환
// ============================================
// 다운샘플링 - 1분 데이터를 1시간으로
from(bucket: "raw_metrics")
|> range(start: -7d)
|> aggregateWindow(every: 1h, fn: mean)
|> to(bucket: "downsampled_metrics")
// 이동 평균 (Moving Average)
from(bucket: "metrics")
|> range(start: -2h)
|> filter(fn: (r) => r._field == "cpu_usage")
|> movingAverage(n: 5)
// 변화율 계산 (derivative)
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._field == "requests_total")
|> derivative(unit: 1m, nonNegative: true)
// ============================================
// 조인 & 피벗
// ============================================
// 두 measurement 조인
cpu = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
memory = from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "memory")
join(tables: {cpu: cpu, memory: memory}, on: ["_time", "host"])
// 피벗 - 필드를 컬럼으로 변환
from(bucket: "metrics")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "server_metrics")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
// ============================================
// 알림 조건
// ============================================
// CPU 80% 이상 감지
from(bucket: "metrics")
|> range(start: -5m)
|> filter(fn: (r) => r._field == "cpu_usage")
|> filter(fn: (r) => r._value > 80.0)
|> count()
|> map(fn: (r) => ({r with alert: r._value > 0}))
"서버 메트릭은 InfluxDB에 저장하고 Grafana로 시각화합시다. 초당 10만 건 이상의 메트릭도 문제없이 처리하고, Telegraf로 수집 에이전트 설정도 간단해요. 7일 이후 데이터는 자동 삭제되도록 Retention Policy 설정하면 스토리지도 효율적으로 관리됩니다."
"센서 데이터는 InfluxDB가 적합해요. 타임스탬프 기반으로 자동 정렬되고 압축률도 높아서 스토리지 비용이 낮습니다. 다운샘플링으로 원본은 1주일만 유지하고 1시간 평균은 1년 보관하는 식으로 계층화하면 좋겠어요."
"Flux 쿼리에서 filter를 최대한 앞에 배치하세요. range와 measurement 필터가 먼저 와야 인덱스를 타요. 그리고 태그 카디널리티가 너무 높으면 성능이 떨어지니까, UUID 같은 건 태그가 아니라 필드로 저장하세요."
태그에 UUID, 사용자ID 같은 고유값을 넣지 마세요. 태그는 인덱싱되어 메모리를 많이 사용합니다. 카디널리티가 높으면 성능이 급격히 저하됩니다.
measurement + 태그 조합이 시리즈를 만듭니다. 시리즈가 수백만 개를 넘으면 메모리 부족과 쿼리 성능 저하가 발생합니다.
InfluxDB는 최신 데이터 쓰기에 최적화되어 있습니다. 오래된 과거 데이터를 대량으로 넣으면 컴팩션 부하가 발생할 수 있습니다.
Retention Policy로 데이터 자동 삭제, 다운샘플링으로 장기 데이터 압축, 배치 쓰기 사용, Continuous Query로 실시간 집계를 구현하세요.