📊 데이터공학

Fivetran

Fivetran

자동화된 데이터 통합 플랫폼. 커넥터 기반 ELT.

상세 설명

Fivetran은 자동화된 데이터 통합 플랫폼으로, 다양한 데이터 소스에서 데이터 웨어하우스/레이크로 데이터를 자동으로 추출하고 로드하는 ELT(Extract, Load, Transform) 서비스입니다.

Fivetran의 핵심 특징

  • 완전 관리형 커넥터: 500개 이상의 사전 구축된 커넥터 제공
  • 자동 스키마 감지: 소스 스키마 변경 자동 감지 및 반영
  • 증분 동기화: 변경된 데이터만 효율적으로 동기화
  • 데이터 정규화: 자동 데이터 정규화 및 분석 준비
  • 높은 신뢰성: 99.9% SLA 보장

지원하는 주요 커넥터

카테고리커넥터기능
데이터베이스PostgreSQL, MySQL, MongoDBCDC 기반 실시간 동기화
SaaSSalesforce, HubSpot, StripeAPI 기반 자동 동기화
클라우드 스토리지S3, GCS, Azure Blob파일 기반 데이터 수집
이벤트Kafka, Kinesis, Webhooks실시간 스트리밍 수집

코드 예제

Fivetran API를 통한 커넥터 설정

# Fivetran API를 사용한 커넥터 설정 및 관리
import requests
import json

class FivetranConnectorManager:
    """Fivetran 커넥터 관리 클래스"""

    def __init__(self, api_key, api_secret):
        self.base_url = "https://api.fivetran.com/v1"
        self.auth = (api_key, api_secret)

    def create_connector(self, group_id, service, config):
        """새 커넥터 생성"""
        payload = {
            "service": service,
            "group_id": group_id,
            "config": config,
            "paused": False,
            "trust_certificates": True,
            "run_setup_tests": True
        }

        response = requests.post(
            f"{self.base_url}/connectors",
            auth=self.auth,
            json=payload
        )
        return response.json()

    def get_connector_status(self, connector_id):
        """커넥터 상태 조회"""
        response = requests.get(
            f"{self.base_url}/connectors/{connector_id}",
            auth=self.auth
        )
        return response.json()

    def sync_connector(self, connector_id):
        """수동 동기화 트리거"""
        response = requests.post(
            f"{self.base_url}/connectors/{connector_id}/sync",
            auth=self.auth
        )
        return response.json()

    def update_connector_config(self, connector_id, new_config):
        """커넥터 설정 업데이트"""
        response = requests.patch(
            f"{self.base_url}/connectors/{connector_id}",
            auth=self.auth,
            json={"config": new_config}
        )
        return response.json()


# 사용 예제
fivetran = FivetranConnectorManager(
    api_key="your_api_key",
    api_secret="your_api_secret"
)

# PostgreSQL 커넥터 설정
postgres_config = {
    "host": "db.example.com",
    "port": 5432,
    "user": "fivetran_user",
    "password": "secure_password",
    "database": "production",
    "update_method": "WAL",  # CDC 사용
    "replication_slot": "fivetran_slot",
    "publication_name": "fivetran_pub"
}

# 커넥터 생성
result = fivetran.create_connector(
    group_id="target_destination_123",
    service="postgres",
    config=postgres_config
)
print(f"Connector ID: {result['data']['id']}")

# Salesforce 커넥터 설정
salesforce_config = {
    "client_id": "your_salesforce_client_id",
    "client_secret": "your_salesforce_client_secret",
    "is_sandbox": False,
    "sync_mode": "History",
    "api_type": "REST"
}

# 동기화 상태 확인
status = fivetran.get_connector_status("connector_123")
print(f"Status: {status['data']['status']['sync_state']}")

Terraform을 사용한 커넥터 IaC 설정

# Terraform으로 Fivetran 커넥터 관리
terraform {
  required_providers {
    fivetran = {
      source  = "fivetran/fivetran"
      version = "~> 1.0"
    }
  }
}

provider "fivetran" {
  api_key    = var.fivetran_api_key
  api_secret = var.fivetran_api_secret
}

# 목적지 그룹 생성
resource "fivetran_group" "data_warehouse" {
  name = "production-snowflake"
}

# PostgreSQL 커넥터 설정
resource "fivetran_connector" "postgres_source" {
  group_id = fivetran_group.data_warehouse.id
  service  = "postgres"

  destination_schema {
    name   = "postgres_prod"
    prefix = "pg_"
  }

  config {
    host     = "db.example.com"
    port     = 5432
    user     = var.postgres_user
    password = var.postgres_password
    database = "production"

    update_method      = "WAL"
    replication_slot   = "fivetran_slot"
    publication_name   = "fivetran_pub"
  }

  run_setup_tests = true
  paused          = false
  sync_frequency  = 360  # 6시간마다
}

# Salesforce 커넥터 설정
resource "fivetran_connector" "salesforce_source" {
  group_id = fivetran_group.data_warehouse.id
  service  = "salesforce"

  destination_schema {
    name = "salesforce_prod"
  }

  config {
    is_sandbox = false
    api_type   = "REST"
  }

  sync_frequency = 60  # 1시간마다
}

# dbt 변환 연동 (변환 계층)
resource "fivetran_dbt_transformation" "customer_360" {
  dbt_project_id = var.dbt_project_id
  run_tests      = true

  schedule {
    schedule_type = "INTEGRATED"
    connector_ids = [
      fivetran_connector.postgres_source.id,
      fivetran_connector.salesforce_source.id
    ]
  }
}

실무 대화 예시

데이터 엔지니어: "현재 수동으로 Salesforce 데이터 추출하는데 매일 2시간씩 걸려요. Fivetran 도입하면 어떨까요?"
데이터 팀장: "좋은 제안이야. Fivetran은 Salesforce 커넥터가 잘 되어 있어서 설정만 하면 자동 동기화돼. 스키마 변경도 자동 감지하고."
데이터 엔지니어: "CDC 방식으로 PostgreSQL 실시간 동기화도 가능한가요?"
데이터 팀장: "WAL 기반 CDC 지원해. Logical Replication 설정하면 5분 이내 지연으로 동기화 가능해. 하지만 대용량 초기 스냅샷은 시간이 좀 걸릴 수 있어."
데이터 엔지니어: "비용은 어떻게 되나요? MAR(Monthly Active Rows) 기준이라던데..."
데이터 팀장: "맞아, MAR 기준으로 과금돼. 그래서 자주 변경되는 로그성 데이터는 비용이 많이 나올 수 있어. 이런 건 Kafka나 직접 구축이 나을 수도 있어."

주의사항

비용 관리

  • MAR(Monthly Active Rows) 기준 과금 - 자주 변경되는 데이터는 비용 급증 가능
  • 초기 히스토리컬 동기화 시 대량 MAR 소비
  • 불필요한 테이블/컬럼은 동기화에서 제외하여 비용 절감

스키마 변경 관리

  • 자동 스키마 감지가 항상 완벽하지 않음 - 주기적 검증 필요
  • 컬럼 삭제 시 목적지에서 자동 삭제되지 않음 (soft delete)
  • 데이터 타입 변경 시 호환성 문제 발생 가능

성능 고려사항

  • 대용량 테이블 초기 동기화는 소스 DB에 부하 줄 수 있음
  • API 기반 커넥터는 Rate Limit에 영향받음
  • 동기화 주기가 너무 짧으면 비용과 부하 증가

관련 용어

더 배우기