파이썬 : asyncio를 이용한 비동기 프로그래밍의 이해

    파이썬 : asyncio를 이용한 비동기 프로그래밍의 이해

    이번 포스팅에서는 파이썬에서 비동기 프로그래밍을 위해 주로 사용하는 asyncio에 대한 부분을 다뤄보고자 한다. 파이썬 2에서는 적절한 비동기 프로그래밍이 지원되지 않아 gevent 같은 라이브러리를 이용해 비동기 프로그래밍을 했던 것으로 알고 있다. 그렇지만 파이썬 3에서는 제네레이터를 기반으로 한 코루틴이 도입되면서 비동기 프로그래밍을 할 수 있게 되었다.

    파이썬은 asyncio 라이브러리를 이용해서 보다 손쉽게 코루틴을 이용해 비동기 프로그래밍을 할 수 있도록 한다. 이번 포스팅에서는 주요하게 사용하는 메서드와 동작 방식이 어떻게 되는지를 살펴보고자 한다. 다른 블로그 글을 공부하고 내부 코드를 이해하며 작성한 글이라 틀린 부분이 존재할 수 있다. 따라서 처음 이 글을 보시는 분이라면 '소프트 랜딩' 개념으로 생각하고 보면 좋을 것 같다. 

     


    asyncio.run()

    asyncio를 이용한 비동기 프로그래밍을 하려면 가장 먼저 asyncio.run() 같은 메서드를 이용해서 비동기 프로그래밍의 진입 지점을 잡아야 한다. 진입 지점을 잡는다는 말은 main 함수처럼 돌아갈 main 코루틴을 시작한다는 의미로 이해할 수 있다. 아래의 asyncio.run() 코드를 살펴보면 이벤트 루프를 획득한 후, loop.run_until_complete()를 통해 이벤트 루프에서 main 코루틴을 시작한다. 그리고 코루틴이 완료될 때까지 해당 코드에서 blocking 되는 것을 볼 수 있다. 

    // asyncio.run()
    def run(main, *, debug=None):
        ...
        loop = events.new_event_loop() // 이벤트 루프 획득
        try:
            ...
            // 이벤트 루프에 코루틴(main) 시작. 완료될 때 까지 block
            return loop.run_until_complete(main) 
        finally:
            try:
                _cancel_all_tasks(loop) // 끝나면 모든 task 취소.
                loop.run_until_complete(loop.shutdown_asyncgens())
                loop.run_until_complete(loop.shutdown_default_executor())
            finally:
                events.set_event_loop(None)
                loop.close() // 루프 닫음.

    loop.run_until_complete()가 끝나면 finally 절로 넘어간다. finally 절에서는 현재 루프에 있는 모든 task를 취소하고 이벤트 루프를 닫는 것을 알 수 있다. 그렇다면 loop.run_until_complete()는 무슨 역할을 하는 것일까?

    // loop.run_until_complete
    def run_until_complete(self, future):
    
        ...
        // task를 생성함. 
        // 내부적으로 task.create_task()를 사용함 → task 실행이 이벤트 루프에 예약됨. 
        future = tasks.ensure_future(future, loop=self)
        
        try:
            // 이벤트 루프를 무한히 시작함 (예외가 발생할 때 까지)
            self.run_forever()
        except:
            if new_task and future.done() and not future.cancelled():
                # The coroutine raised a BaseException. Consume the exception
                # to not log a warning, the caller doesn't have access to the
                # local task.
                future.exception()
            raise
        finally:
            ...

    loop.run_until_complete()에는 파라메터로 future가 전달된다. 이 때 future는 asyncio.run()에 전달되었던 코루틴이다. 즉, main 코루틴으로 이해할 수 있다. 여기서 전달된 코루틴을 tasks.ensure_future()에 전달해준다. tasks.ensure_future()에서는 내부적으로 tasks.create_task()를 호출해서 task를 생성하고 실행을 이벤트 루프에 예약한다. 즉, 최초의 Task가 생성되는 것이다.

    이후에 self.run_forever() 메서드를 실행하고, 이곳에서 blocking된다. 이 때 self는 이벤트 루프 자기 자신을 의미하고, run_forever() 라는 의미에 맞게 코드 내부에서는 무한 루프를 돌면서 이벤트 루프를 실행한다. 무한히 돌던 이벤트 루프는 동작을 수행하던 도중 Catch 되지 못한 Exception을 만나게 되면 종료하게 된다.

    정리하면 asyncio.run()하는 동작은 아래 정도로 정리할 수 있다. 

    1. 이벤트 루프를 셋팅하고 무한히 동작하도록 한다.
    2. 전달된 코루틴을 수행하는 최초의 Task를 생성하고 실행 예약한다.
    3. 이벤트 루프가 예외를 받으면 이벤트 루프가 종료된다. 이 때, 이벤트 루프에 예약된 Task가 남아있더라도 종료된다. 즉, 최초의 Task가 종료되면 나머지 예약된 Task들도 함께 종료된다는 것을 의미한다. 

    그림으로 살펴보면 위와 같이 정리할 수 있다. 

     


    asyncio.create_task()

    asyncio.create_task() 객체에 코루틴을 넘겨주면 Task 객체는 생성된다. 이 때 Task 객체는 내부적으로 _coro라는 필드를 가지고 있는데, 이 필드에 코루틴을 저장해둔다. Task 객체는 생성되면서 이벤트 루프에 자기 자신의 실행을 예약하는데, 이 때  __step__() 메서드를 호출해 줄 것을 요청한다. 쉽게 요약해서 create_task()는 코루틴을 수행하는 Task를 생성하고 이 코루틴의 실행을 예약한다. 언젠가 자신의 순번이 되면 이 Task는 실행되게 될 것이다. 

    자세한 내용은 아래에 작성된 코드 블럭 세 개를 살펴보면 된다.

    def create_task(self, coro, *, name=None):
        """Schedule a coroutine object.
        Return a task object.
        """
        ...
            task = tasks.Task(coro, loop=self, name=name) // Task 생성
        ...
        return task
    • create_task() 메서드에서 tasks.Task()를 이용해서 Task가 생성된다.
    • Task가 생성될 때, 코루틴이 전달된다. 
    Class Task() : ...
    
    def __init__(self, coro, *, loop=None, name=None):
        
        ...
        self._coro = coro
        ...
    
        self._loop.call_soon(self.__step, context=self._context)
        _register_task(self)
        
    
    def __step(self, exc=None):
    
    	# Call either coro.throw(exc) or coro.send(None).
        try:
            if exc is None:
                # We use the `send` method directly, because coroutines
                # don't have `__iter__` and `__next__` methods.
                result = coro.send(None)
            else:
                result = coro.throw(exc)
        ...
    • Task()가 생성되면 전달받은 코루틴을 Task 객체의 _coro에 저장한다.
    • Task()가 생성되면서 이벤트 루프의 call_soon() 메서드를 호출한다. 이 때, self.__step() 메서드를 넘겨준다.
    • self.__step() 메서드는 try 이하 절에서 볼 수 있듯이 코루틴을 실행해주는 메서드다. 
    Class EventLoop(): ...
    
    def call_soon(self, callback, *args, context=None):
    	...
        handle = self._call_soon(callback, args, context)
        ...
        
    def _call_soon(self, callback, args, context):
        ...
        self._ready.append(handle)
        ...
    • 이벤트 루프의 call_soon() 메서드가 호출되면, call_sonn() 메서드는 내부적으로 _call_soon() 메서드를 호출한다.
    • _call_soon() 메서드는 이벤트 루프의 _ready(deque 자료구조임)에 handle을 추가해준다. handle은 callback 함수와 args, context를 가진 녀석들이다. 결론은 코루틴을 실행할 때 사용할 변수와 문맥들을 기록한 녀석들인데, 이 녀석들의 실행을 이벤트 루프에 예약하는 것으로 이해할 수 있다.

     

    asyncio.create_task()의 동작은 아래와 같이 정리할 수 있다.

    1. 전달받은 코루틴으로 Task 객체를 생성한다. Task 객체에는 _coro 필드에 코루틴이 저장된다. 
    2. Task가 생성될 때, loop.call_soon() 메서드에 __step__() 메서드를 인자로 넘겨준다. 이벤트 루프에는 이 Task가 __step__() 메서드를 실행하도록 예약된다.
    3. __step__() 메서드는 coro.send()를 이용해서 Task 객체가 가지고 있는 코루틴을 자신의 차례가 올 때 마다 실행한다. coro.send()로 넘어가게 되면 await로 Future 객체를 전달받은 곳부터 다시 시작된다. 

    그림으로 정리하면 다음과 같다. 


    await 키워드

    await 키워드(yield from과 동일)는 코루틴 내에서 또 다른 코루틴을 실행시키고, 코루틴의 결과를 받기 위해서 기다린다. 또 다른 코루틴으로의 일종의 진입지점을 의미한다고 할 수 있다. 

    코루틴에서 await를 이용해서 코루틴 체이닝을 할 수 있다. await로 계속 또 다른 코루틴을 호출하다보면 함수 선언 부분의 끝까지가서 StopIteration이 발생할 수 있다. 그 외에도 비동기 IO 작업, 혹은 asyncio.sleep() 메서드를 만날 수도 있다. asyncio.sleep() 같은 메서드는 내부적으로 awaitable 객체(Future)를 만들어서 call_later() 메서드를 이용해서 몇 초 후에 다시 재호출해줄 것을 요청한다. 

     

    쉽게 이야기 하면 위와 같은 그림으로 이해해 볼 수 있을 것 같다. 

    await로 코루틴을 호출하면 함수 호출이 계속 발생하며 Stack에 계속 진입 지점과 값들이 Context처럼 쌓이게 될 것이다. asyncio.sleep() 메서드에 도착하게 되면 이 메서드는 Future 객체를 생성하고 await Future를 한다. await Future를 하게 되면 Future 클래스의 __await__() 메서드를 호출하게 된다. 

    __await__() 메서드는 만약 Future 자신이 Done 되지 않았다면, yield self 키워드를 통해서 자기 자신을 반환하는 작업을 한다. 그런데 최초에 코루틴이 시작된 것을 생각해보면 __step()__ 메서드의 coro.send()를 통해서 실행된 것을 기억할 수 있다. 즉 await Future를 하게 되면 Context를 포함한 Future 객체가 task.__step__() 메서드 안에 있는 result = coro.send()로 반환되는 것을 이해할 수 있다. 

    // Future 클래스
    
    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.
    
    __iter__ = __await__  # make compatible with 'yield from'.

    이 때 task 클래스는 result에서 blocking인지, 완료되었는지 등을 확인한다. 만약 완료되었다면 future 객체의 add_done_callback() 메서드에 task.__wakup__() 메서드를 넘겨주면서 이벤트 루프에 예약하는 것을 막아준다. 만약 Future 객체가 완료되지 않았다면 task 객체는 _loop.call_soon() 메서드에 self._step()를 실행할 함수로 넘겨주면서 자기 자신의 실행을 계속 예약한다. 즉, 호출된 부분이 context와 함께 event Loop에 Task 형식으로 실행 예약이 되는 것이다. 

        def __step(self, exc=None):
            # Call either coro.throw(exc) or coro.send(None).
            try:
                if exc is None:
                    result = coro.send(None) // 코루틴 실행함 → 알고 있던 문맥부터
                else:
                    result = coro.throw(exc)
            except StopIteration as exc:
                    ...
                    super().set_result(exc.value) → 코루틴 끝이면, 반환값 리턴
            ...
            else:
                blocking = getattr(result, '_asyncio_future_blocking', None)
                if blocking is not None:
                    # Yielded Future must come from Future.__iter__().
                    if futures._get_loop(result) is not self._loop:
                        ...
                        // 코루틴 수행 중이면, 이벤트 루프에 자기 자신(Task) 예약
                        self._loop.call_soon(
                            self.__step, new_exc, context=self._context)
                    elif blocking:
                        ...
                        // 코루틴 완료되었으면 Future 객체에 콜백 예약
                            result._asyncio_future_blocking = False
                            result.add_done_callback(
                                self.__wakeup, context=self._context)
                            self._fut_waiter = result
                            if self._must_cancel:
                                if self._fut_waiter.cancel(
                                        msg=self._cancel_message):
                                    self._must_cancel = False
                    ...

    이벤트 루프는 실행해야 될 Task를 Deque 형식으로 가지고 있다. Deque에서 필요한 Task를 꺼내서 실행하는 작업을 계속하게 되는데, 그러면 task의 __step__() 메서드가 호출된다. __step__() 메서드는 coro.send(None) 메서드를 호출하는데, 그러면 마지막 await 지점인 await Future(asyncio.sleep 내부)로 돌아가게 될 것이다. await Future는 다시 한번 __await__를 호출하고, Future는 완료되었다면 정상적인 result를 반환할 것이고 그렇지 않다면 Future를 반환할 것이다. 

    만약 result가 반환되면 add_done_callback()을 이용해서 task의 __step__()을 마지막으로 호출하게 된다. 그렇다면 await Future(asyncio.sleep 내부) 절로 다시 돌아가게 되는데 실행이 완료되었기 때문에 StopIteration이 발생하고 반환하게 될 것이다. 그리고 코루틴 체인이 실행되었던 것처럼 다시 연쇄적으로 StopIteration이 발생하면서 코루틴 체인은 종결되게 될 것이다. 

    그림으로 살펴보면 다음과 같다.

    1. 최초로 생성된 Task가 이벤트 루프에 의해서 실행된다. 이벤트 루프는 등록된 callback을 실행하는데, 이 때 Task의 __step()이 호출된다. __step()은 coro.send() 메서드를 이용해서 Task가 가지고 있는 코루틴을 실행한다.
    2. 최초로 생성된 Task는 코루틴을 시작하면 await 키워드를 이용해서 코루틴 체인을 형성한다. 일반 함수를 호출하는 것과 동일하다고 볼 수 있고, 코루틴 체인을 형성하면서 함수 호출 stack이 깊어진다.
    3. asyncio.sleep() 같은 메서드를 만나게 되면 특별한 행동을 한다. asyncio.sleep()은 Future 객체를 생성하고, await Future한다.
    4. Future 객체의 __await__가 호출 되는데, 이 때 Future가 완료되지 않았다면 자기 자신을 yield 한다. Future 객체는 반환되어 최초로 coro.send()가 호출되었던 지점으로 돌아간다.
    5. 이후 coro.send() 결과에 따라서 현재 Task를 다시 한번 이벤트 루프에 예약하거나 하는 작업을 한다. 그리고 이벤트 루프의 큐에 있는 Task를 꺼내와서 다음 작업을 또 한다. 

     


    asyncio.create_task()를 이용한 비동기 테스트 작성

    위의 내용을 바탕으로 비동기 테스트를 작성해보자. asyncio와 코루틴을 적절히 잘 사용하면 다음과 같은 테스트 코드를 작성할 수 있다.

    1. 소켓을 생성한다. (await로 기다림)
    2. http_request()를 통해서 http 요청을 여러번 보낸다. → await로 기다리지 않음.
    3. 소켓을 닫는다. (await로 기다림)

    이 테스트의 요지는 http 요청을 비동기로 여러 개 보낸 상태에서 응답을 기다리지 않고 소켓을 닫는 상황을 만들 수 있다는 것을 의미한다. 왜 그렇게 동작하는 것일까? 아래에서 좀 더 자세히 알아보려고 한다. 

    import asyncio
    import time
    
    async def create_sock():
        await asyncio.sleep(1)
        print(f'socket is created')
        return 1
    
    async def http_request(num):
        print(f'http request call : {num}')
        await asyncio.sleep(2)
        print(f'http request call : {num} is done')
        return 1
    
    async def close_sock():
        await asyncio.sleep(1)
        print(f'socked is closed')
        return 0
    
    async def main():
    
        sock = await create_sock()
        tasks = [asyncio.create_task(http_request(num))
                 for num in range(1,10)]
        await close_sock()
    
    
    asyncio.run(main())
    time.sleep(10)
    
    >>> 출력
    socket is created
    http request call : 1
    http request call : 2
    http request call : 3
    http request call : 4
    http request call : 5
    http request call : 6
    http request call : 7
    http request call : 8
    http request call : 9
    socked is closed

    asyncio.run(main())

    main() 코루틴을 실행하는 최초의 Task 객체를 생성해서 이벤트 루프에 실행을 예약한다.

     

    sock = await create_sock()

    여기서는 await로 create_sock() 코루틴을 실행한다. 이 때, create_sock() 메서드 내에서는 asyncio.sleep()을 통해서 Future 객체가 최초의 Task 객체에게 전달된다. 그리고 이 Future 객체가 완료될 때 까지 이벤트 루프에 Task의 실행을 계속 예약한다. 따라서 create_sock()이 완료될 때까지 최초의 Task가 블록킹 되는 것을 의미한다. 

     

    tasks = [asyncio.create_task() ...]

    이 실행 부분에서는 새로운 Task 객체를 생성하고 Task의 실행을 이벤트 루프에 예약하는 것을 의미한다. 그렇지만 지금 바로 여기서 생성된 Task가 실행되지는 않는다. 왜냐하면 현재 쓰레드를 점유하는 것은 최초 생성된 Task이기 때문이다. 만약 최초 생성된 Task가 asyncio.sleep() 같은 메서드들을 만나게 되면 Future 객체를 반환하면서 쓰레드 점유를 반환한다. 이 때는, 따로 생성된 Task들이 실행될 것이다.

     

    await close_sock()

    위에서 10개의 Task는 생성되어 이벤트 루프에 실행 예약이 되어있다. 그렇지만 쓰레드 점유는 최초로 실행된 Task가 가지고 있기 때문에 위에서 생성된 10개의 Task를 기다리지 않고 바로 await close_sock()으로 넘어온다.await close_sock()에서는 asyncio.sleep(1)을 만나게 되어 쓰레드 점유를 반환하게 된다. 

    이 때 이벤트 루프에는 최초 생성된 Task 말고도 10개의 Task가 실행 예약이 되어있다. 따라서 각 Task의 'http request call : {num}'가 출력된다. 그렇지만 이 녀석들 역시 asyncio.sleep(2)을 하기 때문에 곧 바로 쓰레드 점유를 반환하게 된다. 최초 생성된 Task는 sleep(1)만 하면 되고, 그렇지 않은 쓰레드는 sleep(2)을 해야하기 때문에 최초 생성된 Task는 다시 쓰레드를 점유하고 다음 함수를 실행해나간다.

    StopIteration 발생

    await close_sock()을 하게 되면 main() 코루틴을 모두 실행이 다 되어 StopIteration이 반환된다. 최초 Task의 StopIteration이 반환되면 이벤트 루프는 닫히게 된다. 이벤트 루프가 닫히게 되기 때문에 이벤트 루프에 예약되어 있던 완료되지 못한 10개의 Task 객체 역시 삭제된다. 

     


    함께 보면 좋을만한 것

    1. https://it-eldorado.tistory.com/159 
    2. https://docs.python.org/ko/3/library/asyncio.html
    3. https://docs.python.org/ko/3/library/asyncio-task.html

    댓글

    Designed by JB FACTORY