들어가기 전
이 글은 다음 블로그 글을 보고 공부하며 작성한 글입니다. 이 글에서 다룰 plum tree는 Gossip Protocol이지만, 특정 클러스터 내에서 함께 동작하기 때문에 최종적으로는 Distributed Membership Protocol에 Integrated 되는 형태로 작성될 것입니다. 이 블로그에서 작성한 Distributed Membership Protocol은 다음 글들을 참고해주세요.
HyParView Protocol + Plum Tree 프로토콜이 Integrate된 학습용 코드는 이곳을 참고해주세요. 코드는 erlang으로 작성되어있습니다.
1. 들어가면서
이전 글에서 HyParView 클러스터 멤버십 프로토콜을 이용하여 빠르고 확장 가능한 클러스터를 구축할 수 있다는 것을 공부했다. 수천, 수만 개의 노드로 이루어진 클러스터에서도 이것이 가능하도록 HyParView 클러스터의 각 노드는 전체 클러스터의 작은 부분 집합(부분 클러스터)하고만 활발하게 대화할 수 있는 '부분 보기(Partial View)'라는 개념을 사용했다. 이것은 Active View라고 표현되었으며, Passive View는 커넥션이 맺어져 있지는 않지만 Standby 상태로 연락처를 알고 있는 노드들을 의미했다.
모든 노드가 상호 연결되지 않은 클러스터는 확장하기 쉽다. 왜냐하면 작은 부분 클러스터가 많이 만들어지는 형태, 즉 각 노드가 클러스터에 느슨하게 결합된 상태이기 때문에 확장에 대한 제한이 크지 않다. 그러나 몇 가지 단점이 있는데, 그 중에 하나는 다음 상황을 해결하는 것이 쉽지 않다는 것이다.
한 노드가 클러스터의 전체 노드에게 메세지를 보내고 싶은 상황
이런 문제는 Gossip 프로토콜 중 하나인 PlumTree 프로토콜을 이용해 해결할 수 있다.
2. 문제 상황 설명
다음 질문에 대해서 우리는 답을 할 수 있어야 한다.
한 노드가 클러스터의 전체 노드 중 일부 노드하고만 통신할 수 있을 때, 어떻게 하면 클러스터의 모든 노드에게 메세지를 전송할 수 있을까?
이 문제에 대해서 많이 사용되는 접근 방법은 두 가지가 있다.
- Mesh Flooding
- Spanning Tree
2.1 Mesh Flooding
메시 플러딩은 간단한 개념이다. 자신이 알고 있는 모든 Active Peer에게 동일한 메세지를 Broadcasting하고, 이런 방식이 체이닝 되어서 궁극적으로 메세지가 전달되는 것을 의미한다. 또한 생성된 메세지가 네트워크에서 무한히 반복되지 않도록 TTL을 붙여서 메세지를 날린다.
위 그림에서 검정색 선은 각 노드끼리 연결된 활성 연결을 의미한다. Mesh Flooding은 자신이 메세지를 받으면, 메세지에 TTL을 붙인 후 자신과 연결된 모든 활성 노드에 메세지를 전달한다. 문제는 가장 오른쪽 이미지에서 볼 수 있듯이, 메세지를 발신한 노드(B)에게 메세지를 수신한 노드(A)가 다시 한번 메세지를 전송한다는 것이다.
이런 일들이 모든 노드에게 반복되면 네트워크 전체가 소란스러워진다. 이것은 TTL을 붙이더라도 막을 수 없다. 또한, TTL을 너무 적게 가져갈 경우 클러스터의 모든 노드가 메세지를 받는다는 것을 보장할 수 없기도 한다. 그 외에 메세지에 메세지가 방문했던 노드들을 포함시켜서 보내는 방법을 고려해 볼 수 있지만, 메세지 자체의 크기가 너무 커져서 그 방법 역시 네트워크를 혼잡스럽게 만든다.
다만 구현 관점에서는 난이도가 어렵지 않은 방법이라고 할 수 있다.
2.2 Spanning Tree
또 다른 방법은 클러스터의 전체 노드에 대한 스패닝 트리를 구축하는 것이다. 메세지의 전파(Gossip)는 스패닝 트리의 Edge를 따라 전파될 것이기 때문에 하나의 노드가 똑같은 메세지를 다시 받거나, 다시 보내는 일을 막으며 네트워크의 혼잡을 최소화 할 수 있다. 그러나 스패닝 트리의 어려움은 '스패닝 트리의 복원'에 있다.
예를 들어 한 노드의 연결이 끊어진다면, 스패닝 트리를 다시 수리해야만 한다. 그렇지 않으면 가십 경로가 끊어진 노드들은 메세지를 정상적으로 받지 못할 수 있다. 위 이미지에서는 노드 C의 연결이 끊어진 것을 보여주는데, 만약 A-F의 경로가 추가되지 않았다면 A가 전파하는 메세지는 F / G 노드가 받지 못했을 것이다.
만약 스패닝 트리를 잘 복구할 수 있는 방법을 찾는다면, 우리는 스패닝 트리를 이용해 클러스터의 모든 노드에게 효율적으로 메세지를 보낼 수 있게 될 것이다.
3. Epidemic broadcast trees
Epidemic broadcast tree는 2007년에 발표된 논문을 기본으로 한다. 논문은 이곳에서 확인할 수 있다. 이 글에서 우리는 '노드들끼리 부분적으로 연결된 클러스터' 위에서 Broadcast 할 수 있는 일부 구현을 하려고 한다. Plum Tree 프로토콜이 반드시 HyParView 프로토콜 위에서 동작할 필요는 없지만, HyParView 프로토콜과 함께 사용했을 때 효율적으로 쓸 수 있다.
따라서 이 글에서는 HyParView 프로토콜과 함께 쓴다는 것을 가정하고 작성되기 때문에 HyParView 프로토콜의 기본 사항을 알고 있어야 한다. 구현된 코드는 이곳에서 확인할 수 있으며, HyParView와 관련되어 정리한 곳은 이곳에서 볼 수 있다.
PlumTree 프로토콜에서는 두 가지 타입의 Peer 개념을 사용한다.
- Eager Peer : 현재 노드는 가십 요청을 받을 때 마다, 자신이 알고 있는 Eager Peer에게 바로 메세지를 전파한다.
- Lazy Peer : 현재 노드는 가십 요청을 받은 후, 어느 정도 시간이 흐른 뒤에 자신이 알고 있는 Lazy Peer에게 메세지 ID만을 전파한다. 이것은 Broadcast tree에서 끊어진 Edge를 찾아서 수리하기 위함이다.
한 가지 혼동할 수 있는 부분은 Eager Peer / Lazy Peer 모두 HyParView에서 이야기 하는 Active View에 있는 노드들이다. 즉, Eager Peer / Lazy Peer는 모든 활성 커넥션이 열려있는 상태라는 것이다. 또한 HyParView에서 Active View / Passive View는 기본적으로 그래프 구조이지만, PlumTree 프로토콜에서 이야기하는 Eager Peer는 '트리'를 의미한다.
4. 동작 방법 직관적 정리
위에서 작성한 글만 봤을 때, 실제로 PlumTree 프로토콜이 HyParView 멤버쉽 프로토콜 위에서 어떻게 동작한다는지 와닿지 않을 수도 있다. 그 부분의 갭을 메꾸기 위해서 여기서는 그림으로 동작 방법을 표현해 직관적으로 이해해보고자 한다.
위 그림은 HyParView 멤버쉽으로 이루어진 클러스터를 표현하고, 현재 Spanning Tree는 구축되지 않은 상태다. 그리고 그림에서 각 도형이 표현하는 의미는 아래와 같다.
- 동그라미 : 각 노드를 의미.
- 점선 : HyParView의 Active View를 의미 (여기서 Passive View는 표현하지 않음)
HyParView 프로토콜에서 각 노드는 일부 노드하고만 통신할 수 있는데, 위 이미지에서 A는 B/C/D 노드하고만 통신할 수 있는 것을 알 수 있다.
여기서부터 가쉽을 전파하면서, Spanning 트리가 만들어진다.
A는 자신이 알고 있는 Active View인 B, C, D에게 메세지를 전파한다. 그리고 메세지를 수신한 B, C, D 역시 자신이 알고 있던 Active View에게 메세지를 다시 한번 전파한다. 이렇게 각 노드는 메세지를 받은 후, 메세지를 바로 전송하는 행동을 하는데 이것을 Eager Push라고 한다. 그리고 Eager Push의 대상이 되는 노드들을 Eager Peer라고 한다.
그리고 메세지가 전파된 경로는 실선에서 점선으로 바뀐 것을 알 수 있다. 점선은 Spanning Tree의 경로로 지정되었음을 의미한다.
그렇게 가쉽을 전파하다보면 어느 순간 메세지를 중복으로 수신하는 경우가 있다. #5 이미지의 H / G 노드가 그런 경우다. H/G 노드는 #4 이미지에서 각각 I / E로부터 이미 메세지를 받은 상태다.
방금 도착한 메세지를 이전에 받은 적이 있다면, Spanning Tree 내에 이미 다른 경로가 존재하는 것을 의미한다. 따라서 H/G는 서로에게 Prune 메세지를 전송한다. Prune 메세지는 '나는 당신을 Eager Peer에서 제거합니다. 당신도 당신의 Eager Peer에서 나를 제외해주세요'라는 메세지다. 따라서 #7 이미지를 보면 prune 메세지를 주고 받은 결과로 H-G가 연결된 선은 점선이다. 이것은 H-G는 HyParView 관점에서는 Active View 이지만, PlumTree 프로토콜 관점에서 Spanning Tree 경로는 아님을 의미한다.
처음에는 그래프 형태였지만, 몇 번의 가쉽 전파를 통해 실선으로 표현된 Spanning Tree가 구성되었다. 여기서 각 노드를 연결하는 실선이 끊어지지 않는다면, 가쉽은 실선을 타고 클러스터 내에 전파될 것이다. 각 노드는 중복이 없는 메세지를 수신할 것이고, 또한 불필요한 메세지를 클러스터 내에 전파하지 않을 것이다. 이를 통해 이전에 Mesh Flooding에서 지적한 '네트워크 혼잡'을 최소화할 수 있게 되는 것이다.
그러나 불행하게도 분산 시스템 내에서는 언제든지 각 노드의 연결 구간이 끊어질 수 있다. 노드가 죽었거나, 혹은 각 노드를 있는 네트워크 구간에만 장애가 생길 수도 있다. 바로 #9 이미지처럼 말이다. #10은 L-K 구간이 끊기면서 N/M/L로 구성된 부분 클러스터가 다른 클러스터와 메세지를 전파할 수 있는 경로가 끊어진 것을 의미한다. (여전히 M-J는 HyParView에서 Active View를 유지하고 있다.)
만약 끊어진 가쉽 경로를 이대로 둔다면, A가 전파하는 메세지는 N/M/L 노드에게 영원히 전달되지 않을 것이다.
#11에서 노드 J는 Lazy Peer였던 노드 M에게 IHave(m1)이라는 메세지를 Lazy Push 한다. Lazy Push는 Eager Push를 통해 메세지를 받고 시간이 조금 흐른 후에 실행된다. 그리고 IHave(m1)은 '나는 m1 메세지를 가지고 있어'를 의미하는데, 이 때 m1 메세지의 실제 내용은 포함되지 않고 오로지 메세지 식별자(메세지 ID)만 전송한다.
#12에서 노드 M은 IHave(m1) 메세지를 받은 후에 m1이 자신이 본 적 없는 메세지라는 것을 알았다. 이것은 클러스터 내에 m1이 전파되고 있는데, 나는 아직 받지 못했다는 것을 의미한다. 이때, 다음 두 가지 경우를 상상해 볼 수 있다.
- m1 메세지가 eager push를 통해서 전파되는 중인데, 전파 속도가 느려서 lazy push가 먼저 나에게 도착했다.
- m1 메세지가 내게로 eager push 되는 경로가 끊어졌다.
노드 M은 IHave 메세지를 받더라도 그 메세지를 달라고 바로 요청하지는 않는다. 왜냐하면 경로가 여전히 살아있을 수 있는데, 단지 Lazy Push가 먼저 도착했을 수 있기 때문이다. 그래서 IHave 메세지를 받은 후에 일정 시간 기다린 후에, 그래도 m1 메세지가 도착하지 않는다면 IHave 메세지를 보낸 노드에게 Graft 메세지를 보낸다.
#13에서 노드 M은 노드 J에게 Graft(m1) 메세지를 발송한다. 이것은 '나에게 메세지 m1 전문을 보내줘'라는 것을 의미한다.
#14에서 Graft 발신/수신한 후에 M과 J는 서로를 자신의 Eager Push로 승격시킨다. 그 결과로 #15 이미지에서처럼 M-J는 실선 경로로 표현된다. 즉, IHave / Graft 메세지의 수신을 통해 Spanning Tree의 끊어진 경로를 찾고, 다른 경로를 추가하면서 Spanning Tree를 복구하는 것이다.
이 그림을 통해서 PlumTree 프로토콜 내에서는 아래 메세지가 오고 가는 것을 알게 되었다.
- gossip : Eager Push로 전파되는 메세지
- prune : Tree 내에 이미 다른 경로가 존재할 때, 메세지를 발신한 노드와 Eager Peer 관계 제거 요청
- IHave : Lazy Push에서 사용되는 메세지. 메세지 ID만 보내서 Tree의 끊어진 부분이 있는지 확인
- Graft : Lazy Push에서 사용되는 메세지. IHave를 통해서 받은 메세지가 내게 없을 때, 상대방에게 메세지 전문을 요청하는 메세지
이것을 바탕으로 실제 PlumTree 프로토콜을 구현해보고자 한다.
5. 구현해보기
실제 코드로 위 내용을 옮기기 전에 구현하기 쉽도록 몇 가지 내용을 단순화하고자 한다.
- 각 Node는 erlang Actor 하나로 표현된다. 이를 통해 실제 VM을 띄우지 않고도, 프로토콜을 테스트 해볼 수 있다.
- 각 Node간의 TCP Connection은 표현하지 않는다. 프로토콜만 테스트해보면 되기 때문에 실제 TCP Handling까지 하지 않아도 된다.
이를 바탕으로 코드를 구현해보자.
5.1 노드 추가 / 제거 되었을 때
노드가 추가/제거되는 것을 처리하는 것은 클러스터 멤버쉽 프로토콜의 책임이다. 따라서 노드가 추가/제거 되었을 때, 그 노드를 클러스터에 넣어주는 것은 PlumTree 프로토콜이 신경쓰지 않아도 된다. 다만 멤버쉽 프로토콜에서 노드가 추가/제거 되었을 때, PlumTree 프로토콜에 neighbor_up / neighbor_down이라는 메세지를 보내도록 약속한다.
% From HyParView or other membership protocol.
handle_message({neighbor_up, PeerName}=_Msg, State0) ->
#?MODULE{eager_push_peers=EagerPushPeers0} = State0,
EagerPushPeers = sets:add_element(PeerName, EagerPushPeers0),
State0#?MODULE{eager_push_peers=EagerPushPeers};
% From HyParView or other membership protocol.
handle_message({neighbor_down, PeerName}=_Msg, State0) ->
#?MODULE{eager_push_peers=EagerPushPeers0, lazy_push_peers=LazyPushPeers0, missing=Missing0} = State0,
EagerPushPeers = sets:del_element(PeerName, EagerPushPeers0),
LazyPushPeers = sets:del_element(PeerName, LazyPushPeers0),
Missing = sets:filter(
fun(Announcement) ->
Sender = maps:get(sender, Announcement),
Sender =/= PeerName
end, Missing0),
State0#?MODULE{eager_push_peers=EagerPushPeers, lazy_push_peers=LazyPushPeers, missing=Missing};
neighbor_up은 새로운 노드가 클러스터에 추가되었을 때 받는 메세지다. 이 메세지를 수신한 노드는 새롭게 추가된 노드를 자신의 Eager Peers에 포함시킨다.
neighbor_down은 기존 노드가 클러스터에서 제거되었을 때 받는 메세지다. 이 메세지를 수신한 노드는 다음 두 가지 작업을 한다.
- 자신의 Eager Push Peer 목록에서 그 노드를 제거한다.
- 자신의 Lazy Push Peer 목록에서 그 노드를 제거한다.
- Missing 메세지(메세지 ID만 받고, 본문을 받지 못한 메세지) 목록에서 그 노드가 보낸 Missing 메세지를 모두 제거한다.
그 노드는 이미 클러스터를 떠났기 때문에 EagerPeer / LazyPeer로 관리할 필요도 없고, 그 노드가 보낸 Missing 메세지는 절대로 도착하지 않을 것이기 때문이다.
5.2 BroadCast 구현
만약 클라이언트가 이 노드를 통해 클러스터 전체에 메세지를 전파하고 싶다면, Broadcast 메세지를 전송하면 된다. 혹은 자기 자신이 클러스터 내에 할 말이 있을 때 사용할 수 있다. 즉, 메세지를 보내고 싶은 클러스터 내부/외부 사용자들은 BroadCast 메세지를 만들어서 자기 자신에게 보내기만 하면 된다. BroadCast 메세지는 'Data'라는 Payload를 클러스터 전체에 전파하는 시작점 역할을 한다.
handle_message({broadcast, Data}=_Msg, State0) ->
io:format("[~p] ~p received brodcast message, ~p~n", [self(), my_name(), Data]),
#?MODULE{received_messages=ReceivedMessages0} = State0,
MsgId = uuid:get_v4(),
Gossip = new_gossip(MsgId, Data),
ReceivedMessages = maps:put(MsgId, Gossip, ReceivedMessages0),
MyName = my_name(),
State1 = eager_push(Gossip, MyName, State0),
State2 = lazy_push(Gossip, MyName, State1),
schedule_received_message_expire(Gossip),
State2#?MODULE{received_messages=ReceivedMessages};
Broadcast 메세지를 받은 노드는 다음 작업을 순서대로 처리한다.
- UUID 기반으로 Message ID를 생성하고, 그 Message ID로 Gossip을 하나 생성한다.
- 그리고 자신의 Eager Peer를 통해 메세지를 EagerPush하고, Lazy Peer에게 보낼 Lazy Push를 예약해둔다.
- Eager Push는 바로 자신이 알고 있는 Eager Peer에게 메세지를 전송한다.
- Lazy Push는 얼마 간의 시간이 지난 후에 Lazy Peer에게 메세지 ID만 전송한다.
- 방금 생성한 메세지를 ReceivedMessage라는 Local State에 저장한다.
handle_message({received_message_expired, MessageId}, State0) ->
#?MODULE{received_messages=ReceivedMessages0} = State0,
ReceivedMessages = maps:remove(MessageId, ReceivedMessages0),
State0#?MODULE{received_messages=ReceivedMessages};
Received Message에 받은 메세지를 저장한다. 그러나 저장하기만 해둔다면 메세지는 계속 쌓이기만 할 것이다. 특히, 메세지를 주고 받는 빈도가 활발한 시스템이라면 금새 많은 메세지가 쌓여 메모리에 문제가 있을 수 있다. 따라서 일정 시점이 지나면 메세지를 Expired 시킬 수 있도록 schedule_received_message_expire(...)로 오래된 메세지를 정리하는 작업을 예약한다.
위 코드는 schedule_received_message_expire(...)에 의해서 예약된 작업을 처리한다. Received Message에서 Message ID로 검색해서 찾아지는 메세지를 제거한다.
5.3 Gossip 메세지 구현
최초의 노드가 Broadcast 메세지를 받으면, 자신이 받은 데이터를 Gossip 메세지로 한번 감싼다. 그리고 Gossip 메세지를 다른 Eager Peer들에게 전송한다.
handle_message({gossip, Gossip, Sender}, State0) ->
io:format("[~p] ~p received gossip message, peername ~p.~n", [self(), my_name(), Sender]),
#?MODULE{received_messages=ReceivedMessages0} = State0,
MessageId = get_from_gossip(message_id, Gossip),
UpdatedState =
case maps:get(MessageId, ReceivedMessages0, undefined) of
undefined ->
ReceivedMessages = maps:put(MessageId, Gossip, ReceivedMessages0),
NextRoundGossip = increment_round_gossip(Gossip),
State1 = eager_push(NextRoundGossip, Sender, State0),
State2 = lazy_push(NextRoundGossip, Sender, State1),
State3 = add_eager_and_del_lazy(Sender, State2),
schedule_received_message_expire(Gossip),
State3#?MODULE{received_messages=ReceivedMessages};
_Message ->
...
end,
invalidate_missing_with_given(Gossip, UpdatedState);
위 코드는 Eager Peer가 Gossip 메세지를 받았을 때 처리하는 방법이다.
- 자신의 Local State에 있는 ReceivedMessage 목록에 해당 Message ID를 가진 메세지가 있는지 확인한다.
- 자신이 처음 받아본 메세지라면 Eager Peer, Lazy Peer에게 각각 Push를 추가한다.
- Eager Peer로부터 온 메세지이기 때문에 (본문까지 오는 것은 Eager Peer다) 메세지를 보낸 Peer를 자기 자신의 Eager Peer로 추가한다.
- Received Message에 받은 메세지를 무한히 보관하면 메모리가 터질 수 있기 때문에 message expire 시점을 예약하고, 일정 시간이 지나면 Received Message에서 해당 메세지를 지우도록 한다.
handle_message({gossip, Gossip, Sender}, State0) ->
io:format("[~p] ~p received gossip message, peername ~p.~n", [self(), my_name(), Sender]),
#?MODULE{received_messages=ReceivedMessages0} = State0,
MessageId = get_from_gossip(message_id, Gossip),
UpdatedState =
case maps:get(MessageId, ReceivedMessages0, undefined) of
undefined ->
...
_Message ->
% It means that we have other eager edge.
% So, We should remove this edge.
MyName = my_name(),
ToPid = find_util:get_node_pid(Sender),
io:format("[~p] ~p try to send prune message to ~p~n", [self(), my_name(), Sender]),
PruneMsg = prune_message(MyName),
send_message(ToPid, PruneMsg),
add_lazy_and_del_eager(Sender, State0)
end,
invalidate_missing_with_given(Gossip, UpdatedState);
반면 이전에 받았던 메세지를 받은 경우가 존재할 수도 있다. 이미 다른 경로로 메세지를 받았기 때문에 중복으로 메세지를 받는 것이다. PlumTree 프로토콜의 핵심 목표는 Gossip 경로를 최대한 스패닝 트리 형태로 구성하는 것이기 때문에 중복 경로를 제거해야한다. 앞서 그림에서 이야기 했던 것처럼 이때, 노드는 수신자에게 Prune 메세지를 보내서 서로를 Eager Peer에서 제거하도록 한다.
Missing은 뒤에 자세히 이야기하겠지만, 다른 노드들은 메세지 본문을 수신하였으나 나는 아직 메세지 본문을 수신하지 못한 메세지들을 의미한다. 만약 현재 노드가 이런 메세지가 있다는 소문을 듣고, 이 메세지가 다른 네트워크를 타고 도착하기를 기다리고 있는 경우에는 그 메세지 ID가 missing에 저장되어있다. invalidate_missing_with_given()은 아직 본문을 받지 못한 기다리고 있던 메세지의 본문을 받았을 때, Missing에서 해당 메세지 ID를 제거하는 일을 한다.
5.4 Prune 메세지 구현
handle_message({prune, PeerName}, State0) ->
io:format("[~p] ~p received prune message from ~p~n", [self(), my_name(), PeerName]),
add_lazy_and_del_eager(PeerName, State0);
A가 B에게 Gossip 메세지를 전송했는데, B가 이미 받은 적이 있는 메세지라면 B는 A에게 Eager 메세지를 전송한다. 이것은 '너와 나 말고도 다른 경로가 있으니, 우리 사이의 경로는 끊자'라는 의미다. 따라서 prune 메세지를 받은 노드는 Prune 메세지를 보낸 로드를 Eager Peer에서 제거한 후 Lazy Peer에 넣는 작업을 한다.
5.5 Lazy Push 구현
Lazy Push의 목적은 특정 간격마다 Lazy Peer에게 내가 받은 메세지를 보내서 상대방이 놓친 메세지가 있는지 확인하게 만드는 것이다. 에를 들어 나는 Message ID가 1인 메세지를 가지고 있는데, Lazy Peer에게 Push를 해봤더니 상대방이 Message ID 1이라는 것을 본 적이 없을 수 있다. 이 때는 두 가지 경우를 고려할 수 있다.
- Eager Push 경로가 너무 멀어서 Eager Push보다 먼저 Lazy Push가 먼저 도착했다.
- Eager Push 경로가 끊어졌다. (노드 Down 등으로)
Lazy Push는 Eager Push 경로가 끊어진 상황을 감지하고, 경로가 끊어졌다면 새로운 Eager Peer 경로를 추가해서 Tree Repair를 하기 위해 발송한다.
Lazy Push는 Message ID만 보내는 것이 정상적이다. 그런데 매번 Message ID를 보낸다면 네트워크 트래픽 문제가 발생할 수 있다. 예를 들어, Lazy Push 해야할 메세지가 너무 많다면 한번에 전송해야 할 패킷의 크기가 너무 커지게 된다. 따라서 Plum Tree 논문에서는 Message ID를 보내는 것과 블룸 필터를 보내 최적화 할 것을 권장한다.
따라서 아래 구현에서는 Message ID만 보내는 경우 / 블룸 필터만 보내는 경우로 나눌 수 있다. 나는 이렇게 구현했다.
- MessageID는 상대적으로 긴 간격으로 보낸다.
- 블룸필터는 상태적으로 짧은 간격으로 보낸다.
handle_message({dispatch}, #?MODULE{lazy_queue=LazyQueue}=State) ->
% Schedule lazy push message with Message ID.
LazyRecords =
lists:foldl(
fun(LazyRecord, Acc0) ->
#lazy_record{message_id=MsgId, round=Round, peer_name=PeerName} = LazyRecord,
GossipMsgsForPeer = maps:get(PeerName, Acc0, []),
maps:put(PeerName, [ {MsgId, Round} | GossipMsgsForPeer], Acc0)
end, #{}, LazyQueue),
maps:foreach(
fun(PeerName, Grafts) ->
IHaveMsg = ihave_message(Grafts, PeerName),
ToPid = find_util:get_node_pid(PeerName),
send_message(ToPid, IHaveMsg)
end, LazyRecords),
schedule_dispatch(),
State;
위 코드는 메세지 ID 전체를 Lazy Push로 보내는 것이다. 코드는 이렇게 동작한다.
- Lazy Queue에서 Lazy Record를 불러온 후에, 각 LazyPeer Name으로 보내야 할 MessageID를 List로 Accumulate 한다.
- 이후 maps:foreach(...)에서 각 Lazy Peer에게 IHave()라는 메세지를 보낸다.
- IHave() 메세지에는 Grafts가 포함된다.
- Grafts는 현재 노드가 받은 메세지들의 메세지 ID만 모아둔 리스트이다.
- 정리하면, Lazy Peer에게 '나는 이런 메세지들을 가지고 있어, 너 이거 받은 적 있어? 필요하면 연락해!'라는 메세지를 보내는 것이다.
- schedule_dispatch()를 호출해서 다음 Message ID Lazy Push를 예약한다.
그렇다면 왜 Message ID를 담은 Lazy Push를 계속 보내야하는 것일까? 처음에 메세지 ID를 보내면 내 Lazy Peer들은 대부분 메세지 ID를 알게 될 것이다. 그렇다면 매번 Message ID를 보낼 필요가 있을까? 대답은 'Yes'다. 왜냐하면 아래 코너 케이스가 존재할 수 있기 때문이다.
최초에만 Message ID를 보낸다면, 그 이후에 새롭게 추가된 노드들은 Lazy Push를 통해서 Message ID를 얻을 수 없게 된다.
새롭게 추가된 노드들의 Eager Peer가 열심히 일을 한다면 문제가 없다. 그러나 Eager Peer들에게도 문제가 있고, Lazy Push가 블룸 필터만 보내고 있다면 새로 추가된 노드들은 '내가 어떤 메세지 ID를 누락하고 있는 걸까?'를 전혀 알 수 없게 된다. 따라서 메세지가 누락되었을 때, 상대방에게 메세지 전문을 요청하는 'Graft 메세지'를 보낼 수 없게 된다. 이것은 특정 노드가 Gossip 경로에서 고립될 수 있음을 의미한다.
따라서 주기적으로 메세지 ID를 보내는 Lazy Push는 반드시 필수적이다. 그럼에도 불구하고 여전히 코너케이스는 존재한다. GC 전에 마지막으로 메세지 ID를 Lazy Push 한 후에 새로운 노드가 추가되면 이 노드는 영영 해당 메세지 ID를 받지 못할 수도 있다. 즉, 이것은 PlumTree 프로토콜에서 일부 메세지의 누락이 가끔씩은 발생할 수 있다는 점을 암시한다.
handle_message({dispatch_short}, #?MODULE{lazy_queue=LazyQueue}=State) ->
% Schedule lazy push message with only bloom filter.
BloomFiltersForEachLazyPeers =
lists:foldl(
fun(LazyRecord, Acc) ->
#lazy_record{message_id=MsgId, peer_name=PeerName} = LazyRecord,
BloomFilter0 = maps:get(PeerName, Acc, bloom_filter:new()),
BloomFilter1 = bloom_filter:add(MsgId, BloomFilter0),
maps:put(PeerName, BloomFilter1, Acc)
end, #{}, LazyQueue),
maps:foreach(
fun(PeerName, BloomFilter) ->
IHaveBloomMsg = ihave_bloom_message(BloomFilter, PeerName),
ToPid = find_util:get_node_pid(PeerName),
send_message(ToPid, IHaveBloomMsg)
end, BloomFiltersForEachLazyPeers),
schedule_short_dispatch(),
State;
위는 블룸 필터를 Lazy Push하는 코드다. 위 코드는 다음과 같이 동작한다.
- LazyQueue에서 Lazy Record를 꺼내온 후에 각 Peer들에게 보낼 블룸 필터를 제작한다. 이 때, 블룸필터는 Message ID가 기준이 된다.
- 거짓 양성 확률을 줄이기 위해 나는 해시 함수는 7개, Bit 배열의 크기는 10000으로 설정했다. 자세한 내용은 블룸필터 논문을 참고하면, 거짓양성을 줄이기 위한 배열 크기, 해시 함수 개수를 결정할 수 있다.
- 이후 Lazy Peer들에게 BloomFilter를 담은 IHaveBloom 이라는 메세지를 전송한다.
- 그리고, schedule_short_dispatch()를 호출해 다음 BloomFilter Lazy Push를 예약한다.
BloomFilter의 Bit 배열 크기는 10000이다. 위에서 메세지 ID는 UUID로 설정했다. UUID는 32개의 16진수(4비트)로 이루어지기 때문에 하나의 메세지 ID는 32 * 4 = 128비트로 계산할 수 있다. 즉, 메세지 수가 적을 때는 블룸 필터가 효율적이진 않지만, 메세지 수가 많아질수록 블룸필터가 효율적이게 된다.
% For tree repair.
handle_message({ihave, Grafts, Sender}, State0) ->
% We should wait a while before asking graft message.
% Because lazy push may arrive before eager push.
State1 = lists:foldl(
fun({MsgId, Round}, AccState) ->
if_i_have(AccState, MsgId, Round, Sender)
end, State0, Grafts),
#?MODULE{timers=AlreadyHasTimer} = State0,
#?MODULE{timers=MaybeNewlyGetTimer} = State1,
NewlyReceivedTimer = sets:filter(
fun(Timer, _) ->
not sets:is_element(Timer, AlreadyHasTimer)
end, MaybeNewlyGetTimer),
lists:foreach(
fun(MessageId) ->
schedule_wait_a_while(MessageId)
end, sets:to_list(NewlyReceivedTimer)),
State1;
IHave 메세지에는 상대노드가 받았던 메세지들에 대한 메세지 ID가 리스트로 저장된 Grafts가 함께 온다. 여기서는 Grafts에서 알게 된 메세지들 중에서 내가 받은 적이 없는 것을 확인한다. 즉, 메세지를 보낸 상대방에게 요청할 것이 있는지를 확인한다.
- If_i_have()를 호출해서 내가 이 메세지를 가지고 있지 않은 경우에 해야할 일을 한다. 여기서는 내가 가지고 있지 않은 메세지에 대한 타이머를 생성해서 Accumulate 했다. 타이머는 내가 받아보지 못한 메세지들을 기다리가 위한 용도다. 그리고 이 때, '내가 아직 받지 못한 메세지'를 의미하는 변수 Missing(Set 자료구조)에 받지 못한 메세지 ID들을 추가한다.
- 이번 Grafts 메세지를 통해 새롭게 생성된 타이머가 있는 MessageId들에 대해서만 schedule_wait_a_while(...) 메서드를 호출해서 잠시 대기하도록 한다.
이처럼 잠시 기다리는 이유는 Eager Peer보다 Lazy Peer로부터 먼저 메세지를 전달받는 경우도 분명히 존재할 수 있기 때문이다. 그래서 잠시동안 기다린 후에, 그 때도 메세지를 받지 못했다면 Eager Peer로부터 오는 경로가 끊어졌다고 판단하고 Lazy Peer를 Eager Peer로 승격시키며 Graft(...) 라는 메세지를 보내는 것이다.
handle_message({ihave_bloom, BloomFilter, Sender}, State0) ->
#?MODULE{missing=Missing0, timers=Timers0, cached_graft_msg=CachedGraftMsg0} = State0,
MissingMsgs = lists:foldl(
fun(MissingElement, Acc) ->
{MessageId, _Round, _Sender} = MissingElement,
sets:add_element(MessageId, Acc)
end, sets:new(), Missing0),
NotReservedMsgYet = lists:filter(
fun(MessageId) ->
not lists:member(MessageId, Timers0)
end, MissingMsgs),
{Timers, CachedGraftMsg, ReservedMsgAndSender} = lists:foldl(
fun(MessageId, {TimersAcc, CachedGraftMsgAcc, ReservedMsgAndSenderAcc}) ->
case bloom_filter:member(MessageId, BloomFilter) of
true ->
MsgAndSender = {MessageId, Sender},
case sets:is_element(MsgAndSender, CachedGraftMsgAcc) of
true -> {TimersAcc, CachedGraftMsgAcc, ReservedMsgAndSenderAcc};
false ->
CachedGraftMsgAcc1 = sets:add_element(MsgAndSender, CachedGraftMsgAcc),
TimersAcc1 = sets:add_element(MessageId, TimersAcc),
ReservedMsgAndSenderAcc1 = [MsgAndSender | ReservedMsgAndSenderAcc],
schedule_wait_a_while_with_bloom_filter(MessageId, Sender),
{TimersAcc1, CachedGraftMsgAcc1, ReservedMsgAndSenderAcc1}
end;
false -> {TimersAcc, CachedGraftMsgAcc, ReservedMsgAndSenderAcc}
end
end, {Timers0, CachedGraftMsg0, []}, NotReservedMsgYet),
schedule_cached_graft_message_expire(ReservedMsgAndSender),
State0#?MODULE{timers=Timers, cached_graft_msg=CachedGraftMsg};
Lazy Push로 블룸필터를 받았을 때 호출되는 부분이다. 이것은 다음과 같이 동작한다.
- 먼저 Missing에서 내가 받아보지 못한 메세지들의 메세지 ID만 뽑아서 MissingMsg에 저장한다.
- MissingMsg 중에서 아직 타이머가 예약되지 않은 메세지들만 뽑아서, NotReservedYet에 저장한다.
- 2번에서 구해진 메세지 ID들에 대해 다음 연산을 반복한다.
- BloomFilter에서 메세지 ID가 포함된 것을 확인하면, CachedGraftMsgAcc라는 Set에 {MessageId, Sender} 튜플 형식을 추가한다. 또한, 해당 메세지 ID로 Timer도 예약하고, {MessageId, Sender}가 예약되었다는 의미로 ReserveredMsgandSenderAcc에 Accumulate한다. 이후에, 얼마정도 시간동안 Eager Peer로부터 메세지를 기다린 후에 Graft 메세지를 보낼 수 있도록 Sechedule_wait_a_while_with_bloom_filter(...)를 호출한다.
- 만약 이전에 CachedGraftMsgAcc에 {MessageId, Sender}가 있다면, 이미 본 후에 타이머를 설정한 메세지이기 때문에 무시한다.
- 또한 Bloom Filter에 내가 아직 못받은 메세지가 없으면, 아무것도 하지 않고 다음 Iterator를 돈다.
- 이후 cached_graft_message_expire(...)를 호출해 일정 시간 이후에 방금 예약된 ReserveredMsgAndSender를 Local State에서 제거하도록 작업을 예약해둔다.
% For tree repair.
handle_message({wait_message_arrived, MessageId}, State0) ->
#?MODULE{missing=Missing0, timers=Timers0, received_messages=ReceivedMessage} = State0,
Timers = sets:del_element(MessageId, Timers0),
State =
case maps:is_key(MessageId, ReceivedMessage) of
true -> State0;
false ->
{Missing, Announcement} = poll_announcement_randomly(Missing0, MessageId),
case Announcement of
undefined -> State0;
#{sender := SenderName, round := Round} = Announcement ->
SenderName = maps:get(sender, Announcement),
SenderPid = find_util:get_node_pid(SenderName),
State1 = add_eager_and_del_lazy(SenderName, State0),
MyName = my_name(),
GraftMsg = graft_message(MessageId, Round, MyName),
send_message(SenderPid, GraftMsg),
State1#?MODULE{missing=Missing}
end
end,
State#?MODULE{timers=Timers};
먼저 Message ID를 받은 Lazy Push에서 일정 시간이 지나면 위 함수가 호출되게 된다. 여기서는 다음 작업을 진행한다.
- 먼저 해당 MessageID로 예약된 Timer를 제거한다.
- 만약 내가 받은 메세지 목록에 해당 Message ID가 있다면, Eager Peer로부터 메세지를 받은 것이기 때문에 아무런 작업도 하지 않는다.
- 만약 아직도 내가 메세지를 받지 못했다면 다음 작업을 한다.
- Missing에서 해당 Message ID에 대해 무작위로 Announcement를 하나 뽑아온다. Announcement는 발신자와 Round가 포함되어있다.
- 메세지 발신자(Sender)를 자신의 Eager Peer로 추가한 후에 상대방에게 Graft(...) 메세지를 보낸다. 이것은 상대방에게 '나는 이 메세지 ID를 가지는 메세지가 없으니 좀 보내줘. 그리고 나를 당신의 Eager Peer로 넣어줘!'라는 의미를 가진다.
물론 타이머가 완료될 때까지 Eager Push가 도착하지 않은 경우가 있을 수도 있다. 그럼에도 불구하고 Graft 메세지를 보내면서 새로운 Eager Push가 하나 추가된다. 만약 이전 Eager Peer도 건재하고(그러나 느리고), 새롭게 추가된 Eager Peer도 있다면 '트리'가 아니라 순환 구조를 가진 '그래프'가 되었음을 의미한다. 여기서도 PlumTree 프로토콜은 '경로를 최대한 스패닝 트리로 유지'하는 것이지 '항상 트리 구조'인 것을 의미하지는 않는다.
아무튼 이렇게 그래프 구조가 되었다면, 결국 이미 알고 있는 메세지를 다시 한번 받게 될 것이다. 이 때, Prune(...) 메세지를 보내게 되면서 Eager Peer 경로가 가지치기 되면서, 다시 트리에 가까운 모습을 가지게 될 것이다.
handle_message({wait_message_arrived_with_bloom, MessageId, Sender}, State0) ->
#?MODULE{timers=Timers0, received_messages=ReceivedMessage} = State0,
Timers = sets:del_element(MessageId, Timers0),
State =
case maps:is_key(MessageId, ReceivedMessage) of
true -> State0;
false ->
State1 = add_eager_and_del_lazy(Sender, State0),
MyName = my_name(),
GraftMsg = graft_message(MessageId, ?IGNORE_ROUND, MyName),
send_message(find_util:get_node_pid(Sender), GraftMsg),
State1
end,
State#?MODULE{timers=Timers};
BloomFilter Lazy Push에 대해 일정 시간이 흐른 후에 위 함수가 호출되게 된다. 이 때 동작은 Message ID를 Lazy Push 받은 것과 동일하게 동작한다.
- 타이머를 제거하고.
- 상대방을 나의 Eager Peer로 추가하고.
- 상대방에게 Graft(...) 메세지를 보내서 메세지를 달라고 한다.
% For tree repair.
handle_message({graft, MessageId, _Round, PeerName}, State0) ->
State1 = add_eager_and_del_lazy(PeerName, State0),
#?MODULE{received_messages=ReceivedMessages} = State1,
case maps:get(MessageId, ReceivedMessages, undefined) of
% Considering false-positive caused by bloom filter.
% Or it might already be expired.
undefined -> State1;
Gossip ->
Gossip = maps:get(MessageId, ReceivedMessages),
Myname = find_util:get_node_name_by_pid(self()),
GossipMsg = gossip_message(Gossip, Myname),
ToPid = find_util:get_node_pid(PeerName),
send_message(ToPid, GossipMsg)
end,
State1;
마지막으로 Graft 메세지를 특정 노드가 받게 되면, 이 함수가 호출되게 된다. 이 함수는 다음과 같은 일을 한다.
- 우선 Graft(...) 메세지를 보낸 상대방을 나의 Eager Peer로 추가한다.
- 자신의 ReceivedMessage에서 해당 메세지 ID에 대한 메세지 전문을 가져온다.
- 이때, Bloom Filter에 의한 False Positive가 발생할 수 있으니 해당 경우는 무시하도록 추가한다.
- Gossip 메세지가 찾아지면, Graft(...) 메세지를 보낸 노드에게 찾아진 Gossip 메세지를 전송한다.
이렇게 하면 Lazy Push에 대한 요청 - 응답도 완료되게 된다. Lazy Push는 정리해보면 다음과 같이 동작한다.
- 상대방에게 내가 가진 메세지 ID들만 IHave(...)를 통해 일단 다 보내본다.
- 만약 상대방이 받지 못한 메세지 ID가 있는 경우, Gossip 경로가 끊어졌을 수도 있음을 암시한다.
- 그러나 Eager Push로부터 메세지가 오고 있는 경우도 있으므로 일정 시간 기다려본다.
- 일정 시간 이후에도 Eager Push로부터 메세지가 오지 않았다면, 특정 가십 경로가 끊어졌다고 판단하고 IHave(...) 메세지를 보낸 노드에게 Graft(...) 메세지를 보내서 경로를 수리한다.
정리
이 글에서는 PlumTree 프로토콜을 추상화한 후에, 간략히 구현했다. 구현에서는 일부 최적화를 위해서 블룸 필터까지 도입했다. 그러나 이 글에서는 구현하지 않았지만 PlumTree 프로토콜에서는 Gossip 경로 최적화를 위해서 메세지 전파 Round, Latency등을 확인해서 노드 간의 전파 경로 리밸런싱을 하는 최적화도 존재한다. 만약 이 최적화가 되지 않는다면, 가쉽 전파 경로 내의 특정 노드에게 전파 경로가 집중되어 있을 수도 있고, 특정 노드끼리의 Edge는 Latency가 너무 높아 메세지 전파가 오래 걸릴 수도 있다.
구현해보고 알게된 것은 다음과 같다.
- PlumTree 프로토콜은 가쉽 경로를 최대한 스패닝 트리 형태가 되도록 추구하는 것이다. 그러나 전체적인 형태는 그래프로도 존재한다. 다만 기존 Mesh Flooding 방식에 비해 스패닝 트리에 가까운 것이다.
- PlumTree 프로토콜은 Lazy Push를 통해 최대한 트리를 복구하려 하지만, Gossip 중에 일부 유실되는 메세지가 존재할 수 있다. 따라서 잃어버려도 문제 없거나, 잃어버린다 하더라도 새로운 정보가 주기적으로 빠르게 제공되는 것에 사용하는 것이 좋을 것 같다.
- PlumTree 프로토콜에서 메세지는 Eventually Consistency 형태로 전파된다.
- PlumTree 프로토콜은 Split Brain을 해결해주지는 못한다. 따라서 분산 합의 알고리즘(RAFT), CRDT, Conflict 해소 전략등을 고려해두어야 한다.
여기서는 anti entropy(분산 시스템에서 각 노드가 가지고 있는 데이터의 불일치를 점차 줄여가는 과정)을 Lazy Push가 담당했는데, 그것과는 별개로 merkle Tree 등을 이용해서 비교해 볼 수도 있겠다.
'Distributed System' 카테고리의 다른 글
Distributed Consensus Algorithm : RAFT 합의 알고리즘 (0) | 2025.03.09 |
---|---|
Distributed Membership Protocol : HyParView 프로토콜 (0) | 2025.02.15 |
Distributed Membership Protocol : SWIM 프로토콜 (0) | 2025.01.31 |
분산 컴퓨팅 10. 벡터 시계와 스냅샷 찍기 (0) | 2024.03.17 |
분산 컴퓨팅 3. 시간 동기화 문제와 논리적 시계 (램포트 시계) (0) | 2024.03.17 |