🏗️ 아키텍처

Event Sourcing

이벤트 소싱

Event Sourcing은 상태 변경을 이벤트 스트림으로 저장하는 아키텍처 패턴입니다. 현재 상태 대신 발생한 모든 이벤트를 순서대로 기록하고, 이를 재생(replay)하여 상태를 복원합니다. 완전한 감사 추적과 시간 여행 디버깅이 가능합니다.

📖 상세 설명

Event Sourcing은 전통적인 CRUD 방식과 달리, 데이터의 현재 상태가 아닌 상태 변경 이벤트의 시퀀스를 저장합니다. 은행 원장이 잔액만 기록하는 것이 아니라 모든 입출금 내역을 기록하는 것과 같은 원리입니다.

전통적 방식 vs Event Sourcing

전통적 CRUD Event Sourcing
현재 상태만 저장 (balance: 10000) 모든 이벤트 저장 (입금 5000, 출금 3000, ...)
과거 상태 복원 불가 특정 시점의 상태로 복원 가능 (Time Travel)
변경 이력 추적 어려움 완전한 감사 추적 (Audit Trail)
데이터 손실 가능 이벤트는 불변, 삭제 없음

핵심 개념

Event Sourcing의 장점

💻 코드 예제

Domain Event 정의

// domain/events/AccountEvents.ts
interface DomainEvent {
  readonly eventId: string;
  readonly aggregateId: string;
  readonly version: number;
  readonly occurredAt: Date;
  readonly eventType: string;
}

export class AccountOpened implements DomainEvent {
  readonly eventType = 'AccountOpened';

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly occurredAt: Date,
    public readonly ownerId: string,
    public readonly initialBalance: number
  ) {}
}

export class MoneyDeposited implements DomainEvent {
  readonly eventType = 'MoneyDeposited';

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly occurredAt: Date,
    public readonly amount: number,
    public readonly description: string
  ) {}
}

export class MoneyWithdrawn implements DomainEvent {
  readonly eventType = 'MoneyWithdrawn';

  constructor(
    public readonly eventId: string,
    public readonly aggregateId: string,
    public readonly version: number,
    public readonly occurredAt: Date,
    public readonly amount: number,
    public readonly description: string
  ) {}
}

type AccountEvent = AccountOpened | MoneyDeposited | MoneyWithdrawn;

Event Sourced Aggregate

// domain/aggregates/Account.ts
import { AccountOpened, MoneyDeposited, MoneyWithdrawn } from '../events/AccountEvents';

export class Account {
  private _id: string = '';
  private _ownerId: string = '';
  private _balance: number = 0;
  private _version: number = 0;
  private _uncommittedEvents: DomainEvent[] = [];

  private constructor() {}

  // Factory: 새 계좌 생성
  static open(ownerId: string, initialDeposit: number = 0): Account {
    const account = new Account();

    // 이벤트 발생 (상태 변경 아님)
    account.apply(new AccountOpened(
      crypto.randomUUID(),
      crypto.randomUUID(),
      1,
      new Date(),
      ownerId,
      initialDeposit
    ));

    return account;
  }

  // 이벤트 스트림에서 Aggregate 재구성
  static fromHistory(events: DomainEvent[]): Account {
    const account = new Account();
    for (const event of events) {
      account.applyFromHistory(event);
    }
    return account;
  }

  get id(): string { return this._id; }
  get balance(): number { return this._balance; }
  get version(): number { return this._version; }

  // 명령: 입금
  deposit(amount: number, description: string): void {
    if (amount <= 0) {
      throw new Error('Deposit amount must be positive');
    }

    this.apply(new MoneyDeposited(
      crypto.randomUUID(),
      this._id,
      this._version + 1,
      new Date(),
      amount,
      description
    ));
  }

  // 명령: 출금
  withdraw(amount: number, description: string): void {
    if (amount <= 0) {
      throw new Error('Withdrawal amount must be positive');
    }
    if (this._balance < amount) {
      throw new Error('Insufficient balance');
    }

    this.apply(new MoneyWithdrawn(
      crypto.randomUUID(),
      this._id,
      this._version + 1,
      new Date(),
      amount,
      description
    ));
  }

  // 이벤트 적용 (새 이벤트)
  private apply(event: DomainEvent): void {
    this.mutate(event);
    this._uncommittedEvents.push(event);
  }

