🏗️ 아키텍처

Bulkhead

Bulkhead Pattern, 격벽 패턴

선박의 격벽(Bulkhead)처럼 시스템 리소스를 격리하여 한 부분의 장애가 전체 시스템으로 확산되는 것을 방지하는 분산 시스템 회복탄력성(Resilience) 패턴입니다.

📖 상세 설명

Bulkhead 패턴의 이름은 선박 설계에서 유래했습니다. 선박은 격벽으로 여러 구획을 나누어 한 구획에 물이 차더라도 다른 구획으로 침수가 확산되지 않도록 설계합니다. 소프트웨어에서도 동일한 원리를 적용하여 리소스(스레드 풀, 커넥션 풀, 메모리 등)를 격리합니다.

구현 방식에는 크게 두 가지가 있습니다. 첫째, 스레드 풀 격리는 각 서비스 호출에 별도의 스레드 풀을 할당합니다. 외부 API A가 느려져도 API B 호출용 스레드에는 영향이 없습니다. 둘째, 세마포어 격리는 동시 실행 개수를 제한합니다. 더 가볍지만 타임아웃 제어가 어렵습니다.

적용 시나리오를 예로 들면, 결제 서비스가 3개의 외부 시스템(PG사 A, PG사 B, 포인트 시스템)을 호출한다고 가정합니다. PG사 A가 응답 지연을 일으키면 해당 스레드 풀만 영향을 받고, PG사 B와 포인트 시스템 호출은 정상 동작합니다.

Circuit Breaker와의 차이점은 다음과 같습니다. Bulkhead는 "리소스 격리"에 집중하고, Circuit Breaker는 "장애 탐지 및 빠른 실패"에 집중합니다. 실무에서는 두 패턴을 함께 사용하여 더 강력한 회복탄력성을 확보합니다.

Netflix의 Hystrix가 Bulkhead 패턴을 대중화시켰고, 현재는 Resilience4j가 Java 생태계의 표준입니다. Spring Cloud, Kubernetes, Istio 등에서도 다양한 형태로 Bulkhead를 지원합니다.

💻 코드 예제

// Bulkhead 패턴 예제 - Java + Resilience4j
import io.github.resilience4j.bulkhead.*;
import io.github.resilience4j.bulkhead.ThreadPoolBulkhead;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;

public class PaymentService {

    // 스레드 풀 Bulkhead - PG사 A 전용
    private final ThreadPoolBulkhead pgABulkhead = ThreadPoolBulkhead.of(
        "pgA-bulkhead",
        ThreadPoolBulkheadConfig.custom()
            .maxThreadPoolSize(10)        // 최대 10개 스레드
            .coreThreadPoolSize(5)        // 기본 5개 스레드
            .queueCapacity(20)            // 대기열 20개
            .keepAliveDuration(Duration.ofSeconds(30))
            .build()
    );

    // 스레드 풀 Bulkhead - PG사 B 전용
    private final ThreadPoolBulkhead pgBBulkhead = ThreadPoolBulkhead.of(
        "pgB-bulkhead",
        ThreadPoolBulkheadConfig.custom()
            .maxThreadPoolSize(10)
            .coreThreadPoolSize(5)
            .queueCapacity(20)
            .build()
    );

    // 세마포어 Bulkhead - 포인트 시스템 전용
    private final Bulkhead pointsBulkhead = Bulkhead.of(
        "points-bulkhead",
        BulkheadConfig.custom()
            .maxConcurrentCalls(15)       // 동시 15개 호출 제한
            .maxWaitDuration(Duration.ofMillis(500))  // 500ms 대기
            .build()
    );

    /**
     * PG사 A 결제 - 스레드 풀 격리
     * PG사 A가 느려져도 다른 결제 수단에 영향 없음
     */
    public CompletableFuture payWithPgA(PaymentRequest request) {
        return pgABulkhead.executeSupplier(() -> {
            // PG사 A API 호출
            return callPgAApi(request);
        });
    }

    /**
     * PG사 B 결제 - 별도 스레드 풀
     */
    public CompletableFuture payWithPgB(PaymentRequest request) {
        return pgBBulkhead.executeSupplier(() -> {
            return callPgBApi(request);
        });
    }

