erlang 공부 : Building an Application With OTP

    참고

     


    1. Building an Application With OTP

    얼랭 Application은 관련된 코드와 프로세스로 이루어진 하나의 그룹이다. OTP Application은 프로세스에 OTP behaviour를 사용해서 VM이 프로세스를 설정, 해체하는 방법을 알려주는 형태다. 예를 들어 OTP Behaviour를 사용하는 경우, init(), terminate() 등이 구현되어야 한다. 

    이번 장에서는 OTP Application의 일부를 작성할 것이고, 가장 먼저 프로세스 풀을 구현하는 방법에 대해서 이야기한다. 프로세스 풀은 어플리케이션에서 실행되는 리소스를 일반적인 방식으로 관리 / 제한하는 방법이다. 

     

     


    2. A Pool of Processes

    프로세스 풀을 이용한 리소스를 잘 관리할 수 있다. 프로세스 풀은 어떻게 동작해야할까?

    • 동시에 동작할 수 있는 최대 프로세스를 제한해야 함. 
    • 가용한 프로세스가 없는 경우, 작업을 큐에 넣고 대기함. 가용한 프로세스가 생기면 작업을 실행하는 프로세스를 배정함. 

    이렇게 동작하는 프로세스 풀을 사용해야하는 이유는 뭘까? 어떤 이점이 있기 때문에 사용해야 하는 것일까?

    • 동시에 동작할 수 있는 최대 프로세스를 제한할 수 있음. → 서버 관리 가능. 
    • 어플리케이션에서 열릴 수 있는 파일 수를 제한할 수 있음. 
    • 특정 프로세스에 선택적으로 더 많은 리소스를 할당할 수 있음. (우선순위 관점) 
    • 가끔 어플리케이션에 과부하가 걸렸을 때, 요청으로 온 작업을 큐에 넣어서 안정적으로 어플리케이션이 동작하도록 함. 

    위와 같은 장점이 있기 때문에 얼랭 어플리케이션에 프로세스 풀을 도입하는 것은 좋은 방법이 될 수 있다. 

     

    2.1 Process Pool 요구사항 → 구현해야 할 함수

    • 어플리케이션의 시작 / 종료
    • 특정 프로세스 Pool 시작 및 종료
    • Pool 에서 작업 시작.
    • Pool 이 꽉 차면 시작할 수 없음을 통지.
    • Pool 에 가용 프로세스가 생기면 작업을 실행. 아닌 경우 작업이 큐에 대기함. 
    • Pool에서 가능한 빨리 작업을 비동기적으로 실행함. 여유가 없는 경우 작업이 큐에 대기함. 

    다음 기능을 구현한 프로세스 풀을 작성해야한다. 이런 형태로 동작하는 프로세스 풀은 어플리케이션의 가용성에 좋은 영향을 줄 수 있기 때문이다. 

     


     3. Onion Layer 이론

    Supervisor과 관리하는 자식 프로세스들이 있다. 자식 프로세스가 죽었을 때, 상태 관점에서는 다음 세 가지로 나누어 볼 수 있다. 

    • Static State : 설정 파일이나 Supervisor로 부터 쉽게 상태를 복구할 수 있다. 
    • 재계산 할 수 있는 Dynamic State : init() 함수등을 이용해 재계산해서 복구할 수 있다. 
    • 재계산 할 수 없는 Dynamic state : 재계산할 수 없는 상태. 예를 들면 사용자 입력 같은 것들이 있다. 

    Statie State나 재계산 가능한 Dynamic State는 자식이 죽을 때 마다 새롭게 자식을 시작하면서 실패했던 자식의 상태를 복구할 수 있다. init() 함수라든지 다른 프로세스로부터 전달받은 값을 이용해 다시 재계산할 수 있다. 문제는 재계산 할 수 없는 Dynamic State를 다룰 때다. 

    재계산 불가능한 Dynamic State는 원칙적으로 복구할 수 있는 방법이 없다. 이 때문에 가장 중요한 데이터로 생각할 수 있고, 이 데이터가 훼손되지 않도록 보호해야한다. 어플리케이션에서 실패가 허용되지 않는 부분을 오류 커널이라고 하고 주로 Try ~ Catch 문으로 감싼다. 즉, 재계산 불가능한 Dynamic State는 오류 커널 내부에 존재하도록 해서 가능한한 잃어버리지 않도록 해야한다. 

    Onion은 Core와 껍질로 이루어진다. 중요한 데이터(재계산 불가능한 값)는 양파의 Core에 저장하고, 중요하지 않은 데이터(재계산, 복구 가능한 데이터)는 껍질에다가 저장하도록 한다. 이렇게 동작하도록 Supervision Tree는 다음과 같이 구성해야 한다.

    • 서로 같은 종류의 작업은 같은 Supervision Tree에 속해야 함. 관련 없는 작업은 서로 다른 Supervision Tree에 속해야 함. 
    • 같은 종류의 작업이라도 덜 중요한 작업은 하위 Supervision Tree에 속하도록 구성할 수 있음. 

    Supervision Tree를 구성할 때는 다음과 같이 구성해야한다. 아래 그림도 이해를 돕기 위해 살펴보자.

    위 그림에 대한 설명은 다음과 같다.

    1. 주문 처리 / 로그 쓰기는 서로 다른 작업이다 → 서로 다른 Supervision Tree
    2. 사용자 요청 데이터가 더 중요하고, DB에 데이터 읽어오는 것은 덜 중요하다 (재계산 가능하기 때문) → Supervision Tree에서 SubTree로 구성

     

     


    4. A Pool's Tree

    이런 프로세스 풀은 어떻게 구성해야할까? Top → Btm / Btm → Top 방식 모두가 가능하다. 여기선 Top → Btm 방식으로 설계해본다. 

     

    4.1 gen_server + supervisor 사용

    프로세스 풀마다 하나의 gen_server와 supervisor를 사용한다. 각각의 역할은 다음과 같다. 

    • gen_server → PoolServer로 명명
      • 프로세스 풀의 프로세스 수를 측정 
      • 프로세스 풀에 대기중인 작업수 유지
      • 새로운 자식 프로세스 추가 요청
    • supervisor → Worker Supervisor로 명명
      • 프로세스 풀의 각 작업자를 관리.

    Pool Server는 Worker Supervisor에게 직접 새로운 자식 프로세스를 추가해줄 것을 요청한다. 이를 위해서 Pool Server는 Worker Supervisor가 누구인지 알아야 한다. 또한 새로운 자식 프로세스가 추가되는 것은 동적인 작업이며, 또한 같은 종류의 작업을 하도록 구성해야하기 때문에 Supervisor는 simple_one_for_one 재시작 전략을 가져야 한다. 

    위의 방식을 참고해서 구성한 프로세스 풀 디자인은 다음과 같다. 그런데 이 때, 한 가지 문제점이 있다. Pool1, Pool2를 관리하는 Pool Supervisor가 싱글톤이라는 점이다. 이것은 내결함성에 있어서 심각한 문제를 초래한다. 

    예를 들어 프로세스 풀2의 Worker들이 단시간에 너무 자주 실패해서 Worker Supervisor가 죽는 경우를 고려해보자. Process Pool2의 Worker Supervisor의 Fail은 Pool Supervisor의 Fail도 초래하고, Process Pool1까지 종료되도록 만든다. 

    이를 개선하기 위해 Pool Supervisor를 관리하는 Supervisor of Pool Supervisor를 추가하면 된다. 이 경우 특정 풀의 실패에 따른 내결함성이 약간 개선된다. Onion Layer 관점에서 살펴보면 다음과 같다.

    • 각 Worker는 독립적임.
    • Pool Server는 Worker로부터 격리됨. (주요한 데이터를 가지고 있음) 
    • 모든 Process Pool은 독립적으로 동작함. 

     


    5. Implementing the Supervisors

    지금부터 프로세스 풀 구성을 위한 Component들을 하나씩 생성하고자 한다. 앞서 이야기 한 것처럼 Top → Bottom 방식으로 구성하게 될 것이다. 

    • sup_of_pool_sup : Pool Supervisor 생성 및 관리
    • pool_supervisor : Pool Server 및 Worker Supervisor 생성
    • worker_supervisor : 요청이 오면 Worker 생성함. 
    • Pool Server 
      • Pool Supervisor에게 Worker Supervisor 생성 요청함. 
      • Worker Supervisor에게 Worker 생성 요청함. 

    각 컴포넌트의 역할을 살펴보면 다음과 같다. 

     

    5.1 sup_of_pool_sup (Pool Supervisor 관리)

    Pool Supervisor(Process Pool 자체를 관리하는)들을 관리하는 Supervisor다. 

    start_link() ->
      supervisor:start_link({local, ppool}, ?MODULE, []).
    
    stop(P) ->
      case whereis(ppool) of
          P when is_pid(P) -> exit(P, kill);
          _                -> ok
      end.
    
    % Each Pool Supervisor
    start_pool(Name, Limit, MFA) ->
      ChildSpec =
        #{
          id => Name,
          start => {pool_sup, start_link, [Name, Limit, MFA]},
          restart => permanent,
          shutdown => 10500,
          type => supervisor,
          modules => [pool_sup]
        },
      supervisor:start_child(ppool, ChildSpec).
    
    stop_pool(Name) ->
      supervisor:terminate_child(ppool, Name),
      supervisor:delete_child(ppool, Name).
    
    
    init([]) ->
      SupFlags =
        #{strategy => one_for_one,
          intensity => 6,
          period => 3600
        },
      ChildSpecs = [],
      {ok, {SupFlags, ChildSpecs}}.
    1. start_link()로 supervisor를 실행할 때 {local, ppool}로 실행한다. 이것은 erlang VM 머신 1대에서만 ppool의 이름으로 슈퍼바이저의 PID를 저장하겠다는 의미다.
    2. stop()에서는 'kill'을 이유로 Supervisor를 종료한다. OTP 프레임워크가 모든 슈퍼바이저에 대해 잘 정의된 종료 절차를 제공하지만, 현재 상태에서는 이를 사용할 수 없기 때문이다.
    3. start_pool()을 통해 각 프로세스 풀을 관리할 Supervisor를 시작할 수 있다. 프로세스 풀을 관리할 Supervisor는 start_link()에서 Name, Limit, MFA를 받는 것을 구현해야한다. 
    4. init()에서 Supervisor의 스펙을 설정한다. 이 때, one_for_one으로 설정해야한다. 왜냐하면 각 프로세스는 독립적으로 동작할 것이기 때문에 서로 Dependencies를 가지지 않기 때문이다. 

     

     

    5.2 pool_sup : 프로세스 풀 하나를 관리함. 

    • 프로세스 풀 하나를 관리하는 슈퍼바이저다. 
    • 이 녀석은 Worker Supervisor와 Pool Server를 관리한다. 
    -module(pool_sup).
    -behaviour(supervisor).
    
    %% API
    -export([start_link/3, init/1]).
    
    start_link(Name, Limit, MFA) ->
      supervisor:start_link(?MODULE, {Name, Limit, MFA}).
    
    init({Name, Limit, MFA}) ->
      SupervisorSpec =
          #{
            strategy => one_for_all,
            intensity => 1,
            period => 3600
          },
      ChildSpecs =
        [
          %pool_server
          #{
            id => serv,
            start => {pool_server, start_link, [Name, Limit, self(), MFA]},
            restart => permanent,
            shutdown => 5000,
            type => worker,
            modules => [pool_server]
          }
        ],
      {ok, {SupervisorSpec, ChildSpecs}}.
    1. sup_of_pool_sup이 pool_sup의 start_link()를 호출할 때 Name, Limit, MFA를 전달해준다. 따라서 이 매개변수를 받아서 처리할 수 있도록 start_link()를 구현한다.
    2. SupervisorSpec의 전략을 one_for_all로 선택한다. 이 슈퍼바이저가 관리하는 프로세스는 PoolServer와 WorkerSupervisor인데, 하나만 존재해서는 아무 의미가 없다. 따라서 하나의 자식 프로세스가 죽은 경우 모든 자식 프로세스를 재시작한다. 
    3. ChildSpec에서는 PoolServer만 넣는다. PoolServer가 PoolSupervisor에게 WorkerSupervisor 생성 요청해서 WorkerSupervisor가 생성되도록 한다. 
      1. 이를 위해서 PoolServer를 실행할 때 PoolSupervisor의 PID를 self()를 통해서 넘겨준다. 

    교재에서는 ChildSpec에 PoolServer만 넣어서, PoolSupervisor 생성 시 PoolServer만 생성도록 했다. WorkerSupervisor도 같이 뜨면 좋을텐데 왜 이렇게 했을까 고민해보면, 아마 PoolServer, PoolSupervisor라는 모듈 자체를 일반화해서 사용하고 싶었던 것 같다. (재사용성을 목적으로) 

    그렇기 때문에 시작할 때도 [Name, Limit, MFA]를 Argument로 줘서 동적으로 원하는 Worker Process를 띄울 수 있도록 하는 것 같다. 

     

     

    5.3 Pool Worker Supervisor : 프로세스 풀의 워커를 관리함. 

    • 프로세스 풀의 Worker 프로세스를 관리하는 슈퍼바이저다. 
    • PoolServer에게 프로세스 요청을 받으면 자식 Process를 생성한다. 
    -module(pool_worker_sup).
    -behaviour(supervisor).
    %% API
    -export([init/1, start_link/1]).
    
    start_link(MFA = {_, _, _}) ->
      supervisor:start_link(?MODULE, MFA).
    
    init({M, F, A}) ->
      SupFlags =
        #{strategy => simple_one_for_one,
          intensity => 5,
          period => 3600
        },
      ChildSpecs =
        [
          #{
            id => pool_worker,
            start => {M, F, A},
            restart => temporary,
            shutdown => 5000,
            type => worker,
            modules => [pool_server]
          }
        ],
      {ok, {SupFlags, ChildSpecs}}.
    1. 워커 프로세스의 MFA를 동적으로 전달받아서 실행한다. 이것은 WorkerSupervisor가 일반화된 목적으로 사용되는 것을 의미함.
    2. 슈퍼바이저의 전략을 simple_one_for_one 전략을 사용하며, 이는 Worker 프로세스가 동일한 모듈을 이용해서 동작하는 것을 의미함. 그리고 각각이 격리되어 동작함. 

     

     

    5.4 Pool Server : 외부 요청을 받아서 Proxy함. 

    • 프로세스 풀에 핵심적인 역할을 하는 서버다.
    • 요청을 받으면 WorkerSupervisor에게 프로세스를 생성하도록 요청한다. 
    -module(pool_server).
    
    start(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit)->
      gen_server:start({local, Name}, ?MODULE, {Limit, Sup, MFA}, []).
    
    start_link(Name, Limit, Sup, MFA) when is_atom(Name), is_integer(Limit)->
      gen_server:start_link({local, Name}, ?MODULE, {Limit, Sup, MFA}, []).
    
    
    init({Limit, Sup, MFA}) ->
      self() ! {start_worker_supervisor, {Sup, MFA}},
      {ok, #state{limit=Limit, sup=Sup}}.
    1. 외부에서 Name, Limit, Sup, MFA를 인자로 받은 후, gen_server를 시작한다. 이 메서드는 pool_supervisor가 호출한다. 그리고 Sup에는 Pool Supervisor가 전달된다. 
    2. init() 메서드에서는 자기 자신에게 start_worker_supervisor 메세지를 전송하고, 값을 반환한다. Pool Supervisor init() 과정에서 pool_server의 init()가 호출된다. 이 말은 Pool_server의 init()가 완료되어야 Pool Supervisor의 초기화가 완료된다는 의미다. 만약 pool_Server의 init() 과정에서 supervisor:start_child(Sup, MFA)를 호출하게 되면, Sup에 해당되는 Pool Supervisor가 초기화 되지 않았기 때문에 호출에 응답할 수 없게 된다. 따라서 자기 자신에게 메세지를 보내서 비동기로 처리요청한다. 
    run(Name, Args) ->
      gen_server:call(Name, {run, Args}).
    
    sync_queue(Name, Args) ->
      gen_server:call(Name, {sync, Args}, infinity).
    
    async_queue(Name, Args) ->
      gen_server:cast(Name, {async, Args}).
    
    stop(Name) ->
      gen_server:call(Name, stop).
    1. run()은 동기방식으로 요청하고, 가용 가능한 프로세스가 없는 경우 Task Queue에 넣지 않고 noalloc 응답을 하도록 한다.
    2. sync_queue()는 가용 가능한 프로세스가 없는 경우 Task Queue에 넣고 실행될 때까지 기다린다.
    3. async_queue()는 가용 가능한 프로세스가 없는 경우 Task Queue에 넣고, 다음 작업을 진행한다. 그리고 caller는 추후에 메세지로 결과를 통보받는다. 
    handle_call({run, Args}, _From, S = #state{limit=N, sup=Sup, refs=Refs}) when N > 0 ->
      {ok, Pid} = supervisor:start_child(Sup, Args),
      Ref = erlang:monitor(process, Pid),
      {reply, {ok, Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref, Refs)}};
    handle_call({run, _Args}, _From, S = #state{limit=N}) when N =< 0 ->
      {reply, noalloc, S};
    handle_call({sync, Args}, _From, S = #state{limit=N, sup=Sup, refs=Refs}) when N > 0 ->
      {ok, Pid} = supervisor:start_child(Sup, Args),
      Ref = erlang:monitor(process, Pid),
      {reply, {ok, Pid}, S#state{limit=N-1, refs=gb_sets:add(Ref, Refs)}};
    handle_call({sync, Args}, From, S = #state{limit=N, queue = Queue}) when N =< 0 ->
      {noreply, S#state{queue = queue:in({From, Args}, Queue)}};
    handle_call(stop, _From, State) ->
      {stop, normal, ok, State};
    handle_call(_Msg, _From, State) ->
      {noreply, State}.
    • Limit를 이용해서 현재 가용 가능한 프로세스를 관리한다. 만약 Limit가 0보다 작은 경우는 가용 가능한 프로세스가 없기 때문에 queue:in() 메서드를 이용해 Task에 작업을 넣어둔다. 
    • 이 때, Task에 들어가는 작업은 From, Args 형태로 표현된다. From은 sync()를 호출해서 결과를 기다리고 있는 caller 프로세스이며, Args는 특정 작업을 시작하는데 필요한 파라메터들이다. 
    handle_cast({async, Args}, S = #state{limit = N, sup = Sup, refs = Refs}) when N > 0 ->
      {ok, Pid} = supervisor:start_child(Sup, Args),
      Ref = erlang:monitor(process, Pid),
      {noreply, S#state{limit = N-1, refs = gb_sets:add(Ref, Refs)}};
    handle_cast({async, Args}, S = #state{limit = N, queue = Q}) when N =< 0 ->
      {noreply, S#state{queue = queue:in(Args, Q)}};
    handle_cast(_Msg, S) ->
      {noreply, S}.
    

    비동기 처리 요청도 동일한 형식으로 처리한다. 

    • 프로세스가 넘었으면 바로 처리함.
    • 프로세스가 없으면 작업 큐에 넣어둠. 
    handle_info({start_worker_supervisor, {Sup, MFA}}, S)->
      {ok, Pid} = supervisor:start_child(Sup, ?SPEC(MFA)),
      {noreply, S#state{sup = Pid}};
    handle_info({'DOWN', Ref, process, _Pid, _}, S = #state{refs=Refs}) ->
      case gb_sets:is_element(Ref, Refs) of
        true -> handle_down_worker(Ref, S);
        false -> S
      end;
    handle_info(Msg, S) ->
      io:format("Unknown Msg : ~p.~n", [Msg]),
      {noreply, S}.
      
      
    -define(SPEC(MFA),
      {worker_sup,
        {pool_worker_sup, start_link, [MFA]},
        temporary,
        10000,
        supervisor,
        [pool_worker_sup]}).
    • start_worker_supervisor로 요청이 오는 경우, 슈퍼바이저에게 워커 슈퍼바이저 시작을 요청한다. 이 때 SPEC(MFA)라는 매크로를 이용한다.
    • 만약 특정 프로세스가 다운되었다는 메세지를 받으면, 자신이 관리하고 있는 자식 프로세스 중 하나인지 확인한 후 handle_down_worker()를 호출한다.
    handle_down_worker(Ref, S = #state{limit=L, sup=Sup, refs=Refs, queue=Q}) ->
      case queue:out(Q) of
        % sync case
        {{value, {From, Args}}, Q} ->
          {ok, Pid} = supervisor:start_child(Sup, Args),
          NewRef = erlang:monitor(process, Pid),
          NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
          gen_server:reply(From, {ok, Pid}),
          {noreply, S#state{refs=NewRefs}};
        {{value, Args}, Q} ->
          {ok, Pid} = supervisor:start_child(Sup, Args),
          NewRef = erlang:monitor(process, Pid),
          NewRefs = gb_sets:insert(NewRef, gb_sets:delete(Ref, Refs)),
          {noreply, S#state{refs=NewRefs}};
        {empty, _} ->
          {noreply, S#state{limit = L+1, refs = gb_sets:delete(Ref, Refs)}}
      end.

    handle_down_worker()는 worker가 죽었을 때, pool_server가 관리하고 있는 부분을 업데이트 하고 다음 작업을 배정할 수 있도록 하는 메서드다.

    1. Pool Server가 자식 프로세스 시작을 요청받았을 때, 모니터를 생성해서 gb_set에 넣어둔다. 
    2. Pool Server는 각 자식 프로세스의 모니터를 가지고 있기 때문에 자식 프로세스가 종료되면 DOWN 메세지를 전송 받는다. 
    3. Pool Server가 DOWN 메세지를 받은 경우, 관리하고 있던 자식 프로세스가 종료된 것이기 때문에 gb_set에 관리되고 있는 모니터를 제거하고, Limit을 수정하고, Queue에 Task이 있는 경우 프로세스를 배정한다.

    위 작업을 수행하는 것이 handle_down_worker() 메서드다.

    code_change(_OldVsn, State, _Extra) ->
      {ok, State}.
    
    terminate(_Reason, _State) ->
      ok.

     나머지 gen_server의 interface를 구현해준다. 

     

    5.5 Worker 프로세스 생성

     

     

     

     

     

     

    댓글

    Designed by JB FACTORY