Python : asyncio의 시작과 종료

    들어가기 전

    이 글은 파이썬 비동기 라이브러리 Asyncio를 공부하며 작성한 글입니다.

     


    3.10 asyncio의 시작

    asyncio의 시작은 간단하게 할 수 있다. 일반적인 방법은 다음과 같다

    1. async def로 main() 함수를 선언한다.
    2. asyncio.run()으로 main() 함수를 실행한다. 

    앞서 공부했던 내용에서 asyncio.run()이 어떻게 동작하는지를 자세히 살펴봤었다. asyncio.run()은 다음 작업을 개발자 대신에 해준다. 

    1. 이벤트 루프를 하나 만들고, 전달받은 코루틴의 실행을 예약한다.
    2. run_until_complete()를 이용해서 예약한 코루틴이 끝날 때 까지 기다린다.
    3. 코루틴의 실행이 끝나면, 아직 보류 중인 모든 Task 객체를 수집한다.
    4. 보류중인 모든 Task에 task.cancel()을 실행한다. 즉, 이벤트 루프를 통해 Task의 Cancel이 예약되는 것이다. 
    5. Task를 모두 수집한다. 
    6. 모든 그룹 Task에 대해서 run_until_complete()를 이용해서 각 Task가 완전히 취소될 때 까지 기다린다. 종료되기까지 대기한다는 의미는 대기 중인 Task에 발생한 CancelledError 예외가 처리되기까지 대기한다는 의미다. 

    기본적으로 asyncio.run()을 이용하면, 어플리케이션이 우아하게 끝날 수 있도록 도와준다. 그렇지만 이것만으로는 충분하지 않다. 처음 asyncio를 이용해서 어플리케이션을 개발하다보면 대부분 종료 시 'Task was destoryed but it is pending'과 같은 오류 메세지가 발생하여 이를 처리하는데 많은 시간을 소모한다. 그렇다면 이것은 왜 발생하는 것일까?

     

    3.10.1 asyncio 끝내기

    asyncio.run()을 통해서 끝내는 처리를 했다고 하지만 여전히 에러가 발생할 수 있다. 이것은 어플리케이션에서 앞서 설명한 단계 중 하나 이상을 처리하지 않았기 때문이다. 예를 들어서 아래 코드를 고려해보자.

    async def f(delay):
        await asyncio.sleep(delay)
    
    loop = asyncio.get_event_loop()
    t1 = loop.create_task(f(1))
    t2 = loop.create_task(f(2))
    loop.run_until_complete(t1)
    loop.close()
    
    >>>
    Task was destroyed but it is pending!
    task: <Task pending name='Task-2' coro=<f() running at C:\dev\study_asyncio\chapter3_7_6.py:7> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000020E32D87520>()]>>

    아래 코드에서 'Task was destoryed but it is pending' 이라는 에러 메세지가 발생한 것을 확인할 수 있다. 이것은 무엇 때문에 발생하는 것일까?

    • t1은 1초 후에 종료된다.
    • t2는 2초 후에 종료된다.
    • 이벤트 루프는 t1이 종료된 후 바로 종료된다. 이 시점에 t2는 1초 만큼 더 실행해야 된다.

    위 오류는 위와 같이 이벤트 루프를 종료했을 때, 이벤트 루프 내부에 아직 완료되지 않은 Task가 존재할하기 때문에 발생한다. 따라서 asyncio를 이용한 어플리케이션을 종료하기 전에는 관습적으로 다음 작업을 통해서 이벤트 루프를 종료할 것을 권장한다. 대부분 asyncio.run()이 해주지만, 복잡한 상황이 발생했을 때 처리하려면 이런 작업을 이해하고 있어야 한다.

    1. 이벤트 루프에서 종료되지 않은 Task를 모아서 cancle()을 호출한다.
    2. 모든 Task가 실제로 종료될 때 까지 기다린다.
    3. 이벤트 루프를 닫는다. 
    import asyncio
    import aiohttp
    
    
    async def f(delay):
        await asyncio.sleep(delay)
    
    loop = asyncio.get_event_loop()
    t1 = loop.create_task(f(1))
    t2 = loop.create_task(f(2))
    loop.run_until_complete(t1)
    
    # 종료 로직
    tasks = asyncio.all_tasks(loop=loop)
    for t in tasks:
        t.cancel()
    futures = asyncio.gather(*tasks, return_exceptions=True)
    loop.run_until_complete(futures)
    #  종료 로직
    
    loop.close()

     

     

    3.10.2 asyncio 제대로 끝내기

    아래 코드를 대상으로 종료에 대해서 더 자세히 살펴보고자 한다.

    import asyncio
    from asyncio import StreamReader, StreamWriter
    
    async def echo(reader: StreamReader, writer: StreamWriter):
        print('New connection')
        try:
            while data := await reader.readline():
                writer.write(data.upper())
                await writer.drain()
            print('Leaving Connection.')
        except asyncio.CancelledError:
            print('Connection dropped!')
    
    async def main(host='127.0.0.1', port='8888'):
        server = await asyncio.start_server(echo, host, port)
        async with server:
            await server.serve_forever()
    
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Bye!')

    위 코드를 간략히 설명하면 다음과 같다.

    1. 서버 프로그램에서 echo() 코루틴 함수를 사용하여 각각의 네트워크 연결에 대해 코루틴을 생성한다. 네트워크 동작을 처리하기 위해 asyncio의 StreamsAPI를 사용했다.
    2. 네트워크 연결을 유지하기 위해 데이터를 기다리는 무한 루프를 사용한다. 
    3. 모든 글자는 대문자로 전환하여 송신 측에 되돌려준다.
    4. 이 Task가 취소되면 메세지를 출력한다.

    위 서버를 실행한 다음에 telnet 명령어를 이용하면 서버와 실시간 통신을 할 수 있게 된다.

     

    클라이언트 종료 → 서버 종료 하는 경우

    $ telnet 127.0.0.1 8888
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    
    hi!
    HI!
    stop shouting
    STOP SHOUTING
    ^]
    telnet> q/
    Connection closed.

    클라이언트를 먼저 종료 시키면 서버에서 사용하는 코루틴도 없어진다. 즉, 서버의 이벤트 루프에는 온전히 main() 이라는 코루틴 Task만 실행 예약이 되어있다. 

    New connection
    Leaving Connection.
    Bye!

    이 때 서버를 종료 시키면 (Ctrl + C) 다음과 같은 문구가 나오면서 종료되게 된다. 

     

    클라이언트 유지 → 서버 종료 하는 경우

    그렇다면 클라이언트가 유지되면서 서버가 종료하면 어떻게 될까?

    New connection
    Connection dropped!
    Bye!

    클라이언트가 유지되고 있는 상황이라면 echo() 코루틴 내부에서 동작 중인 상황이 된다. 이 때, Ctrl + C를 하면 CancelledError에 대한 예외 처리가 시작되었다는 것을 알 수 있다. 정리하면 Ctrl + C는 이벤트 루프 내에 있는 예외 처리기가 동작하도록 해서 CancelledError가 발생하도록 한다.

     

    프로그램 종료 시, 메세지 보내기

    네트워크 연결이 끊기면, 네트워크 연결이 끊겼다는 이벤트를 모니터링 시스템에 발송해야하는 상황을 가정해보자. 그 상황을 가정해보면 아래와 같이 코드를 작성해 볼 수 있다.

    async def echo(reader: StreamReader, writer: StreamWriter):
        print('New connection')
        try:
            while data := await reader.readline():
                writer.write(data.upper())
                await writer.drain()
            print('Leaving Connection.')
        except asyncio.CancelledError:
            msg = 'Connection Closed'
            print(msg)
            asyncio.create_task(send_event(msg)) ## 모니터링 시스템에 메세지 보내기 
            
            print('Connection dropped!')
    
    async def main(host='127.0.0.1', port='8888'):
        server = await asyncio.start_server(echo, host, port)
        async with server:
            await server.serve_forever()
    
    
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Bye!')

    send_event() 메서드를 이용해서 모니터링 시스템으로 메세지를 보낼 수 있다고 해보자. 하지만 이 코드에서는 한 가지 심각한 버그가 있다. 클라이언트 연결이 유지된 상황에서 서버를 종료하면 다음 에러가 발생하는 것을 볼 수 있다.

    New connection
    Connection Closed
    Connection dropped!
    Bye!
    Task was destroyed but it is pending!
    task: <Task pending name='Task-6' coro=<send_event() done, defined at C:\dev\study_asyncio\chapter3_7_8.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0000023B256D89A0>()]>>

    'Task was destoryed but it is pending!' 이라는 메세지가 보인다. 이것은 이벤트 루프가 종료되었는데, 이벤트 루프에 실행이 pending 된 Task가 있는 경우에 발생하는 에러 메세지다. 어떻게 이런 일이 가능할까? 아래를 고려해보자.

    1. Ctrl + C를 입력한다. 
    2. 이벤트 루프에 예약된 예외 처리기는 현재 이벤트 루프에 예약된 모든 Task를 수집한 후, 모든 Task에게 cancel()을 호출한다.
    3. cancel()이 호출되면 해당 Task에는 CancelledError가 발생한다.
    4. except 절에서 CancelledError를 잡는다. 여기서 create_task(send_event())를 이용해서 이벤트 루프에 새로운 코루틴의 실행을 예약한다.
    5. 2번에서 수집한 모든 Task들(send_event 코루틴은 제외)의 취소가 완료될 때까지 run_until_complete()를 실행한다.
    6. run_until_complete() 실행이 종료되면 이벤트 루프가 Close된다. 그렇지만 이 때, send_event() 코루틴이 여전히 이벤트 루프에 pending 상태로 기다리고 있다.

    위와 같은 상황이기 때문에 except 절에서 모니터링 시스템에 메세지를 보내는 코루틴이 정상적으로 종료가 되지 않은 것이다. 

     

    정리 

    • Ctrl + C를 입력하면, 이벤트 루프의 예외 처리기가 이벤트 루프에 있는 모든 Task를 Gather 한 후, cancel()을 호출한다.
    • cancel()은 CancelledError를 raise 한다.
    • except CancelledError 절에서 예외 처리와 관련된 새로운 Task(Future)를 생성하면 안된다. 꼭 생성할 수 밖에 없다면, 동일 함수의 범위 내에서 새로운 Task나 Future에 대해서도 await 해야한다.

    가장 마지막에 적은 내용은 아래와 같이 await 키워드를 이용해서 해당 Task가 끝날 때까지 기다려줘야 한다는 것이다. 

    except asyncio.CancelledError:
        msg = 'Connection Closed'
        print(msg)
        await asyncio.create_task(send_event(msg))

     


    3.10.3. gather()내의 return_exceptions=True

    asyncio.run()에서 종료할 때 gather() 메서드를 이용하는데, 이 때 옵션으로 return_exceptions=True를 사용했다. return_exceptions의 기본값을 False다. 대부분의 경우 기본값인 False를 사용하는 것은 적절하지 않은데, 아래에서는 무엇때문에 기본값을 그대로 사용하는 것이 문제가 되는지 정리하고자 한다. 먼저 상호작용하는 함수들의 동작을 한번 고려해보자.

    1. run_until_complete()는 Future 객체를 받아서 동작한다. 종료 절차를 작성할 때는 gather()로 모은 Future를 인자로 넣어준다.
    2. run_until_complete() Future를 받아서 동작할 때, Future에서 예외가 발생하면 예외는 run_until_complete() 밖으로 던져진다.
    3. run_until_complete()에 여러 Future가 전달되는 경우도 있다. 전달된 Future들 중 하나의 Future라도 내부에서 발생한 예외가 처리되지 못한다면, 전달된 모든 Future에서 예외가 발생한 것이 된다. 여기에는 CancelledError도 포함된다.
    4. 만약 여러 Future들 중 일부의 Task만 CancelledError가 처리되고 나머지는 처리되지 않았다면, CancelledError가 처리되지 않은 Task 때문에 모든 Task가 종료되기 전에 이벤트 루프가 중지된다. 
    5. 종료 절차에서 이런 문제가 발생되면 안된다. 따라서 그룹 Task 중 일부 몇 몇 Task에서 예외가 발생하더라도, 모든 하위 Task가 모두 종료한 뒤에 run_until_complete()가 반환되어야 한다.
    6. 그래서 gather(*, return_exceptions=True)가 존재한다. 이 설정을 통해서 Group Future가 하위 Task의 예외를 반환값으로 처리하도록 하고, 이 덕분에 run_until_complete()가 중지되지 않도록 한다. 

    간단히 정리하면 gather(, return_exceptions=True)는 gather에 전달된 Future들에게서 에러가 발생하더라도, 에러 자체를 처리해서 반환값으로 받도록 한다. 그 덕분에 run_until_complete() 실행 시에 일부 Task의 예외 발생으로 인해 이벤트 루프가 정지되지 않도록 한다.

    아래 코드를 삺펴보자.

    async def f(delay):
        await asyncio.sleep(1/delay)
        return delay
    
    loop = asyncio.get_event_loop()
    for delay in range(5):
        loop.create_task(f(delay))
    pendings = asyncio.all_tasks(loop=loop)
    loop.run_until_complete(tasks[1])
    
    for task in pendings:
        task.cancel()
    futures = asyncio.gather(*pendings, return_exceptions=True)
    results = loop.run_until_complete(futures)
    print(f'{results = }')
    loop.close()

    이 코드에서는 한 가지 에러 유발 장치가 존재한다.

    • 1/0은 ZeroDivisionError가 발생한다.

    f(delay) 코루틴이 동작할 때, delay = 0 이 들어오게 되면 이벤트 루프에서 해당 Task가 실행될 때 ZeroDivisionError가 발생하게 될 것이다. 위 코드에서는 return_exceptions=True를 했기 때문에 ZeroDivisionError가 발생하더라도 이벤트 루프가 정상적으로 종료될 수 있도록 돕느다. 

    results = [1, 4, 
        ZeroDivisionError('division by zero'),
                3, 2]

    실행 결과는 위와 같다.

     

    정리

    • run_until_complete()에 전달된 Future 객체들 중 하나라도 Exception이 발생한다면, run_until_complete는 Exception을 던지고 이벤트루프는 종료된다.
    • gahter(, return_exceptions=True)로 Future를 모으면, Future에서 Exception이 발생했다면 Exception을 처리하고 Exception 자체를 반환 값으로 받도록 한다.
    • gather(, return_exceptions = True)와 run_until_complete()는 상호 작용을 통해서 asyncio가 우아하게 끝날 수 있도록 협력한다. 

     

     


    3.10.4 시그널을 이용한 종료

    이전에는 KeyboardInterrupt (Ctrl - C)를 이용해서 어플리케이션의 실행을 중지했다. KeyboardInterrupt를 발생시켜 loop.run_until_complete()의 호출 블로킹을 효과적으로 중지하고 이후의 종료 절차가 실행되도록 한다. 

    KeyboardInterrupt는 리눅스 기반에서는 SIGINT 시그널을 주는 것과 동일하다. 그런데 일반적으로 네트워크 어플리케이션을 종료할 때는 SIGTERM을 주는 것이 일반적이다. 가장 먼저 SIGINT를 주었을 때, 어플리케이션을 효율적으로 종료하는 코드를 고려해보자.

     

    SIGINT로만 어플리케이션 종료하기

    import asyncio
    
    
    # Signal 주는 것
    
    async def main():
        while True:
            print(f'your application is running')
            await asyncio.sleep(1)
    
    if __name__ == '__main__':
    
        loop = asyncio.get_event_loop()
        task = loop.create_task(main())
        try:
            loop.run_until_complete(task)
        except KeyboardInterrupt:
            print('Got Signal: SIGINT, shutting down')
    
        tasks = asyncio.all_tasks(loop=loop)
        for t in tasks:
            t.cancel()
        futures = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(futures)
        loop.close()

    위의 코드는 다음과 같이 동작한다. 

    • main() 함수에서 무한 루프를 통해서 프로그램이 실행된다. (코루틴이 계속 실행된다). 이 때  메인 쓰레드는 loop.run_until_complete()에 블로킹 된다. 
    • KeyboardInterrupt가 들어오게 되면, loop.run_until_complete() 절에서 바로 빠져나오게 된다. 그리고 except 절에서 에러가 잡혀진다. 
      • 이벤트 루프에는 KeyboardInterrupt에 대한 기본적인 예외 처리기가 존재할 것이다. 이벤트 루프가 동작 도중 KeyboardInterrupt가 발생하면, 이 예외 처리기가 이것을 받아서 KeyboardInterrupt 에러를 던지는 작업을 할 것이다.
      • 따라서 loop.run_until_complete() 밖으로 KeyboardInterrupt 에러가 던져지면서, loop.run_until_complete() 블로킹에서 벗어나게 된다.
    • 이후 메인 쓰레드는 실행 흐름을 따라서 실행된다. 이 때, Task가 취소되고 정상적으로 이벤트 루프를 종료하는 동작을 한다. 

    위 코드를 실행하고 Ctrl - C를 이용해서 종료하면 다음 실행 결과가 나온다. 

    $ python chapter_3_10_2.py
    your application is running
    your application is running
    your application is running
    Got Signal: SIGINT, shutting down

    우선 KeyboardInterrupt(SIGINT)를 이용해서 프로그램을 종료하는 방법은 굉장히 간단하게 처리할 수 있다. 그런데 SIGINT 뿐만 아니라 다른 시그널을 받고도 asyncio 이벤트 루프를 종료해야한다면 어떻게 해야할까?

     

    SIGTERM, SIGINT 명령어를 받고 종료하기 

    어플리케이션이 시그널을 받고 종료를 해야한다면, 실제로는 main() 코루틴 내에서 네트워크 리소스의 Grace Termination이 이루어져야 한다. 어플리케이션이 시그널을 받고 종료하려면 몇 가지 고려사항이 필요한데, 최소한의 고려사항은 다음과 같다. 

    • main() 코루틴에서 네트워크 리소스의 자원 종료가 진행되어야 한다. CancelledError를 처리하는 예외 처리기가 추가되어야 하고, 그 예외 처리기 내의 정리 코드에서는 수 초 간의 정리 작업을 수행해야한다. (네트워크 피어 간 통신을 처리한 후에 모든 소켓 연결을 종료 시키는 행위등)
    • 시그널을 여러번 보내더라도 어플리케이션이 이상한 동작을 해서는 안된다. 예를 들면 종료 절차를 여러번 반복하거나 혹은 동시에 실행하는 것이다. 즉, 첫번째 종료 시그널을 받아 정리 작업을 시작하면, 그 이후의 시그널은 프로그램 종료 시까지 무시되어야함. 
    import asyncio
    from signal import SIGTERM, SIGINT
    
    
    async def main():
        try:
            while True:
                print(f'your application is running')
                await asyncio.sleep(1)
        except asyncio.CancelledError: ## 1
            for i in range(3):
                print('<Your application is shutting down>')
                await asyncio.sleep(1)
    
    
    def handler(sig): ## 2
        # loop = asyncio.get_running_loop()
        loop.stop()
        print(f'Got Signal: {sig!s}, shutting down')
        loop.remove_signal_handler(sig) ## 3-1
        loop.add_signal_handler(SIGINT, lambda: None) ## 3-2
    
    
    if __name__ == '__main__':
    
        loop = asyncio.get_event_loop()
    
        for sig in (SIGINT, SIGTERM):
            loop.add_signal_handler(sig, handler, sig) ## 4
    
        task = loop.create_task(main())
        loop.run_forever()  ## 5
    
        tasks = asyncio.all_tasks(loop=loop)
        for t in tasks:
            t.cancel()
        futures = asyncio.gather(*tasks, return_exceptions=True)
        loop.run_until_complete(futures)
        loop.close()

    위 코드를 살펴보자.

    1. except asyncio.cancelledError

    main() 코루틴에서 정리 작업을 수행한다. main() 코루틴과 관련된 Task를 cancel() 메서드를 실행해서 취소 시그널을 전달하면 이곳으로 넘어온다. 그러면 이곳에서 3초 간 기다렸다가 코루틴이 종료된다. 

    정확하게 알아둬야 할 점은 except 절에서 하는 것은 3초 간 기다리는 것 말고는 아무것도 없다는 것이다. task.cancel()을 통해 cancel 에러가 발생하면, Task는 이것을 Catch해서 3초 간 기다리고 try ~ except 절을 벗어난다. try ~ except 절을 벗어나게 되면 코루틴의 마지막이기 때문에 StopIteration이 발생하고, 코루틴이 종료되는 것이다. 

    2. handler()

    이곳은 이벤트 루프의 예외 처리기에서 사용할 callback 함수를 정의하는 곳이다. 

    handler() 메서드의 주요한 목적은 현재 동작중인 이벤트 루프를 중지하는 것이다. SIGINT, SIGTERM 신호가 오게 되면 이 콜백으로 넘어오게 될 것이다. 콜백으로 넘어왔을 때, 이벤트 루프는 동작하면서 예약된 Task를 실행하고 있을 것이다. 이 때 이벤트 루프를 정지시키면서, 이벤트 루프에 예약된 Task가 실행되지 않도록 한다. SIGINT, SIGTERM은 Application을 종료하라는 의미이기 때문에 정상적인 동작으로 이해할 수 있다. 

    3-1. loop.remove_signal_handler()

    종료 절차 중에 SIGINT, SIGTERM 시그널이 또 들어와도 시그널 처리기는 실행되지 않아야 한다. 만약 시그널 처리기가 다시 실행되게 된다면, loop.stop()이 호출될 것이다. 그런데 이 상황은 새로운 문제를 발생시킨다.

    첫번째 시그널에 의해서 시그널 처리기가 이벤트 루프의 실행을 중단 시키고 loop.run_forever() 실행이 중단된다. 그리고 loop.run_until_complete()를 통해서 모든 Task가 취소되는 작업을 기다리고 있을 것이다. 즉, 이벤트 루프가 동작해서 모든 Task가 취소될 때까지 기다리는 작업을 할 것이다. 

    하지만 이 때, 두번째 시그널이 들어오고 시그널 처리기가 호출된다면, 이벤트 루프가 중지된다. 이벤트 루프가 중지된다는 것은 Task를 취소하는 작업이 중지된다는 의미다. 의도한대로 동작하지 않고 이상동작을 할 수 있기 때문에 시그널 처리기 콜백이 호출되는 시그널 처리기를 이벤트 루프에서 제거해야한다.

    3-2 loop.add_signal_handler(SIGINT, lambda: None)

    이 코드도 반드시 필요하다. 단순히 SIGINT 시그널 처리기를 삭제하게 된다면, 이벤트 루프는 기본적으로 내장된 KeyboardInterrupt 시그널 처리기를 사용할 것이다. 이 시그널 처리기는 KeyboardInterrupt를 raise 할텐데, 바깥 쪽에서 try ~ catch로 잡아주지 않기 때문에 프로그램 자체가 Exception에 의해서 바로 종료될 것이다. 따라서 이 코드 역시 추가 되어야 한다. 

     

    4. loop.add_signal_handler()

    이벤트 루프에 시그널 처리기를 추가한다. 기본적으로 KeyboardInterrupt(SIGINT)에 대한 시그널 처리기는 추가되어있다. 아마 기본적인 SIGINT 시그널 처리기는 KeyboardInterrupt 예외를 raise 하는 형태일 것이다. 하지만 그런 형태로 동작하지 않기 위해서 시그널 처리기를 추가할 수 있다.

    추가된 시그널 처리기는 해당 시그널이 발생했을 때, 해당 콜백 함수를 호출해준다. 

     

    5. loop.run_forever()

    보통 run_forever()의 실행은 이벤트 루프가 중지될 때까지 계속된다. 이번 예제에서 이벤트 루프는 SIGINT나 SIGTERM을 받았을 때 handler() 콜백이 호출되고, 이 안에서 loop.stop()으로 정지된다. loop.stop()으로 루프가 정지되면, run_forever()에서 블로킹 된 코드 흐름은 뒤로 흐르게 될 것이다. 

     

    위 코드의 실행 결과는 다음과 같다.

    $ python shell_signal02.py
    <Your application is running>
    <Your application is running>
    <Your application is running>
    <Your application is running>
    <Your application is running>
    ^CGot signal: Signals.SIGINT, shuttding down.
    <Your application is shutting down...>
    ^C<Your application is shutting down...>
    ^C<Your application is shutting down...>

    종료되고 있는 단계에서 여러번 CTRL + C를 눌렀지만, main() 코루틴이 완료될 때까지 아무 일도 일어나지 않았다. 이것은 첫번째 시그널이 주어졌을 때, 시그널 처리기가 이벤트 루프에서 삭제 되었기 때문에 다음과 같이 동작했던 것이다. 

     

    동작의 정리

    • 이벤트 루프는 기본적으로 시그널 처리기를 가지고 있다. 이 시그널 처리기는 SIGINT 신호를 받으면 trigger되어서 KeyboardInterrupt를 raise 하는 형태로 동작한다. 
    • 이벤트 루프에 시그널 처리기를 콜백과 함께 등록할 수 있다. 이 때 각종 Signal에 대해서 동작하도록 만들 수 있다.
    • 이벤트 루프에 시그널 처리기를 등록할 경우, 기존에 존재하던 시그널 처리기는 동작하지 않는다.
    • 이벤트 루프가 실행되고 있을 때, 특정 시그널이 들어오면 시그널 처리기가 호출된다. 시그널 처리기는 본인의 작업을 완료하고, loop.run_forever() 같이 동기 함수의 호출에서 블로킹이 풀린다.

     

    asyncio.run()에서 시그널 처리기 사용하기

    앞선 예제에서는 이벤트 루프의 동작 흐름을 직접 컨트롤 했었다. 실제로는 asyncio.run()을 이용해서 대부분 비동기 어플리케이션을 시작하기 때문에 asyncio.run()에서 어떻게 이용할지를 한번 생각해봐야 한다. 

    import asyncio
    from signal import SIGTERM, SIGINT
    
    
    async def main():
        
        loop = asyncio.get_running_loop() ## 1
        for sig in (SIGINT, SIGTERM):
            loop.add_signal_handler(sig, handler, sig)
    
        try:
            while True:
                print(f'your application is running')
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            for i in range(3):
                print('<Your application is shutting down>')
                await asyncio.sleep(1)
    
    
    def handler(sig): ## 2
        loop = asyncio.get_running_loop()
        pending = asyncio.all_tasks(loop=loop)
        for t in pending: 
            t.cancel()
        print(f'Got Signal: {sig!s}, shutting down')
        loop.remove_signal_handler(sig)
        loop.add_signal_handler(SIGINT, lambda: None)
    
    
    if __name__ == '__main__':
        asyncio.run(main())

    이벤트 루프의 제어를 직접하지 않기 때문에 앞선 예제와는 조금 다른 형태로 코드를 작성해야한다. 

    1. loop.add_signal_handler()

    asyncio.run()으로 시작하기 때문에 이벤트 루프를 직접 얻을 방법이 없다. 따라서 main() 코루틴 내에서 시그널 처리기를 이벤트 루프에 추가하는 작업을 한다. 

     

    2. def handler()

    만약 handler()에서 이벤트 루프를 직접 중지한다면, main() Task가 정상적으로 중지될 것이다. 따라서 시그널 처리기 내부에서 이벤트 루프를 중지할 수 없다. 따라서 Task에 취소 요청을 해야한다.

    Task에 취소 요청이 들어가면 task.cancel()을 통해서 Task는 자동적으로 종료될 것이다. Task가 자동적으로 종료되면 asyncio.run()의 run_until_complete(main()) 부분이 정상적으로 리턴되면서 asyncio.run() 내부의 정리 작업이 진행된다. 

     

     


    3.10.5 종료 중 Executor 기다리기

    main() 코루틴 내에서 Executor에서 새로운 Future가 실행될 수 있다. 이 때 main() 코루틴이 끝나는 시간보다 Executor()의 실행 시간이 더 긴 경우도 있다. 파이썬 3.9 이상에서는 asyncio.run()을 이용했을 때, main() 코루틴이 먼저 끝나더라도 Executor에서 실행되는 Future들이 정상적으로 종료될 때까지 기다린다.

    import time
    import asyncio
    
    async def main():
        loop = asyncio.get_running_loop()
        loop.run_in_executor(None, blocking)
        print(f'{time.ctime()} Hello!')
        await asyncio.sleep(1)
        print(f'{time.ctime()} Good Bye!')
    
    
    def blocking():
        time.sleep(1.5)
        print(f'{time.ctime()} Hello from a Thread!')
    
    asyncio.run(main())
    >>>
    Sun Feb 19 17:47:59 2023 Hello!
    Sun Feb 19 17:48:00 2023 Good Bye!
    Sun Feb 19 17:48:00 2023 Hello from a Thread!

    예를 들어 파이썬 3.9 이상에서 위 코드를 실행시켜보면 정상적으로 종료되는 것을 알 수 있다. 하지만 파이썬 3.9보다 아래 버전에서는 많은 에러가 발생한다. 왜 이런 에러가 발생하고, 3.8 이하 버전에서는 어떻게 대처해야하는지를 고려해본다. 

     

    3.8 이하에서는 왜 에러가 발생할까?

    loop.run_in_executor()에서 반환되는 객체는 Future 객체다. 그리고 asyncio.run()에서 main() 코루틴이 끝나면 asyncio.all_task()를 이용해서 모든 Task를 가져온 후에 cancel()을 호출한다. 그런데 Executor에서 실행되는 녀석들은 Future 객체이기 때문에 all_task()를 호출했을 때 포함되지 않는다. 그래서 Future 객체는 실행 취소가 이루어지지 않는다.

    따라서 이벤트 루프 내에 실행되어야 할 Future 객체가 있는데 loop.close()가 호출되기 때문에 에러가 발생하게 된다. 그렇지만 3.9 버전부터는 이 부분을 고려하도록 asyncio.run()이 개선되었기 때문에 잘 종료될 수 있다. 하지만 3.8 이하 버전에서는 이 부분이 잘 처리되도록 개발자가 직접 코드를 작성해야한다. 

     

    3.8 이하에서 Executor 종료 기다리기 1

    첫번째 방법으로는 main() 코루틴 내에서 Future 객체가 종료될 때까지 기다리는 방법이다. Future 객체는 awaitable 객체이기 때문에 await 키워드를 이용해서 종료될 때까지 기다릴 수 있다.

    import time
    import asyncio
    
    async def main():
        loop = asyncio.get_running_loop()
        future = loop.run_in_executor(None, blocking)
        try:
            print(f'{time.ctime()} Hello!')
            await asyncio.sleep(1)
            print(f'{time.ctime()} Good Bye!')
        finally:
            await future  ## 종료될 때까지 대기
            
    
    def blocking():
        time.sleep(1.5)
        print(f'{time.ctime()} Hello from a Thread!')
    
    asyncio.run(main())

    다음과 같이 코루틴 내부의 finally 절에서 future를 await 하는 방법이 존재한다. 

    이렇게 코드를 작성하면 코드는 동작하지만 한 가지 문제점이 존재한다. Exeuctor 작업을 생성하는 영역마다 try ~ finally를 사용해야한다. 비즈니스 로직 외에 수평 관심사가 들어가게 된다. 

     

    3.8 이하에서 Executor 종료 기다리기 2

    두번째 방법은 all_tasks()에서 Task만 모아서 취소한다는 점을 이용하는 것이다. run_in_executor()에서 반환되는 Future 객체를 Task 객체로 한번 감싸게 되면 all_tasks()에서 Task로 전달되기 때문에 이것을 노려서 처리해 볼 수 있다.

    import time
    import asyncio
    
    async def main():
        loop = asyncio.get_running_loop()
        
        future = loop.run_in_executor(None, blocking)
        asyncio.create_task(make_coro(future)) ##1 
        
        print(f'{time.ctime()} Hello!')
        await asyncio.sleep(1)
        print(f'{time.ctime()} Good Bye!')
    
            
    async def make_coro(future): ## 2
        try:
            return await future
        except asyncio.CancelledError:
            return await future
        
        
    def blocking():
        time.sleep(1.5)
        print(f'{time.ctime()} Hello from a Thread!')
    
    asyncio.run(main())

    위 코드를 살펴보고 이해해보면 다음과 같다.

    1. asyncio.create_task(make_coro(future))

    run_in_executor()가 호출되었을 때 반환된 값은 Future다. 이것을 비동기 함수로 전달하면, 코루틴이 된다. 즉, Future를 코루틴으로 한번 감싸서 사용할 수 있다. 이렇게 하면 all_tasks()를 통해 얻는 Task 목록에 포함되어 cancel() 메서드가 호출될 수 있다.

    2. async def make_coro()

    이 함수에서는 전달된 Future 객체가 종료될 때 까지 await 하는 작업만 기다린다. 한 가지 더 살펴보면 좋은 것은 except CancelledError 일 때도 await Future를 한다는 점이다. 이것은 asyncio.run()의 동작 주기를 살펴보면 당연한 형태다. 

    asyncio.run()은 메인 코루틴이 종료되면, all_tasks()를 통해서 Pending 상태의 Task를 얻어서 Task의 실행을 취소한다. 그런데 이 때, make_coro()로 감싼 코루틴 역시 cancel() 되는데, cancel()을 받아서 Task가 단순히 취소된다면 future 객체가 완료되는 것을 기다리지 못한다. 따라서 CancelledError가 발생했을 때도 이것을 Catch해서 Future가 완료될 때까지 await 해야한다. 

     

    위 방법을 이용해서 처리해 볼 수 있다. 그런데 이 방법 역시 run_in_executor()를 할 때 마다 make_coro()로 한번씩 감싸야 하기 때문에 많은 Future가 추가될 경우 번거로워 질 수 있다. 

     

    3.8 이하에서 Executor 종료 기다리기 3

    마지막 방법은 asyncio.run()을 사용하지 않고 직접 이벤트 루프를 제어하는 방법이 있다.

    import time
    import asyncio
    from concurrent.futures import ThreadPoolExecutor as Executor
    
    
    async def main():
        loop = asyncio.get_running_loop()
        print(f'{time.ctime()} Hello!')
        await asyncio.sleep(1)
        print(f'{time.ctime()} Good Bye!')
    
    def blocking():
        time.sleep(1.5)
        print(f'{time.ctime()} Hello from a Thread!')
    
    
    loop = asyncio.get_event_loop()
    executor = Executor()
    loop.set_default_executor(executor)
    
    task = loop.create_task(main())
    future = loop.run_in_executor(None, blocking)
    loop.run_until_complete(task)
    
    pending = asyncio.all_tasks(loop=loop)
    for t in pending: t.cancel()
    
    loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
    executor.shutdown(wait=True)
    loop.close()

    위 코드에서는 이벤트 루프에 전달한 Executor를 직접 생성하는 작업을 한다. 그리고 마지막으로 이벤트 루프를 close 하기 전에 executor.shutdown()을 호출하는데, 이 때 wait=true로 주어서 Executor에서 실행 중인 Future가 완료될 때까지 기다리는 작업을 한다. 

     

    정리

    • 파이썬 3.9에서는 Future 객체가 늦게 끝나는 것을 고려하지 않고 asyncio.run()을 이용하면 깔끔하게 Task와 Future의 종료를 처리할 수 있다.
    • 파이썬 3.8 이하 버전부터는 executor에서 실행되는 Future 객체가 main() 코루틴보다 늦게 끝날 경우 이것을 처리하는 코드를 개발자가 직접 작성해야한다. 

    댓글

    Designed by JB FACTORY