Python : asyncio 비동기 프로그래밍 공부

    들어가기 전

    이 글은 파이썬 비동기 라이브러리 asyncio 3장을 공부하며 작성한 글입니다. 

     


    3.1 퀵 스타트

    파이썬 asyncio의 주요 컨트리뷰터 중 하나인 유리 셀리바노프는 다음 이야기를 남겼다. 

    Asyncio는 일곱 가지 기능만 알면 충분하다. (일반적 용도로 가정할 때) 

    asyncio는 많은 기능을 제공하지만 이 기능은 두 가지 사람을 대상으로 각각 만들어졌다고 볼 수 있다. 하나는 asyncio 프레임워크를 개발하는 개발자 관점, 또 다른 하나는 asyncio를 사용하는 개발자 관점이다. 일반적인 용도는 asyncio를 사용하는 최종 사용자 관점을 의미한다. 따라서 7가지 기능 정도만 정확하게 알면 asyncio를 사용할 수 있다.

     

    Asyncio를 사용한 Hello World

    import asyncio
    import time
    
    
    async def main():
        print(f'{time.ctime()} Hello!')
        await asyncio.sleep(1)
        print(f'{time.ctime()} GoodBye!')
    
    asyncio.run(main())  # ----> (1)
    
    >>>>
    Fri Feb 10 17:54:11 2023 Hello!
    Fri Feb 10 17:54:12 2023 GoodBye!
    1. asyncio 모듈에서 제공하는 run() 함수로 async def 함수를 실행한다. 그리고 async def 함수에서 다른 코루틴을 실행한다. asyncio.sleep()이 그 예가 된다. 

    대부분의 asyncio 기반 코드는 run() 함수를 사용한다. asyncio.run() 함수 내에는 여러 절차들이 개발되어있는데, 그 부분이 추상화되어 run() 함수만으로 비동기 함수를 실행한다. 아래에서는 run() 함수의 대략적인 구현 부분을 살펴볼 수 있다. 

     

    Asyncio.run()의 대략적인 구현

    위에서는 간단히 asyncio.run() 메서드를 이용해서 비동기 함수를 시작했었다. run() 메서드는 추상화 되어있다고 했는데 저수준으로 살펴보면 어떻게 될까? 

    import asyncio
    import time
    
    async def main():
        print(f'{time.ctime()} Hello!')
        await asyncio.sleep(1)
        print(f'{time.ctime()} GoodBye!')
    
    # asyncio.run(main())
    # run() 메서드의 내부 
    loop = asyncio.get_event_loop() # 1
    task = loop.create_task(main()) # 2
    loop.run_until_complete(task) # 3
    pending = asyncio.all_tasks(loop=loop) # 4
    for task in pending: # 5
        task.cancel()
    group = asyncio.gather(*pending, return_exceptions=True) # 6
    loop.run_until_complete(group) # 7
    loop.close() # 8

    run() 메서드는 위와 같이 복잡하게 구현되어있다. 함수 단위로 어떤 일을 하는지 살펴본다.

    asyncio.get_event_loop()

    코루틴을 실행하기 위해 필요한 이벤트 루프를 가져온다. 동일 스레드에서 호출하면 어디에서든 동일한 이벤트 루프가 반환된다. async def 함수 내에서 이벤트 루프를 가져와야 한다면 asyncio.get_running_loop()를 호출해야한다. 메인 쓰레드에서는 현재 이벤트 루프를 시작하기 전이기 때문에 running loop가 아닌 상태다. 반면 코루틴 내부에서는 이벤트 루프가 비동기로 함수를 처리하고 있기 때문에 이벤트 루프는 running 상태가 된다. 

     

    task = loop.create_task(main())

    create_task()를 호출하기 전까지 코루틴은 시작되지 않는다. create_task()를 호출해서 코루틴을 루프에서 실행될 수 있도록 스케쥴링한다. 반환받은 task 객체를 통해서 다음 작업을 할 수 있다. 

    • task 객체의 상태 확인 (끝났는지 등)
    • task 객체 실행 결과 반환
    • task 객체의 취소(task.cancel())

     

    loop.run_until_completed(task)

    이 메서드에 전달된 task가 끝날 때까지 이벤트 루프를 실행한다. 이 메서드가 호출되면 이 메서드를 호출한 쓰레드는 메서드가 완료될 때까지 이 코드에서 Blocking 된다. 이 때 이 Task만 이벤트 루프에서 실행되는 것이 아니라 이벤트 루프에 예약된 다른 Task들도 함께 수행된다. 바꿔 이야기하면 매개변수로 전달된 Task의 수행이 끝날 때까지 쓰레드를 이벤트루프가 점유하는 것으로 이해할 수 있다. 

    또한 이 Task만 끝날 때까지 기다린다. 이 의미는 Task A, Task B가 이벤트 루프에 예약되었고 Task A를 run_until_completed() 한다면 TaskA가 끝날 때까지만 기다린다. 이것은 Task B가 끝나지 않았을 가능성도 암시한다. 

     

    pending = asyncio.all_tasks(loop = loop)

    매개변수로 전달된 이벤트 루프에 실행 예약된 모든 Task들을 반환한다. 이 Task들은 실행완료되지 않았기 때문에 Pending 상태로 이해할 수 있다. run_until_complete()가 끝나면 이벤트 루프에 실행 예약되었으나 끝나지 않은 모든 Task(Pending 된 Task)를 가져와서 task.cancel()을 통해서 취소한다.

    정리하면 이벤트 루프에서 아직 실행 중인 Task들을 모두 가져와서 실행을 취소한다.

     

    group = asyncio.gather(*pending) + loop.run_until_complete()

    gather() 메서드를 이용해서 아직 실행 중인 모든 Task들을 취합한다. task.cancel()은 실행 중인 task를 취소하는 요청을 보내는 것이다. task.cancel()로 취소 요청을 보내면 이벤트 루프가 이 요청을 받아서 실제로 task를 취소해줘야한다. 그렇지만 task.cancel()을 하는 순간에는 이벤트 루프는 동작하고 있지 않았다. 이벤트 루프는 메인 쓰레드를 점유해서 동작을 하는데 그러지 않았기 때문이다. 

    따라서 asyncio.gather()을 하게 되면 *pending에 포함된 Task들이 그대로 반환될 것이다. 취소 요청은 보냈지만 아직 이벤트 루프가 그것을 인지하지 못했기 때문에 이녀석들은 그대로 실행 상태다. 그렇다면 이 블록의 의미는 취소 요청을 보냈지만 이벤트 루프가 동작하지 못해 아직 취소 요청이 받아들여지지 않은 Task들로 보면 될 것이다.

    그리고 for문을 돌면 loop.run_until_complete()를 반복한다. 이 말은 각 Task에 들어간 취소 요청을 모두 받아들인다. 다음 예시로 풀어보자.

    1. Task1은 취소 요청이 들어갔다.
    2. loop.run_until_complete(task1)을 했더니 Task1이 아직 끝나지 않았다. 이벤트 루프는 Task1을 실행한다.
    3. Task1을 실행하려고 봤더니 취소 요청이 들어왔다. 이벤트 루프는 취소 요청을 수락하고 Task1을 실행 취소한다.
    4. 이벤트 루프에서 Task1이 끝날 때까지 대기하라고 했는데 끝났으니 loop.run_untill_complete(Task1)에서 Blocking 된 제어의 흐름이 다시 메인 메서드로 돌아간다. 

    Task1 → Task2 → Task3 순으로 차례차례 Task의 실행을 취소해준다.

     

    loop.close()

    이 메서드는 보통 최종 동작이다. 이 함수는 정지된 루프(실행 중인 Task가 없는 루프)에서 호출해야한다. 이 함수가 호출되면 루프의 모든 대기열을 비우고 Executor를 종료시킨다. 정지(Stop)된 루프는 다시 실행할 수 있지만 닫혀진(closed) 루프는 다시 실행할 수 없다. asyncio.run() 내부에서는 새로운 이벤트 루프를 처음에 호출하고 run()이 끝날 때 이벤트 루프를 닫는다.

    위에서는 asyncio.run()를 메서드 단위로 좀 더 자세히 살펴봤다. asyncio.run() 메서드의 실행 순서를 간략히 정리하면 다음과 같다.

    1. 이벤트 루프를 얻는다.
    2. 이벤트 루프에 코루틴의 실행을 예약하고, 예약된 코루틴이 끝날 때 까지 Blocking 된다.
    3. 코루틴이 끝나면 Blocking에서 벗어나서 현재 이벤트 루프에서 실행중인 모든 Task를 취소한다.
    4. Task 취소가 완료되면 이벤트 루프를 닫는다. 

     

     

    asyncio.run_in_executor()

    멀티 태스킹을 적절히 구성하기 위해서는 I/O 위주 함수들을 적당히 어우러지게 사용해야한다. 이는 await 키워드를 사용해서 이벤트 루프 ↔ 코루틴 사이의 Context 전환을 제어해야한다는 의미다. 그렇지만 대부분의 파이썬 함수에서는 이 작업을 수행하지 않는다. 따라서 직접 Context 전환이 되도록 개발자가 직접 코드를 작성해야한다. 결국 많은 함수에서 async def(비동기 함수)를 도입할 때까지는 Blocking 되는 라이브러리를 사용할 수 밖에 없다. 

    asyncio에서는 다행히도 동기 함수(def로 선언된)들이 비동기적으로 실행될 수 있도록 run_in_executor()라는 메서드를 제공한다. 간략히 정리하면 다음 동작을 한다. 

    • run_in_executor()는 동기 함수를 실행해 줄 다른 쓰레드를 하나 만들어 이벤트 루프 내의 executor에 제공한다.
    • 이벤트 루프는 executor에 있는 다른 쓰레드의 동기 함수, Task에 있는 코루틴 객체를 번갈아가면서 실행시킨다.
    • run_in_executor()는 Future를 반환한다.  
    import time
    import asyncio
    
    
    async def main(wait_time):
        for i in range(wait_time):
            print(f'coroutine wait {i} sec. \n')
            await asyncio.sleep(1)
    
        print(f'coroutine completed.')
    
    def blocking(wait_time): # --> (1)
        for i in range(wait_time):
            print(f'blocking wait {i} sec \n') 
            time.sleep(1)
    
        print(f'blocking func completed')
    
    
    loop = asyncio.get_event_loop()
    task = loop.create_task(main(3))
    loop.run_in_executor(None, blocking, 2) # -------> (2)
    loop.run_until_complete(task)
    pending = asyncio.all_tasks(loop=loop) # --------> (3)
    
    for task in pending:
        task.cancel()
    
    group = asyncio.gather(*pending, return_exceptions=True)
    loop.run_until_complete(group)
    loop.close()

    위 코드를 예시로 고려해보자.

    def blocking()

    이 코드에서 time.sleep() 메서드가 호출된다. time.sleep() 메서드는 전통적인 블로킹 함수다. 만약 메인 쓰레드에서 이 함수를 호출하면 메인 쓰레드는 이 코드 라인에서 블록킹되고 이벤트 루프는 실행되지 않을 것이다. 이 말은 이 메서드를 코루틴으로 만들어서는 절대 안된다는 의미다. 

    만약 이 메서드가 코루틴으로 만들어진다면, 이벤트 루프가 이 Task를 실행했을 때 이 Task가 원하는 시간만큼 이곳에서 블록킹 될 것이다. 블록킹 되는 시간동안 이벤트 루프의 다른 Task는 실행될 수 없게 된다. 따라서 이처럼 블록킹되는 메서드들은 코루틴으로 만들어지지 않아야 한다.  

    예를 들어 이벤트 루프에 Task가 10개가 이미 존재하고, time.sleep(10)을 하는 Task가 코루틴으로 만들어져서 이벤트 루프에서 새롭게 Task 100으로 추가되었을 때를 가정해보자. 이벤트 루프가 Task 100을 실행하는 즉시 이벤트 루프는 10초동안 Task 100에서 의미없이 블록킹되게 된다. 

    이처럼 일반적인 블록킹 함수는 코루틴 / Task로 만들어져서 이벤트 루프에 예약되면 절대로 안된다. 대신에 asyncio는 run_in_executor() 메서드를 제공해서 이 문제를 해결해준다. 이 메서드에 블록킹 함수가 전달되면 쓰레드를 하나 생성해서 이벤트 루프와 비동기적으로 실행될 수 있도록 한다.

     

    loop.run_in_executor()

    이벤트 루프는 기본적으로 싱글 쓰레드로 동작한다. 그렇지만 때로는 별도의 쓰레드 / 프로세스에서 다른 작업이 병렬적으로 수행될 수 있어야 한다. run_in_executor()는 별도의 쓰레드 / 프로세스에서 작업할 수 있도록 도와준다. 

    run_in_executor()에 Blocking 함수를 전달하고 executor()에서 실행되도록 요청한다. run_in_exeuctor()는 메인 스레드를 블록킹하지 않는다. 이벤트 루프가 가지고 있는 executor에서 동기 함수가 비동기적으로 실행될 수 있도록 작업을 스케쥴링 하는 역할을 한다. 

    run_in_executor()는 Future 객체를 반환한다. Future 객체는 awaitable 하기 때문에 run_in_executor()가 코루틴 내에서 호출되었다면 await 키워드로 블록킹 함수의 실행이 완료될 때까지 기다릴 수도 있다. 

     

    loop.all_tasks()

    이곳에서는 어떠한 Task도 반환되지 않는다. loop.run_until_completed(task)에서 task가 끝날 때까지 Blocking 되었었다. 이벤트 루프에는 Task 1개, Future 1개가 예약되어있었는데 Task 1개가 끝났기 때문에 이벤트 루프에는 Future만 있다. (심지어 Future도 Task보다 1초 전에 끝났다). 

    loop.all_tasks()는 이벤트 루프에 존재하는 Task 객체들만 반환한다. Future는 대상이 아니다. 따라서 반환되는 Task는 존재하지 않는다. 

     

    실행 결과

    실행 결과를 살펴보면 다음과 같다.

    blocking wait 0 sec 
    coroutine wait 0 sec. 
    blocking wait 1 sec 
    coroutine wait 1 sec. 
    blocking func completed
    coroutine wait 2 sec. 
    coroutine completed.

    blocking wait / coroutine wait 메세지가 번갈아가면서 출력되는 것을 확인했다. 즉, run_in_exeuctor() 메서드를 이용해서 동기 함수를 이벤트 루프내에서 비동기적으로 실행되는 것을 확인했다. 

     


    3.2 Asyncio의 탑

    asyncio 프레임워크는 여러 계층으로 구별할 수 있다. asyncio 프레임워크 설계자, 혹은 사용자 입장에서도 이 계층을 나눠서 인식하면 좋다.

    단계 구성 구현
    계층 9 네트워크 : 스트림 StreamReader / StreamWriter / 
    asyncio.open_connection() /
    asyncio.server_start()
    계층 8 네트워크 : TCP & UDP protocol
    계층 7 네트워크 : 트랜스포트 BaseTransport
    계층 6 도구 asyncio.Queue
    계층 5 별개의 스레드와 프로세스 run_in_executor() /asyncio.subprocess
    계층 4 Task asyncio.Task / asyncio.create_task()
    계층 3 Future asyncio.Future
    계층 2 이벤트 루프 asyncio.run() / BaseEventLoop
    계층 1 코루틴 async def / async with / async for / await

    각 계층을 먼저 하나씩 살펴보면 다음 역할을 한다. 그리고 사용자 입장에서 살펴봐야 할 계층도 함께 살펴보고자 한다.

    계층1 : 코루틴

    가장 기초적인 단계로 일반적인 코루틴을 의미한다. 

    계층 2 : 이벤트 루프

    코루틴 그 자체만으로는 그다지 유용하지는 않다. 코루틴을 실행시켜 줄 이벤트 루프 없이는 무의미하다. 코루틴은 이벤트 루프를 통해서 비동기적으로 실행될 수 있다. 

    계층 3, 4 : Future + Task

    Task는 Future의 하위 클래스(상속 클래스)다. 따라서 Task, Future는 동일한 계층으로 이해할 수 있다.

    • Future 인스턴스는 이벤트 루프에서 실행중인 Task다. 알림을 통해 결과를 반환한다.
    • Task 인스턴스는 이벤트 루프에서 실행중인 코루틴을 나타낸다.

    즉, Future는 루프 기반을 의미한다. Task는 루프 기반이면서 코루틴 기반임을 의미한다. 사용자 입장에서는 Task를 더 많이 사용하게 될 것이다. 

    계층 5 : 별개의 쓰레드 / 프로세스

    asyncio는 이벤트 루프를 실행하는 스레드 / 프로세스와 별개로 다른 스레드 / 프로세스를 생성해서 비동기적으로 작업을 할 수 있는 기능을 제공한다. 

    계층 6 : 도구

    asyncio.Queue 같은 비동기 기반의 도구들이 존재한다. asyncio의 Queue는 thread-safe한 Que와 매우 흡사하다. thread-safe한 Que와 유일한 차이점은 asyncio.Que를 사용할 때는 get(), put()에 대해 await 키워드를 사용해야한다는 것이다. 

    또한 Queue.Queue의 get()은 메인 스레드를 Blocking한다. 따라서 코루틴 내에서는 Queue.Queue를 사용하면 안된다.

    계층 7~9 : 네트워크 I/O

    사용자 관점에서 가장 유용한 계층은 Streams API다. protocols API는 Streams API보다 더 세부적이다. 대신에 Streams API가 사용하기에는 좀 더 간편하다. 

     

    위에서 각 계층에 무엇이 있는지를 대략적으로 살펴봤다. 그렇다면 사용자 입장에서는 어떤 계층을 주로 살펴봐야 할까?

    계층 1 : 코루틴

    • async def 함수를 작성하는 방법을 숙지해야한다.
    • 코루틴을 호출하고 실행하기 위한 await 사용 방법을 이해해야한다.

    계층 2 : 이벤트 루프

    • 이벤트 루프를 시작하고 종료하며, 상호작용하는 방법을 이해해야한다.

    계층 5 : 다른 쓰레드 / 프로세스

    • 비동기 어플리케이션에서 블로킹 코드를 사용할 때 Executor를 이용하면 된다.
    • 하지만 대부분의 서드파티 라이브러리는 asyncio와 호환되지 않는다. 

    계층 6 : 도구

    • 긴 시간동안 실행되는 코루틴에 데이터를 전달해야한다면 asyncio.Queue를 이용하는 것이 적합한 방법이다. 스레드에 데이터를 배분할 때 queue.Queue를 사용하는 것과 동일한 방법이다. 
    • Asyncio의 Queue는 표준 라이브러리으 queue 모듈과 동일한 API를 제공하지만 get()과 같은 블로킹 메서드가 아닌 코루틴을 사용해야한다. 

    계층 9 네트워크

    • Streams API는 소켓 통신을 처리하는 가장 간단한 동작이다. 네트워크 어플리케이션의 ProtoType을 작성하기에 적합하다. 만약 더 상세한 제어가 필요한 경우 Protocols API로 전환하면 된다. 
    • 물론 asyncio와 호환되는 소켓 통신용 서드파티 라이브러리를 사용한다면 asyncio의 네트워크 계층을 직접 사용할 필요는 없다. aiohttp가 바로 그 예다. 

     

     


    3.3 코루틴

    코루틴 객체와 비동기 함수는 서로 다른 의미를 가지고 있다. 이 챕터에서는 그 부분에 대해서 자세히 알아보고, 코루틴이 동작하는 방식을 저수준에서 간략히 알아보고자 한다.

     

    3.3.1 새로운 async def 키워드

    아래 코드를 살펴보면서 하나씩 개념을 다 잡아가고자 한다. 

    import time
    import asyncio
    import inspect
    
    async def func():
        return 123
    
    c = func()
    
    print(type(func))
    print(f'func is coroutine? {inspect.iscoroutine(func)}')
    print(f'func is coroutine function {inspect.iscoroutinefunction(func)}')
    print(f'func() is coroutine? {inspect.iscoroutine(c)}')
    
    
    >>>>
    <class 'function'>
    func is coroutine? False
    func is coroutine function True
    func() is coroutine? True

    위 코드와 출력 결과를 살펴보면 다음과 같은 사실을 알 수 있다.

    • async def로 정의된 함수의 타입은 'function'이다. 즉, async def 자체는 일반 함수와 동일하게 'function'으로 분류된다.
    • async def로 정의된 func 자체는 코루틴이 아니다. 대신 코루틴 함수를 의미한다.
    • func()를 호출하면 코루틴 객체가 반환된다. 

    정리하면 다음과 같다.

    • 비동기 함수 = 코루틴 함수이다.
    • 비동기 함수를 호출하면 코루틴 객체가 반환된다. 

     

    3.3.2 코루틴 객체는 무엇인가?

    코루틴 객체는 완료되지 않은 채 일시 정지했던 함수를 재개할 수 있는 기능을 가진 객체를 의미한다. 예를 들어 코루틴 함수가 절반정도만 실행된 후에 일시 정지되고, 이후에 일시정지 된 부분부터 다시 재시작해서 코루틴 함수를 전체 실행하는 형태의 동작이 가능하도록 한다. 그렇다면 코루틴을 실행하고 종료하는 조작방법은 어떻게 될까?

    async def func():
        print(f'coroutine started.')
        return 123
    
    c = func()
    
    try :
        c.send(None)
    except StopIteration as e:
        print(f'The answer was : {e.value}')

    coro.send(None)

    • 코루틴은 제너레이터와 비슷한 형태로 동작한다. 제너레이터는 send() 메서드를 통해서 제너레이터를 실행할 수 있다. 마찬가지로 코루틴도 send() 메서드를 호출해서 코루틴 객체를 실행시킬 수 있다. 
    • 이벤트 루프도 Task가 가진 코루틴을 시작할 때 동일하게 send(None) 메서드를 호출해서 코루틴을 실행시켜준다. 
    • coro.send()는 await 키워드가 있는 곳까지 코루틴 함수의 코드를 실행시켜준다. 

    StopIteration

    제너레이터가 모두 실행되면 StopIteration 예외가 발생하며 값이 반환된다. 코루틴도 동일하게 동작하는데, 실행이 완료되면 StopIteration 예외가 발생한다. 그리고 예외에는 value 값을 통해서 코루틴의 return 값을 전달 받을 수 있다. 

     

    send() 메서드와 StopIteration이 코루틴 실행의 시작과 끝이다. 함수 하나를 시작할 때 고려해야 할 부분이 많은 것처럼 보이지만, 이벤트 루프가 내부적으로 이런 처리를 다 해주기 때문에 사용자는 일반 함수를 사용하는 것처럼 실행하고 값을 반환받으면 된다. 

     

    3.3.3 await 키워드

    await 키워드는 항상 매개변수를 하나 필요로 한다. 매개변수에는 반드시 awaitable한 객체가 와야한다. awaitable한 객체는 다음과 같다.

    • 코루틴 객체
    • __await__() 메서드를 구현한 모든 클래스의 객체  (Task, Future 등)

    await 키워드를 사용하는 방법은 다음과 같다. 

    async def func():
        print(f'coroutine started.')
        for i in range(5):
            await asyncio.sleep(1)
            print(f'sleep {i} sec')
        print(f'coroutine started.')
        return 123
    
    async def main():
        ret = await func()
        return ret

    func() 호출 → 코루틴 객체 생성

    • func()를 호출하면 실행 결과로 코루틴이 반환된다. 코루틴은 awaitable한 객체이기 때문에 await 할 수 있다.
    • await 한다는 것은 코루틴의 실행이 끝날 때까지 해당 블락에서 기다린다는 것이다.
    • func()가 완료되면 ret에는 123이 저장된다. 

     

    3.3.4 코루틴 예외 주입

    코루틴을 취소하는 방법은 코루틴에 예외를 주입하는 것이다. 실행중인 Task를 취소할 때 task.cancel()을 사용한다. task.cancle()은 내부적으로 코루틴에 asyncio.CancelledError 예외를 발생시켜서 Task를 취소한다. 

    ...
    coro = main()
    coro.send(None)
    coro.throw(Exception, 'blah')
    
    >>>>
        async def main():
    Exception: blah

    coro.throw()

    • throw() 메서드를 이용하면 발생시킬 예외와 예외 메세지를 함께 전달할 수 있다. 
    • coro.send() 메서드가 호출되었으므로 현재 코루틴의 Context는 첫번째 await 코드 라인에 대기하고 있을 것이다. 이 때 coro.throw() 메서드를 호출하면 첫번째 await 코드 라인에서 Exception이 발생되게 될 것이다. 

    위에서 코루틴이 어떤 지점에서 어떻게 예외를 발생시키는지를 알 수 있었다. 아래에서는 실제 이벤트 루프에서 Task를 어떻게 취소하는지를 살펴보려고 한다. 

     

    3.3.5 Task 취소 내부 간단 구현

    이번 절에서는 coro.throw()를 이용해서 Task.cancel()를 간단히 구현해보고자 한다.

    async def main():
        try:
            while True:
                print(1)
                await asyncio.sleep(0)
        except asyncio.CancelledError as e:
            print('I was cancelled')
        
    
    
    coro = main()
    coro.send(None)
    coro.send(None)
    coro.throw(asyncio.CancelledError)
    
    >>>>
    
        coro.throw(asyncio.CancelledError)
    StopIteration

    except asyncio.CancelledError

    • task.cancel()이 호출되면 코루틴 내부로 asyncio.CancelledError가 주입된다. 이것은 이벤트 루프가 내부적으로 coro.send() / coro.throw(asyncio.CancelledError)를 적절하게 호출해주면서 실행된다. 

    coro.throw()

    • 코루틴에게 asyncio.CancelledError를 주입해준다. 그러면 코루틴은 await 하고 있는 지점에서 에러를 발생시킨다. 에러는 except 절에 의해서 잡히게 되고 'I was cancelled'를 출력한다. 
    • 코루틴은 실행해야 할 모든 부분을 실행했기 때문에 StopIteration 에러가 발생하며 리턴되게 된다.

     

     

    3.3.6 코루틴 → 이벤트 루프로 변환

    앞서서 코루틴의 저수준 메서드 send() / throw()를 이용해서 직접 코루틴을 실행하고 종료하는 작업들을 했었다. asyncio는 코루틴의 실행과 종료를 이벤트 루프를 통해서 해준다. 앞서서 코루틴으로 작성했던 코드를 이벤트 루프로 옮기면 다음과 같이 된다.

    async def main():
        for _ in range(10):
            await asyncio.sleep(1)
        return 'DONE'
    
    coro = main()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(coro)

    위의 코드에서는 다음과 같이 동작한다.

    • 코루틴 객체를 생성한다.
    • 이벤트 루프를 얻는다.
    • 이벤트 루프의 run_until_complete를 호출하며 코루틴 객체를 전달해준다. 코루틴 객체가 전달되면 Task 객체로 한번 감싸서 이벤트 루프에서 실행되도록 스케쥴링한다. 

     


    3.4 이벤트 루프

    파이썬 3.7에서 get_running_loop() 메서드가 추가되었기 때문에 개발자가 직접 이벤트 루프를 가져와서 사용해야 하는 경우는 많지 않다. 그렇지만 파이썬 3.7 이전에 작성된 코드들이 존재하기 때문에 이 부분에 대해서도 짚고 넘어가야 한다.

    • asyncio.get_event_loop()
      • 현재 스레드가 가지고 있는 이벤트 루프를 얻는다. 
      • 스레드가 이벤트 루프를 가지고 있지 않다면 생성하고 스레드에게 셋팅한 후에 이벤트 루프를 반환한다.
      • 같은 스레드 내의 어디에서도 호출 가능하다. 이 때 반환되는 이벤트 루프는 항상 동일한 객체다.
    • asyncio.get_running_loop() 
      • 현재 동작중인 이벤트 루프를 반환한다. 
      • 코루틴, Task 내부에서만 실행 가능하다. 

    그렇다면 get_running_loop() 메서드가 등장하게 되면서 코드는 얼마나 더 편리하게 바뀌었을까?

    async def for_coro():
        print(1)
    
    # get_running_loop() 등장 전
    async def main():
        loop = asyncio.get_event_loop()
        for _ in range(10):
            loop.create_task(for_coro())
        
    # get_running_loop() 등장 후
    async def main_after_get_running():
        # loop = asyncio.get_event_loop() 
        for _ in range(10):
            asyncio.create_task(for_coro())
    • 3.7 이전에 get_running_loop()가 등장하기 전에는 직접 쓰레드에서 사용중인 이벤트 루프를 가져와서 이벤트 루프를 통해 Task를 각각 생성했어야 했다.
    • 3.7 이후에 get_running_loop()가 등장했다. get_running_loop()를 통해서 현재 동작하고 있는 이벤트 루프를 얻을 수 있게 되어서 개발자가 굳이 이벤트 루프를 가져오지 않아도 된다. 개발자는 이제부터 asyncio.create_task() 메서드를 이용해서 Task를 직접 생성하면 된다. 

     

     


    3.5 Task와 Future

    asyncio를 이용해서 무언가를 실행한다면 대부분 Task 인스턴스를 이용해서 실행하게 될 것이다. 왜냐하면 주로 create_task() 메서드를 이용해서 실행할 것이기 때문이다. 그렇지만 asyncio에서는 Future 인스턴스도 사용되기 때문에 각 인스턴스가 하는 일에 대해서 이해를 해야한다. 

    아래와 같이 각 객체의 내용을 쉽게 정리해 볼 수 있다.

    • Future : 이벤트 루프와 주로 상호작용한다. 작업이 완료되었는지, 아닌지를 정의한다. 
    • Task : 코루틴을 실행하는 객체다. 

    Future 인스턴스는 다음과 같은 기능도 가능하다고 한다.

    • 'result' 값을 설정할 수 있다. (set_result 로 write. result로 read)
    • .cancel()로 작업을 취소할 수 있다. 
    • Future가 완료되었을 때 실행할 Callback 함수를 가진다. 

    아래 코드에서 Future 실행을 통해 좀 더 자세히 알아보자.

    import asyncio
    
    
    async def main(f):
        await asyncio.sleep(1)
        f.set_result("hello")
    
    future = asyncio.Future()
    print(future.done())
    
    loop = asyncio.get_event_loop()
    coro = main(future)
    task = loop.create_task(coro)
    loop.run_until_complete(task)
    
    print(future.done(), future.result())
    
    >>>
    
    False
    True hello
    1. Future를 생성한다. 이 Future는 이벤트 루프에 실행이 예약되지도 않을 것이고 특정 코루틴을 실행하지도 않을 것이다. 왜냐하면 아무런 인자없이 만들어진 Future 이기 때문이다.
    2. future.done()으로 Future가 종료된 상태인지 확인한다. Future는 실행되지 않았기 때문에 아직 완료된 상태가 아니다.
    3. 루프에 Task를 생성해서 코루틴 실행을 예약한다. 
    4. loop.run_until_complete()를 이용해서 생성한 Task가 완료될 때까지 기다린다. 

    위와 같이 실행하면 Future는 set_result() 메서드가 호출되는 시점에 상태가 Done으로 바뀌게 된다. 그리고 result() 메서드를 이용해서 결과도 가져올 수 있게 된다. 다시 한번 정리하면 Future는 실행 중인지 완료되었는지, 완료되었다면 결과를 받을 수 있는 인스턴스 정도로 생각하면 된다. 그리고 완료되는 것은 set_result() 메서드가 호출되는 시점이다.

     

    3.5.1 Task ? Future ? 결정하자. 

    파이썬 3.7 이후(get_running_loop()가 추가)로 asyncio.create_task()를 이용해서 코루틴의 실행을 이벤트 루프에 예약한다. 3.7 버전 이전에는 코루틴을 실행하는 방법이 두 가지가 존재했다. 아래의 두 가지 메서드를 살펴볼 수 있다.

    • loop.create_task() : 최종 사용자를 위해 설계됨.
    • asyncio.ensure_future() : asyncio 프레임워크 개발자를 위해 설계됨.

    loop.create_task()는 코루틴만 전달 받아서, 코루틴을 실행하는 Task 객체를 이벤트 루프에 예약하는 역할을 한다. 그렇지만 asyncio.ensure_future()는 두 가지 기능이 있는데 이것 때문에 초기 asyncio 사용자에게 혼란을 가중시켰었다. asyncio.ensure_future()는 어떤 동작을 할까?

    • 코루틴 객체를 전달받으면 Task를 생성해서 이벤트 루프에 실행 예약한다.
    • Task, Future 객체를 전달받으면 아무런 일도 하지 않고 전달받은 객체를 그대로 반환한다. 

    개발자가 Task, Future 객체를 전달받으면 Task, Future 객체의 실행을 이벤트 루프에 예약하는 작업등을 하지 않고 객체를 그대로 반환한다. 이런 부분 때문에 많은 오해가 있었다. 그렇다면 왜 이런 메서드가 필요했던 것일까? 이런 메서드는 asyncio 라이브러리 내부에서 Future, Task 클래스에만 있는 메서드를 반드시 쓰고 싶을 때를 위해서 추가된 메서드다. 

    async def main():
        ...
        
    coro = main()
    coro.done()

    예를 들어 coro.done()을 하면 attribute가 없어서 실행이 안된다. 왜냐하면 코루틴에는 done()이 없고, Future에만 존재하기 때문이다. asyncio 라이브러리 개발자들은 내부적으로 Future 객체인 경우에만 작성할 수 있는 코드가 있을텐데, 그런 경우를 항상 만족시키기 위해서 ensure_future()라는 메서드를 이용한다. 코루틴 객체가 전달되면 Task 객체가 전달되기 때문에 Task 객체에만 있는 작업을 할 수 있게 되는 것이다. 

    asyncio.gather(*aws, loop=None, ...)

    실제로 asyncio.gather()와 같은 녀석들을 보면 내부적으로 ensure_future()를 사용하는 것을 볼 수 있을 것이다. asyncio.gather()는 인수로 awaitable 객체를 받는데, awaitable한 객체를 Argument로 전달받는 녀석들은 내부적으로 대부분 ensure_future()를 사용할 것이다. 

    귀도 반 로섬(파이썬의 창시자)가 작성한 글 중에 ensure_future()에 대해서 다음 내용이 존재한다.

    ensure_future()의 목적은 코루틴 혹은 Future 일 수 있는 인스턴스에 대해 Future에만 정의된 메서드를 호출하고 싶은 경우에 대응하는 것이다. 이미 Future인 경우 아무런 일도 일어나지 않고, 코루틴인 경우 Task로 감싼다. 
    코루틴임이 확실한 인스턴스에 대해 스케쥴링하고 싶은 경우라면 create_task()가 적합한 API이다. ensure_future()를 호출해야 하는 유일한 경우는 코루틴 혹은 Future 일 수 있는 인스턴스를 받아서 그 인스턴스에 Future로만 할 수 있는 동작을 수행해야하는 경우 뿐이다. 

    따라서 일반적으로 사용하는 개발자들은 create_task()를 메서드를 이용해서 코루틴의 실행을 이벤트 루프에 예약하면 된다.  아래 코드에서 위에서 설명한 내용도 함께 살펴볼 수 있다. 

    import asyncio
    
    async def my_coro():
        print('1')
    
    
    coro = my_coro()
    loop = asyncio.get_event_loop()
    
    task1 = loop.create_task(coro)
    assert isinstance(task1, asyncio.Task)
    
    task2 = asyncio.ensure_future(coro)
    assert isinstance(task2, asyncio.Task)
    
    task3 = asyncio.ensure_future(task1)
    assert task1 is task3

    task1

    • task1은 이벤트 루프를 통해서 생성한 Task다. 이 때 코루틴이 전달되기 때문에 Task가 생성되고, Task는 이벤트 루프에 실행 예약된다. 
    • 따라서 Task1의 타입은 asyncio.Task가 성립한다.

    task2

    • task2는 ensure_future()를 통해서 생성한 Task다. 전달된 객체가 코루틴이기 때문에 Task가 반환되고, 반환된 Task는 이벤트 루프에 실행 예약된다.
    • 따라서 Task2의 타입은 asyncio.Task가 성립한다.

    Task3

    • ensure_future()에 Task1이 전달되는, 이것은 이미 Task 객체다. ensure_future()는 코루틴이 전달되는 경우에만 새로운 Task 객체를 생성해서 반환한다. Task 객체가 전달되면, 전달받은 Task 객체를 그대로 반환한다.
    • 이 때 전달받은 Task 객체를 이벤트 루프에 실행 예약도 하지 않고 그대로 반환한다. 따라서 Task1 == Task3이 성립한다.

     

    정리

    • 일반적으로 asyncio를 사용하는 개발자 입장이라면 Task를 적극적으로 사용하도록 한다.
    • Task를 사용할 때는 create_task() 메서드를 이용하면 된다. ensure_future() 역시 Task 실행을 예약해주기는 하지만, 주로 프레임워크 개발자들이 사용하는 메서드이며 단순히 Task를 생성하는 것보다 좀 더 숨겨진 기능이 존재한다. 
    • Future는 비동기 처리를 지원하지 않는 동기 함수들을 이벤트 루프에서 비동기로 실행할 때 사용한다. (run_in_executor)

     

     

     

     

     

    참고 

    https://soooprmx.com/python-%ED%91%9C%EC%A4%80-%ED%95%A8%EC%88%98%EB%A5%BC-asyncio%EC%97%90%EC%84%9C-%EB%B9%84%EB%8F%99%EA%B8%B0%EB%A1%9C-%ED%98%B8%EC%B6%9C%ED%95%98%EB%8A%94-%EB%B0%A9%EB%B2%95/

     

    댓글

    Designed by JB FACTORY