    /**
     * 포인트 사용 - 세마포어 격리
     * 더 가볍고 빠르지만 타임아웃 제어 어려움
     */
    public PointResult usePoints(String userId, int points) {
        return Bulkhead.decorateSupplier(pointsBulkhead, () -> {
            return callPointsApi(userId, points);
        }).get();
    }

    /**
     * 모니터링 - Bulkhead 상태 확인
     */
    public void printBulkheadMetrics() {
        // 스레드 풀 Bulkhead 메트릭
        ThreadPoolBulkhead.Metrics tpMetrics = pgABulkhead.getMetrics();
        System.out.println("PG-A Bulkhead:");
        System.out.println("  - Available: " + tpMetrics.getRemainingQueueCapacity());
        System.out.println("  - Active: " + tpMetrics.getActiveThreadCount());

        // 세마포어 Bulkhead 메트릭
        Bulkhead.Metrics semMetrics = pointsBulkhead.getMetrics();
        System.out.println("Points Bulkhead:");
        System.out.println("  - Available: " + semMetrics.getAvailableConcurrentCalls());
        System.out.println("  - Max: " + semMetrics.getMaxAllowedConcurrentCalls());
    }

    private PaymentResult callPgAApi(PaymentRequest req) { /* API 호출 */ }
    private PaymentResult callPgBApi(PaymentRequest req) { /* API 호출 */ }
    private PointResult callPointsApi(String userId, int points) { /* API 호출 */ }
}


// Spring Boot + Resilience4j 어노테이션 방식
@Service
public class OrderService {

    @Bulkhead(name = "inventory", type = Bulkhead.Type.THREADPOOL)
    public CompletableFuture checkInventory(String productId) {
        return CompletableFuture.supplyAsync(() -> {
            return inventoryClient.getStock(productId);
        });
    }

    @Bulkhead(name = "shipping", type = Bulkhead.Type.SEMAPHORE,
              fallbackMethod = "shippingFallback")
    public ShippingCost calculateShipping(Address address) {
        return shippingClient.calculate(address);
    }

    // Bulkhead 가득 찼을 때 fallback
    public ShippingCost shippingFallback(Address address, BulkheadFullException e) {
        return ShippingCost.estimated();  // 예상 배송비 반환
    }
}
# Bulkhead 패턴 예제 - Python + asyncio
import asyncio
from asyncio import Semaphore
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Callable, TypeVar, Generic
from functools import wraps
import time

T = TypeVar('T')


class BulkheadFullError(Exception):
    """Bulkhead 용량 초과 에러"""
    pass


@dataclass
class BulkheadConfig:
    """Bulkhead 설정"""
    max_concurrent_calls: int = 10
    max_wait_time: float = 0.5  # seconds


class SemaphoreBulkhead:
    """세마포어 기반 Bulkhead"""

    def __init__(self, name: str, config: BulkheadConfig = None):
        self.name = name
        self.config = config or BulkheadConfig()
        self._semaphore = Semaphore(self.config.max_concurrent_calls)
        self._active_calls = 0

    async def execute(self, func: Callable, *args, **kwargs):
        """Bulkhead로 보호된 함수 실행"""
        try:
            # 대기 시간 내에 세마포어 획득 시도
            acquired = await asyncio.wait_for(
                self._semaphore.acquire(),
                timeout=self.config.max_wait_time
            )
        except asyncio.TimeoutError:
            raise BulkheadFullError(
                f"Bulkhead '{self.name}' full. "
                f"Max concurrent: {self.config.max_concurrent_calls}"
            )

        self._active_calls += 1
        try:
            if asyncio.iscoroutinefunction(func):
                return await func(*args, **kwargs)
            else:
                return func(*args, **kwargs)
        finally:
            self._active_calls -= 1
            self._semaphore.release()

    @property
    def metrics(self):
        return {
            "active_calls": self._active_calls,
            "available": self.config.max_concurrent_calls - self._active_calls,
            "max_concurrent": self.config.max_concurrent_calls
        }


