Leader Election
리더 선출 / Leader Selection
분산 시스템에서 하나의 노드를 리더로 선출하는 알고리즘. 리더는 작업 조율, 요청 처리 순서 결정, 데이터 복제 관리 등의 역할을 담당합니다. Raft 합의, Bully Algorithm, ZooKeeper/etcd를 통한 분산 락 등으로 구현하며, 분산 데이터베이스, 메시지 큐, 클러스터 조율의 핵심입니다.
리더 선출 / Leader Selection
분산 시스템에서 하나의 노드를 리더로 선출하는 알고리즘. 리더는 작업 조율, 요청 처리 순서 결정, 데이터 복제 관리 등의 역할을 담당합니다. Raft 합의, Bully Algorithm, ZooKeeper/etcd를 통한 분산 락 등으로 구현하며, 분산 데이터베이스, 메시지 큐, 클러스터 조율의 핵심입니다.
Leader Election(리더 선출)은 분산 시스템에서 가장 근본적인 문제 중 하나입니다. 여러 노드가 동시에 같은 작업을 수행하면 충돌이 발생하므로, 하나의 노드를 리더로 선출하여 작업을 조율하게 합니다. 리더는 클라이언트 요청을 받아 처리하고, 결과를 다른 노드에 복제합니다.
리더 선출이 필요한 이유는 크게 세 가지입니다. 첫째, 쓰기 순서 보장입니다. 동시 쓰기 요청이 들어오면 리더가 순서를 결정하여 일관성을 유지합니다. 둘째, 복제 조율입니다. 리더가 데이터 변경을 팔로워에게 전파하여 모든 노드의 상태를 동기화합니다. 셋째, 장애 복구입니다. 리더가 실패하면 새 리더를 선출하여 서비스 연속성을 보장합니다.
대표적인 리더 선출 알고리즘으로는 Raft가 있습니다. Raft는 Heartbeat 타임아웃과 과반수 투표를 통해 리더를 선출하며, etcd, Consul, TiKV 등에서 사용됩니다. 또 다른 방법은 ZooKeeper/etcd의 Ephemeral 노드와 분산 락을 활용하는 것입니다. 가장 먼저 락을 획득한 노드가 리더가 되고, 락을 잃으면 다른 노드가 리더가 됩니다.
실무에서 리더 선출은 Kafka의 파티션 리더, Kubernetes 컨트롤러 매니저, 분산 데이터베이스의 Primary 노드, 스케줄러의 마스터 등에서 사용됩니다. 리더 선출 알고리즘의 선택은 장애 복구 시간, 네트워크 토폴로지, 일관성 요구사항에 따라 결정됩니다.
쓰기 요청을 처리하고 Replica에 복제. MySQL Group Replication, PostgreSQL Patroni.
Kafka 파티션당 하나의 Leader 브로커가 Producer/Consumer 요청 처리.
하나의 마스터만 작업 할당. 중복 실행 방지. Kubernetes controller-manager.
글로벌 리소스 접근 조율. 한 번에 하나의 클라이언트만 임계 영역 진입.
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
func main() {
nodeID := os.Getenv("NODE_ID")
if nodeID == "" {
nodeID = "node-1"
}
// etcd 클라이언트 연결
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 세션 생성 (TTL 10초 - 리더 장애 감지 시간)
session, err := concurrency.NewSession(cli, concurrency.WithTTL(10))
if err != nil {
log.Fatal(err)
}
defer session.Close()
// Election 객체 생성
election := concurrency.NewElection(session, "/leader/scheduler")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// 시그널 핸들링
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigCh
fmt.Printf("[%s] 종료 신호 수신, 리더 사임\n", nodeID)
election.Resign(ctx)
cancel()
}()
// 리더 선출 시도 (블로킹)
fmt.Printf("[%s] 리더 선출 대기 중...\n", nodeID)
if err := election.Campaign(ctx, nodeID); err != nil {
log.Fatal(err)
}
// 리더로 선출됨!
fmt.Printf("👑 [%s] 리더로 선출됨!\n", nodeID)
// 리더 작업 수행
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
// 현재 리더 확인
resp, _ := election.Leader(ctx)
if resp != nil && len(resp.Kvs) > 0 {
fmt.Printf("[%s] 현재 리더: %s (Lease: %d)\n",
nodeID, string(resp.Kvs[0].Value), resp.Kvs[0].Lease)
}
// 리더 작업 (예: 스케줄링)
fmt.Printf("[%s] 🔧 리더 작업 수행 중...\n", nodeID)
}
}
}
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.retry.ExponentialBackoffRetry;
public class LeaderElectionExample extends LeaderSelectorListenerAdapter {
private final String nodeId;
private final LeaderSelector leaderSelector;
public LeaderElectionExample(CuratorFramework client, String path, String nodeId) {
this.nodeId = nodeId;
this.leaderSelector = new LeaderSelector(client, path, this);
// 리더 종료 후 자동 재선거 참여
this.leaderSelector.autoRequeue();
}
public void start() {
leaderSelector.start();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
// 리더로 선출됨 - 이 메서드가 반환되면 리더십 상실
System.out.println("👑 [" + nodeId + "] 리더로 선출됨!");
try {
while (true) {
// 리더 작업 수행
System.out.println("[" + nodeId + "] 🔧 리더 작업 수행 중...");
Thread.sleep(1000);
}
} catch (InterruptedException e) {
System.out.println("[" + nodeId + "] 리더십 종료");
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) throws Exception {
String nodeId = System.getenv().getOrDefault("NODE_ID", "node-1");
CuratorFramework client = CuratorFrameworkFactory.newClient(
"localhost:2181",
new ExponentialBackoffRetry(1000, 3)
);
client.start();
LeaderElectionExample example = new LeaderElectionExample(
client, "/leader/scheduler", nodeId
);
example.start();
System.out.println("[" + nodeId + "] 리더 선출 대기 중...");
Thread.currentThread().join();
}
}
package main
import (
"context"
"fmt"
"os"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
)
func main() {
nodeID := os.Getenv("POD_NAME")
namespace := os.Getenv("POD_NAMESPACE")
// 클러스터 내부 설정
config, _ := rest.InClusterConfig()
clientset, _ := kubernetes.NewForConfig(config)
// Lease 기반 리소스 락
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: "my-controller-leader",
Namespace: namespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: nodeID,
},
}
ctx := context.Background()
// Leader Election 실행
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// 리더가 되었을 때 실행
fmt.Printf("👑 [%s] 리더로 선출됨!\n", nodeID)
runController(ctx)
},
OnStoppedLeading: func() {
// 리더십을 잃었을 때
fmt.Printf("😔 [%s] 리더십 상실\n", nodeID)
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity != nodeID {
fmt.Printf("📢 새 리더 선출: %s\n", identity)
}
},
},
})
}
func runController(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(1 * time.Second):
fmt.Println("🔧 컨트롤러 작업 수행 중...")
}
}
}
import redis
import time
import uuid
import signal
import sys
class RedisLeaderElection:
def __init__(self, redis_client, lock_name, ttl=10):
self.redis = redis_client
self.lock_name = lock_name
self.ttl = ttl
self.node_id = str(uuid.uuid4())[:8]
self.is_leader = False
def try_become_leader(self):
"""리더 되기 시도 (SET NX EX)"""
acquired = self.redis.set(
self.lock_name,
self.node_id,
nx=True, # 키가 없을 때만 설정
ex=self.ttl # TTL 설정
)
self.is_leader = bool(acquired)
return self.is_leader
def renew_leadership(self):
"""리더십 갱신 (TTL 연장)"""
if not self.is_leader:
return False
# Lua 스크립트로 원자적 갱신 (소유권 확인 + TTL 연장)
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.redis.eval(script, 1, self.lock_name, self.node_id, self.ttl)
self.is_leader = bool(result)
return self.is_leader
def resign(self):
"""리더십 사임"""
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.redis.eval(script, 1, self.lock_name, self.node_id)
self.is_leader = False
def get_current_leader(self):
"""현재 리더 확인"""
return self.redis.get(self.lock_name)
def main():
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
election = RedisLeaderElection(redis_client, "leader:scheduler")
def signal_handler(sig, frame):
print(f"[{election.node_id}] 종료 신호 수신")
election.resign()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print(f"[{election.node_id}] 리더 선출 시작")
while True:
if election.is_leader:
# 리더십 갱신
if election.renew_leadership():
print(f"👑 [{election.node_id}] 리더 작업 수행 중...")
else:
print(f"😔 [{election.node_id}] 리더십 상실")
else:
# 리더 되기 시도
if election.try_become_leader():
print(f"👑 [{election.node_id}] 리더로 선출됨!")
else:
leader = election.get_current_leader()
print(f"[{election.node_id}] 대기 중... (현재 리더: {leader})")
time.sleep(1)
if __name__ == "__main__":
main()
| 방법 | 장애 감지 | 선출 시간 | 의존성 | 사용 사례 |
|---|---|---|---|---|
| Raft 합의 | Heartbeat 타임아웃 | ~1-2초 | 없음 (자체 구현) | etcd, Consul, TiKV |
| ZooKeeper Ephemeral | 세션 타임아웃 | ~5-10초 | ZooKeeper 클러스터 | Kafka, HBase |
| etcd Lease | Lease TTL | ~TTL | etcd 클러스터 | Kubernetes 컨트롤러 |
| Redis SET NX | Key TTL | ~TTL | Redis | 간단한 분산 락 |
| Bully Algorithm | 메시지 타임아웃 | O(n) 메시지 | 없음 | 교육용, 소규모 |
✅ 선택 가이드: 강한 일관성이 필요하면 Raft 기반(etcd, Consul), 기존 인프라 활용이면 ZooKeeper/Redis, Kubernetes 환경이면 Lease API를 사용하세요.
"스케줄러를 HA로 만들려면 리더 선출이 필요해요. etcd의 Lease를 사용해서 리더를 선출하고, 리더만 작업을 스케줄링하면 중복 실행을 방지할 수 있어요. TTL은 10초로 설정하면 장애 감지도 적당합니다."
"리더 선출 flapping이 발생한 원인은 네트워크 지터예요. Lease TTL이 5초인데 간헐적으로 갱신이 늦어져서 리더십을 잃고 다시 선출되는 것을 반복했어요. TTL을 15초로 늘리고 갱신 주기를 5초로 하면 안정될 거예요."
"Split Brain 방지요? 핵심은 과반수(Quorum)입니다. 네트워크 파티션 시 과반수를 확보한 파티션만 리더를 선출할 수 있어요. 두 파티션 모두 과반수 미만이면 리더가 없는 상태가 되지만, 두 리더가 동시에 존재하는 것보다 안전합니다."
❌ Split Brain: 네트워크 파티션 시 두 개의 리더가 존재하면 데이터 불일치가 발생합니다. 반드시 과반수 기반 선출을 사용하고, 펜싱(Fencing)을 구현하세요.
❌ 리더 Flapping: TTL이 너무 짧거나 네트워크가 불안정하면 리더가 자주 바뀝니다. TTL은 네트워크 RTT의 10배 이상, 갱신 주기는 TTL의 1/3 이하로 설정하세요.
❌ 단일 장애점: 리더 선출을 위한 외부 서비스(etcd, ZooKeeper)가 장애나면 전체 시스템이 영향받습니다. 해당 서비스도 HA로 구성하세요.
✅ 올바른 방법: 리더 전환 시 진행 중인 작업을 완료하거나 롤백하는 Graceful Handoff를 구현하세요. 리더십 상실 시 즉시 작업을 중단하고 새 리더에게 인계합니다.