들어가기 전
- 이 글은 이 블로그 글을 참고로 학습한 글입니다.
- 원글은 SWIM 프로토콜을 F#으로 구현한 코드를 제공합니다.
- 이 글에서는 erlang으로 SWIM 프로토콜을 구현하였으며, 원글에서는 구현되지 않았던 suspect - alive 반박, SWIM 프로토콜 메세지 사이의 동시성 문제를 해결하기 위해 Incarnation을 고려한 코드를 추가했습니다.
- 제가 구현한 코드는 이곳에 있습니다.
1. 클러스터란 무엇인가?
사용자 관점에서 클러스터는 '단일 머신'처럼 보이도록 만들어준다. 여러 서버가 상호 연결된 클러스터는 새로운 서버가 추가되거나 제거되기도 하는데, 클러스터는 이런 개념들을 '단일 머신'으로 추상화해준다. 이 덕분에 클라이언트는 클러스터에서 발생할 수 있는 여러 복잡한 시나리오들을 고려하지 않을 수 있다.
그러나 클러스터가 이런 추상화를 제공해주기 위해서는 클러스터가 구현해야하는 프로토콜과 책임이 존재한다. 아래에서는 클러스터가 일반적으로 구현해야하는 것들을 의미한다.
2. 클러스터 0계층
아래 항목들은 위 추상화를 보장하기 위해 클러스터가 고려하고 구현해야 할 가장 기본적인 부분이다.
2.1 어떻게 클러스터에 조인할까?
클러스터에 조인하려는 새로운 서버 노드가 있다면, 이미 클러스터의 멤버 노드들과 통신하는 방법을 알아야 한다. 그래야 클러스터에 'Join' 요청을 보낼 수 있다. 어디서 그런 정보를 찾을 수 있을까?
- 서버 노드를 기동할 때, 컨택 포인트 리스트를 Configuration Parameter로 제공. (yaml 등..)
- Etch, Zookeeper 같은 써드파티로 Node Registry를 구축. 서버 노드가 기동할 때, Node Registry에 질의.
- 호스트 환경에서는 k8s DNS, mDNS 서비스등을 이용할 수 있음.
2.2 클러스터에 속한 다른 노드를 어떻게 알 수 있을까?
이 문제를 다루는 주요한 주제는 멤버쉽 프로토콜이다. 동적 클러스터에서는 Active discovered 노드를 추적하고, 노드가 가입/탈퇴하면 이를 업데이트하고 가십(전파)하는 방식으로 수행된다. 여기서 많은 결정은 배포 시나리오에 따라 달라진다. 예를 들어 동일 IDC 내에서 클러스터를 구성하는 서비스는 모바일 디바이스 메시와는 다른 특성을 가져야만 한다.
- 동일 IDC 센터에서 동작하는 일반적인 백엔드 서비스 클러스터
- SWIM 프로토콜 적합. 모든 노드가 클러스터 전체 정보를 알고 있고, 직접 서로의 상태를 체크.
- 규모가 큰 클러스터 (수천개 이상이거나, 여러 IDC 센터에서 동작)
- HaParView 프로토콜 적합. 노드는 클러스터의 일부 상태만 알고 있으므로 확장에 유리.
2.3 한 노드에서 다른 노드로 메세지를 보내려면?
일반적으로 이 부분 역시 멤버십 프로토콜의 책임이다.
- IDC 기반 시스템 시나리오 : 모든 노드가 다른 모든 노드에 연결될 수 있다는 보수적인 가정을 할 수 있음.
- 다른 시나리오 : 네트워크 특성상, 일부 노드가 다른 노드에 연결되지 못할 수도 있음.
일반적으로 IDC 기반 시스템 시나리오에서 동일한 Private Subnet에서는 서로 접근할 수 있다고 보는 편이다. 보안적인 문제도 없을 것이고, 같은 IDC이니 Latency 문제 또한 없을 것이기 때문이다. 반면 다른 시나리오에서는 보안(Firewall)이나 NAT 등의 문제로 노드끼리 연결되지 못할 수도 있을 것이다.
모든 노드끼리 연결된 단순한 경우에는 노드 간 메세지 전송을 위해 TCP 프로토콜만 사용하면 되지만, 그렇지 않은 경우에는 노드 간 메세지 전송을 단순히 할 수가 없다. 따라서 멤버쉽 프로토콜은 보다 복잡한 방식으로 메세지 전송을 할 수 밖에 없다.
2.4 죽은 노드를 어떻게 감지할 수 있을까?
이것은 Failure Detection(실패 감지)로 알려져 있다. 가장 많이 알려진 방법은 timeout 이내에 ping ↔ ack를 주고 받거나, 모든 Connection이 주어진 시간 간격 이내에 Heartbeat 메세지를 보내는 것이다.
- Heartbeat은 주로 TCP(L4)에 직접 구현되기도 하고, 때로는 TCP 위(L7)에 Heartbeta을 전송할 수도 있다. 한 가지 문제점은 TCP는 OS에 관리하기 때문에 응답성이 뛰어나지만, Application Layer는 그렇지 않을 수 있다. Application 계층에서는 Deadlock이 발생한다거나 할 수 있기 때문이다.
- 멤버쉽 프로토콜은 자체 Heartbeat 알고리즘을 사용하기도 한다.
- 누락된 Heartbeat은 반드시 그 노드가 죽은 것을 의미하지는 않는다. 예를 들어 Heartbeat을 받는 수신 받는 서버의 메세지큐가 가득차서, 상대방이 Heartbeat을 정상적으로 전송했으나 Accept 하지 못하고 있을 수 있기 때문이다.
최근에 만들어진 프로토콜(SWIM 프로토콜의 확장된 버전인 Phi accural failure dector, LifeGuard)은 위에서 이야기한 Heartbeat을 받는 서버쪽에서 다른 요청을 처리하느라 과부하에 걸린 시나리오를 고려하기도 한다.
3. 더 고려해야 할 부분
위에서는 추상화 된 클러스터를 위한 기본적인 내용들을 고려해보았다. 그러나 위 고려사항만으로는 충분히 동작할 수 없다. 좀 더 복잡한 아래의 내용도 반드시 고려해야만 한다.
3.1 주기적인 네트워크 파티셔닝을 어떻게 감지하고 대응할 수 있을까?
흔히 split-brain 시나리오로 알려진 문제다. 이 시나리오는 다음 이유 때문에 발생할 수 있다.
응답하지 않는 노드와 죽은 노드를 구분할 수 없다.
이런 split-brain 시나리오에서는 클러스터가 두 개 이상으로 분할된다. 이 때 각 노드는 스스로만 살아있다고 믿고 클러스터에 대한 데이터 불일치, 데이터 손상을 유발할 수 있다.
예를 들어 여러 노드로 구성된 클러스터가 있을 때, 일부 노드가 네트워크 파티셔닝으로 다른 노드와 통신할 수 없게 되면, 자신을 제외한 모든 노드는 '죽은 노드'라고 판단할 수 있다. 그러면 각 노드는 스스로가 노드가 1개뿐인 클러스터가 되어 독립적인 형태로 동작한다. 그리고 이 노드들이 가지고 있는 데이터나 상태는 서로 다를 수 있을 것이고, 이 때문에 클러스터 간 노드들의 데이터 불일치 문제가 발생한다.
3.2 서로 다른 노드가 클러스터의 상태를 어떻게 추론하고 결정할 수 있을까?
이 경우는 주로 분산 Database와 같이 데이터 관리를 담당하는 시스템에서 발생한다. 예를 들면 etcd가 있다. 각 노드는 들어오는 요청을 처리해야하는데, 네트워크 파티셔닝등으로 클러스터가 분리되었을 때 각 노드가 내리는 결정이 '서로 상충되는 결정'이 될 수 있다. 이 문제에 대한 일반적인 접근 방식을 살펴보자.
- Conflict 방지 : 일반적으로 노드가 결정을 내리기 전에 시스템 상태에 대한 합의를 도출해야한다는 전제를 이용해 충돌을 방지함. 이를 위해 노드들 사이에 리더를 설정하고 유지해야하며, Node Quorum을 이용해 동기화 해야한다. (Raft, ZAB, Paxos)
- Conflict 발생을 인정하고, Conflict 해결 : 긴 latency나 주기적으로 연결할 수 없는 서버가 많은 환경에서 고려하는 절충안이다. 모든 노드의 합의 없이도 각 노드가 독립적으로 최종 상태를 결정할 수 있게 해준다. 그리고 각 노드가 결정한 결정 사항이 최종적으로는 동일한 결론이 될 수 있도록 충분한 메타 데이터를 보강해준다. (CRDTs)
두번째 방식은 긴 latency가 있는 경우, 모든 Node의 Consensus를 이루는데 네트워크 문제로 실패할 가능성이 매우 크기 때문에 위와 같은 방식이 고려되기도 한다.
3.3 클러스터 내부의 특정 리소스로 요청을 라우팅하는 방법은 무엇일까?
클러스터에 있는 노드들이 각 리소스들을 가지고 있는 상황을 가정해보자. (일반적으로 그렇다)
이 때, 클러스터가 커진다는 것은 클러스터가 보관하는 상태, 리소스 등이 비례해서 커지는 것을 의미한다. 그렇다면 데이터가 너무 많아지기 때문에 단일 노드만이 해당 정보를 가지기에는 무리가 있다. 또한 여러가지 약점(SPOF 등)이 존재할 수 있다. 이런 약점을 극복하기 위해서 여러 노드가 복제본을 가지고 있도록 할 수 있고, 리소스를 분할해서 각 노드에 보관하는 방법을 사용한다.
그런데 이 상황에서 클라이언트가 특정 리소스가 필요해서 어떤 노드에게 요청을 보냈다면, 이 정보를 어떻게 찾아서 반환할 수 있을까?
- Naive한 방법
- 해당 리소스의 복제본이 클러스터 내에 R개 있고, 클러스터 전체 노드 수가 N개 일 때 (N/R) + 1번 요청을 보냈을 때, 리소스에 도착할 수 있어야 한다.
- 불필요한 쿼리가 많이 발생하기 때문이 비효율적이다.
- 중앙 저장소 이용
- 모든 엔티티의 정보를 중앙 저장소에 저장한다.
- 중앙 저장소에서 해당 리소스를 가진 노드를 찾은 후에, 노드에 요청을 보낸다.
- 중앙 저장소가 SPOF가 될 수 있다.
- 큰 클러스터는 많은 리소스를 가지고, 리소스 ID 단건 - 노드 ID로 맵핑하기 때문에 클러스터 확장성에 문제를 가져온다.
- 중앙 저장소 + 파티셔닝
- 데이터를 파티션 단위로 묶고, 중앙 저장소에서는 (Partition ID - Node ID) 형태로 저장한다.
- 특정 노드는 특정 파티션의 데이터를 가지고 있다.
- 중앙 저장소에서 해당 리소스를 가진 노드를 찾은 후에, 노드에 요청을 보낸다.
- 클러스터가 커져도, 데이터가 파티션 단위로 압축되기 때문에 확장 가능한 설계다.
- 분산 해시 테이블 (Distirubted Hash Table)
- 특정 Key를 특정 해시 함수로 Hashing하여 어떤 노드가 어떤 키를 담당할지 결정한다.
- 각 노드는 필요한 데이터의 해시값을 구하면, 어떤 노드가 그 값을 담당하는지 알 수 있다. (Consistent Hash 같은 것들)
- 중앙 저장소가 필요없어서 구조 자체는 단순해짐.
- 노드가 추가/제거될 때 전체 Hash Ring을 재배열하는 비용이 커짐.
4. SWIM 프로토콜
SWIM 프로토콜은 멤버십 프로토콜의 일종이다.
4.1 SWIM 프로토콜 이론
SWIM 프로토콜을 구현하기 전에 먼저 고려해야 할 시나리오들은 다음과 같다.
- 새로운 노드가 클러스터에 참여하려고 한다. 어떻게 참여를 허가하고, 클러스터 멤버들에게 새로운 노드의 참여 정보를 알릴 수 있을까?
- 멤버가 클러스터에서 Graceful하게 떠나려고 한다. 어떻게 이 정보를 전파할 수 있을까?
- 어떤 멤버가 갑자기 종료되거나 연결할 수 없다. 어떻게 다른 멤버들에게 이 사실을 전파할 수 있을까?
1~2번은 매우 간단하다. 그러나 3번 시나리오 때문에 구현이 복잡해진다. 같은 IDC 센터에 있는 노드라도, 특정 노드끼리 연결된 구간이 실패할 수 있으며, 다른 노드들은 해당 노드에 접근할 수 있다. 이런 경우가 있기 때문데 어떤 멤버가 한번 응답하지 않더라도 바로 클러스터에서 제외하는 것은 바람직하지 않다. 만약 한번 응답하지 않는다고 노드를 클러스터에서 제외한다면, 그 클러스터는 매우 불안정한 클러스터가 될 것이다.
이런 우려사항은 다음을 고려해보면 해결할 수 있다.
어떤 노드는 다른 임의의 노드에게 HeartBeat을 보낼 수 있다. 예를 들어 A -> C로 Ping 메세지를 보내고, Timeout이 발생하기 전에 C -> A로 ACK 메세지가 올 것을 예상한다. 이 경우, A는 C가 여전히 정상적으로 동작한다고 판단할 수 있다.
A -> C로 Ping 요청을 보냈는데, C -> A로 ACK가 Timeout이 발생했다. 한번 응답을 못했다고 해서 응답못한 노드를 클러스터에서 매번 제외하면 불안정한 클러스터가 될 것이다. 따라서 아직까지는 C를 '죽은 노드'로 판단하지 않을 것이다. 대신에 모든 클러스터 멤버 (C 포함)에게 'Suspect C'라는 메세지를 전송한다.
각 노드들은 의심스러운 노드 C가 특정 시간 내에 확인되길 바라며 타이머 Alive(C)를 셋팅한다. 만약 타이머 Alive(C)가 Timeout 되기 전에 정보를 받으면 '의심스러운 노드 리스트'에서 C를 제거할 것이다.
이제 C가 살아있는지 확인이 필요하다. 이 때, A는 '의심스럽지 않는 노드'들 중에서 하나를 뽑아, 그 노드에게 C가 정상인지 대신 확인해달라는 메세지인 PING-REQ(C)를 전송한다. 이 때, E가 C로 Ping을 보낼 것이고 C가 E에게 ACK하면, C는 살아있고 A - C 네트워크 구간에 문제가 있었을 뿐이라는 것을 알게 된다.
반면 E가 C에게 ACK를 받지 못하면, A / E가 더블체크한 것이기 때문에 모든 클러스터 멤버들에게 'C'는 죽은 노드라고 알릴 수 있다.
그러나 DEAD(C)라는 메세지가 'C'가 실제로 죽은 상태라는 것을 의미하지는 않는다. 의심 노드 C가 제 때 응답하지 못한 이유는 '실제로 죽은 것' 외에도 다음 코너 케이스들이 존재할 수 있다.
- 다른 요청을 처리하느라 너무 바빠 HeartBeat에 너무 늦게 응답함.
- JVM Stop the world로 인해 응답 속도가 느려짐.
- Split Brain으로 인한 현상
- 이미 A,E와 B,C,D로 클러스터가 나눠졌을 수도 있음. A는 C가 죽었다고 판단, C는 A가 죽었다고 판단.
- 즉, C는 모르는 노드 A에게 PING 요청이 왔기 때문에 응답하지 않았을 수도 있음.
궁극적으로는 100% 신뢰할 수 있는 Failure Detection은 존재하지 않는다. 개선할 수 있는 방법은 Fales Alert을 줄이고, 죽은 노드를 감지하는데 걸리는 시간을 줄이는 것이다. 그러나 이 둘은 일반적으로는 Trade-Off 관계를 가진다.
위에서 언급한 시나리오 중 일부는 Lifeguard 프로토콜을 이용해 일부 보완할 수 있다.
5. SWIM 프로토콜 - 구현
조금 쉬운 구현을 위하 몇 가지 시나리오를 단순화한다. 이것은 SWIM 프로토콜을 더 쉽게 이해하는데 도움을 줄 수 있다.
- PING - ACK 메세지에 Sequential 번호 추가
- Sequential 번호는 죽은 노드가 부활하는 것을 방지하기 위해 필요하다.
- A 노드 -> B노드로 PING1을 보냄.
- B노드는 ACK1을 응답했으나, A 노드에 아직 도달하지 않음.
- A 노드 -> B노드로 PING2를 보냄.
- B노드는 죽어서 ACK2를 보낼 수 없다.
- 뒤늦게 ACK1이 A 노드에게 전달된다. 이 때, 죽은 노드 B는 살아있다고 판단된다.
- Sequential 번호는 죽은 노드가 부활하는 것을 방지하기 위해 필요하다.
- PING을 보낼 때 마다, MemberShip State를 함께 가십(전파)한다.
최적화 된 방법은 아니지만, 클러스터 규모가 작을 때는 잘 동작할 것이다. 좀 더 최적화 한다면, 현재 노드가 알고 있는 Membership State를 해싱해서 PING에 포함시켜서 보낸다. 그리고 ACK를 하는 노드는 자신이 알고 있는 Membership State의 해시와 다른 경우, ACK에 Membership State를 포함해서 보내는 방식이 될 수 있다.
F#으로 작성된 원본 코드는 이곳에서 확인할 수 있다.
6. SWIM 프로토콜 구현 전제조건
실제 노드 간 통신은 TCP 프로토콜을 통해서 이루어질 것이다. 그러나 실제 통신을 고려한다면, SWIM 프로토콜에서 TCP Handshake, 데이터의 직렬화/역직렬화 등을 구현해야한다. 이런 이유 때문에 SWIM 프로토콜을 구현하는 것보다 주변의 중요하지 않은 것들을 구현하는데 많은 시간이 걸릴 것이다.
이런 구현의 복잡성을 제거하고, 프로토콜 자체에만 집중하기 위해서 Actor 모델을 기반으로 프로세스 통신을 모사하고 SWIM 프로토콜을 구현할 것이다. 원문에서는 F#의 Akka.NET/Akkling이 사용되는데, 이 글에서는 erlang으로 포팅하여 구현하고자 한다. 구현 코드의 전체는 이곳에서 확인할 수 있다.
6.1 Actor 초기화
가장 먼저 Actor를 초기화하는 코드를 작성한다.
- start_link()를 통해 Actor Process를 생성한다.
- 생성된 Actor Process는 init()를 호출해서 자기 자신을 초기화한다.
start_link(ClusterNodes, MyAlias) ->
ClusterNodesWithPid = [whereis(ClusterNodeAlias) || ClusterNodeAlias <- ClusterNodes],
gen_server:start_link(?MODULE, [ClusterNodesWithPid, MyAlias], []).
- init() 함수 내에는 Actor Process가 초기화 하는데 필요한 로직들을 추가한다.
- 자신이 클러스터의 첫번째 멤버가 아니라면, 자기 자신에게 Joining 메세지를 보낸다. 그러면 초기화가 완료된 이후, 자기 자신에게 도착한 Joining 메세지를 확인한 후 다음 작업을 하게 될 것이다.
- 자신이 클러스터의 첫번째 멤버라면, 스스로가 클러스터를 구성하면 된다. 따라서 어떤 곳에도 Joining 메세지를 보낼 필요가 없다.
init([ClusterNodes, MyAlias]) ->
io:format("[~p] swim_node ~p try to intialized ~n", [self(), self()]),
erlang:register(MyAlias, self()),
{ActiveCluster, MyState} = case ClusterNodes of
% 클러스터의 첫번째 노드일 때,
[] ->
Actives = #{self() => #node_state{state=alive}},
{Actives, ready};
ClusterNodes ->
JoiningMsg = joining_message(ClusterNodes),
timer:apply_after(0, gen_server, cast, [self(), JoiningMsg]),
{#{}, joining}
end,
case MyState of
ready ->
io:format("[~p] swim_node ~p has been ready ~n", [self(), self()]),
NextRoundMsg = next_round_message(self()),
send_message(NextRoundMsg, self());
_ -> ok
end,
State = #?MODULE{my_self=self(), my_state=MyState, actives=ActiveCluster},
{ok, State}.
6.2 클러스터 조인 과정
초기화 과정에서 Actor 프로세스는 자기 자신에게 Joining 메세지를 보냈었다. 초기화가 완료되면 Actor 프로세스는 아래 코드를 실행한다.
- 만약 알고 있는 클러스터 노드가 없다면, Join 메세지를 보낼 곳이 없다. 명백한 오류이기 때문에 error(failed_to_join)으로 프로세스를 종류한다.
- 가장 먼저 Node에게 JoinMsg를 비동기적으로 전송한다.
- 그러나 Node에게 보낸 JoinMsg가 여러가지 문제로 인해 처리되지 않을 수 있다. (Node가 너무 바쁘다든지, 네트워크 파티셔닝이 있다든지). 따라서 Node에게 보낸 JoinMsg에 응답이 오지 않을 때를 대비해서, 자기 자신에게 다시 한번 다른 클러스터에세 JoinMsg를 보내라는 JoiningMsg를 비동기적으로 예약할 수 있다.
handle_cast({joining, KnownClusterNodes}, State) ->
io:format("[~p] node ~p try to join the clusters... ~n", [self(), self()]),
MaybeTimer =
case KnownClusterNodes of
[] ->
error(failed_to_join);
[Node | Rest] ->
JoinMsg = join_message(self()),
send_message(JoinMsg, Node),
% If Join request fail, node should send request to other known node.c
% Otherwise, it cannot join the cluster forever.
JoiningMsg = joining_message(Rest),
{ok, Timer} = timer:send_after(?DEFAULT_INIT_JOIN_TIMEOUT, JoiningMsg),
Timer
end,
NewState = State#?MODULE{try_join_timer=MaybeTimer},
{noreply, NewState};
만약 클러스터 멤버가 새롭게 가입하려고 하는 노드로부터 JoinMsg를 성공적으로 받았다면, 아래 코드가 실행된다.
- 새롭게 클러스터에 들어오려는 노드를 자신의 State에 보관하고 있던 Active 노드에 넣어서 NewActives를 생성한다.
- 자기 자신을 제외하고, 자신이 알고 있는 모든 Active 노드에게 JoinedMsg를 보낸다.
- JoinedMsg에는 자신이 알고 있는 Actives가 포함된다. 이 Actives는 Gossip을 위한 것이다. 이를 통해 모든 노드들에게 상태가 동기화되는 형식이다.
handle_cast({join, Peer}, State) ->
io:format("[~p] swim_node ~p receive join request from ~p ~n", [self(), self(), Peer]),
#?MODULE{actives=Actives} = State,
NewlyJoinedNodeState = #node_state{state=alive},
NewActives = maps:put(Peer, NewlyJoinedNodeState, Actives),
JoinedMsg = joined_message(NewActives),
lists:foreach(
fun
(Pid) when Pid =/= self() -> send_message(JoinedMsg, Pid);
(_Pid) -> ok
end, maps:keys(NewActives)),
NewState = State#?MODULE{actives=NewActives},
{noreply, NewState};
Join을 허락한 노드로부터 JoinedMsg를 다른 노드들은 받는다. 이 때, 클러스터에 가입을 요청한 노드도 자신이 가입되었다는 'JoinedMsg'를 받는다. 따라서 여기서는 노드별로 수행하는 조건의 분리가 필요하다.
- JoinedMsg는 이전 노드가 알고 있던 가장 최신의 상태가 GossipedActives에 포함되어 전파된다. 따라서 해당 Gossip을 현재 내가 알고 있는 최신 상태와 비교하여 Merge 한다.
- 만약 조인 요청을 보낸 노드라면,
- Joining 요청을 보내기 위한 Timer의 예약을 취소하고 자기 자신의 State도 업데이트한다.
- 클러스터 멤버로 동작하기 위해 NEXT_ROUND라는 메세지를 보낸다.
handle_cast({joined, GossipedActives}, #?MODULE{try_join_timer=PreviousTimer}=State) ->
io:format("[~p] swim_node ~p receive joined gossip~n", [self(), self()]),
NewState0 = merge_gossip_into_actives_(State, GossipedActives),
NewState = case is_joining_node(NewState0) of
true ->
timer:cancel(PreviousTimer),
process_task_and_update_after_joined(NewState0);
false -> NewState0
end,
{noreply, NewState};
아래는 Gossip된 최신 정보와 현재 내가 알고 있는 최신 정보를 Merge하는 상세 코드다.
- Gossip과 Local State에 모두 있던 정보라면 각 정보에 대한 incarnation 값을 비교한다. 그리고 Gossip된 정보의 incarnation이 큰 경우에만 Gossip 정보를 반영한다.
- 그렇지 않은 경우라면 기존 정보를 유지한다.
이 코드가 필요한 것은 비동기적인 SWIM 프로토콜에서 발생할 수 있는 Concurrency 문제를 발생하기 위함이다. 예를 들어 아래 경우가 발생할 수 있다.
- 10초전 노드 A Alive 메세지가 전송되었다.
- 5초전 노드 A Dead 메세지가 전송되었고, 모든 노드가 그 메세지를 받고 A가 죽었다고 마킹했다.
- 10초전에 보낸 Alive 메세지가 네트워크 문제로 인해 이제 다른 노드들에게 도착했다. 이때 죽은 노드 A는 다시 alive라고 마킹된다.
분산 환경에서는 네트워크 문제로 인해 늦게 발송된 메세지가 빨리 도착할 수도 있다. 이런 것들이 동시성 이슈를 야기하고, 이런 죽은 노드의 부활 같은 동시성 문제를 막기 위해 최신 정보를 나타내는 Incarnation을 도입하고, Incarnation을 비교해서 가장 최신의 정보일 때만 업데이트 한다.
merge_gossip_into_actives_(State, GossipedActives) ->
#?MODULE{actives=Actives} = State,
GossipedKeys = maps:keys(GossipedActives),
MergedActives =
lists:foldl(
fun(Key, Acc) ->
MaybeKnownValue = maps:get(Key, Actives, undefined),
GossipedValue = maps:get(Key, GossipedActives),
case {MaybeKnownValue, GossipedValue} of
{undefined, _} ->
maps:put(Key, GossipedValue, Acc);
{#node_state{incarnation=KnownIncarnation}, #node_state{incarnation=GossipedIncarnation}} when GossipedIncarnation >= KnownIncarnation ->
maps:put(Key, GossipedValue, Acc);
_ ->
Acc
end
end, Actives, GossipedKeys),
State#?MODULE{actives=MergedActives, try_join_timer=undefined}.
6.3 Join 잘되는지 확인하기
방금 위에서 작성한 코드를 통해 Join이 잘되는지 확인해보자. 확인하기 위해 아래 코드를 작성했다.
- 노드 A는 클러스터의 첫번째 멤버가 된다.
- 1초를 쉰 후에, 노드 B는 멤버가 A로 있는 클러스터에 가입 요청을 보낸다.
swim_node:start_link([], 'A'),
timer:sleep(1000),
swim_node:start_link(['A'], 'B'),
timer:sleep(1000),
위 코드의 실행 결과 생성되는 로그를 살펴보면 아래와 같다.
- <0.91.0>은 노드 A의 PID다
- <0.93.0>은 노드 B의 PID다
- 5~8번 라인을 보면 노드 B가 클러스터에 가입하려고 하고, 노드 A가 요청을 받은 후 Joined Message를 가십했다는 것을 확인했다.
[<0.91.0>] swim_node <0.91.0> try to intialized
[<0.91.0>] swim_node <0.91.0> has been ready
[<0.91.0>] got received {next_round, <0.91.0>}
[<0.93.0>] swim_node <0.93.0> try to intialized
[<0.93.0>] node <0.93.0> try to join the clusters...
[<0.91.0>] swim_node <0.91.0> receive join request from <0.93.0>
[<0.93.0>] swim_node <0.93.0> receive joined gossip
[<0.93.0>] swim_node <0.93.0> has been ready
이를 통해서 멤버가 SWIM 프로토콜을 통해 멤버에 가입할 수 있다는 것을 확인했다.
6.4 NEXT ROUND + PING + PING_ACK 구현
SWIM 클러스터의 노드들은 각 라운드마다 자신이 알고 있는 살아있는 노드들 중 하나에게 Ping을 보내서 여전히 살아있는지 확인한다. 즉, 노드들은 살아있는 동안 각 NEXT ROUND를 무한히 반복하면서 다른 멤버의 상태를 확인하고 가십하면서 클러스터 전체의 상태를 맞춰가는 형태로 동작한다.
- send_next_round_msg()를 호출해 자기 자신에게 next_round 메세지를 보낼 것을 예약한다. 만약 Actor 프로세스가 살아있다면, 다음에 이 메세지를 받은 후 다시 한번 이 handle_cast({next_round}, ...)라는 함수를 실행한다.
- Ping 메세지를 받을 대상을 먼저 필터링 한다. 자기 자신을 필터링 하고, 자신이 알고 있는 State 중 alive인 노드에게만 메세지를 보내도록 필터링 한다.
- 위 목록에서 하나의 노드를 선택한 후, 해당 노드에게 Ping Msg를 전송한다. 그리고 자기 자신에게는 Ping timeout Msg를 전송한다.
- Ping Msg는 비동기적으로 발송되는 메세지다. 따라서 상대방에게 어떤 응답이 오는지 알 수 없기 때문에 자기 자신에게 Ping timeout Msg를 전송한다.
- 만약 Ping timeout 메세지가 자기 자신에게 올 때까지 Ping ACK 메세지를 받지 못했다는 것은 Ping 메세지를 받은 노드가 응답하지 못하는 상태에 있다는 것을 증명하기 때문이다.
handle_cast({next_round, Pid}, #?MODULE{suspects=Suspects, actives=Actives}=State0) ->
io:format("[~p] got received {next_round, ~p}~n", [self(), Pid]),
send_next_round_msg(),
PeersToReceiveMsgMap = filter_useless_nodes_from_actives(Actives),
State =
case pick_one_randomly(PeersToReceiveMsgMap) of
undefined ->
State0;
MaybeSuspectPid ->
io:format("[~p] ~p send ping msg to ~p~n", [self(), self(), MaybeSuspectPid]),
PingMsg = ping_message(self(), Actives),
send_message(PingMsg, MaybeSuspectPid),
PingTimeoutMsg = ping_timeout_message(MaybeSuspectPid),
{ok, Timer} = send_message_after(?DEFAULT_PING_TIMEOUT, PingTimeoutMsg, self()),
SkipList = [self(), MaybeSuspectPid],
WaitPingAck = #wait_ping_ack{skip_list=SkipList, timers=[Timer], state=first},
SuspectValue = {wait_ping_ack, WaitPingAck},
UpdatedSuspects = maps:put(MaybeSuspectPid, SuspectValue, Suspects),
State0#?MODULE{suspects=UpdatedSuspects}
end,
{noreply, State};
Ping 메세지를 받은 노드는 아래 코드 블록을 실행한다.
- Ping 메세지를 준 노드에게 Ping Ack 메세지를 응답해야한다.
- Ping 메세지에는 이전 노드가 가지고 있던 최신 상태인 GossipedActives가 있다. 자신이 알고 있는 최신 상태와 가쉽된 정보의 Incarnation을 비교해서 최신 상태로 업데이트한다. 즉, 여기서도 최신 정보가 가쉽된다.
handle_cast({ping, From, GossipedActives}, #?MODULE{actives=Actives}=State0) ->
io:format("[~p] ~p got ping message from ~p~n", [self(), self(), From]),
PingAckMsg = ping_ack_message(self(), Actives),
send_message(PingAckMsg, From),
State = merge_gossip_into_actives_(State0, GossipedActives),
{noreply, State};
6.5 PING_TIMEOUT + PING_REQ 구현
성공적인 경우라면 제시간에 PING 메세지에 대한 PING_ACK 메세지가 온다. 그러나 늘 그렇듯이, PING_ACK 메세지가 오기 전 PING_TIMEOUT 메세지가 도착할 수 있다. 이때, 이론 단계에서 정의한 것처럼 현재 노드는 다른 노드에게 해당 노드가 살아있는지 체크해달라는 PING_REQ 메세지를 보낸다.
참고 : 논문에 따르면 PING_REQ를 보낼 때 최소 3개의 노드에 요청을 보내면, 실패 감지에 더 효과적이라고 한다.
아래 코드로 먼저 ping_timeout 메세지를 처리한다.
- ping_timeout 메세지를 받았으면, 먼저 자신의 State에서 Suspects를 가져온다. Suspects에는 각 Pid별로 현재 노드가 Suspect와 관련해 예약한 타이머가 존재한다.
- wait_ping_ack 상태
- 이 상태는 다른 노드가 살아있는지 체크하는 상태라는 것을 의미한다. 여기에는 ping, ping_req 메세지를 보낸 상태가 모두 포함된다.
- {_Timers, first}의 패턴 매치에서는 해당 노드가 Ping 메세지를 보낸 후, ping_timeout을 받았다는 것을 의미한다.
- 이 때 알고 있는 살아있는 노드가 있다면 send_ping_req_message_max_n_times()를 호출해 ping_req 메세지를 여러 노드들에 전송한다.
- 마찬가지로 자기 자신에게 ping_timeout 메세지를 예약한다. 이것은 3개 발송된 ping_req 메세지 전체에 대한 타이머가 된다.
- {_Timers, ping_req}의 패턴 매치에서는 해당 노드가 ping_req 메세지를 보낸 후, ping_timeout을 받았다는 것을 의미한다.
- ping, ping_req가 모두 실패했기 때문에 자신이 알고 있는 모든 노드들에게 '그 노드'가 Suspected된 상태라는 Suspected 메세지를 전송한다.
- wait_ping_req_ack 상태
- 이것은 ping_req 메세지를 받은 노드가 ping 메세지를 다른 노드에 보낸 후에 응답을 기다리다가 ping timeout을 맞은 것을 의미한다.
- 이 때, ping_req 메세지를 받은 노드는 Suspected 메세지를 전송해야하는 책임을 ping_req 메세지를 보낸 노드에게 위임한다. 그렇지 않으면 코드가 더 복잡해지기 때문이다.
handle_cast({ping_timeout, Suspect}, #?MODULE{suspects=Suspects0, actives=Actives}=State0) ->
io:format("[~p] node ~p received ping_timeout message. suspect is ~p~n", [self(), self(), Suspect]),
State =
case maps:get(Suspect, Suspects0, undefined) of
{wait_ping_ack, WaitPingTimerRecord} ->
#wait_ping_ack{skip_list=SkipList, timers=Timers, state=WaitPingState} = WaitPingTimerRecord,
case {Timers, WaitPingState} of
% In case of first NEXT round is failed, and in trying to ping_req.
{_Timers, ping_req} ->
gossip_suspect_node(State0, Suspect);
% In case of first NEXT round try
{_Timers, first} ->
OtherCandidates = maps:filter(
fun(Pid, _) ->
not lists:member(Pid, SkipList)
end, Actives),
case OtherCandidates of
[] ->
% There is no node to delegate ping_req at all.
gossip_suspect_node(State0, Suspect);
OtherCandidates ->
send_ping_req_message_max_n_times(?DEFAULT_PING_REQ_TRY, OtherCandidates, Actives, Suspect),
PingTimeoutMsg = ping_timeout_message(Suspect),
{ok, Timer} = timer:apply_after(?DEFAULT_PING_TIMEOUT, swim_node, send_message, [PingTimeoutMsg, self()]),
WaitPingAck = #wait_ping_ack{timers=[Timer], skip_list=SkipList, state=ping_req},
SuspectValue = {wait_ping_ack, WaitPingAck},
UpdatedSuspects = maps:put(Suspect, SuspectValue, Suspects0),
State0#?MODULE{suspects=UpdatedSuspects}
end
end;
{wait_ping_req_ack, _WaitingPingReqAck} ->
% delegate to determine it's suspected.
% If node which is responsible of ping_req don't response ping_ack to requester node until timer bomb,
% The request node's timer will be send timeout message to request node.
% Then, request node will broadcast suspected message.
State0;
undefined ->
% erlang has concurrency scenario
% 1. suspect timer is reserved
% 2. suspect decline by sending alive msg.
% 3. Node started to handle alive msg.
% 4. However, the timer has been expired, so ping_timeout message is already reach to node's postbox.
% 5. Node cancel all timers successfully. however, ping_timeout message still in node's postbox.
% To prevent for situation above to kill the current node process, we added 'undefined' clauses.
State0;
_ ->
State0
end,
{noreply, State};
이제 ping_req 메세지를 받은 노드는 아래 코드를 실행한다.
- Ping 메세지를 타켓 노드에게 보낸다.
- 스스로에게 Ping timeout 메세지를 예약한다.
- 가쉽된 최신 정보를 Incarnation을 고려하여 Merge해서 자신의 State에 업데이트한다.
handle_cast({ping_req, Requester, Suspect, GossipedActives}, State0) ->
io:format("[~p] ~p got ping_req message from ~p~n", [self(), self(), Requester]),
#?MODULE{actives=Actives, suspects=Suspects}=State0,
PingMsg = {ping, self(), Actives},
send_message(PingMsg, Suspect),
PingTimeoutMsg = ping_timeout_message(Suspect),
{ok, Timer} = send_message_after(?DEFAULT_PING_TIMEOUT, PingTimeoutMsg, self()),
WaitPingReqAck = #wait_ping_req_ack{requester=Requester, timers=[Timer]},
SuspectValue = {wait_ping_req_ack, WaitPingReqAck},
UpdatedSuspects = maps:put(Suspect, SuspectValue, Suspects),
State1 = #?MODULE{suspects=UpdatedSuspects},
State = merge_gossip_into_actives_(State1, GossipedActives),
{noreply, State};
6.6 Suspected - Alive 반박 - WAIT_CONFIRM Timeout
위 코드에서 ping_req 메세지 조차 응답하지 않은 경우, 해당 노드가 죽었는지 의심된다는 'Suspected' 메세지를 클러스터 내에 브로드 캐스팅한다. 여기서는 Suspected 메세지를 받은 노드들이 어떻게 대응하는지, 그리고 Alive 반박에 대해서 어떻게 응답하는지에 대한 코드를 구현한다.
Suspected 메세지를 받은 노드인데, 그것이 만약 자기 자신인 경우에 Alive Message를 보내서 반박할 수 있다. 즉, 자신의 Suspected + Dead 상태를 Alive로 바꿀 수 있다. 그 내용은 아래에 구현되어있다.
- 먼저 가쉽된 정보를 자신에게 업데이트한다.
- 가쉽된 정보, 자신이 알고 있는 정보에서 Incarnation을 각각 찾아서 최대값을 기준으로 1을 더해서 새로운 Incarnation을 만들어낸다.
- 이렇게 하지 않으면 다른 노드들이 알고 있는 Incarnation보다 더 작은 Incarnation으로 Alive 메세지를 보낼 수도 있다. 그렇게 되면, 해당 노드들은 '오래된 정보'라고 판단하고 Alive 메세지를 무시한다. 이 케이스를 해결하기 위해 아래와 같이 작성한다.
handle_cast({suspected, Suspect, GossipedIncarnation, GossipedActives}, State0) ->
...
case Self =:= Suspect of
% Case1. me is in suspected.
true ->
State1 = merge_gossip_into_actives_(State0, GossipedActives),
#?MODULE{actives=Actives1} = State1,
GossipedIncarnation = get_incarnation_by_pid(GossipedActives, Self),
LocalIncarnation = get_incarnation_by_pid(Actives0, Self),
NewIncarnation = max(GossipedIncarnation, LocalIncarnation) + 1,
NewNodeState = #node_state{incarnation=NewIncarnation, state=alive},
Actives = maps:put(Suspect, NewNodeState, Actives1),
ActivePeerListExceptSelf = lists:delete(Self, maps:keys(Actives)),
AliveMsg = alive_message(Self, NewIncarnation, Actives),
send_messages(AliveMsg, ActivePeerListExceptSelf),
State1;
...
반면 Suspected된 노드가 자신이 아니라면, 가쉽된 정보를 반영하면 된다. 그러나 논문에서는 Suspect 메세지를 받았을 때 바로 Dead 처리하는 것이 아니라 일정 시간 조금 더 기다릴 것을 권장한다. 만약 특정 기간 내에 Alive 메세지가 도착하지 않으면 Suspect에서 Dead 상태로 바꾸라는 것이다.
그렇게 하지 않으면, 노드가 너무 빨리 클러스터에서 빠져나갔다가 다시 Alive 되는 경우가 있을 수 있다. 이런 일들이 많이 반복되게 되면 SWIM 클러스터 자체가 굉장히 불안정해진다. 이런 케이스를 방지하기 위해서 약간의 유예시간을 준다. 이점을 고려해서 아래에 코드를 구현했다.
- 자기 자신에게 WAIT_CONFIRM 이라는 메세지를 보낼 것을 예약한다. 이 메세지가 도착하기 전에 ALIVE 메세지가 도착하지 않으면 Suspect된 노드는 이윽고 DEAD가 될 것이다.
- 그 외에도 다른 가쉽된 정보를 받아서 최신 정보로 업데이트한다.
이 때, GossipedIncarnation을 직접 받아서 수작업으로 처리하고 있는 것을 볼 수 있다. 사실 GossipedActives에 해당 정보가 모두 포함되어있는데, 명시적으로 특정 노드가 표현되는 것을 알려주기 위해 Suspect + GossipedIncarnation을 도입했다.
handle_cast({suspected, Suspect, GossipedIncarnation, GossipedActives}, State0) ->
...
case Self =:= Suspect of
% Case1. me is in suspected.
true ->
...
false ->
TimeoutMsg = {wait_confirm_timeout, Suspect},
{ok, Timer} = send_message_after(?DEFAULT_WAIT_CONFIRM_TIMEOUT, TimeoutMsg, Self),
NewState0 = merge_gossip_into_actives_(Actives0, GossipedActives),
MaybeNewNodeStateOfSuspect =
apply_diff_with_considering_incarnation(Suspect, suspected, Actives0, GossipedIncarnation),
NewState1 = put_it_into_actives_get_state(Suspect, MaybeNewNodeStateOfSuspect, NewState0),
SuspectValue = {wait_confirm, Timer},
UpdatedSuspects = maps:put(Suspect, SuspectValue, Suspects0),
NewState1#?MODULE{suspects=UpdatedSuspects}
end,
{noreply, State};
자신이 죽었다고 의심받은 노드는 Alive 메세지를 브로드캐스팅 한다고 했다. 각 노드는 Alive 메세지를 받으면 아래 코드를 실행한다.
- 만약 해당 노드가 wait_confirm 상태이고, 전달된 메세지의 Incarnation이 더 큰 경우에 Suspected 된 노드를 Alive로 바꾼다.
- 그렇지 않으면 메세지를 무시한다.
handle_cast({alive, Suspect, GossipedIncarnation, GossipedActive}, State0) ->
#?MODULE{actives=Actives0, suspects=Suspects0}=State0,
State =
case maps:get(Suspect, Suspects0, undefined) of
{wait_confirm, WaitConfirmRecord} ->
LocalIncarnation = get_incarnation_by_pid(Actives0, Suspect),
case LocalIncarnation < GossipedIncarnation of
true ->
cancel_timers(get_timers(WaitConfirmRecord)),
UpdatedSuspects = maps:remove(Suspect, Suspects0),
State1 = merge_gossip_into_actives_(State0, GossipedActive),
State1#?MODULE{suspects=UpdatedSuspects};
false ->
merge_gossip_into_actives_(State0, GossipedActive)
end;
_ ->
merge_gossip_into_actives_(State0, GossipedActive)
end,
{noreply, State};
특정 노드를 WAIT_CONFIRM 상태로 바라보고 있는 와중에, ALIVE 메세지보다 WAIT_CONFIRM_TIMEOUT 메세지가 먼저 도착한 경우에는 해당 노드는 죽었다고 판단한다.
- 특정 노드의 상태를 업데이트 하기 때문에 현재 알고 있는 노드의 Incarnation에 1을 더해서 새로운 Incarnation을 만든다.
- 만든 Incarnation 정보를 동봉해 자신이 알고 있는 모든 노드들에게 Left Msg를 전송한다. Left Msg는 해당 노드가 죽었다는 것을 의미하는 메세지이다.
handle_cast({wait_confirm_timeout, Suspect}, State0) ->
State = leave_suspect_node(State0, Suspect),
{noreply, State};
...
leave_suspect_node(#?MODULE{actives=Actives0, suspects=Suspects0}=State, Suspect) ->
Suspects = maps:remove(Suspect, Suspects0),
NewIncarnation = get_incarnation_by_pid(Actives0, Suspect) + 1,
NewNodeState = #node_state{state=dead, incarnation=NewIncarnation},
Actives = maps:put(Suspect, NewNodeState, Actives0),
ShouldReceiveMsgPeerMaps0 = maps:remove(self(), Actives),
ShouldReceiveMsgPeerMaps1 = maps:remove(Suspect, ShouldReceiveMsgPeerMaps0),
PeersToReceive = maps:keys(ShouldReceiveMsgPeerMaps1),
LeftMsg = left_node_message(Suspect, NewIncarnation, Actives),
send_messages(LeftMsg, PeersToReceive),
State#?MODULE{actives=Actives, suspects=Suspects}.
6.7 Left 구현
이미 스스로가 WAIT_CONFIRM_TIMEOUT 메세지를 받아서 해당 노드를 죽었다고 판단했을 수도 있지만, 다른 노드로부터 특정 노드가 죽었다는 Left 메세지를 받을 수도 있다. 이때, Left 메세지를 받은 노드는 아래 코드 블록을 실행한다.
코드 자체는 길지만 Gossiped된 Incarnation 정보를 확인해서, 최신 정보라고 판단되면 Actives에서 해당 노드를 제거하고 자신의 State에 저장하는 방식이다.
handle_cast({left, LeftNode, GossipedIncarnation, GossipedActives}, State0) ->
#?MODULE{actives=Actives0, suspects=Suspects0}=State0,
Suspects =
case maps:get(LeftNode, Suspects0) of
undefined -> Suspects0;
{_, Record} ->
cancel_timers(get_timers(Record)),
maps:remove(LeftNode, Suspects0)
end,
Actives1 =
case maps:is_key(LeftNode, Actives0) of
true ->
MaybeNewNodeStateOfLeftNode =
apply_diff_with_considering_incarnation(LeftNode, dead, Actives0, GossipedIncarnation),
StateWithNewLeftNode = put_it_into_actives_get_state(LeftNode, MaybeNewNodeStateOfLeftNode, State0),
#?MODULE{actives=MergedActives} = StateWithNewLeftNode,
MergedActives;
false -> Actives0
end,
GossipedActivesExceptLeftNode = remove_node_state(LeftNode, GossipedActives),
State1 = State0#?MODULE{actives=Actives1},
State2 = merge_gossip_into_actives_(State1, GossipedActivesExceptLeftNode),
State = State2#?MODULE{suspects=Suspects},
{noreply, State};
7. 구현 코드 실행
분산된 Actor가 비동기적으로 메세지를 주고 받으면서 상호작용하기 때문에 많은 경우를 테스트 해보기는 어려워 세 가지 경우만 테스트해보았다. 아래에서는 실행하는 방법과 그 로그 메세지를 확인해보고자 한다.
7.1 일반적인 경우
아래 코드는 노드 2개로 이루어진 SWIM 클러스터가 성공적으로 구성되고, 모든 노드가 PING 메세지에 정상적으로 응답해서 클러스터가 잘 유지되는 경우를 모사한다.
main() ->
% normal case
swim_node:start_link([], 'A'),
timer:sleep(1000),
swim_node:start_link(['A'], 'B'),
timer:sleep(1000).
아래 코드로 실행할 수 있고, 거기서 나온 로그를 볼 수 있다. 로그에는 현재 어떤 메세지를 각 노드들이 주고 받는지를 잘 볼 수 있다.
1> swim_node:main().
% 노드 A 초기화
[<0.91.0>] swim_node <0.91.0> try to intialized
[<0.91.0>] swim_node <0.91.0> has been ready
[<0.91.0>] got received {next_round, <0.91.0>}
% 노드B 초기화
[<0.93.0>] swim_node <0.93.0> try to intialized
[<0.93.0>] node <0.93.0> try to join the clusters...
% 노드 A Join 메세지 수신
[<0.91.0>] swim_node <0.91.0> receive join request from <0.93.0>
% 노드 B Joined 메세지 수신
[<0.93.0>] swim_node <0.93.0> receive joined gossip
[<0.93.0>] swim_node <0.93.0> has been ready
% 노드 A NEXT round 메세지 수신 -> ping 메세지 전송 to 노드 B
[<0.91.0>] got received {next_round, <0.91.0>}
[<0.91.0>] <0.91.0> send ping msg to <0.93.0>
% 노드 B는 ping 메세지를 수신 후, ping ack 메세지 전송 to 노드 A
[<0.93.0>] <0.93.0> got ping message from <0.91.0>
[<0.91.0>] <0.91.0> got ping_ack message from <0.93.0>
% 노드 B Next round 메세지 수신 -> 노드 A에게 ping 메세지 전송
[<0.93.0>] got received {next_round, <0.93.0>}
[<0.93.0>] <0.93.0> send ping msg to <0.91.0>
% 노드 A ping 메세지 수신 후, ping_ack 메세지 응답.
[<0.91.0>] <0.91.0> got ping message from <0.93.0>
...
그리고 각 노드가 가지고 있는 자신의 State를 아래 코드로 살펴볼 수 있다. 아래 코드에서 확인할 수 있듯이 노드 A, B는 모두 동일한 Actives 상태를 자신의 로컬 State에 저장하고 있다.
3> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{},undefined}
4> sys:get_state(<0.93.0>).
{swim_node,ready,<0.93.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{},undefined}
7.2 노드 B가 죽어서 클러스터 멤버에서 제외되는 경우.
아래 코드를 이용해서 특정 멤버가 클러스터를 떠나는 시나리오를 모사했다. 여기서는 노드 B가 클러스터에서 떠나는 것을 의미한다.
swim_node:start_link([], 'A'),
timer:sleep(1000),
swim_node:start_link(['A'], 'B'),
timer:sleep(1000),
exit(whereis('B'), exit).
코드를 실행했을 때 발생하는 로그와 중간 중간 Actor의 상태를 확인해서 정상 동작하는지 살펴보았다.
2> swim_node:main().
% 노드 A 초기화 완료
[<0.91.0>] swim_node <0.91.0> try to intialized
[<0.91.0>] swim_node <0.91.0> has been ready
[<0.91.0>] got received {next_round, <0.91.0>}
% 노드 B 초기화 완료
[<0.93.0>] swim_node <0.93.0> try to intialized
[<0.93.0>] node <0.93.0> try to join the clusters...
[<0.91.0>] swim_node <0.91.0> receive join request from <0.93.0>
% 노드 B 클러스터 가입 완료
[<0.93.0>] swim_node <0.93.0> receive joined gossip
[<0.93.0>] swim_node <0.93.0> has been ready
% 가입 완료되자마자 노드 B는 죽음. exit(whereis('B'), exit).
% 노드 A 상태 확인 -> 노드 A,B 모두 alive 상태. incarnation은 모두 0.
% 노드 A는 아직 노드 B의 상태를 체크하기 전이다.
3> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{},undefined}
% 노드 A는 노드 B에게 Ping을 날린다.
[<0.91.0>] got received {next_round, <0.91.0>}
[<0.91.0>] <0.91.0> send ping msg to <0.93.0>
% 노드 A의 상태를 확인해보면 아래와 같다.
% 현재 ping_ack를 기다리는 상태 (wait_ping_ack)이고, 타이머가 존재한다.
4> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{<0.93.0> =>
{wait_ping_ack,{wait_ping_ack,[<0.91.0>,<0.93.0>],
[{once,#Ref<0.4044132323.2616721418.254816>}],
first}}},
undefined}
% 노드 B ping 메세지에 대한 ping_timeout 메세지를 수신했다.
[<0.91.0>] node <0.91.0> received ping_timeout message. suspect is <0.93.0>
[<0.91.0>] node <0.91.0> received ping_timeout message. suspect is <0.93.0>
...
% 노드 A의 State를 확인해보면, 노드 B의 Incarnation이 1 증가하고 Suspected 상태로 바뀌었다.
% 그리고 노드 B는 wait_confirm 상태가 되었다.
5> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,1,suspected}},
#{<0.93.0> =>
{wait_confirm,<0.93.0>,
{once,#Ref<0.4044132323.2616721418.254837>}}},
undefined}
% 노드 B로부터 wait_confirm_timeout 메세지를 받았다.
% 이제 노드 A는 노드 B를 Dead라고 Mark해서 자신의 State에 저장하고 다른 노드에 브로드캐스팅한다.
% 그러나 현재 클러스터에는 노드A만 있기 때문에 전파할 노드가 없다.
[<0.91.0>] node <0.91.0> received wait_confirm_timeout message. suspect is <0.93.0>
% 노드 A의 State를 확인해보면 노드 B의 Incarnation이 2로 증가해있고, Dead 상태로 된 것을 볼 수 있다.
6> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,2,dead}},
#{},undefined}
7.3 노드 B가 Suspected되나 Alive 반박해서 살아나는 코드
각 노드는 자신이 죽은 것으로 의심된다는 메세지를 받았을 때, Alive 메세지를 브로드캐스팅 해서 자신의 상태를 Alive로 바꿀 수 있다고 했다. 아래 코드로 이 시나리오를 테스트 하려고 한다.
아래에서 사용된 hibernate_while() 코드는 노드 B가 30000ms(30초)동안 어떠한 메세지에도 응답하지 않도록 의도된 것이다.
main() ->
% scenario
% 1. 'A' suspect 'B'.
% 2. 'B' decline 'A' by sending Alive Message.
% 3. 'B' still is member of cluster.
process_flag(trap_exit, true),
swim_node:start_link([], 'A'),
timer:sleep(1000),
swim_node:start_link(['A'], 'B'),
timer:sleep(1000),
swim_node:hibernate_while(30000, whereis('B')).
코드가 실행된 결과와 해석은 아래에 작성했다.
% 노드 A 초기화 완료
[<0.91.0>] swim_node <0.91.0> try to intialized
[<0.91.0>] swim_node <0.91.0> has been ready
[<0.91.0>] got received {next_round, <0.91.0>}
% 노드 B 초기화 완료
[<0.93.0>] swim_node <0.93.0> try to intialized
[<0.93.0>] node <0.93.0> try to join the clusters...
% 노드 B 클러스터에 조인 완료
[<0.91.0>] swim_node <0.91.0> receive join request from <0.93.0>
[<0.93.0>] swim_node <0.93.0> receive joined gossip
[<0.93.0>] swim_node <0.93.0> has been ready
% 노드 B 하이버네이트 시작 -> 40초간 아무 메세지에도 응답하지 않음.
[<0.93.0>] swim_node <0.93.0> received hibernate message. it will be hibernated during 35000s.
ok
% 노드 A 상태 확인
% 노드 A+B 모두 alive + incarnation 0
3> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{},undefined}
% 노드 A는 Ping 메세지를 보낸 상태.
% 노드 A의 상태는 wait_ping_ack로 되어있음.
[<0.91.0>] got received {next_round, <0.91.0>}
[<0.91.0>] <0.91.0> send ping msg to <0.93.0>
4> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,0,alive}},
#{<0.93.0> =>
{wait_ping_ack,{wait_ping_ack,[<0.91.0>,<0.93.0>],
[{once,#Ref<0.4272253564.1277952005.189707>}],
first}}},
undefined}
...
% 노드 A는 ping_timeout 메세지를 받음.
% 노드 A는 노드 B를 Suspected 상태로 변경하고 incarnation 1로 수정
% 노드 B에 대해 wait_confirm으로 타이머 설정.
[<0.91.0>] node <0.91.0> received ping_timeout message. suspect is <0.93.0>
[<0.91.0>] node <0.91.0> received ping_timeout message. suspect is <0.93.0>
[<0.91.0>] got received {next_round, <0.91.0>}
6> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,1,suspected}},
#{<0.93.0> =>
{wait_confirm,<0.93.0>,
{once,#Ref<0.4272253564.1277952005.189755>}}},
undefined}
% 노드 B는 Ping 메세지를 받고 Ping ACK 메세지를 전송함.
[<0.93.0>] <0.93.0> got ping message from <0.91.0>
[<0.93.0>] got received {next_round, <0.93.0>}
% 노드 A는 노드 B로부터 Ping Ack 메세지를 확인함.
[<0.91.0>] <0.91.0> got ping_ack message from <0.93.0>
[<0.93.0>] <0.93.0> send ping msg to <0.91.0>
% 노드 A는 노드 B로부터 ALIVE 메세지를 확인함
% 이때, 노드 A는 노드 B의 상태를 Alive로 업데이트 하고 incarnation을 2로 올림.
[<0.91.0>] node <0.91.0> received alive message. suspect is <0.93.0>
5> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,2,alive}},
#{},undefined}
% 뒤늦게 노드 A에게 wait_confirm_timeout 메세지가 들어옴.
% 그러나 이 메세지의 incarnation은 1이기 때문에 무시됨.
[<0.91.0>] node <0.91.0> received ping_timeout message. suspect is <0.93.0>
[<0.91.0>] node <0.91.0> received wait_confirm_timeout message. suspect is <0.93.0>
6> sys:get_state(<0.91.0>).
{swim_node,ready,<0.91.0>,
#{<0.91.0> => {node_state,0,alive},
<0.93.0> => {node_state,2,alive}},
#{},undefined}
...
정리
SWIM 프로토콜은 분산 시스템에서 사용할 수 있는 멤버십 프로토콜 중 하나이다. 논문 초기에는 각 노드가 전체 클러스터의 상태를 가지고 있고, 이것을 가쉽하는 형태가 제안되었다고 한다. 각 라운드마다 전송되는 메세지에 Gossip 정보가 Piggyback 되는 형식으로 클러스터 멤버들 사이에 상태가 전파된다. 따라서 멤버들은 비동기적으로 상태를 업데이트하기 때문에 약한 결합으로 구성된다. 따라서 클러스터는 결과적으로 동기화되는 Eventually Consitency 형태로 동작한다.
그것에 대한 예시는 아래를 보면 여실히 드러난다.
또한 전체 State를 각 노드가 가지고 있는 것은 클러스터가 충분히 작을 때는 문제가 없지만, 클러스터 사이즈가 커지면 커질수록 노드 간에 Piggyback 하는 메세지의 크기가 커진다. 따라서 전체 State를 전달하는 Full Membership 방식의 SWIM 프로토콜에서는 클러스터 사이즈에 따른 확장은 제한된다.
이런 이유 때문에 SWIM 프로토콜은 확장이 제한되는 경우도 있다. 그러나 충분히 큰 클러스터에서도 SWIM 프로토콜을 사용할 수 있도록 최적화가 들어가기도 한다
- 전체 State를 보내지 않고, 여러 이벤트는 Accumulator 했다가 한번에 Flush 하는 방법.
- Partial View를 가져서 업데이트 해야되는 상태 자체를 줄이는 방식
또한 SWIM 프로토콜만으로 구성된 분산 클러스터의 한계도 여전히 존재한다.
- 죽은 노드가 정말로 죽은 노드인지는 보장할 수 없다.
- Split Brain이 발생할 경우, 각 클러스터별로 서로 다른 State를 가진다. State의 Conflict은 멤버십 프로토콜에서 해결할 수 없다.
그럼에도 불구하고 SWIM 클러스터를 멤버십 프로토콜을 사용하면 그 나름대로의 이점도 분명히 존재한다. 멤버쉽 프로토콜을 사용하지 않는다면, 클러스터의 멤버를 알려줄 써드파티 컴포넌트가 필요할 것이다. 대표적인 예시로 Zookeeper 같은 것들이 있을 것이다. 이런 의존성을 사용하지 않기 때문에 관리 포인트가 줄어들어 운영에도 이점을 가지게 될 것이다.
실제로 SWIM 프로토콜은 Serf(HashiCorp), Consul, Nomad, Vault 내부에서도 사용되고 Akka Cluster 혹은 커스텀 분산 시스템에서 사용된다고 한다.
Future More
이 글에서는 분산 시스템에서 가장 기초적인 멤버십 프로토콜인 SWIM 프로토콜을 학습했다. 이후에 공부해볼만한 것은 Partial View를 가지는 HyParView 프로토콜, Plum Tree, Gossip 프로토콜, 그리고 Split Brain을 해결하는 Vector Clock 혹은 CRDT 같은 것이 존재할 것이다.
참고사항
멤버쉽 프로토콜
클러스터를 구성하는 노드들이 서로 인식하고, 노드가 추가/제거 될 때 이를 전파하는 프로토콜을 의미한다. 멤버쉽 프로토콜은 클러스터의 확장성(Scalability), 안정성(Fault Tolerance), 성능(Performance)에 영향을 미친다. 여러가지 기준에 의해서 멤버쉽 프로토콜은 분류될 수 있다.
- static cluster : 미리 정해진 구성 목록이 있음.
- dynamic cluster : 노드가 실시간으로 추가/제거되며 이를 자동으로 감지하여 반영
또 다른 기준으로는 클러스터 상태 정보를 어떻게 가지고 있느냐도 기준이 될 수 있다.
- Full Membership : 각 노드가 전체 클러스터의 상태를 알고 있고, 모든 노드가 서로 직접적인 연결을 유지함. (SWIM 프로토콜)
- Partial Membership : 각 노드가 클러스터의 일부만 알고 있음. 클러스터 크기가 매우 커질 경우, 모든 노드가 클러스터 전체 정보를 유지하는 것은 어렵기 때문(확장성 관점에서 좋음). 대표적인 예로 HyparView 프로토콜.
'Distributed System' 카테고리의 다른 글
Distributed Membership Protocol : HyParView 프로토콜 (0) | 2025.01.31 |
---|---|
분산 컴퓨팅 10. 벡터 시계와 스냅샷 찍기 (0) | 2024.03.17 |
분산 컴퓨팅 3. 시간 동기화 문제와 논리적 시계 (램포트 시계) (0) | 2024.03.17 |
분산 컴퓨팅 2. 중재자와 2단계 커밋 프로토콜 (0) | 2024.03.10 |
분산 컴퓨팅 1. 분산 컴퓨팅이란 무엇인가? (0) | 2024.03.10 |