XDR
Extended Detection and Response (확장된 탐지 및 대응)
엔드포인트, 네트워크, 클라우드, 이메일, 아이덴티티 등 다양한 보안 계층의 데이터를 단일 플랫폼에서 통합 수집하고, 크로스-레이어 상관 분석을 통해 위협을 탐지하며, 통합된 대응을 제공하는 차세대 보안 솔루션입니다.
Extended Detection and Response (확장된 탐지 및 대응)
엔드포인트, 네트워크, 클라우드, 이메일, 아이덴티티 등 다양한 보안 계층의 데이터를 단일 플랫폼에서 통합 수집하고, 크로스-레이어 상관 분석을 통해 위협을 탐지하며, 통합된 대응을 제공하는 차세대 보안 솔루션입니다.
XDR(Extended Detection and Response)은 EDR(엔드포인트)의 개념을 확장하여 네트워크, 클라우드 워크로드, 이메일, 아이덴티티 등 전체 IT 인프라에서 발생하는 보안 이벤트를 통합 수집하고 분석하는 플랫폼입니다. 개별 보안 도구가 각자의 사일로에서 알림을 생성하는 기존 방식과 달리, XDR은 모든 데이터를 단일 데이터 레이크에 정규화하여 저장하고, 크로스-레이어 상관 분석(Cross-layer Correlation)을 통해 공격의 전체 그림을 파악합니다.
XDR의 등장 배경은 보안 도구의 파편화와 알림 피로(Alert Fatigue)입니다. 평균적인 기업은 40개 이상의 보안 도구를 운영하며, SOC 분석가는 하루 수천 건의 알림을 처리해야 합니다. 각 도구가 부분적인 정보만 제공하기 때문에 공격의 전체 경로를 추적하려면 여러 콘솔을 오가며 수동으로 데이터를 연결해야 합니다. 2018년 Palo Alto Networks가 처음 제안한 XDR 개념은 이러한 "도구 스프롤(Tool Sprawl)"과 "피벗 세금(Pivot Tax)"을 해결하고자 했습니다.
XDR의 핵심 가치는 세 가지입니다. 첫째, 통합 가시성(Unified Visibility)은 엔드포인트의 프로세스 실행, 네트워크의 트래픽 흐름, 클라우드의 API 호출, 이메일의 첨부파일 분석을 단일 뷰에서 제공합니다. 둘째, 자동 상관 분석(Automated Correlation)은 개별적으로는 정상처럼 보이는 이벤트들을 연결하여 공격 체인을 재구성합니다. 예를 들어 "피싱 이메일 수신 → 악성 첨부파일 실행 → 자격증명 탈취 → 측면 이동"의 전체 경로를 하나의 인시던트로 묶어줍니다. 셋째, 통합 대응(Unified Response)은 엔드포인트 격리, 네트워크 차단, 계정 비활성화를 단일 플랫폼에서 실행합니다.
XDR은 Native XDR과 Open XDR로 구분됩니다. Native XDR은 단일 벤더의 EDR, NDR, 이메일 보안 등을 긴밀하게 통합한 형태로, 데이터 정규화와 상관 분석이 원활하지만 벤더 종속성이 높습니다. Open XDR은 여러 벤더의 보안 도구를 API로 연동하여 기존 투자를 보존하면서 XDR 기능을 구현하지만, 통합 깊이가 얕을 수 있습니다. 선택은 조직의 기존 보안 스택과 전략에 따라 달라집니다.
# XDR 크로스-레이어 상관 분석 엔진 - Python
# 엔드포인트, 네트워크, 이메일, 클라우드 이벤트를 통합 분석
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Set
from collections import defaultdict
from dataclasses import dataclass, field
import json
import hashlib
@dataclass
class SecurityEvent:
"""정규화된 보안 이벤트"""
timestamp: datetime
source_type: str # endpoint, network, email, cloud, identity
event_type: str # process_create, network_connection, email_received, etc.
severity: str # info, low, medium, high, critical
host: Optional[str] = None
user: Optional[str] = None
source_ip: Optional[str] = None
dest_ip: Optional[str] = None
process_name: Optional[str] = None
file_hash: Optional[str] = None
url: Optional[str] = None
email_subject: Optional[str] = None
raw_data: Dict = field(default_factory=dict)
@dataclass
class CorrelatedIncident:
"""상관 분석된 인시던트"""
incident_id: str
title: str
severity: str
events: List[SecurityEvent]
attack_chain: List[str]
mitre_techniques: List[str]
affected_assets: Set[str]
recommendations: List[str]
created_at: datetime
class XDRCorrelationEngine:
"""XDR 상관 분석 엔진 - 크로스 레이어 위협 탐지"""
def __init__(self):
self.events: List[SecurityEvent] = []
self.incidents: List[CorrelatedIncident] = []
self.correlation_rules = self._load_correlation_rules()
def _load_correlation_rules(self) -> List[Dict]:
"""상관 분석 규칙 로드"""
return [
{
"name": "Phishing Attack Chain",
"description": "피싱 이메일 → 악성 첨부파일 실행 → 후속 활동",
"pattern": [
{"source_type": "email", "event_type": "suspicious_attachment"},
{"source_type": "endpoint", "event_type": "process_create",
"conditions": {"parent_name": ["outlook.exe", "thunderbird.exe"]}},
{"source_type": "endpoint", "event_type": "process_create",
"conditions": {"name": ["powershell.exe", "cmd.exe", "wscript.exe"]}}
],
"time_window_minutes": 30,
"severity": "high",
"mitre": ["T1566.001", "T1204.002", "T1059"]
},
{
"name": "Credential Theft and Lateral Movement",
"description": "자격증명 탈취 후 측면 이동",
"pattern": [
{"source_type": "endpoint", "event_type": "lsass_access"},
{"source_type": "identity", "event_type": "auth_failure",
"conditions": {"count_threshold": 5}},
{"source_type": "network", "event_type": "smb_connection",
"conditions": {"internal_only": True}}
],
"time_window_minutes": 60,
"severity": "critical",
"mitre": ["T1003.001", "T1110", "T1021.002"]
},
{
"name": "Cloud Data Exfiltration",
"description": "클라우드 저장소로 대량 데이터 유출",
"pattern": [
{"source_type": "endpoint", "event_type": "file_access",
"conditions": {"count_threshold": 100}},
{"source_type": "network", "event_type": "dns_query",
"conditions": {"domains": ["dropbox.com", "drive.google.com",
"onedrive.live.com"]}},
{"source_type": "network", "event_type": "https_upload",
"conditions": {"size_threshold_mb": 50}}
],
"time_window_minutes": 120,
"severity": "critical",
"mitre": ["T1567.002", "T1041"]
},
{
"name": "C2 Communication Detection",
"description": "C2 서버와의 비콘 통신 탐지",
"pattern": [
{"source_type": "endpoint", "event_type": "process_create",
"conditions": {"suspicious_parent": True}},
{"source_type": "network", "event_type": "periodic_connection",
"conditions": {"interval_variance": "low", "dest_unknown": True}},
{"source_type": "network", "event_type": "dns_query",
"conditions": {"entropy": "high"}}
],
"time_window_minutes": 1440, # 24시간
"severity": "high",
"mitre": ["T1071", "T1568", "T1573"]
}
]
def ingest_event(self, event: SecurityEvent):
"""이벤트 수집 및 실시간 상관 분석 트리거"""
self.events.append(event)
# 실시간 상관 분석 실행
self._run_correlation(event)
def _run_correlation(self, trigger_event: SecurityEvent):
"""상관 분석 규칙 실행"""
for rule in self.correlation_rules:
matched_events = self._match_pattern(rule, trigger_event)
if matched_events:
incident = self._create_incident(rule, matched_events)
self.incidents.append(incident)
print(f"[INCIDENT] {incident.title}")
print(f" Severity: {incident.severity}")
print(f" Events: {len(incident.events)}")
print(f" Attack Chain: {' → '.join(incident.attack_chain)}")
def _match_pattern(self, rule: Dict, trigger_event: SecurityEvent) -> List[SecurityEvent]:
"""패턴 매칭 - 시간 윈도우 내 관련 이벤트 검색"""
time_window = timedelta(minutes=rule["time_window_minutes"])
pattern = rule["pattern"]
# 트리거 이벤트가 패턴의 마지막 단계와 일치하는지 확인
last_step = pattern[-1]
if not self._event_matches_step(trigger_event, last_step):
return []
# 시간 윈도우 내 이전 단계들 검색
matched_chain = [trigger_event]
for step in reversed(pattern[:-1]):
window_start = trigger_event.timestamp - time_window
# 같은 호스트/사용자의 이벤트 검색
candidates = [
e for e in self.events
if window_start <= e.timestamp <= trigger_event.timestamp
and self._event_matches_step(e, step)
and self._events_related(e, trigger_event)
]
if candidates:
matched_chain.insert(0, candidates[-1]) # 가장 최근 이벤트
else:
return [] # 패턴 불일치
return matched_chain if len(matched_chain) == len(pattern) else []
def _event_matches_step(self, event: SecurityEvent, step: Dict) -> bool:
"""개별 이벤트가 패턴 단계와 일치하는지 확인"""
if event.source_type != step["source_type"]:
return False
if event.event_type != step["event_type"]:
return False
# 추가 조건 확인
conditions = step.get("conditions", {})
for key, value in conditions.items():
if key == "parent_name":
if event.raw_data.get("parent_name", "").lower() not in [v.lower() for v in value]:
return False
elif key == "name":
if event.process_name and event.process_name.lower() not in [v.lower() for v in value]:
return False
# ... 추가 조건 처리
return True
def _events_related(self, event1: SecurityEvent, event2: SecurityEvent) -> bool:
"""두 이벤트가 같은 엔티티(호스트/사용자)와 관련되어 있는지 확인"""
# 같은 호스트
if event1.host and event2.host and event1.host == event2.host:
return True
# 같은 사용자
if event1.user and event2.user and event1.user == event2.user:
return True
# IP 관련성
if event1.source_ip and event2.source_ip and event1.source_ip == event2.source_ip:
return True
return False
def _create_incident(self, rule: Dict, events: List[SecurityEvent]) -> CorrelatedIncident:
"""인시던트 생성"""
affected_assets = set()
attack_chain = []
for event in events:
if event.host:
affected_assets.add(f"host:{event.host}")
if event.user:
affected_assets.add(f"user:{event.user}")
attack_chain.append(f"{event.source_type}:{event.event_type}")
incident_id = hashlib.md5(
f"{rule['name']}-{events[0].timestamp}".encode()
).hexdigest()[:12]
return CorrelatedIncident(
incident_id=f"XDR-{incident_id}",
title=rule["name"],
severity=rule["severity"],
events=events,
attack_chain=attack_chain,
mitre_techniques=rule.get("mitre", []),
affected_assets=affected_assets,
recommendations=self._generate_recommendations(rule),
created_at=datetime.utcnow()
)
def _generate_recommendations(self, rule: Dict) -> List[str]:
"""인시던트 유형별 대응 권장사항 생성"""
recommendations = []
if "Phishing" in rule["name"]:
recommendations.extend([
"의심스러운 이메일 발신자 차단",
"영향받은 엔드포인트 격리",
"사용자 비밀번호 재설정",
"조직 내 유사 이메일 검색 및 삭제"
])
elif "Credential" in rule["name"]:
recommendations.extend([
"영향받은 계정 즉시 비활성화",
"모든 세션 강제 종료",
"측면 이동 대상 시스템 점검",
"특권 계정 감사"
])
elif "Exfiltration" in rule["name"]:
recommendations.extend([
"네트워크 연결 즉시 차단",
"유출 데이터 범위 파악",
"법무팀/경영진 에스컬레이션",
"포렌식 데이터 보존"
])
return recommendations
def query_events(self, query: Dict, time_range_hours: int = 24) -> List[SecurityEvent]:
"""이벤트 검색 쿼리"""
cutoff = datetime.utcnow() - timedelta(hours=time_range_hours)
results = []
for event in self.events:
if event.timestamp < cutoff:
continue
match = True
for key, value in query.items():
if hasattr(event, key):
if getattr(event, key) != value:
match = False
break
elif key in event.raw_data:
if event.raw_data[key] != value:
match = False
break
if match:
results.append(event)
return results
# 사용 예시
if __name__ == "__main__":
xdr = XDRCorrelationEngine()
# 시뮬레이션: 피싱 공격 체인
base_time = datetime.utcnow()
# 1. 의심스러운 이메일 수신
xdr.ingest_event(SecurityEvent(
timestamp=base_time,
source_type="email",
event_type="suspicious_attachment",
severity="medium",
user="john.doe@company.com",
email_subject="Invoice_2024.xlsm",
raw_data={"attachment_type": "xlsm", "sender": "attacker@malicious.com"}
))
# 2. Outlook에서 프로세스 생성
xdr.ingest_event(SecurityEvent(
timestamp=base_time + timedelta(minutes=5),
source_type="endpoint",
event_type="process_create",
severity="medium",
host="WORKSTATION-001",
user="john.doe@company.com",
process_name="excel.exe",
raw_data={"parent_name": "outlook.exe", "cmdline": "excel.exe Invoice_2024.xlsm"}
))
# 3. PowerShell 실행 (매크로)
xdr.ingest_event(SecurityEvent(
timestamp=base_time + timedelta(minutes=6),
source_type="endpoint",
event_type="process_create",
severity="high",
host="WORKSTATION-001",
user="john.doe@company.com",
process_name="powershell.exe",
raw_data={"parent_name": "excel.exe", "cmdline": "powershell -enc ..."}
))
print(f"\n총 인시던트: {len(xdr.incidents)}")
// XDR 위협 헌팅 쿼리 - KQL (Kusto Query Language)
// Microsoft Defender XDR, Azure Sentinel 등에서 사용
// ==============================================
// 1. 크로스-레이어 피싱 공격 체인 탐지
// ==============================================
// 이메일 → 엔드포인트 → 네트워크 상관 분석
let timeWindow = 1h;
let suspiciousEmails = EmailEvents
| where Timestamp > ago(timeWindow)
| where AttachmentCount > 0
| where ThreatTypes has_any ("Phish", "Malware")
| project EmailTimestamp=Timestamp, RecipientEmailAddress,
SenderFromAddress, Subject, AttachmentCount;
let endpointActivity = DeviceProcessEvents
| where Timestamp > ago(timeWindow)
| where InitiatingProcessFileName in~
("outlook.exe", "thunderbird.exe", "excel.exe", "word.exe")
| where FileName in~
("powershell.exe", "cmd.exe", "wscript.exe", "mshta.exe")
| project ProcessTimestamp=Timestamp, DeviceName, AccountName,
FileName, ProcessCommandLine, InitiatingProcessFileName;
suspiciousEmails
| join kind=inner (endpointActivity)
on $left.RecipientEmailAddress == $right.AccountName
| where ProcessTimestamp between (EmailTimestamp .. (EmailTimestamp + timeWindow))
| project EmailTimestamp, ProcessTimestamp, RecipientEmailAddress,
Subject, DeviceName, FileName, ProcessCommandLine
| extend AttackChain = "Email → Endpoint Execution"
// ==============================================
// 2. 자격증명 탈취 후 측면 이동 탐지
// ==============================================
let credentialTheft = DeviceProcessEvents
| where Timestamp > ago(24h)
| where FileName =~ "lsass.exe" or
ProcessCommandLine has_any ("mimikatz", "sekurlsa", "procdump")
| project CredTheftTime=Timestamp, DeviceName, AccountName;
let lateralMovement = DeviceNetworkEvents
| where Timestamp > ago(24h)
| where RemotePort in (445, 135, 3389, 5985) // SMB, RPC, RDP, WinRM
| where RemoteIPType == "Private"
| summarize ConnectionCount=count(),
TargetHosts=make_set(RemoteIP)
by DeviceName, AccountName, bin(Timestamp, 1h);
credentialTheft
| join kind=inner (lateralMovement)
on DeviceName, AccountName
| where Timestamp > CredTheftTime
| project CredTheftTime, Timestamp, DeviceName, AccountName,
ConnectionCount, TargetHosts
| extend AttackStage = "Post-Credential Theft Lateral Movement"
| where ConnectionCount > 5
// ==============================================
// 3. C2 비콘 통신 패턴 탐지
// ==============================================
let beaconDetection = DeviceNetworkEvents
| where Timestamp > ago(24h)
| where RemoteIPType == "Public"
| summarize
ConnectionCount = count(),
AvgInterval = avg(datetime_diff('second', Timestamp,
prev(Timestamp, 1) over (partition by DeviceName,
RemoteIP order by Timestamp))),
IntervalStdDev = stdev(datetime_diff('second', Timestamp,
prev(Timestamp, 1) over (partition by DeviceName,
RemoteIP order by Timestamp))),
FirstSeen = min(Timestamp),
LastSeen = max(Timestamp),
TotalBytes = sum(SentBytes + ReceivedBytes)
by DeviceName, RemoteIP, RemotePort, InitiatingProcessFileName
| where ConnectionCount > 10
| where IntervalStdDev < 60 // 규칙적인 간격 (편차 60초 미만)
| where AvgInterval between (30 .. 3600) // 30초 ~ 1시간 간격
| extend BeaconScore = 100 - (IntervalStdDev / AvgInterval * 100)
| where BeaconScore > 70
| project DeviceName, RemoteIP, RemotePort, InitiatingProcessFileName,
ConnectionCount, AvgInterval, BeaconScore, FirstSeen, LastSeen
// ==============================================
// 4. 클라우드 서비스로 데이터 유출 탐지
// ==============================================
let cloudUploadDomains = dynamic([
"dropbox.com", "drive.google.com", "onedrive.live.com",
"mega.nz", "wetransfer.com", "box.com"
]);
let largeFileAccess = DeviceFileEvents
| where Timestamp > ago(4h)
| where ActionType == "FileRead"
| where FolderPath has_any ("Documents", "Desktop", "Downloads",
"Confidential", "Finance", "HR")
| summarize FileCount=count(), TotalSize=sum(FileSize)
by DeviceName, AccountName, bin(Timestamp, 15m)
| where FileCount > 50 or TotalSize > 100000000; // 50+ files or 100MB+
let cloudUploads = DeviceNetworkEvents
| where Timestamp > ago(4h)
| where RemoteUrl has_any (cloudUploadDomains)
| where SentBytes > 1000000 // 1MB+
| summarize UploadCount=count(), TotalUploaded=sum(SentBytes)
by DeviceName, AccountName, RemoteUrl, bin(Timestamp, 15m);
largeFileAccess
| join kind=inner (cloudUploads)
on DeviceName, AccountName
| where Timestamp1 between ((Timestamp - 15m) .. (Timestamp + 15m))
| project Timestamp, DeviceName, AccountName, FileCount, TotalSize,
RemoteUrl, TotalUploaded
| extend AlertTitle = "Potential Data Exfiltration to Cloud Storage",
Severity = "Critical"
// ==============================================
// 5. 통합 인시던트 뷰 - 공격 타임라인
// ==============================================
let incidentId = "INC-2024-001";
let targetUser = "john.doe@company.com";
let timeRange = 7d;
union
(EmailEvents
| where RecipientEmailAddress =~ targetUser
| where Timestamp > ago(timeRange)
| project Timestamp, Source="Email",
Activity=strcat("Received: ", Subject),
Details=strcat("From: ", SenderFromAddress)),
(DeviceProcessEvents
| where AccountName =~ targetUser
| where Timestamp > ago(timeRange)
| project Timestamp, Source="Endpoint",
Activity=strcat("Process: ", FileName),
Details=ProcessCommandLine),
(DeviceNetworkEvents
| where InitiatingProcessAccountName =~ targetUser
| where Timestamp > ago(timeRange)
| project Timestamp, Source="Network",
Activity=strcat("Connection to ", RemoteIP),
Details=strcat("Port: ", RemotePort)),
(IdentityLogonEvents
| where AccountUpn =~ targetUser
| where Timestamp > ago(timeRange)
| project Timestamp, Source="Identity",
Activity=strcat("Logon: ", LogonType),
Details=strcat("From: ", DeviceName))
| order by Timestamp asc
| extend IncidentId = incidentId
| project-reorder Timestamp, Source, Activity, Details
// XDR API 연동 및 자동화된 대응 - Node.js
// 여러 보안 도구를 통합하여 XDR 기능 구현
const axios = require('axios');
const { EventEmitter } = require('events');
class XDRPlatform extends EventEmitter {
constructor(config) {
super();
this.config = config;
// 연동된 보안 솔루션 클라이언트
this.edr = new EDRClient(config.edr);
this.ndr = new NDRClient(config.ndr);
this.email = new EmailSecurityClient(config.emailSecurity);
this.casb = new CASBClient(config.casb);
this.identity = new IdentityClient(config.identity);
// 통합 데이터 레이크
this.dataLake = [];
// 상관 분석 엔진
this.correlationEngine = new CorrelationEngine();
}
// 모든 소스에서 이벤트 수집 시작
async startCollection() {
console.log('Starting XDR event collection from all sources...');
// 각 소스에서 이벤트 스트림 설정
this.edr.on('event', (event) => this.ingestEvent('endpoint', event));
this.ndr.on('event', (event) => this.ingestEvent('network', event));
this.email.on('event', (event) => this.ingestEvent('email', event));
this.casb.on('event', (event) => this.ingestEvent('cloud', event));
this.identity.on('event', (event) => this.ingestEvent('identity', event));
// 연결 시작
await Promise.all([
this.edr.connect(),
this.ndr.connect(),
this.email.connect(),
this.casb.connect(),
this.identity.connect()
]);
console.log('All security sources connected');
}
// 이벤트 정규화 및 수집
ingestEvent(sourceType, rawEvent) {
const normalizedEvent = this.normalizeEvent(sourceType, rawEvent);
this.dataLake.push(normalizedEvent);
// 실시간 상관 분석
const incidents = this.correlationEngine.analyze(normalizedEvent, this.dataLake);
for (const incident of incidents) {
this.emit('incident', incident);
this.handleIncident(incident);
}
}
// 이벤트 정규화 - 통합 스키마로 변환
normalizeEvent(sourceType, raw) {
const normalized = {
id: this.generateEventId(),
timestamp: new Date(raw.timestamp || Date.now()).toISOString(),
sourceType: sourceType,
eventType: this.mapEventType(sourceType, raw),
severity: raw.severity || 'info',
// 공통 엔티티
entities: {
host: raw.hostname || raw.deviceName || raw.computerName,
user: raw.user || raw.accountName || raw.userPrincipalName,
sourceIp: raw.sourceIp || raw.srcIp || raw.clientIp,
destIp: raw.destIp || raw.remoteIp || raw.targetIp,
process: raw.processName || raw.fileName,
fileHash: raw.sha256 || raw.md5 || raw.fileHash,
url: raw.url || raw.requestUrl,
domain: raw.domain || raw.remoteDomain
},
// 원본 데이터 보존
rawData: raw,
// 메타데이터
meta: {
source: sourceType,
ingestedAt: new Date().toISOString(),
enriched: false
}
};
// 위협 인텔리전스 강화
this.enrichEvent(normalized);
return normalized;
}
mapEventType(sourceType, raw) {
const mappings = {
endpoint: {
'ProcessCreate': 'process_create',
'FileCreate': 'file_create',
'NetworkConnect': 'network_connection',
'RegistryModification': 'registry_modify'
},
network: {
'dns': 'dns_query',
'http': 'http_request',
'flow': 'network_flow',
'alert': 'ids_alert'
},
email: {
'received': 'email_received',
'attachment': 'attachment_detected',
'link_click': 'link_clicked'
},
identity: {
'logon': 'auth_logon',
'logoff': 'auth_logoff',
'failed': 'auth_failure',
'mfa': 'mfa_challenge'
}
};
const sourceMap = mappings[sourceType] || {};
return sourceMap[raw.eventType] || raw.eventType || 'unknown';
}
async enrichEvent(event) {
// 위협 인텔리전스 조회
const iocs = [];
if (event.entities.fileHash) {
const hashReputation = await this.checkHashReputation(event.entities.fileHash);
if (hashReputation.malicious) {
iocs.push({ type: 'hash', value: event.entities.fileHash, ...hashReputation });
}
}
if (event.entities.destIp) {
const ipReputation = await this.checkIPReputation(event.entities.destIp);
if (ipReputation.malicious) {
iocs.push({ type: 'ip', value: event.entities.destIp, ...ipReputation });
}
}
if (event.entities.domain) {
const domainReputation = await this.checkDomainReputation(event.entities.domain);
if (domainReputation.malicious) {
iocs.push({ type: 'domain', value: event.entities.domain, ...domainReputation });
}
}
event.threatIntel = {
iocs: iocs,
enrichedAt: new Date().toISOString()
};
event.meta.enriched = true;
}
async checkHashReputation(hash) {
// VirusTotal, Hybrid Analysis 등 연동
try {
const response = await axios.get(
`${this.config.threatIntel.url}/hash/${hash}`,
{ headers: { 'Authorization': `Bearer ${this.config.threatIntel.apiKey}` }}
);
return response.data;
} catch {
return { malicious: false };
}
}
async checkIPReputation(ip) {
// AbuseIPDB, GreyNoise 등 연동
try {
const response = await axios.get(
`${this.config.threatIntel.url}/ip/${ip}`,
{ headers: { 'Authorization': `Bearer ${this.config.threatIntel.apiKey}` }}
);
return response.data;
} catch {
return { malicious: false };
}
}
async checkDomainReputation(domain) {
try {
const response = await axios.get(
`${this.config.threatIntel.url}/domain/${domain}`,
{ headers: { 'Authorization': `Bearer ${this.config.threatIntel.apiKey}` }}
);
return response.data;
} catch {
return { malicious: false };
}
}
// 인시던트 자동 대응
async handleIncident(incident) {
console.log(`\n[INCIDENT] ${incident.id}: ${incident.title}`);
console.log(` Severity: ${incident.severity}`);
console.log(` Affected: ${Array.from(incident.affectedAssets).join(', ')}`);
// 심각도별 자동 대응
if (incident.severity === 'critical') {
await this.executeCriticalResponse(incident);
} else if (incident.severity === 'high') {
await this.executeHighResponse(incident);
}
// SOAR/Ticketing 시스템 연동
await this.createTicket(incident);
// 알림 발송
await this.sendAlerts(incident);
}
async executeCriticalResponse(incident) {
const actions = [];
for (const asset of incident.affectedAssets) {
const [type, value] = asset.split(':');
if (type === 'host') {
// 엔드포인트 격리
console.log(` [ACTION] Isolating endpoint: ${value}`);
actions.push(this.edr.isolateEndpoint(value));
// 네트워크 차단
console.log(` [ACTION] Blocking network for: ${value}`);
actions.push(this.ndr.blockHost(value));
}
if (type === 'user') {
// 계정 비활성화
console.log(` [ACTION] Disabling account: ${value}`);
actions.push(this.identity.disableAccount(value));
// 모든 세션 종료
console.log(` [ACTION] Revoking sessions for: ${value}`);
actions.push(this.identity.revokeSessions(value));
}
}
const results = await Promise.allSettled(actions);
incident.responseActions = results.map((r, i) => ({
action: actions[i],
status: r.status,
result: r.value || r.reason
}));
}
async executeHighResponse(incident) {
// 고위험: 모니터링 강화 + 승인 후 조치
for (const asset of incident.affectedAssets) {
const [type, value] = asset.split(':');
if (type === 'host') {
console.log(` [ACTION] Enhanced monitoring for: ${value}`);
await this.edr.enableEnhancedMonitoring(value);
}
if (type === 'user') {
console.log(` [ACTION] Requiring MFA for: ${value}`);
await this.identity.requireMFA(value);
}
}
}
async createTicket(incident) {
// ServiceNow, Jira 등 연동
const ticket = {
title: `[XDR] ${incident.id}: ${incident.title}`,
severity: incident.severity,
description: this.formatIncidentDescription(incident),
assignee: this.config.ticketing.defaultAssignee,
labels: ['xdr', 'security-incident', incident.severity]
};
console.log(` [TICKET] Created: ${ticket.title}`);
}
async sendAlerts(incident) {
// Slack, Email, PagerDuty 연동
if (incident.severity === 'critical') {
// 긴급 알림
console.log(` [ALERT] Paging on-call: ${incident.title}`);
}
}
formatIncidentDescription(incident) {
return `
## XDR Incident Report
**ID:** ${incident.id}
**Severity:** ${incident.severity}
**Created:** ${incident.createdAt}
### Attack Chain
${incident.attackChain.join(' → ')}
### MITRE ATT&CK
${incident.mitreTechniques.join(', ')}
### Affected Assets
${Array.from(incident.affectedAssets).join('\n')}
### Events
${incident.events.length} related events detected
### Recommendations
${incident.recommendations.join('\n')}
`;
}
generateEventId() {
return `evt-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
}
// 클라이언트 클래스 (간략화)
class EDRClient extends EventEmitter {
constructor(config) { super(); this.config = config; }
async connect() { console.log('EDR connected'); }
async isolateEndpoint(host) { return { success: true, host }; }
async enableEnhancedMonitoring(host) { return { success: true, host }; }
}
class NDRClient extends EventEmitter {
constructor(config) { super(); this.config = config; }
async connect() { console.log('NDR connected'); }
async blockHost(host) { return { success: true, host }; }
}
class EmailSecurityClient extends EventEmitter {
constructor(config) { super(); this.config = config; }
async connect() { console.log('Email Security connected'); }
}
class CASBClient extends EventEmitter {
constructor(config) { super(); this.config = config; }
async connect() { console.log('CASB connected'); }
}
class IdentityClient extends EventEmitter {
constructor(config) { super(); this.config = config; }
async connect() { console.log('Identity Provider connected'); }
async disableAccount(user) { return { success: true, user }; }
async revokeSessions(user) { return { success: true, user }; }
async requireMFA(user) { return { success: true, user }; }
}
class CorrelationEngine {
analyze(event, dataLake) {
// 상관 분석 로직 (앞선 Python 예제 참조)
return [];
}
}
// 사용 예시
const xdr = new XDRPlatform({
edr: { url: 'https://edr.company.com/api' },
ndr: { url: 'https://ndr.company.com/api' },
emailSecurity: { url: 'https://email-sec.company.com/api' },
casb: { url: 'https://casb.company.com/api' },
identity: { url: 'https://identity.company.com/api' },
threatIntel: { url: 'https://threatintel.company.com/api', apiKey: 'xxx' },
ticketing: { defaultAssignee: 'soc-team' }
});
xdr.on('incident', (incident) => {
console.log(`New incident detected: ${incident.id}`);
});
xdr.startCollection();
"현재 EDR, NDR, SIEM을 별도로 운영하면서 알림 피로가 심각합니다. 지난달 한 건의 피싱 공격을 추적하는데 세 개 콘솔을 오가며 6시간이 걸렸습니다. XDR을 도입하면 크로스-레이어 상관 분석으로 공격 체인이 자동으로 연결되고, MTTD(평균 탐지 시간)와 MTTR(평균 대응 시간)을 대폭 줄일 수 있습니다."
"XDR에서 자동으로 상관 분석된 인시던트입니다. 공격 체인을 보면, 마케팅팀 직원이 피싱 이메일 첨부파일을 열었고, Excel 매크로가 PowerShell을 실행했으며, 이후 내부 네트워크로 SMB 연결을 시도했습니다. 이메일, 엔드포인트, 네트워크 이벤트가 하나의 인시던트로 묶여서 전체 그림을 한눈에 파악할 수 있었습니다."
"Native XDR과 Open XDR 중 선택해야 합니다. 우리는 이미 CrowdStrike EDR, Palo Alto 방화벽, Microsoft 365를 사용 중이라 Open XDR이 기존 투자를 보존하면서 통합할 수 있습니다. 단, API 연동 품질과 상관 분석 깊이를 PoC에서 반드시 검증해야 합니다."
많은 벤더가 기존 제품에 "XDR" 라벨만 붙이고 있습니다. 진정한 XDR은 다중 소스 데이터 통합, 자동 상관 분석, 통합 대응이 핵심입니다. PoC에서 실제 크로스-레이어 탐지 시나리오를 테스트하세요.
XDR을 도입해도 기존 도구의 데이터를 통합하지 않으면 효과가 없습니다. 모든 관련 로그 소스(엔드포인트, 네트워크, 클라우드, 이메일, ID)를 XDR에 연결하고 정규화된 스키마로 통합해야 합니다.
기본 규칙만으로는 환경 특성을 반영한 탐지가 어렵습니다. 조직의 정상 행위 베이스라인을 학습시키고, 커스텀 탐지 규칙을 지속적으로 개발하며, 오탐률을 모니터링하여 튜닝하세요.
단계적 통합(핵심 소스부터), 위협 헌팅 역량 개발, SOAR 연동으로 대응 자동화, 정기적인 탐지 규칙 개발 사이클, 벤더 종속성 관리 전략을 수립하세요. XDR은 도구가 아닌 통합된 보안 운영 모델입니다.