erlang 공부 : more on multiprocessing

    참고

     


    상태를 가지지 않는 프로세스

    fridge1() ->
      receive
        {From, {store, Food}} ->
          From ! {self(), ok},
          fridge1();
        {From, {take, Food}} ->
          From ! {self(), not_found},
          fridge1()
      end.

    위는 냉장고로 동작하기 위해 만든 함수다. 이 함수는 두 가지 기능을 지원한다.

    • store
    • take

    함수는 메세지를 기다리고, 받은 메시지에 store, take 같은 atom이 있으면 그에 상응하는 행동을 한다. 

    16> P3 = spawn(kitchen2,fridge1, []).
    <0.304.0>
    
    17> P3 ! {self(), {store, hello}}.
    {<0.297.0>,{store,hello}}
    
    18> flush().
    Shell got {<0.304.0>,ok}
    
    19> P3 ! {self(), {take, hello}}. 
    {<0.297.0>,{take,hello}}
    
    20> flush().
    Shell got {<0.304.0>,not_found}

    spawn(M,F,A)를 이용해 냉장고 erlang 프로세스를 생성하고, 그 프로세스에게 메세지를 보내서 결과를 전달받았다. 그런데 문제점이 있다. 프로세스 자체가 냉장고에 Store, Take된 Food에 대한 정보를 전혀 가지지 못하고 있기 때문에 항상 "ok", "not_found"만 리턴하는 것이다. 

    이를 위해 상태를 가지는 프로세스를 생성해야한다. 

     


    상태를 가지는 프로세스

    fridge2(FoodList) ->
      receive
        {From, {store, Food}} ->
          From ! {self(), ok},
          fridge2([Food | FoodList]);
        {From, {take, Food}} ->
          case lists:member(Food, FoodList) of
            true ->
              From ! {self(), ok},
              fridge2(lists:delete(Food, FoodList));
            false ->
              From ! {self(), not_found}
        end
      end.
    

    상태를 가지도록 하려면 다음 코드를 작성하면 된다.

    • FoodList라는 List를 가지고 있고, 이를 Accumulation으로 활용한다.

    Tail Recursion을 할 때 Accumulation을 파라메터로 넘겨주면서, 프로세스가 상태를 유지하면서 메세지를 기다리도록 하는 것이다.

    27> P4 ! {self(), {store, "Hello"}}.
    {<0.325.0>,{store,"Hello"}}
    
    31> flush().                        
    Shell got {<0.327.0>,ok}
    ok
    
    32> P4 ! {self(), {take, "Hello"}}. 
    {<0.325.0>,{take,"Hello"}}
    
    33> flush().                       
    Shell got {<0.327.0>,ok}
    ok

    프로세스를 만든 후에 메세지를 보내보면, 기대한 것처럼 프로세스가 동작하는 것을 알 수 있다. 그러나 여기서 문제점은 프로세스에게 메세지를 보낼 때, 메세지를 보내는 프로토콜을 클라이언트가 명확하게 알아야 한다는 것이다. 이것은 클라이언트가 꽤 깊숙히 프로토콜에 의존한다고 이해해 볼 수 있을 것이다.


    메세지를 보내는 방법의 캡슐화

    {self(), {store, "Hello"}}.

    위에서 냉장고 프로세스에게 메세지를 보낼 때, 다음 프로토콜을 준수해야한다. 냉장고 프로세스에게 메세지를 보내는 모든 프로세스가 이 프로토콜을 정확히 숙지해야하는데, 쉽지 않다. 따라서 이런 프로토콜을 메서드로 캡슐화 할 수 있고, 이를 통해 클라이언트는 필요한 부분만 알도록 처리할 수 있다.

    -spec store(integer(), any()) -> any().
    store(Pid, Food) ->
      Pid ! {self(), {store, Food}},
      receive
        {Pid, Message} -> Message
      end.
    
    -spec take(integer(), any()) -> any().
    take(Pid, Food) ->
      Pid ! {self(), {take, Food}},
      receive
        {Pid, Message} -> Message
      end.

    store, take 프로토콜을 보내는 것을 store(), take() 메서드로 캡슐화해서 이 문제를 해결할 수 있다.

     


    프로세스 생성의 캡슐화

    Pid = spawn(kitchen, fridge, [[]]).

    현재 프로세스를 생성할 때 다음 코드를 사용하고 있다. 이 코드는 프로세스를 생성하고자 하는 클라이언트가 사용하는 코드다. 메세지 프로토콜과 마찬가지로 클라이언트가 프로세스를 생성할 때 어떤 함수와 인자를 전달해야하는지가 명시되어있다.

    이런 부분은 좋지 않다고 판단할 수 있는데, 왜냐하면 클라이언트가 프로세스를 생성하는 구체적인 과정을 알아야 하기 때문이다. 이 역시 캡슐화가 가능하다.

    -spec start() -> integer().
    start() ->
      spawn(?MODULE, fridge2, [[]]).

    다음과 같이 프로세스를 생성해주는 함수를 제공해주고, 클라이언트는 이 함수를 사용하기만 하면 된다. 이렇게 클라이언트으로부터 프로세스 생성을 캡슐화 할 수 있다. 

     


    메세지 Timeout

    -module(timeout_case).
    
    %% 냉장고에서 물건 찾기
    take(Pid, Food) ->
      Pid ! {self(), {take, Food}},
      receive
        {Pid, Msg} -> Msg
      end.
    
    %% 냉장고 프로세스
    fridge(FoodList) ->
    
      receive
        {From, {store, Food}} ->
          From ! {self(), ok},
          fridge([Food|FoodList]);
        {From, {take, Food}} ->
          case lists:member(Food, FoodList) of
            true ->
              From ! {self(), {ok, Food}},
              fridge(lists:delete(Food, FoodList));
            false ->
              From ! {self(), not_found},
              fridge(FoodList)
          end;
        terminate ->
          ok
      end.

    위 코드가 있을 때를 가정해보자. 위 코드에 대한 간단한 설명은 다음과 같다. 

    • 냉장고 프로세스를 생성해서, 냉장고에 음식을 저장 / 꺼낼 수 있음.
    • 냉장고에서 음식을 꺼낼 때, 통신 프로토콜을 take() 메서드로 추상화한다. 
    1> timeout_case:take(pid(0,250,1), hello).

    얼랭 쉘에서 위 모듈의 코드를 실행해보면 어떻게 동작할까? 위 코드는 Pid 형식으로 변수를 하나 생성하고, 그 Pid로 요청을 보내는 함수다. 이 코드가 실행되면 무한히 기다린다. take() 메서드는 receive 절에서 메세지를 수신할 때까지 계속 기다리는데 <0,250,1>이라는 프로세스는 애초에 존재하지 않기 때문에 요청에 대한 응답이 오지 않기 때문이다. 

    take(Pid, Food) ->
      Pid ! {self(), {take, Food}},
      receive
        {Pid, Msg} -> Msg
      after 5000 ->
        timeout_here
      end.

    receive 절에서 특정 시간만 기다릴 수 있도록 erlang에서는 after 키워드를 제공해준다. 위 코드는 다음과 같이 동작한다. 

    • receive 절에서 5000ms만큼 기다린 후, 응답이 없으면 after 절을 실행한다. 
    15> timeout_case:take(pid(0,250,100), hello).
    timeout_here

    코드를 수정하고 다시 한번 존재하지 않는 프로세스(<0.250.100>)으로 요청을 전송해본다. 이제 take() 메서드를 실행하는 프로세스는 5000ms까지 기다려보고 응답이 오지 않으면 after 절을 실행한다. 따라서 timeout_here이 반환된다. 

     


    선택적 수신

    receive + Guard + after를 이용하면 프로세스가 메세지를 선택적으로 수신하는 것도 가능해진다. 어떻게 할 수 있을까? 

    self() ! {15, high}, self() ! {7, low}, self() ! {1, low}, self() ! {17, high}.

    먼저 전송되는 메세지를 정의하면 위와 같다. {Priority, Msg} 형태로 메세지를 전송한다. 여기서 Priority 값이 10보다 큰 녀석을 우선적으로 수신해서 응답하고 싶다고 해보자. 

    important() ->
      receive
        {Priority, Message} when Priority > 10 ->
          [Message | important()]
      after 0 ->
        normal()
      end.
    
    normal() ->
      receive
        {_, Message} ->
          [Message|normal()]
      after 0 ->
        []
      end.

    코드는 위처럼 작성할 수 있다.  

    1. 먼저 self() ! {...}를 통해서 현재 프로세스의 메세지함에 메세지가 저장된다.
    2. important()를 호출하면 메세지를 읽어오기 시작하면서 receive 절이 호출된다. 이 때 when Priority > 10에 대해서만 동작하기 때문에 처음 2개의 메세지가 호출된다.
    3. 남은 2개의 메세지는 가드절에서 걸려서 호출되지 않기 때문에 after 0 절에서 normal()의 결과를 호출해서 리턴된다. 

    결론적으로 important()를 호출했을 때의 결과는 다음과 같이 구성될 것이다

    [[[Message|important()] | normal()] | normal()]

    재귀적으로 호출되는 것이기 때문에 이해하기 어려울 수도 있지만 하나씩 코드를 따라가면 된다. 

    18> self() ! {15, high}, self() ! {7, low}, self() ! {1, low}, self() ! {17, high}.
    {17,high}
    19> selective_receive:important().
    [high,high,low,low]

    실행 결과는 위와 같으며, 선택적으로 필요한 메세지만 먼저 수신하도록 처리했다.

    이렇게 선택적 수신을 했을 때 문제점이 존재한다. 프로세스가 받은 메세지지만 읽지 않은 메세지가 쌓이면 쌓일수록 새로운 메세지를 읽어오는데 느려진다는 것이다.

    만약 기존 프로세스에 쌓여있는 메세지가 366개인데, 불필요한 메세지라서 읽지 않은 경우라고 가정해보자. 그리고 필요한 메세지가 367번째로 전달되었을 때, 이걸 읽기 위해서 erlang 프로세스는 366개의 불필요한 메세지를 한번 점검한 후에야 367번째 메세지를 수신할 수 있게 된다. 

    이처럼 불필요한 메세지가 쌓이는 것은 erlang 성능 문제를 야기시킨다. 

    댓글

    Designed by JB FACTORY