class ThreadPoolBulkhead:
    """스레드 풀 기반 Bulkhead"""

    def __init__(self, name: str, max_workers: int = 10, queue_size: int = 20):
        self.name = name
        self.max_workers = max_workers
        self.queue_size = queue_size
        self._executor = ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=f"bulkhead-{name}"
        )
        self._queue_semaphore = Semaphore(queue_size)

    async def execute(self, func: Callable, *args, **kwargs):
        """스레드 풀에서 함수 실행"""
        if not self._queue_semaphore.locked() or self._queue_semaphore._value > 0:
            await self._queue_semaphore.acquire()
            try:
                loop = asyncio.get_event_loop()
                return await loop.run_in_executor(
                    self._executor,
                    lambda: func(*args, **kwargs)
                )
            finally:
                self._queue_semaphore.release()
        else:
            raise BulkheadFullError(f"Bulkhead '{self.name}' queue full")


def bulkhead(bulkhead_instance):
    """Bulkhead 데코레이터"""
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            return await bulkhead_instance.execute(func, *args, **kwargs)
        return wrapper
    return decorator


# 사용 예시
class PaymentService:
    """결제 서비스 - 외부 시스템별 Bulkhead 적용"""

    def __init__(self):
        # PG사별 격리된 Bulkhead
        self.pg_a_bulkhead = SemaphoreBulkhead(
            "pg-a",
            BulkheadConfig(max_concurrent_calls=10, max_wait_time=0.5)
        )
        self.pg_b_bulkhead = SemaphoreBulkhead(
            "pg-b",
            BulkheadConfig(max_concurrent_calls=10, max_wait_time=0.5)
        )
        self.points_bulkhead = SemaphoreBulkhead(
            "points",
            BulkheadConfig(max_concurrent_calls=15, max_wait_time=0.3)
        )

    async def pay_with_pg_a(self, order_id: str, amount: int):
        """PG사 A 결제 - 별도 Bulkhead"""
        return await self.pg_a_bulkhead.execute(
            self._call_pg_a_api, order_id, amount
        )

    async def pay_with_pg_b(self, order_id: str, amount: int):
        """PG사 B 결제 - 별도 Bulkhead"""
        return await self.pg_b_bulkhead.execute(
            self._call_pg_b_api, order_id, amount
        )

    async def use_points(self, user_id: str, points: int):
        """포인트 사용 - 별도 Bulkhead"""
        try:
            return await self.points_bulkhead.execute(
                self._call_points_api, user_id, points
            )
        except BulkheadFullError:
            # Fallback: 포인트 사용 실패해도 결제는 진행
            print(f"Points bulkhead full, skipping points for {user_id}")
            return {"success": False, "reason": "bulkhead_full"}

    async def _call_pg_a_api(self, order_id: str, amount: int):
        await asyncio.sleep(0.1)  # API 호출 시뮬레이션
        return {"pg": "A", "order_id": order_id, "status": "success"}

    async def _call_pg_b_api(self, order_id: str, amount: int):
        await asyncio.sleep(0.1)
        return {"pg": "B", "order_id": order_id, "status": "success"}

    async def _call_points_api(self, user_id: str, points: int):
        await asyncio.sleep(0.05)
        return {"user_id": user_id, "points_used": points}

    def print_metrics(self):
        """Bulkhead 메트릭 출력"""
        print(f"PG-A: {self.pg_a_bulkhead.metrics}")
        print(f"PG-B: {self.pg_b_bulkhead.metrics}")
        print(f"Points: {self.points_bulkhead.metrics}")


# 테스트
async def main():
    service = PaymentService()

    # 동시에 여러 결제 요청 (PG-A만 과부하)
    tasks = []
    for i in range(20):
        tasks.append(service.pay_with_pg_a(f"order-{i}", 10000))

    # PG-A가 느려져도 PG-B는 정상 동작
    tasks.append(service.pay_with_pg_b("order-special", 50000))

    results = await asyncio.gather(*tasks, return_exceptions=True)

    # 결과 확인
    success = sum(1 for r in results if not isinstance(r, Exception))
    failed = sum(1 for r in results if isinstance(r, BulkheadFullError))
    print(f"Success: {success}, Bulkhead Full: {failed}")

    service.print_metrics()


