📊
데이터공학
Fivetran
Fivetran
자동화된 데이터 통합 플랫폼. 커넥터 기반 ELT.
Fivetran
자동화된 데이터 통합 플랫폼. 커넥터 기반 ELT.
Fivetran은 자동화된 데이터 통합 플랫폼으로, 다양한 데이터 소스에서 데이터 웨어하우스/레이크로 데이터를 자동으로 추출하고 로드하는 ELT(Extract, Load, Transform) 서비스입니다.
| 카테고리 | 커넥터 | 기능 |
|---|---|---|
| 데이터베이스 | PostgreSQL, MySQL, MongoDB | CDC 기반 실시간 동기화 |
| SaaS | Salesforce, HubSpot, Stripe | API 기반 자동 동기화 |
| 클라우드 스토리지 | S3, GCS, Azure Blob | 파일 기반 데이터 수집 |
| 이벤트 | Kafka, Kinesis, Webhooks | 실시간 스트리밍 수집 |
# Fivetran API를 사용한 커넥터 설정 및 관리
import requests
import json
class FivetranConnectorManager:
"""Fivetran 커넥터 관리 클래스"""
def __init__(self, api_key, api_secret):
self.base_url = "https://api.fivetran.com/v1"
self.auth = (api_key, api_secret)
def create_connector(self, group_id, service, config):
"""새 커넥터 생성"""
payload = {
"service": service,
"group_id": group_id,
"config": config,
"paused": False,
"trust_certificates": True,
"run_setup_tests": True
}
response = requests.post(
f"{self.base_url}/connectors",
auth=self.auth,
json=payload
)
return response.json()
def get_connector_status(self, connector_id):
"""커넥터 상태 조회"""
response = requests.get(
f"{self.base_url}/connectors/{connector_id}",
auth=self.auth
)
return response.json()
def sync_connector(self, connector_id):
"""수동 동기화 트리거"""
response = requests.post(
f"{self.base_url}/connectors/{connector_id}/sync",
auth=self.auth
)
return response.json()
def update_connector_config(self, connector_id, new_config):
"""커넥터 설정 업데이트"""
response = requests.patch(
f"{self.base_url}/connectors/{connector_id}",
auth=self.auth,
json={"config": new_config}
)
return response.json()
# 사용 예제
fivetran = FivetranConnectorManager(
api_key="your_api_key",
api_secret="your_api_secret"
)
# PostgreSQL 커넥터 설정
postgres_config = {
"host": "db.example.com",
"port": 5432,
"user": "fivetran_user",
"password": "secure_password",
"database": "production",
"update_method": "WAL", # CDC 사용
"replication_slot": "fivetran_slot",
"publication_name": "fivetran_pub"
}
# 커넥터 생성
result = fivetran.create_connector(
group_id="target_destination_123",
service="postgres",
config=postgres_config
)
print(f"Connector ID: {result['data']['id']}")
# Salesforce 커넥터 설정
salesforce_config = {
"client_id": "your_salesforce_client_id",
"client_secret": "your_salesforce_client_secret",
"is_sandbox": False,
"sync_mode": "History",
"api_type": "REST"
}
# 동기화 상태 확인
status = fivetran.get_connector_status("connector_123")
print(f"Status: {status['data']['status']['sync_state']}")
# Terraform으로 Fivetran 커넥터 관리
terraform {
required_providers {
fivetran = {
source = "fivetran/fivetran"
version = "~> 1.0"
}
}
}
provider "fivetran" {
api_key = var.fivetran_api_key
api_secret = var.fivetran_api_secret
}
# 목적지 그룹 생성
resource "fivetran_group" "data_warehouse" {
name = "production-snowflake"
}
# PostgreSQL 커넥터 설정
resource "fivetran_connector" "postgres_source" {
group_id = fivetran_group.data_warehouse.id
service = "postgres"
destination_schema {
name = "postgres_prod"
prefix = "pg_"
}
config {
host = "db.example.com"
port = 5432
user = var.postgres_user
password = var.postgres_password
database = "production"
update_method = "WAL"
replication_slot = "fivetran_slot"
publication_name = "fivetran_pub"
}
run_setup_tests = true
paused = false
sync_frequency = 360 # 6시간마다
}
# Salesforce 커넥터 설정
resource "fivetran_connector" "salesforce_source" {
group_id = fivetran_group.data_warehouse.id
service = "salesforce"
destination_schema {
name = "salesforce_prod"
}
config {
is_sandbox = false
api_type = "REST"
}
sync_frequency = 60 # 1시간마다
}
# dbt 변환 연동 (변환 계층)
resource "fivetran_dbt_transformation" "customer_360" {
dbt_project_id = var.dbt_project_id
run_tests = true
schedule {
schedule_type = "INTEGRATED"
connector_ids = [
fivetran_connector.postgres_source.id,
fivetran_connector.salesforce_source.id
]
}
}