🔒 보안

SOC

Security Operations Center

보안 운영 센터. 24/7 보안 모니터링 및 대응.

📖 상세 설명

SOC(Security Operations Center, 보안 운영 센터)는 조직의 IT 인프라, 네트워크, 애플리케이션을 24시간 365일 모니터링하고 사이버 위협을 탐지, 분석, 대응하는 중앙 집중식 보안 조직입니다. SOC는 보안 분석가, 보안 엔지니어, SOC 매니저로 구성된 팀이 SIEM, SOAR, EDR, 위협 인텔리전스 플랫폼 등 다양한 보안 도구를 활용하여 조직을 보호합니다.

SOC 운영은 일반적으로 3계층(Tier) 구조로 나뉩니다. Tier 1(초급 분석가)은 SIEM 알림을 모니터링하고 초기 분류(Triage)를 수행합니다. 매일 수천 건의 알림 중 실제 위협을 식별하여 Tier 2로 에스컬레이션합니다. Tier 2(중급 분석가)는 심층 조사, 침해 범위 파악, 대응 조치를 담당합니다. Tier 3(고급 분석가/위협 헌터)는 고급 위협 분석, 포렌식, 위협 헌팅, 보안 도구 튜닝을 수행합니다.

현대 SOC는 AI/ML 기반 자동화를 적극 도입하고 있습니다. SIEM에서 수집되는 로그 데이터를 머신러닝 모델로 분석하여 이상 징후를 탐지하고, SOAR(Security Orchestration, Automation and Response)를 통해 반복적인 대응 작업을 자동화합니다. 이를 통해 MTTR(Mean Time To Respond, 평균 대응 시간)을 줄이고 분석가의 Alert Fatigue(알림 피로)를 완화합니다. UEBA(User and Entity Behavior Analytics)는 사용자 및 엔터티의 정상 행동 패턴을 학습하여 내부 위협을 탐지합니다.

SOC 성숙도는 인력, 프로세스, 기술의 세 축으로 평가됩니다. 성숙한 SOC는 MITRE ATT&CK 프레임워크에 매핑된 탐지 규칙, 문서화된 플레이북(Playbook), 정기적인 레드팀/퍼플팀 훈련, KPI 기반 성과 측정을 갖추고 있습니다. 또한 위협 인텔리전스(TI)를 통합하여 최신 공격 기법과 IoC(Indicator of Compromise)를 탐지 규칙에 반영합니다. 클라우드 환경 확대에 따라 CSPM, CWPP 등 클라우드 보안 도구와의 통합도 필수입니다.

💻 코드 예제

SIEM 알림 자동 분류 (Python)

import json
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
from typing import Optional, List