if __name__ == "__main__":
    asyncio.run(main())
// Bulkhead 패턴 예제 - Go
package main

import (
    "context"
    "errors"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

var ErrBulkheadFull = errors.New("bulkhead is full")

// BulkheadConfig 설정
type BulkheadConfig struct {
    MaxConcurrent int
    MaxWaitTime   time.Duration
}

// Bulkhead 세마포어 기반 격벽
type Bulkhead struct {
    name        string
    config      BulkheadConfig
    semaphore   chan struct{}
    activeCalls int64
}

// NewBulkhead 생성자
func NewBulkhead(name string, config BulkheadConfig) *Bulkhead {
    return &Bulkhead{
        name:      name,
        config:    config,
        semaphore: make(chan struct{}, config.MaxConcurrent),
    }
}

// Execute Bulkhead로 보호된 함수 실행
func (b *Bulkhead) Execute(ctx context.Context, fn func() (interface{}, error)) (interface{}, error) {
    // 타임아웃 컨텍스트
    waitCtx, cancel := context.WithTimeout(ctx, b.config.MaxWaitTime)
    defer cancel()

    // 세마포어 획득 시도
    select {
    case b.semaphore <- struct{}{}:
        // 획득 성공
        atomic.AddInt64(&b.activeCalls, 1)
        defer func() {
            atomic.AddInt64(&b.activeCalls, -1)
            <-b.semaphore
        }()
        return fn()

    case <-waitCtx.Done():
        // 타임아웃 - Bulkhead 가득 참
        return nil, fmt.Errorf("%w: %s (max: %d)",
            ErrBulkheadFull, b.name, b.config.MaxConcurrent)
    }
}

// Metrics 현재 상태
func (b *Bulkhead) Metrics() map[string]int64 {
    active := atomic.LoadInt64(&b.activeCalls)
    return map[string]int64{
        "active":    active,
        "available": int64(b.config.MaxConcurrent) - active,
        "max":       int64(b.config.MaxConcurrent),
    }
}

// PaymentService 결제 서비스
type PaymentService struct {
    pgABulkhead    *Bulkhead
    pgBBulkhead    *Bulkhead
    pointsBulkhead *Bulkhead
}

// NewPaymentService 생성자
func NewPaymentService() *PaymentService {
    return &PaymentService{
        pgABulkhead: NewBulkhead("pg-a", BulkheadConfig{
            MaxConcurrent: 10,
            MaxWaitTime:   500 * time.Millisecond,
        }),
        pgBBulkhead: NewBulkhead("pg-b", BulkheadConfig{
            MaxConcurrent: 10,
            MaxWaitTime:   500 * time.Millisecond,
        }),
        pointsBulkhead: NewBulkhead("points", BulkheadConfig{
            MaxConcurrent: 15,
            MaxWaitTime:   300 * time.Millisecond,
        }),
    }
}

// PayWithPgA PG사 A 결제
func (s *PaymentService) PayWithPgA(ctx context.Context, orderID string, amount int) (string, error) {
    result, err := s.pgABulkhead.Execute(ctx, func() (interface{}, error) {
        // PG사 A API 호출 시뮬레이션
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("PG-A: %s paid %d", orderID, amount), nil
    })
    if err != nil {
        return "", err
    }
    return result.(string), nil
}

// PayWithPgB PG사 B 결제
func (s *PaymentService) PayWithPgB(ctx context.Context, orderID string, amount int) (string, error) {
    result, err := s.pgBBulkhead.Execute(ctx, func() (interface{}, error) {
        time.Sleep(100 * time.Millisecond)
        return fmt.Sprintf("PG-B: %s paid %d", orderID, amount), nil
    })
    if err != nil {
        return "", err
    }
    return result.(string), nil
}

// UsePoints 포인트 사용
func (s *PaymentService) UsePoints(ctx context.Context, userID string, points int) (string, error) {
    result, err := s.pointsBulkhead.Execute(ctx, func() (interface{}, error) {
        time.Sleep(50 * time.Millisecond)
        return fmt.Sprintf("Points: %s used %d", userID, points), nil
    })
    if err != nil {
        // Fallback: 포인트 실패해도 계속 진행
        if errors.Is(err, ErrBulkheadFull) {
            return "points_skipped", nil
        }
        return "", err
    }
    return result.(string), nil
}

// PrintMetrics 메트릭 출력
func (s *PaymentService) PrintMetrics() {
    fmt.Printf("PG-A Bulkhead: %v\n", s.pgABulkhead.Metrics())
    fmt.Printf("PG-B Bulkhead: %v\n", s.pgBBulkhead.Metrics())
    fmt.Printf("Points Bulkhead: %v\n", s.pointsBulkhead.Metrics())
}

func main() {
    service := NewPaymentService()
    ctx := context.Background()

    var wg sync.WaitGroup
    var successCount, failCount int64

    // PG-A에 20개 동시 요청 (Max 10개이므로 일부 실패)
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go func(orderNum int) {
            defer wg.Done()
            orderID := fmt.Sprintf("order-%d", orderNum)
            _, err := service.PayWithPgA(ctx, orderID, 10000)
            if err != nil {
                atomic.AddInt64(&failCount, 1)
                fmt.Printf("Failed: %s - %v\n", orderID, err)
            } else {
                atomic.AddInt64(&successCount, 1)
            }
        }(i)
    }

    // PG-B는 별도 Bulkhead이므로 영향 없음
    wg.Add(1)
    go func() {
        defer wg.Done()
        result, _ := service.PayWithPgB(ctx, "special-order", 50000)
        fmt.Printf("PG-B Result: %s\n", result)
    }()

    wg.Wait()

    fmt.Printf("\n=== Results ===\n")
    fmt.Printf("Success: %d, Failed: %d\n", successCount, failCount)
    service.PrintMetrics()
}