  // 이벤트 적용 (히스토리 재생)
  private applyFromHistory(event: DomainEvent): void {
    this.mutate(event);
  }

  // 상태 변경 (이벤트 타입별 처리)
  private mutate(event: DomainEvent): void {
    if (event instanceof AccountOpened) {
      this._id = event.aggregateId;
      this._ownerId = event.ownerId;
      this._balance = event.initialBalance;
      this._version = event.version;
    } else if (event instanceof MoneyDeposited) {
      this._balance += event.amount;
      this._version = event.version;
    } else if (event instanceof MoneyWithdrawn) {
      this._balance -= event.amount;
      this._version = event.version;
    }
  }

  // 커밋되지 않은 이벤트 가져오기
  getUncommittedEvents(): DomainEvent[] {
    return [...this._uncommittedEvents];
  }

  clearUncommittedEvents(): void {
    this._uncommittedEvents = [];
  }
}

Event Store 인터페이스 및 구현

// infrastructure/eventstore/EventStore.ts
interface IEventStore {
  append(streamId: string, events: DomainEvent[], expectedVersion: number): Promise<void>;
  getStream(streamId: string): Promise<DomainEvent[]>;
  getStreamFromVersion(streamId: string, fromVersion: number): Promise<DomainEvent[]>;
}

// PostgreSQL 기반 Event Store 구현
export class PostgresEventStore implements IEventStore {
  constructor(private db: Pool) {}