class Severity(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class AlertStatus(Enum):
    NEW = "new"
    INVESTIGATING = "investigating"
    ESCALATED = "escalated"
    FALSE_POSITIVE = "false_positive"
    RESOLVED = "resolved"

@dataclass
class SecurityAlert:
    alert_id: str
    source: str  # SIEM, EDR, Firewall, etc.
    timestamp: datetime
    severity: Severity
    title: str
    description: str
    source_ip: Optional[str] = None
    dest_ip: Optional[str] = None
    user: Optional[str] = None
    mitre_tactic: Optional[str] = None
    mitre_technique: Optional[str] = None
    ioc: Optional[List[str]] = None
    status: AlertStatus = AlertStatus.NEW

class SOCAlertTriager:
    """Tier 1 자동 분류 시스템"""

    def __init__(self):
        # 알려진 양성 패턴 (False Positive 필터링)
        self.known_benign = {
            "scheduled_scan": ["Vulnerability Scanner", "Nessus", "Qualys"],
            "backup_traffic": ["backup.internal.com", "10.0.100."],
            "monitoring": ["prometheus", "grafana", "nagios"]
        }

        # 고위험 패턴 (즉시 에스컬레이션)
        self.high_risk_patterns = {
            "ransomware_extension": [".encrypted", ".locked", ".crypto"],
            "c2_domains": ["evil.com", "malware.xyz"],  # TI 연동 필요
            "lateral_movement": ["psexec", "wmic", "mimikatz"]
        }

        # MITRE ATT&CK 매핑
        self.critical_techniques = [
            "T1486",  # Data Encrypted for Impact (Ransomware)
            "T1003",  # OS Credential Dumping
            "T1021",  # Remote Services
            "T1071",  # Application Layer Protocol (C2)
        ]

    def triage(self, alert: SecurityAlert) -> dict:
        """알림 자동 분류 및 대응 권고"""
        result = {
            "alert_id": alert.alert_id,
            "original_severity": alert.severity.value,
            "adjusted_severity": alert.severity.value,
            "action": "investigate",
            "reason": [],
            "auto_remediation": None
        }

        # 1. False Positive 필터링
        if self._is_known_benign(alert):
            result["action"] = "auto_close"
            result["adjusted_severity"] = Severity.LOW.value
            result["reason"].append("Known benign activity")
            alert.status = AlertStatus.FALSE_POSITIVE
            return result

        # 2. 고위험 패턴 탐지 -> 자동 에스컬레이션
        high_risk_match = self._check_high_risk(alert)
        if high_risk_match:
            result["action"] = "escalate_tier2"
            result["adjusted_severity"] = Severity.CRITICAL.value
            result["reason"].append(f"High-risk pattern: {high_risk_match}")
            alert.status = AlertStatus.ESCALATED
            return result

        # 3. MITRE ATT&CK 기반 우선순위 조정
        if alert.mitre_technique in self.critical_techniques:
            result["adjusted_severity"] = Severity.HIGH.value
            result["action"] = "prioritize"
            result["reason"].append(f"Critical MITRE technique: {alert.mitre_technique}")

        # 4. 자동 대응 추천
        result["auto_remediation"] = self._suggest_remediation(alert)

        return result

    def _is_known_benign(self, alert: SecurityAlert) -> bool:
        """알려진 양성 활동 확인"""
        for category, patterns in self.known_benign.items():
            for pattern in patterns:
                if pattern.lower() in alert.description.lower():
                    return True
                if alert.source_ip and pattern in alert.source_ip:
                    return True
        return False

    def _check_high_risk(self, alert: SecurityAlert) -> Optional[str]:
        """고위험 패턴 확인"""
        for category, patterns in self.high_risk_patterns.items():
            for pattern in patterns:
                if pattern.lower() in alert.description.lower():
                    return f"{category}: {pattern}"
        return None

    def _suggest_remediation(self, alert: SecurityAlert) -> Optional[dict]:
        """자동 대응 액션 추천"""
        if "brute_force" in alert.title.lower():
            return {
                "type": "block_ip",
                "target": alert.source_ip,
                "duration": "24h",
                "requires_approval": False
            }
        if "malware" in alert.title.lower() and alert.dest_ip:
            return {
                "type": "isolate_host",
                "target": alert.dest_ip,
                "requires_approval": True  # Tier 2 승인 필요
            }
        return None

# 사용 예제
triager = SOCAlertTriager()

alert = SecurityAlert(
    alert_id="ALERT-2024-12345",
    source="CrowdStrike EDR",
    timestamp=datetime.now(),
    severity=Severity.MEDIUM,
    title="Suspicious Process Execution",
    description="mimikatz.exe executed by SYSTEM",
    source_ip="10.0.50.100",
    user="SYSTEM",
    mitre_tactic="Credential Access",
    mitre_technique="T1003"
)

result = triager.triage(alert)
print(json.dumps(result, indent=2))
# {
#   "alert_id": "ALERT-2024-12345",
#   "original_severity": "medium",
#   "adjusted_severity": "critical",
#   "action": "escalate_tier2",
#   "reason": ["High-risk pattern: lateral_movement: mimikatz"],
#   ...
# }

SOAR Playbook 실행 예제

import asyncio
from abc import ABC, abstractmethod
from typing import List, Dict, Any

class PlaybookAction(ABC):
    """Playbook 액션 기본 클래스"""

    @abstractmethod
    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        pass

class EnrichWithThreatIntel(PlaybookAction):
    """위협 인텔리전스로 IoC 조회"""

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        ioc = context.get("source_ip")
        # VirusTotal, AbuseIPDB, AlienVault OTX 등 조회
        ti_result = await self._query_threat_intel(ioc)
        return {"threat_intel": ti_result, "ioc_malicious": ti_result.get("score", 0) > 70}

    async def _query_threat_intel(self, ioc: str) -> dict:
        # 실제로는 TI API 호출
        await asyncio.sleep(0.1)
        return {"ioc": ioc, "score": 85, "categories": ["malware", "c2"]}

class QuerySIEMLogs(PlaybookAction):
    """SIEM에서 관련 로그 조회"""

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        source_ip = context.get("source_ip")
        timerange = context.get("timerange", "24h")

        # Elasticsearch/Splunk 쿼리
        query = f"source_ip:{source_ip} AND @timestamp:[now-{timerange} TO now]"
        results = await self._search_siem(query)

        return {
            "related_events": len(results),
            "affected_hosts": list(set(r.get("dest_ip") for r in results)),
            "users_involved": list(set(r.get("user") for r in results))
        }

    async def _search_siem(self, query: str) -> List[dict]:
        await asyncio.sleep(0.2)
        return [{"dest_ip": "10.0.1.50", "user": "admin"}]

class IsolateHost(PlaybookAction):
    """EDR을 통한 호스트 격리"""

    async def execute(self, context: Dict[str, Any]) -> Dict[str, Any]:
        host = context.get("target_host")
        if not context.get("approved", False):
            return {"status": "pending_approval", "host": host}

        # CrowdStrike/Defender ATP API 호출
        result = await self._isolate_via_edr(host)
        return {"status": "isolated", "host": host, "edr_response": result}

    async def _isolate_via_edr(self, host: str) -> dict:
        await asyncio.sleep(0.5)
        return {"success": True, "isolation_id": "ISO-12345"}

class SOARPlaybook:
    """SOAR Playbook 실행기"""

    def __init__(self, name: str, actions: List[PlaybookAction]):
        self.name = name
        self.actions = actions
        self.execution_log = []

    async def execute(self, initial_context: Dict[str, Any]) -> Dict[str, Any]:
        context = initial_context.copy()

        for action in self.actions:
            action_name = action.__class__.__name__
            print(f"[{self.name}] Executing: {action_name}")

            try:
                result = await action.execute(context)
                context.update(result)
                self.execution_log.append({
                    "action": action_name,
                    "status": "success",
                    "result": result
                })
            except Exception as e:
                self.execution_log.append({
                    "action": action_name,
                    "status": "failed",
                    "error": str(e)
                })
                raise

        return context

# Phishing Response Playbook
phishing_playbook = SOARPlaybook(
    name="Phishing Incident Response",
    actions=[
        EnrichWithThreatIntel(),
        QuerySIEMLogs(),
        # BlockSenderInEmailGateway(),
        # NotifyAffectedUsers(),
        # CreateJiraTicket(),
    ]
)

# 실행
async def main():
    result = await phishing_playbook.execute({
        "alert_id": "ALERT-2024-67890",
        "source_ip": "203.0.113.100",
        "email_subject": "Urgent: Password Reset Required"
    })
    print(f"Playbook completed. Affected hosts: {result.get('affected_hosts')}")

asyncio.run(main())

SOC 대시보드 메트릭 수집

from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List
import statistics

@dataclass
class SOCMetrics:
    """SOC KPI 메트릭"""
    period_start: datetime
    period_end: datetime

    # 볼륨 메트릭
    total_alerts: int = 0
    alerts_by_severity: Dict[str, int] = None
    alerts_by_source: Dict[str, int] = None

    # 효율성 메트릭
    mttd: float = 0.0  # Mean Time To Detect (분)
    mttr: float = 0.0  # Mean Time To Respond (분)
    mttc: float = 0.0  # Mean Time To Close (분)

    # 품질 메트릭
    false_positive_rate: float = 0.0
    true_positive_rate: float = 0.0
    escalation_rate: float = 0.0

    # 커버리지 메트릭
    mitre_coverage: float = 0.0  # ATT&CK 기법 탐지율

class SOCMetricsCollector:
    """SOC 메트릭 수집기"""

    def __init__(self, siem_client, ticketing_client):
        self.siem = siem_client
        self.ticketing = ticketing_client

    def collect_daily_metrics(self, date: datetime) -> SOCMetrics:
        period_start = date.replace(hour=0, minute=0, second=0)
        period_end = period_start + timedelta(days=1)

        # 알림 데이터 조회
        alerts = self.siem.get_alerts(period_start, period_end)
        incidents = self.ticketing.get_incidents(period_start, period_end)

        metrics = SOCMetrics(
            period_start=period_start,
            period_end=period_end,
            total_alerts=len(alerts),
            alerts_by_severity=self._count_by_field(alerts, "severity"),
            alerts_by_source=self._count_by_field(alerts, "source")
        )

        # MTTD 계산: 공격 발생 → 탐지 알림
        detection_times = [
            (a.detected_at - a.attack_started_at).total_seconds() / 60
            for a in alerts if a.attack_started_at
        ]
        if detection_times:
            metrics.mttd = statistics.mean(detection_times)

        # MTTR 계산: 알림 → 대응 시작
        response_times = [
            (i.response_started_at - i.created_at).total_seconds() / 60
            for i in incidents if i.response_started_at
        ]
        if response_times:
            metrics.mttr = statistics.mean(response_times)

        # MTTC 계산: 알림 → 종료
        close_times = [
            (i.closed_at - i.created_at).total_seconds() / 60
            for i in incidents if i.closed_at
        ]
        if close_times:
            metrics.mttc = statistics.mean(close_times)

        # False Positive Rate
        false_positives = sum(1 for a in alerts if a.status == "false_positive")
        metrics.false_positive_rate = false_positives / len(alerts) * 100 if alerts else 0

        # MITRE ATT&CK 커버리지
        metrics.mitre_coverage = self._calculate_mitre_coverage()

        return metrics

    def _count_by_field(self, items: List, field: str) -> Dict[str, int]:
        counts = {}
        for item in items:
            value = getattr(item, field, "unknown")
            counts[value] = counts.get(value, 0) + 1
        return counts

    def _calculate_mitre_coverage(self) -> float:
        """탐지 규칙의 MITRE ATT&CK 커버리지 계산"""
        total_techniques = 201  # ATT&CK Enterprise 기법 수
        detection_rules = self.siem.get_detection_rules()
        covered_techniques = set()

        for rule in detection_rules:
            if rule.mitre_techniques:
                covered_techniques.update(rule.mitre_techniques)

        return len(covered_techniques) / total_techniques * 100

    def generate_report(self, metrics: SOCMetrics) -> str:
        """일간 SOC 리포트 생성"""
        return f"""
=== SOC Daily Report ===
Period: {metrics.period_start.date()}

📊 Alert Volume
- Total Alerts: {metrics.total_alerts}
- By Severity: {metrics.alerts_by_severity}

⏱️ Response Metrics
- MTTD (Mean Time To Detect): {metrics.mttd:.1f} minutes
- MTTR (Mean Time To Respond): {metrics.mttr:.1f} minutes
- MTTC (Mean Time To Close): {metrics.mttc:.1f} minutes

📈 Quality Metrics
- False Positive Rate: {metrics.false_positive_rate:.1f}%
- MITRE ATT&CK Coverage: {metrics.mitre_coverage:.1f}%

🎯 Goals
- MTTD Target: < 15 minutes {'✅' if metrics.mttd < 15 else '❌'}
- MTTR Target: < 60 minutes {'✅' if metrics.mttr < 60 else '❌'}
- FP Rate Target: < 30% {'✅' if metrics.false_positive_rate < 30 else '❌'}
"""

🗣️ 실무에서 이렇게 말해요

  • "어젯밤 SOC에서 랜섬웨어 의심 알림이 떴는데, Tier 2가 바로 대응해서 확산 전에 격리했습니다."
  • "SIEM 알림 중 70%가 False Positive라서 분석가들이 힘들어해요. SOAR 플레이북으로 자동 필터링 규칙 추가합시다."
  • "이번 달 MTTR이 90분인데, 목표가 60분입니다. 자동 대응 범위를 늘려야 할 것 같아요."
  • "MITRE ATT&CK 커버리지가 40%밖에 안 돼요. Credential Access 쪽 탐지 규칙을 보강해야 합니다."
  • "SOC의 3-Tier 구조를 설명하고, 각 Tier의 역할과 필요한 역량은 무엇인가요?"
  • "MTTD, MTTR, MTTC의 차이점과 각각을 줄이기 위한 전략은 무엇인가요?"
  • "Alert Fatigue란 무엇이고, SOC에서 이를 완화하기 위한 방법은 무엇인가요?"
  • "SOAR의 역할과 SOC 운영에 미치는 영향을 설명해 주세요."
  • "이 탐지 규칙은 MITRE ATT&CK에 매핑되어 있지 않아요. T1059 (Command and Scripting) 태그를 추가해 주세요."
  • "플레이북에 롤백 절차가 없어요. 호스트 격리가 잘못된 경우를 대비해 복구 액션을 추가합시다."
  • "이 SIEM 쿼리는 False Positive가 많이 발생할 것 같아요. 더 구체적인 필터 조건이 필요합니다."

⚠️ 주의사항

Alert Fatigue 관리

하루 수천 건의 알림 중 실제 위협은 일부입니다. 과도한 False Positive는 분석가를 지치게 하고 진짜 위협을 놓치게 합니다. 정기적인 탐지 규칙 튜닝, 위험 기반 우선순위화, SOAR 자동화가 필수입니다.

인력 번아웃 방지

24/7 교대 근무, 고압적인 환경, 스킬 정체는 SOC 인력 이직률을 높입니다. 적절한 교대 일정, 지속적인 교육 기회, 자동화를 통한 반복 업무 감소, 명확한 에스컬레이션 경로가 중요합니다.

도구 통합과 가시성

SIEM, EDR, 방화벽, 클라우드 보안 도구가 사일로화되면 위협을 놓칩니다. XDR(Extended Detection and Response)이나 통합 데이터 레이크를 통해 전체 환경에 대한 통합 가시성을 확보해야 합니다.

🔗 관련 용어

📚 더 배우기