보안 정보 및 이벤트 관리 시스템으로, 전사 IT 인프라의 로그를 중앙에서 수집, 정규화, 상관관계 분석하여 보안 위협을 탐지하고 대응합니다. 2025년 글로벌 SIEM 시장은 140억 달러 규모이며, AI/ML 기반 차세대 SIEM이 연 20% 성장 중입니다. SOC(Security Operations Center)의 핵심 플랫폼으로, 실시간 위협 탐지와 규정 준수 감사의 기반입니다.
SIEM(Security Information and Event Management)은 두 가지 기능의 결합입니다. SIM(Security Information Management)은 로그 수집, 저장, 검색, 보고서 생성을 담당하고, SEM(Security Event Management)은 실시간 이벤트 모니터링, 상관관계 분석, 알림을 담당합니다. 예를 들어, 방화벽에서 차단된 IP가 VPN 로그에서 성공적 로그인으로 나타나면, SIEM이 이 두 이벤트를 상관 분석하여 "잠재적 침해" 알림을 발생시킵니다. 단일 로그만 보면 놓치는 위협을 상관관계로 탐지하는 것이 SIEM의 핵심 가치입니다.
전통적 SIEM은 규칙 기반(rule-based) 탐지에 의존했습니다. "5분 내 로그인 실패 5회 이상이면 알림"같은 정적 규칙은 알려진 공격 패턴에는 효과적이지만, 새로운 공격이나 정상 범위 내 은밀한 활동에는 한계가 있습니다. 또한 규칙이 많아질수록 오탐(false positive)이 급증하여 보안 분석가가 알림 피로(alert fatigue)에 빠집니다. 2025년 IDC 보고서에 따르면 AI 기반 SIEM 도입이 연 20% 성장 중인데, 이는 이러한 한계를 AI/ML로 극복하려는 움직임입니다.
차세대 SIEM(Next-Gen SIEM)은 AI/ML을 핵심에 둡니다. UEBA(User and Entity Behavior Analytics)는 사용자와 시스템의 정상 행동 패턴을 학습하여 이상 행동을 탐지합니다. 예를 들어 평소 업무 시간에만 접속하던 직원이 새벽 3시에 대량 데이터를 다운로드하면, 규칙 없이도 이상으로 감지합니다. 머신러닝은 정적 규칙이 놓치는 lateral movement, credential stuffing, privilege escalation 같은 스텔스 공격을 탐지합니다. AI가 알림을 심각도별로 자동 분류하여 오탐을 대폭 줄이고, 분석가는 실제 위협에 집중할 수 있습니다.
클라우드 네이티브 SIEM과 Agentic AI가 2025년 주요 트렌드입니다. 클라우드 SIEM은 온프레미스 대비 확장성, 비용 효율성, 관리 편의성이 뛰어나 조직들이 빠르게 전환 중입니다. Microsoft Sentinel, CrowdStrike Falcon, Splunk Cloud가 대표적입니다. Agentic AI는 한 발 더 나아가 검색 분석, 상관관계 규칙 생성, 데이터 변환, 워크플로우 자동화를 자율적으로 수행합니다. 2025년 Gartner Magic Quadrant에서 Microsoft와 CrowdStrike가 리더/비저너리로 선정되며 AI SIEM 경쟁이 치열합니다.
💻 코드 예제
# SIEM 스타일 로그 분석 및 위협 탐지 - Python
import json
from datetime import datetime, timedelta
from collections import defaultdict
from typing import List, Dict, Any
import hashlib
class SimpleSIEM:
"""간단한 SIEM 상관관계 분석 엔진"""
def __init__(self):
self.events = []
self.alerts = []
self.rules = []
# 사용자별 행동 기록 (UEBA 시뮬레이션)
self.user_baselines = defaultdict(lambda: {
'login_hours': set(),
'source_ips': set(),
'accessed_resources': set(),
})
def ingest_log(self, log: Dict[str, Any]):
"""로그 수집 및 정규화"""
normalized = {
'timestamp': datetime.fromisoformat(log.get('timestamp', datetime.now().isoformat())),
'source': log.get('source', 'unknown'),
'event_type': log.get('event_type', 'unknown'),
'user': log.get('user'),
'source_ip': log.get('source_ip'),
'destination_ip': log.get('destination_ip'),
'action': log.get('action'),
'status': log.get('status'),
'raw': json.dumps(log),
}
self.events.append(normalized)
self._update_baseline(normalized)
self._evaluate_rules(normalized)
return normalized
def _update_baseline(self, event: Dict):
"""사용자 행동 베이스라인 업데이트 (UEBA)"""
if not event.get('user'):
return
user = event['user']
baseline = self.user_baselines[user]
# 로그인 시간대 기록
if event['event_type'] == 'authentication':
hour = event['timestamp'].hour
baseline['login_hours'].add(hour)
# 소스 IP 기록
if event.get('source_ip'):
baseline['source_ips'].add(event['source_ip'])
def add_correlation_rule(self, rule):
"""상관관계 규칙 추가"""
self.rules.append(rule)
def _evaluate_rules(self, event: Dict):
"""모든 규칙 평가"""
for rule in self.rules:
if alert := rule(event, self.events, self.user_baselines):
self.alerts.append(alert)
self._notify_alert(alert)
def _notify_alert(self, alert: Dict):
"""알림 발송 (실제로는 Slack, PagerDuty 등 연동)"""
severity_emoji = {'critical': '🔴', 'high': '🟠', 'medium': '🟡', 'low': '🟢'}
print(f"\n{severity_emoji.get(alert['severity'], '⚪')} ALERT: {alert['title']}")
print(f" Severity: {alert['severity']}")
print(f" Description: {alert['description']}")
print(f" User: {alert.get('user', 'N/A')}")
print(f" Source IP: {alert.get('source_ip', 'N/A')}")
# 상관관계 규칙 정의
def brute_force_rule(event, events, baselines):
"""브루트포스 공격 탐지: 5분 내 동일 IP에서 로그인 실패 5회 이상"""
if event['event_type'] != 'authentication' or event['status'] != 'failed':
return None
source_ip = event.get('source_ip')
if not source_ip:
return None
# 최근 5분 내 같은 IP의 로그인 실패 횟수
window = timedelta(minutes=5)
cutoff = event['timestamp'] - window
failures = [
e for e in events
if e['event_type'] == 'authentication'
and e['status'] == 'failed'
and e.get('source_ip') == source_ip
and e['timestamp'] > cutoff
]
if len(failures) >= 5:
return {
'title': 'Brute Force Attack Detected',
'severity': 'high',
'description': f'{len(failures)} failed login attempts from {source_ip} in 5 minutes',
'source_ip': source_ip,
'rule': 'brute_force',
'timestamp': event['timestamp'],
}
return None
def impossible_travel_rule(event, events, baselines):
"""불가능한 이동 탐지: 지리적으로 불가능한 시간 내 로그인"""
if event['event_type'] != 'authentication' or event['status'] != 'success':
return None
user = event.get('user')
source_ip = event.get('source_ip')
if not user or not source_ip:
return None
# 최근 1시간 내 같은 사용자의 성공한 로그인
window = timedelta(hours=1)
cutoff = event['timestamp'] - window
recent_logins = [
e for e in events
if e['event_type'] == 'authentication'
and e['status'] == 'success'
and e.get('user') == user
and e['timestamp'] > cutoff
and e.get('source_ip') != source_ip
]
# 실제로는 GeoIP로 거리 계산, 여기서는 다른 IP면 의심
if recent_logins:
return {
'title': 'Impossible Travel Detected',
'severity': 'critical',
'description': f'User {user} logged in from multiple IPs within 1 hour',
'user': user,
'source_ip': source_ip,
'previous_ips': [e['source_ip'] for e in recent_logins],
'rule': 'impossible_travel',
'timestamp': event['timestamp'],
}
return None
def anomalous_hour_rule(event, events, baselines):
"""비정상 시간대 접속 탐지 (UEBA)"""
if event['event_type'] != 'authentication' or event['status'] != 'success':
return None
user = event.get('user')
if not user:
return None
baseline = baselines[user]
current_hour = event['timestamp'].hour
# 사용자의 정상 로그인 시간대가 10개 이상 기록되어 있고
# 현재 시간이 정상 범위를 벗어나면 알림
if len(baseline['login_hours']) >= 10 and current_hour not in baseline['login_hours']:
return {
'title': 'Anomalous Login Time Detected',
'severity': 'medium',
'description': f'User {user} logged in at unusual hour ({current_hour}:00)',
'user': user,
'normal_hours': sorted(baseline['login_hours']),
'rule': 'anomalous_hour',
'timestamp': event['timestamp'],
}
return None
# 사용 예시
if __name__ == '__main__':
siem = SimpleSIEM()
# 상관관계 규칙 등록
siem.add_correlation_rule(brute_force_rule)
siem.add_correlation_rule(impossible_travel_rule)
siem.add_correlation_rule(anomalous_hour_rule)
# 브루트포스 공격 시뮬레이션
print("=== Simulating Brute Force Attack ===")
for i in range(6):
siem.ingest_log({
'timestamp': (datetime.now() + timedelta(seconds=i*30)).isoformat(),
'source': 'auth-server',
'event_type': 'authentication',
'user': f'user_{i % 3}',
'source_ip': '192.168.1.100',
'status': 'failed',
})
# 불가능한 이동 시뮬레이션
print("\n=== Simulating Impossible Travel ===")
siem.ingest_log({
'timestamp': datetime.now().isoformat(),
'source': 'auth-server',
'event_type': 'authentication',
'user': 'admin',
'source_ip': '1.1.1.1', # 한국
'status': 'success',
})
siem.ingest_log({
'timestamp': (datetime.now() + timedelta(minutes=30)).isoformat(),
'source': 'auth-server',
'event_type': 'authentication',
'user': 'admin',
'source_ip': '8.8.8.8', # 미국 (30분 만에 이동 불가)
'status': 'success',
})
print(f"\n=== Summary ===")
print(f"Total events: {len(siem.events)}")
print(f"Total alerts: {len(siem.alerts)}")
-- Splunk SPL (Search Processing Language) 쿼리 예제
-- 1. 브루트포스 공격 탐지
-- 5분 내 동일 소스에서 로그인 실패 5회 이상
index=auth sourcetype=linux_secure action=failure
| bin _time span=5m
| stats count as failure_count,
values(user) as targeted_users,
dc(user) as unique_users
by src_ip, _time
| where failure_count >= 5
| sort -failure_count
| table _time, src_ip, failure_count, unique_users, targeted_users
-- 2. 비정상 로그인 시간 탐지 (UEBA 스타일)
index=auth action=success
| eval hour=strftime(_time,"%H")
| eval is_business_hours=if(hour>=9 AND hour<=18, "yes", "no")
| where is_business_hours="no"
| stats count as off_hours_logins,
values(src_ip) as source_ips,
earliest(_time) as first_login,
latest(_time) as last_login
by user
| where off_hours_logins >= 3
| sort -off_hours_logins
-- 3. 권한 상승 탐지
-- 일반 사용자가 관리자 명령 실행
index=linux sourcetype=linux_audit
| search (command="sudo *" OR command="su *" OR command="chmod 777 *")
| stats count as privilege_commands,
values(command) as commands
by user, host
| lookup user_roles user OUTPUT role
| where role!="admin"
| sort -privilege_commands
-- 4. 데이터 유출 의심 탐지
-- 대량 파일 다운로드 또는 외부 전송
index=network sourcetype=firewall action=allowed
| where dest_port IN (20, 21, 22, 443, 8080)
| stats sum(bytes_out) as total_bytes,
dc(dest_ip) as unique_destinations,
values(dest_ip) as destinations
by src_ip, user
| eval total_mb = round(total_bytes/1024/1024, 2)
| where total_mb > 100 -- 100MB 이상
| sort -total_mb
| table user, src_ip, total_mb, unique_destinations, destinations
-- 5. 상관관계 분석: 방화벽 차단 후 VPN 성공
-- 동일 IP가 방화벽에서 차단됐는데 VPN으로 들어왔으면 의심
index=firewall action=blocked
| rename src_ip as attacker_ip
| join attacker_ip type=inner
[search index=vpn action=success
| rename src_ip as attacker_ip]
| table _time, attacker_ip, user, firewall_reason, vpn_connection_time
-- 6. 실시간 대시보드용 통계
-- 시간별 보안 이벤트 트렌드
index=security
| timechart span=1h count by severity
| addtotals fieldname=total
-- 7. MITRE ATT&CK 매핑
-- 탐지된 공격을 ATT&CK 프레임워크에 매핑
index=alerts
| lookup mitre_attack_mapping technique_id OUTPUT tactic, technique_name
| stats count by tactic, technique_name
| sort -count
# Elastic SIEM Detection Rule 예제
# .detection-rules/brute_force.toml
[rule]
author = ["Security Team"]
description = "Detects potential brute force attacks with multiple failed login attempts"
enabled = true
false_positives = ["Users with forgotten passwords", "Automated testing"]
from = "now-5m"
index = ["logs-*", "winlogbeat-*", "filebeat-*"]
interval = "5m"
language = "kuery"
license = "Elastic License v2"
name = "Brute Force Attack Detected"
risk_score = 73
rule_id = "bf-001"
severity = "high"
tags = ["Brute Force", "Credential Access", "T1110"]
type = "threshold"
[rule.threat]
framework = "MITRE ATT&CK"
[[rule.threat.technique]]
id = "T1110"
name = "Brute Force"
reference = "https://attack.mitre.org/techniques/T1110/"
[rule.threat.tactic]
id = "TA0006"
name = "Credential Access"
reference = "https://attack.mitre.org/tactics/TA0006/"
[rule.threshold]
field = ["source.ip"]
value = 5
[rule.query]
'''
event.category:authentication AND event.outcome:failure
'''
---
# Elastic SIEM API를 통한 알림 조회 및 대응 - Python
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
import json
class ElasticSIEM:
def __init__(self, host='localhost:9200', api_key=None):
self.es = Elasticsearch(
hosts=[host],
api_key=api_key
)
def search_security_events(self, query, time_range='24h'):
"""보안 이벤트 검색"""
body = {
"query": {
"bool": {
"must": [
{"query_string": {"query": query}},
{"range": {
"@timestamp": {
"gte": f"now-{time_range}",
"lte": "now"
}
}}
]
}
},
"sort": [{"@timestamp": "desc"}],
"size": 100
}
response = self.es.search(
index=".alerts-security.alerts-*",
body=body
)
return response['hits']['hits']
def get_open_alerts(self, severity=None):
"""열린 알림 조회"""
query = {
"bool": {
"must": [
{"term": {"kibana.alert.workflow_status": "open"}}
]
}
}
if severity:
query["bool"]["must"].append(
{"term": {"kibana.alert.severity": severity}}
)
response = self.es.search(
index=".alerts-security.alerts-*",
query=query,
sort=[{"@timestamp": "desc"}],
size=50
)
return response['hits']['hits']
def acknowledge_alert(self, alert_id):
"""알림 확인 처리"""
self.es.update(
index=".alerts-security.alerts-*",
id=alert_id,
body={
"doc": {
"kibana.alert.workflow_status": "acknowledged",
"kibana.alert.workflow_user": "analyst@company.com"
}
}
)
def create_detection_rule(self, rule_config):
"""탐지 규칙 생성 (Kibana Detection Engine API)"""
# 실제로는 Kibana API 사용
# POST /api/detection_engine/rules
pass
def get_alert_timeline(self, alert_id):
"""알림 관련 이벤트 타임라인 조회"""
alert = self.es.get(index=".alerts-security.alerts-*", id=alert_id)
source_ip = alert['_source'].get('source', {}).get('ip')
if not source_ip:
return []
# 해당 IP의 최근 활동 조회
events = self.es.search(
index="logs-*",
query={
"bool": {
"must": [
{"term": {"source.ip": source_ip}},
{"range": {"@timestamp": {"gte": "now-1h"}}}
]
}
},
sort=[{"@timestamp": "asc"}],
size=200
)
return events['hits']['hits']
# 사용 예시
if __name__ == '__main__':
siem = ElasticSIEM(api_key='your-api-key')
# 열린 고위험 알림 조회
alerts = siem.get_open_alerts(severity='high')
for alert in alerts:
print(f"Alert: {alert['_source']['kibana.alert.rule.name']}")
print(f" Severity: {alert['_source']['kibana.alert.severity']}")
print(f" Source IP: {alert['_source'].get('source', {}).get('ip')}")
🗣️ 실무에서 이렇게 말하세요
💬 SOC 운영 회의에서
"알림 피로 문제가 심각합니다. 하루 1만 개 알림 중 실제 위협은 10개도 안 돼요. AI 기반 SIEM으로 전환해서 UEBA로 행동 기반 탐지하고, ML로 오탐을 자동 필터링해야 합니다. CrowdStrike나 Microsoft Sentinel 같은 차세대 SIEM은 심각도 자동 분류로 분석가가 실제 위협에 집중할 수 있게 해줍니다."
💬 보안 아키텍처 리뷰에서
"온프레미스 SIEM에서 클라우드 네이티브로 마이그레이션합시다. 현재 하루 500GB 로그인데 피크 시 2TB까지 가요. 온프렘은 용량 증설에 몇 주 걸리고, 클라우드는 자동 스케일링됩니다. Sentinel이나 Splunk Cloud는 AWS, Azure, GCP 로그를 네이티브로 수집하고, SOAR 연동으로 대응 자동화까지 됩니다."
💬 컴플라이언스 감사 대응에서
"SIEM에서 90일 로그 보관 정책 적용했고, 감사 추적 보고서는 자동 생성됩니다. 특권 계정 활동, 데이터 접근 로그, 정책 변경 이력 모두 중앙에서 검색 가능합니다. MITRE ATT&CK 프레임워크 매핑으로 탐지 커버리지 분석도 되고, 규정 준수 대시보드에서 PCI-DSS, ISMS 요건별 현황을 실시간으로 볼 수 있습니다."
⚠️ 주의사항 & 베스트 프랙티스
❌
모든 로그 무분별 수집
비용과 노이즈 때문에 모든 로그를 수집하면 안 됩니다. 보안 가치가 높은 로그(인증, 권한, 네트워크 흐름)를 우선 수집하고, 디버그 레벨 로그는 제외하세요. 로그 볼륨 급증은 비용 폭탄과 검색 성능 저하로 이어집니다.
❌
규칙만 의존하는 탐지
정적 규칙은 알려진 공격만 탐지합니다. 제로데이, 내부자 위협, APT는 규칙을 우회합니다. UEBA와 ML 기반 이상 탐지를 병행하세요. 규칙은 명확한 악성 행위에, ML은 미지의 위협에 사용합니다.
❌
알림만 발생, 대응 없음
SIEM이 알림을 발생해도 대응 프로세스가 없으면 무용지물입니다. SOAR(Security Orchestration, Automation and Response)와 연동하여 티켓 생성, 격리, 차단을 자동화하세요. 플레이북을 문서화하고 정기적으로 훈련합니다.
✅
SIEM 베스트 프랙티스
로그 소스 우선순위 정의, 정규화 및 풍부화(enrichment) 파이프라인 구축, 사용 사례 기반 탐지 규칙 개발, MITRE ATT&CK 매핑으로 커버리지 분석, 오탐 피드백 루프로 규칙 튜닝, SOAR 연동 자동 대응, 정기적인 레드팀/퍼플팀 훈련으로 탐지 효과 검증을 구현하세요.