🗣️ 실무에서 이렇게 말하세요

💬 장애 대응 회의에서
"어제 PG사 A 장애 때 우리 결제 전체가 멈춘 건 Bulkhead가 없었기 때문입니다. 모든 외부 호출이 같은 스레드 풀을 공유해서 PG사 A 요청이 쌓이면서 다른 결제 수단까지 영향을 받았어요. 각 PG사별로 스레드 풀을 분리하는 Bulkhead 패턴을 적용해야 합니다."
💬 아키텍처 리뷰에서
"Circuit Breaker만으로는 부족합니다. Circuit이 열리기 전까지 느린 서비스가 스레드를 계속 점유하거든요. Bulkhead로 리소스를 격리하고, Circuit Breaker로 장애를 탐지해서 빠르게 실패시키는 조합이 필요합니다. Resilience4j에서 둘 다 쉽게 적용할 수 있어요."
💬 성능 최적화 논의에서
"Bulkhead 사이즈 설정이 중요합니다. 너무 작으면 정상 트래픽도 거부되고, 너무 크면 격리 효과가 없어요. 각 서비스의 평균 응답 시간과 예상 트래픽을 기반으로 Little's Law를 적용해서 적정 풀 사이즈를 계산해야 합니다."

⚠️ 주의사항 & 베스트 프랙티스

풀 사이즈 과소 설정

Bulkhead 용량이 너무 작으면 정상 트래픽도 거부됩니다. 예상 동시 호출 수, 평균 응답 시간, 피크 트래픽을 고려하여 설정하세요.

모니터링 부재

Bulkhead 메트릭을 모니터링하지 않으면 용량 부족을 인지하지 못합니다. 활성 호출 수, 거부율, 대기 시간을 대시보드에 표시하세요.

스레드 풀 과다 생성

서비스마다 스레드 풀을 만들면 전체 스레드 수가 폭증합니다. 중요도와 SLA에 따라 그룹핑하고, 세마포어 방식도 고려하세요.

Bulkhead 베스트 프랙티스

Circuit Breaker와 함께 사용, 적절한 Fallback 구현, 메트릭 기반 동적 조정, 스레드 풀과 세마포어 방식을 상황에 맞게 선택.

🔗 관련 용어

📚 더 배우기