  async append(
    streamId: string,
    events: DomainEvent[],
    expectedVersion: number
  ): Promise<void> {
    const client = await this.db.connect();

    try {
      await client.query('BEGIN');

      // 낙관적 동시성 제어
      const currentVersion = await this.getCurrentVersion(client, streamId);
      if (currentVersion !== expectedVersion) {
        throw new Error(`Concurrency conflict: expected ${expectedVersion}, got ${currentVersion}`);
      }

      // 이벤트 append (INSERT only, never UPDATE)
      for (const event of events) {
        await client.query(
          `INSERT INTO event_store
           (event_id, stream_id, version, event_type, payload, occurred_at)
           VALUES ($1, $2, $3, $4, $5, $6)`,
          [
            event.eventId,
            streamId,
            event.version,
            event.eventType,
            JSON.stringify(event),
            event.occurredAt
          ]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getStream(streamId: string): Promise<DomainEvent[]> {
    const result = await this.db.query(
      `SELECT payload FROM event_store
       WHERE stream_id = $1
       ORDER BY version ASC`,
      [streamId]
    );
    return result.rows.map(row => this.deserialize(row.payload));
  }

  private deserialize(payload: any): DomainEvent {
    // Event Type에 따라 적절한 클래스로 역직렬화
    const eventType = payload.eventType;
    switch (eventType) {
      case 'AccountOpened':
        return new AccountOpened(
          payload.eventId, payload.aggregateId, payload.version,
          new Date(payload.occurredAt), payload.ownerId, payload.initialBalance
        );
      case 'MoneyDeposited':
        return new MoneyDeposited(
          payload.eventId, payload.aggregateId, payload.version,
          new Date(payload.occurredAt), payload.amount, payload.description
        );
      // ... 다른 이벤트 타입들
      default:
        throw new Error(`Unknown event type: ${eventType}`);
    }
  }
}

Snapshot을 활용한 최적화

// infrastructure/snapshot/SnapshotStore.ts
interface Snapshot<T> {
  aggregateId: string;
  version: number;
  state: T;
  createdAt: Date;
}

export class AccountRepository {
  private readonly SNAPSHOT_INTERVAL = 100; // 100개 이벤트마다 스냅샷

  constructor(
    private eventStore: IEventStore,
    private snapshotStore: ISnapshotStore
  ) {}

  async getById(accountId: string): Promise<Account> {
    // 1. 스냅샷 조회
    const snapshot = await this.snapshotStore.get<AccountState>(accountId);

    let account: Account;
    let fromVersion = 0;

    if (snapshot) {
      // 2. 스냅샷에서 시작
      account = Account.fromSnapshot(snapshot.state);
      fromVersion = snapshot.version;
    } else {
      account = new Account();
    }

    // 3. 스냅샷 이후 이벤트만 재생
    const events = await this.eventStore.getStreamFromVersion(accountId, fromVersion);
    for (const event of events) {
      account.applyFromHistory(event);
    }

    return account;
  }

  async save(account: Account): Promise<void> {
    const events = account.getUncommittedEvents();
    if (events.length === 0) return;

    const expectedVersion = account.version - events.length;

    // 이벤트 저장
    await this.eventStore.append(account.id, events, expectedVersion);
    account.clearUncommittedEvents();

    // 스냅샷 생성 여부 확인
    if (account.version % this.SNAPSHOT_INTERVAL === 0) {
      await this.snapshotStore.save({
        aggregateId: account.id,
        version: account.version,
        state: account.toSnapshot(),
        createdAt: new Date()
      });
    }
  }
}

Projection (Read Model)

// infrastructure/projections/AccountBalanceProjection.ts
// 이벤트를 구독하여 Read Model 업데이트

export class AccountBalanceProjection {
  constructor(private db: Pool) {}

  // 이벤트 핸들러
  async handle(event: DomainEvent): Promise<void> {
    if (event instanceof AccountOpened) {
      await this.db.query(
        `INSERT INTO account_balances (account_id, owner_id, balance, updated_at)
         VALUES ($1, $2, $3, $4)`,
        [event.aggregateId, event.ownerId, event.initialBalance, event.occurredAt]
      );
    } else if (event instanceof MoneyDeposited) {
      await this.db.query(
        `UPDATE account_balances
         SET balance = balance + $1, updated_at = $2
         WHERE account_id = $3`,
        [event.amount, event.occurredAt, event.aggregateId]
      );
    } else if (event instanceof MoneyWithdrawn) {
      await this.db.query(
        `UPDATE account_balances
         SET balance = balance - $1, updated_at = $2
         WHERE account_id = $3`,
        [event.amount, event.occurredAt, event.aggregateId]
      );
    }
  }

  // Projection 리빌드 (모든 이벤트 재처리)
  async rebuild(): Promise<void> {
    await this.db.query('TRUNCATE account_balances');

    const allEvents = await this.eventStore.getAllEvents();
    for (const event of allEvents) {
      await this.handle(event);
    }
  }
}

💬 현업 대화 예시

👨‍💼 PM

"고객이 지난달 15일에 왜 출금이 실패했는지 확인해달라고 하는데요, 그 당시 잔액이 얼마였는지 알 수 있나요?"

👩‍💻 백엔드 개발자

"Event Sourcing이라 가능해요. 15일 기준으로 이벤트를 재생하면 그 시점의 정확한 잔액을 알 수 있습니다. 'MoneyWithdrawn 실패' 이벤트도 기록되어 있어서 왜 실패했는지도 추적 가능해요."

👨‍💻 주니어 개발자

"이벤트가 계속 쌓이면 재생 시간이 너무 오래 걸리지 않나요? 계좌 조회할 때마다 수천 개 이벤트를 다 읽어야 하면..."

👩‍💻 시니어 개발자

"Snapshot을 사용해요. 100개 이벤트마다 현재 상태를 저장해두고, 그 이후 이벤트만 재생하면 됩니다. 그리고 읽기용 Read Model은 Projection으로 별도 관리하니까 조회 성능은 문제없어요."

👨‍💼 아키텍트

"새로운 대시보드에 '일별 거래량' 통계가 필요하다고요? Event Sourcing 덕분에 쉬워요. 기존 이벤트를 새로운 Projection으로 리플레이하면 과거 데이터도 모두 채워넣을 수 있습니다."

⚠️ 주의사항

이벤트 스키마 변경: 이미 저장된 이벤트의 구조를 변경하기 어렵습니다. 새 필드 추가는 쉽지만, 기존 필드 변경은 마이그레이션이 복잡합니다. 이벤트 버전 관리가 필수입니다.

저장 공간: 모든 이벤트를 영구 저장하므로 데이터가 계속 증가합니다. 스토리지 비용과 아카이빙 전략을 고려해야 합니다.

복잡성: 단순 CRUD 시스템에 Event Sourcing은 과도한 복잡성입니다. 감사 추적, 시간 여행이 필수인 도메인(금융, 의료)에 적합합니다.

팁: Event Sourcing은 CQRS와 함께 사용하면 효과적입니다. 쓰기는 이벤트 스트림, 읽기는 최적화된 Projection으로 분리하세요.

🔗 관련 용어

📚 더 배우기