📖 상세 설명
NATS는 Derek Collison이 2010년 Ruby로 처음 개발하고, 이후 Go로 재작성한 오픈소스 메시징 시스템입니다. "Always on and available"이라는 철학 아래, 단순성과 성능을 최우선으로 설계되었습니다. 현재 CNCF 인큐베이팅 프로젝트로, Kubernetes 환경에서 서비스 간 통신의 표준으로 자리잡아가고 있습니다.
NATS의 핵심 철학은 "Simplicity is a Feature"입니다. 복잡한 설정 없이 단일 바이너리를 실행하면 즉시 메시징 서버가 구동됩니다. At-most-once 전달을 기본으로 하는 Core NATS는 초저지연(sub-millisecond)을 제공하고, JetStream을 활성화하면 At-least-once/Exactly-once 전달과 메시지 영속성을 지원합니다.
내부적으로 NATS는 텍스트 기반 프로토콜을 사용하여 디버깅이 용이하고, Subject 기반 주소 지정으로 유연한 메시지 라우팅이 가능합니다. 클러스터링, 슈퍼클러스터(멀티 리전), Leaf Node를 통해 글로벌 규모의 메시징 인프라를 구축할 수 있습니다.
실무에서 NATS는 IoT 디바이스 통신, 마이크로서비스 이벤트 버스, 실시간 알림 시스템, 분산 시스템 조율 등에 널리 사용됩니다. Kafka 대비 훨씬 가벼우면서도 대부분의 메시징 요구사항을 충족하여, "the connective tissue for cloud native applications"로 불립니다.
NATS의 핵심 메시징 패턴
Pub/Sub (발행/구독)
Subject에 메시지를 발행하면 해당 Subject를 구독한 모든 클라이언트가 수신. 1:N 브로드캐스트.
Queue Groups
같은 큐 그룹에 속한 구독자 중 하나에게만 메시지 전달. 로드 밸런싱된 워커 패턴.
Request/Reply
동기적 요청-응답 패턴. 클라이언트가 요청하고 서버가 응답. 타임아웃 내장.
JetStream
영속성 레이어. 메시지 저장, 재생, Consumer 관리. Stream과 Consumer로 구성.
💻 코드 예제
Go - NATS 기본 Pub/Sub
package main
import (
"fmt"
"github.com/nats-io/nats.go"
"time"
)
func main() {
// NATS 서버 연결
nc, err := nats.Connect(nats.DefaultURL) // nats://localhost:4222
if err != nil {
panic(err)
}
defer nc.Close()
// 구독자 설정 (비동기)
sub, _ := nc.Subscribe("orders.created", func(msg *nats.Msg) {
fmt.Printf("📬 Received: %s\n", string(msg.Data))
})
defer sub.Unsubscribe()
// 메시지 발행
nc.Publish("orders.created", []byte(`{"orderId": "12345", "amount": 99000}`))
// 동기 구독 (Request/Reply 패턴)
nc.Subscribe("api.users.get", func(msg *nats.Msg) {
nc.Publish(msg.Reply, []byte(`{"userId": "1", "name": "Kim"}`))
})
// Request/Reply 요청
response, _ := nc.Request("api.users.get", []byte(`{"id": "1"}`), 2*time.Second)
fmt.Printf("📨 Response: %s\n", string(response.Data))
time.Sleep(100 * time.Millisecond)
}
Go - JetStream (영속성 메시징)
package main
import (
"context"
"fmt"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
func main() {
nc, _ := nats.Connect(nats.DefaultURL)
defer nc.Close()
js, _ := jetstream.New(nc)
ctx := context.Background()
// Stream 생성 (메시지 저장소)
stream, _ := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "ORDERS",
Subjects: []string{"orders.>"}, // orders.* 패턴 매칭
Storage: jetstream.FileStorage, // 파일 기반 영속성
Retention: jetstream.LimitsPolicy,
MaxMsgs: 10000,
MaxAge: 24 * time.Hour,
})
fmt.Printf("✅ Stream created: %s\n", stream.CachedInfo().Config.Name)
// 메시지 발행 (영속적)
js.Publish(ctx, "orders.created", []byte(`{"orderId": "ORD-001"}`))
js.Publish(ctx, "orders.paid", []byte(`{"orderId": "ORD-001"}`))
// Consumer 생성 (메시지 소비자)
consumer, _ := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "order-processor",
FilterSubject: "orders.created",
AckPolicy: jetstream.AckExplicitPolicy,
})
// 메시지 Fetch (Pull 기반)
batch, _ := consumer.Fetch(10, jetstream.FetchMaxWait(5*time.Second))
for msg := range batch.Messages() {
fmt.Printf("📦 Processing: %s\n", string(msg.Data()))
msg.Ack() // 명시적 ACK
}
}
Python - NATS 비동기 클라이언트
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
# NATS 서버 연결
nc = await nats.connect("nats://localhost:4222")
# 구독 핸들러
async def message_handler(msg):
subject = msg.subject
data = msg.data.decode()
print(f"📬 [{subject}] {data}")
# Request/Reply인 경우 응답
if msg.reply:
await nc.publish(msg.reply, b'{"status": "processed"}')
# 와일드카드 구독
# '*' = 단일 토큰, '>' = 다중 토큰
await nc.subscribe("events.>", cb=message_handler)
# Queue Group 구독 (로드 밸런싱)
await nc.subscribe(
"tasks.process",
queue="workers", # 같은 그룹 내 하나의 워커만 수신
cb=message_handler
)
# 메시지 발행
await nc.publish("events.user.signup", b'{"userId": "123"}')
await nc.publish("events.order.created", b'{"orderId": "456"}')
# Request/Reply (타임아웃 2초)
try:
response = await nc.request(
"api.health",
b'ping',
timeout=2.0
)
print(f"📨 Health: {response.data.decode()}")
except TimeoutError:
print("❌ Request timed out")
await asyncio.sleep(1)
await nc.drain()
asyncio.run(main())
Docker Compose - NATS 클러스터
# docker-compose.yml
version: '3.8'
services:
nats-1:
image: nats:2.10-alpine
container_name: nats-1
ports:
- "4222:4222" # 클라이언트 포트
- "8222:8222" # 모니터링 포트
command: >
-js # JetStream 활성화
-sd /data # 데이터 디렉토리
--cluster_name NATS_CLUSTER
--cluster nats://0.0.0.0:6222
--routes nats://nats-2:6222,nats://nats-3:6222
volumes:
- nats-1-data:/data
nats-2:
image: nats:2.10-alpine
container_name: nats-2
command: >
-js -sd /data
--cluster_name NATS_CLUSTER
--cluster nats://0.0.0.0:6222
--routes nats://nats-1:6222,nats://nats-3:6222
volumes:
- nats-2-data:/data
nats-3:
image: nats:2.10-alpine
container_name: nats-3
command: >
-js -sd /data
--cluster_name NATS_CLUSTER
--cluster nats://0.0.0.0:6222
--routes nats://nats-1:6222,nats://nats-2:6222
volumes:
- nats-3-data:/data
volumes:
nats-1-data:
nats-2-data:
nats-3-data: