SOC
Security Operations Center
보안 운영 센터. 24/7 보안 모니터링 및 대응.
Security Operations Center
보안 운영 센터. 24/7 보안 모니터링 및 대응.
SOC(Security Operations Center, 보안 운영 센터)는 조직의 IT 인프라, 네트워크, 애플리케이션을 24시간 365일 모니터링하고 사이버 위협을 탐지, 분석, 대응하는 중앙 집중식 보안 조직입니다. SOC는 보안 분석가, 보안 엔지니어, SOC 매니저로 구성된 팀이 SIEM, SOAR, EDR, 위협 인텔리전스 플랫폼 등 다양한 보안 도구를 활용하여 조직을 보호합니다.
SOC 운영은 일반적으로 3계층(Tier) 구조로 나뉩니다. Tier 1(초급 분석가)은 SIEM 알림을 모니터링하고 초기 분류(Triage)를 수행합니다. 매일 수천 건의 알림 중 실제 위협을 식별하여 Tier 2로 에스컬레이션합니다. Tier 2(중급 분석가)는 심층 조사, 침해 범위 파악, 대응 조치를 담당합니다. Tier 3(고급 분석가/위협 헌터)는 고급 위협 분석, 포렌식, 위협 헌팅, 보안 도구 튜닝을 수행합니다.
현대 SOC는 AI/ML 기반 자동화를 적극 도입하고 있습니다. SIEM에서 수집되는 로그 데이터를 머신러닝 모델로 분석하여 이상 징후를 탐지하고, SOAR(Security Orchestration, Automation and Response)를 통해 반복적인 대응 작업을 자동화합니다. 이를 통해 MTTR(Mean Time To Respond, 평균 대응 시간)을 줄이고 분석가의 Alert Fatigue(알림 피로)를 완화합니다. UEBA(User and Entity Behavior Analytics)는 사용자 및 엔터티의 정상 행동 패턴을 학습하여 내부 위협을 탐지합니다.
SOC 성숙도는 인력, 프로세스, 기술의 세 축으로 평가됩니다. 성숙한 SOC는 MITRE ATT&CK 프레임워크에 매핑된 탐지 규칙, 문서화된 플레이북(Playbook), 정기적인 레드팀/퍼플팀 훈련, KPI 기반 성과 측정을 갖추고 있습니다. 또한 위협 인텔리전스(TI)를 통합하여 최신 공격 기법과 IoC(Indicator of Compromise)를 탐지 규칙에 반영합니다. 클라우드 환경 확대에 따라 CSPM, CWPP 등 클라우드 보안 도구와의 통합도 필수입니다.
import json
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
from typing import Optional, List
class Severity(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class AlertStatus(Enum):
NEW = "new"
INVESTIGATING = "investigating"
ESCALATED = "escalated"
FALSE_POSITIVE = "false_positive"
RESOLVED = "resolved"
@dataclass
class SecurityAlert:
alert_id: str
source: str # SIEM, EDR, Firewall, etc.
timestamp: datetime
severity: Severity
title: str
description: str
source_ip: Optional[str] = None
dest_ip: Optional[str] = None
user: Optional[str] = None
mitre_tactic: Optional[str] = None
mitre_technique: Optional[str] = None
ioc: Optional[List[str]] = None
status: AlertStatus = AlertStatus.NEW
class SOCAlertTriager:
"""Tier 1 자동 분류 시스템"""
def __init__(self):
# 알려진 양성 패턴 (False Positive 필터링)
self.known_benign = {
"scheduled_scan": ["Vulnerability Scanner", "Nessus", "Qualys"],
"backup_traffic": ["backup.internal.com", "10.0.100."],
"monitoring": ["prometheus", "grafana", "nagios"]
}
# 고위험 패턴 (즉시 에스컬레이션)
self.high_risk_patterns = {
"ransomware_extension": [".encrypted", ".locked", ".crypto"],
"c2_domains": ["evil.com", "malware.xyz"], # TI 연동 필요
"lateral_movement": ["psexec", "wmic", "mimikatz"]
}
# MITRE ATT&CK 매핑
self.critical_techniques = [
"T1486", # Data Encrypted for Impact (Ransomware)
"T1003", # OS Credential Dumping
"T1021", # Remote Services
"T1071", # Application Layer Protocol (C2)
]
def triage(self, alert: SecurityAlert) -> dict:
"""알림 자동 분류 및 대응 권고"""
result = {
"alert_id": alert.alert_id,
"original_severity": alert.severity.value,
"adjusted_severity": alert.severity.value,
"action": "investigate",
"reason": [],
"auto_remediation": None
}
# 1. False Positive 필터링
if self._is_known_benign(alert):
result["action"] = "auto_close"
result["adjusted_severity"] = Severity.LOW.value
result["reason"].append("Known benign activity")
alert.status = AlertStatus.FALSE_POSITIVE
return result
# 2. 고위험 패턴 탐지 -> 자동 에스컬레이션
high_risk_match = self._check_high_risk(alert)
if high_risk_match:
result["action"] = "escalate_tier2"
result["adjusted_severity"] = Severity.CRITICAL.value
result["reason"].append(f"High-risk pattern: {high_risk_match}")
alert.status = AlertStatus.ESCALATED
return result
# 3. MITRE ATT&CK 기반 우선순위 조정
if alert.mitre_technique in self.critical_techniques:
result["adjusted_severity"] = Severity.HIGH.value
result["action"] = "prioritize"
result["reason"].append(f"Critical MITRE technique: {alert.mitre_technique}")
# 4. 자동 대응 추천
result["auto_remediation"] = self._suggest_remediation(alert)
return result
def _is_known_benign(self, alert: SecurityAlert) -> bool:
"""알려진 양성 활동 확인"""
for category, patterns in self.known_benign.items():
for pattern in patterns:
if pattern.lower() in alert.description.lower():
return True
if alert.source_ip and pattern in alert.source_ip:
return True
return False
def _check_high_risk(self, alert: SecurityAlert) -> Optional[str]:
"""고위험 패턴 확인"""
for category, patterns in self.high_risk_patterns.items():
for pattern in patterns:
if pattern.lower() in alert.description.lower():
return f"{category}: {pattern}"
return None
def _suggest_remediation(self, alert: SecurityAlert) -> Optional[dict]:
"""자동 대응 액션 추천"""
if "brute_force" in alert.title.lower():
return {
"type": "block_ip",
"target": alert.source_ip,
"duration": "24h",
"requires_approval": False
}
if "malware" in alert.title.lower() and alert.dest_ip:
return {
"type": "isolate_host",
"target": alert.dest_ip,
"requires_approval": True # Tier 2 승인 필요
}
return None
# 사용 예제
triager = SOCAlertTriager()
alert = SecurityAlert(
alert_id="ALERT-2024-12345",
source="CrowdStrike EDR",
timestamp=datetime.now(),
severity=Severity.MEDIUM,
title="Suspicious Process Execution",
description="mimikatz.exe executed by SYSTEM",
source_ip="10.0.50.100",
user="SYSTEM",
mitre_tactic="Credential Access",
mitre_technique="T1003"
)
result = triager.triage(alert)
print(json.dumps(result, indent=2))
# {
# "alert_id": "ALERT-2024-12345",
# "original_severity": "medium",
# "adjusted_severity": "critical",
# "action": "escalate_tier2",
# "reason": ["High-risk pattern: lateral_movement: mimikatz"],
# ...
# }
import asyncio
from abc import ABC, abstractmethod
from typing import List, Dict, Any
class PlaybookAction(ABC):
"""Playbook 액션 기본 클래스"""
@abstractmethod
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
pass
class EnrichWithThreatIntel(PlaybookAction):
"""위협 인텔리전스로 IoC 조회"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
ioc = context.get("source_ip")
# VirusTotal, AbuseIPDB, AlienVault OTX 등 조회
ti_result = await self._query_threat_intel(ioc)
return {"threat_intel": ti_result, "ioc_malicious": ti_result.get("score", 0) > 70}
async def _query_threat_intel(self, ioc: str) -> dict:
# 실제로는 TI API 호출
await asyncio.sleep(0.1)
return {"ioc": ioc, "score": 85, "categories": ["malware", "c2"]}
class QuerySIEMLogs(PlaybookAction):
"""SIEM에서 관련 로그 조회"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
source_ip = context.get("source_ip")
timerange = context.get("timerange", "24h")
# Elasticsearch/Splunk 쿼리
query = f"source_ip:{source_ip} AND @timestamp:[now-{timerange} TO now]"
results = await self._search_siem(query)
return {
"related_events": len(results),
"affected_hosts": list(set(r.get("dest_ip") for r in results)),
"users_involved": list(set(r.get("user") for r in results))
}
async def _search_siem(self, query: str) -> List[dict]:
await asyncio.sleep(0.2)
return [{"dest_ip": "10.0.1.50", "user": "admin"}]
class IsolateHost(PlaybookAction):
"""EDR을 통한 호스트 격리"""
async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
host = context.get("target_host")
if not context.get("approved", False):
return {"status": "pending_approval", "host": host}
# CrowdStrike/Defender ATP API 호출
result = await self._isolate_via_edr(host)
return {"status": "isolated", "host": host, "edr_response": result}
async def _isolate_via_edr(self, host: str) -> dict:
await asyncio.sleep(0.5)
return {"success": True, "isolation_id": "ISO-12345"}
class SOARPlaybook:
"""SOAR Playbook 실행기"""
def __init__(self, name: str, actions: List[PlaybookAction]):
self.name = name
self.actions = actions
self.execution_log = []
async def execute(self, initial_context: Dict[str, Any]) -> Dict[str, Any]:
context = initial_context.copy()
for action in self.actions:
action_name = action.__class__.__name__
print(f"[{self.name}] Executing: {action_name}")
try:
result = await action.execute(context)
context.update(result)
self.execution_log.append({
"action": action_name,
"status": "success",
"result": result
})
except Exception as e:
self.execution_log.append({
"action": action_name,
"status": "failed",
"error": str(e)
})
raise
return context
# Phishing Response Playbook
phishing_playbook = SOARPlaybook(
name="Phishing Incident Response",
actions=[
EnrichWithThreatIntel(),
QuerySIEMLogs(),
# BlockSenderInEmailGateway(),
# NotifyAffectedUsers(),
# CreateJiraTicket(),
]
)
# 실행
async def main():
result = await phishing_playbook.execute({
"alert_id": "ALERT-2024-67890",
"source_ip": "203.0.113.100",
"email_subject": "Urgent: Password Reset Required"
})
print(f"Playbook completed. Affected hosts: {result.get('affected_hosts')}")
asyncio.run(main())
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List
import statistics
@dataclass
class SOCMetrics:
"""SOC KPI 메트릭"""
period_start: datetime
period_end: datetime
# 볼륨 메트릭
total_alerts: int = 0
alerts_by_severity: Dict[str, int] = None
alerts_by_source: Dict[str, int] = None
# 효율성 메트릭
mttd: float = 0.0 # Mean Time To Detect (분)
mttr: float = 0.0 # Mean Time To Respond (분)
mttc: float = 0.0 # Mean Time To Close (분)
# 품질 메트릭
false_positive_rate: float = 0.0
true_positive_rate: float = 0.0
escalation_rate: float = 0.0
# 커버리지 메트릭
mitre_coverage: float = 0.0 # ATT&CK 기법 탐지율
class SOCMetricsCollector:
"""SOC 메트릭 수집기"""
def __init__(self, siem_client, ticketing_client):
self.siem = siem_client
self.ticketing = ticketing_client
def collect_daily_metrics(self, date: datetime) -> SOCMetrics:
period_start = date.replace(hour=0, minute=0, second=0)
period_end = period_start + timedelta(days=1)
# 알림 데이터 조회
alerts = self.siem.get_alerts(period_start, period_end)
incidents = self.ticketing.get_incidents(period_start, period_end)
metrics = SOCMetrics(
period_start=period_start,
period_end=period_end,
total_alerts=len(alerts),
alerts_by_severity=self._count_by_field(alerts, "severity"),
alerts_by_source=self._count_by_field(alerts, "source")
)
# MTTD 계산: 공격 발생 → 탐지 알림
detection_times = [
(a.detected_at - a.attack_started_at).total_seconds() / 60
for a in alerts if a.attack_started_at
]
if detection_times:
metrics.mttd = statistics.mean(detection_times)
# MTTR 계산: 알림 → 대응 시작
response_times = [
(i.response_started_at - i.created_at).total_seconds() / 60
for i in incidents if i.response_started_at
]
if response_times:
metrics.mttr = statistics.mean(response_times)
# MTTC 계산: 알림 → 종료
close_times = [
(i.closed_at - i.created_at).total_seconds() / 60
for i in incidents if i.closed_at
]
if close_times:
metrics.mttc = statistics.mean(close_times)
# False Positive Rate
false_positives = sum(1 for a in alerts if a.status == "false_positive")
metrics.false_positive_rate = false_positives / len(alerts) * 100 if alerts else 0
# MITRE ATT&CK 커버리지
metrics.mitre_coverage = self._calculate_mitre_coverage()
return metrics
def _count_by_field(self, items: List, field: str) -> Dict[str, int]:
counts = {}
for item in items:
value = getattr(item, field, "unknown")
counts[value] = counts.get(value, 0) + 1
return counts
def _calculate_mitre_coverage(self) -> float:
"""탐지 규칙의 MITRE ATT&CK 커버리지 계산"""
total_techniques = 201 # ATT&CK Enterprise 기법 수
detection_rules = self.siem.get_detection_rules()
covered_techniques = set()
for rule in detection_rules:
if rule.mitre_techniques:
covered_techniques.update(rule.mitre_techniques)
return len(covered_techniques) / total_techniques * 100
def generate_report(self, metrics: SOCMetrics) -> str:
"""일간 SOC 리포트 생성"""
return f"""
=== SOC Daily Report ===
Period: {metrics.period_start.date()}
📊 Alert Volume
- Total Alerts: {metrics.total_alerts}
- By Severity: {metrics.alerts_by_severity}
⏱️ Response Metrics
- MTTD (Mean Time To Detect): {metrics.mttd:.1f} minutes
- MTTR (Mean Time To Respond): {metrics.mttr:.1f} minutes
- MTTC (Mean Time To Close): {metrics.mttc:.1f} minutes
📈 Quality Metrics
- False Positive Rate: {metrics.false_positive_rate:.1f}%
- MITRE ATT&CK Coverage: {metrics.mitre_coverage:.1f}%
🎯 Goals
- MTTD Target: < 15 minutes {'✅' if metrics.mttd < 15 else '❌'}
- MTTR Target: < 60 minutes {'✅' if metrics.mttr < 60 else '❌'}
- FP Rate Target: < 30% {'✅' if metrics.false_positive_rate < 30 else '❌'}
"""
하루 수천 건의 알림 중 실제 위협은 일부입니다. 과도한 False Positive는 분석가를 지치게 하고 진짜 위협을 놓치게 합니다. 정기적인 탐지 규칙 튜닝, 위험 기반 우선순위화, SOAR 자동화가 필수입니다.
24/7 교대 근무, 고압적인 환경, 스킬 정체는 SOC 인력 이직률을 높입니다. 적절한 교대 일정, 지속적인 교육 기회, 자동화를 통한 반복 업무 감소, 명확한 에스컬레이션 경로가 중요합니다.
SIEM, EDR, 방화벽, 클라우드 보안 도구가 사일로화되면 위협을 놓칩니다. XDR(Extended Detection and Response)이나 통합 데이터 레이크를 통해 전체 환경에 대한 통합 가시성을 확보해야 합니다.