(Python) asyncio를 통한 비동기 코루틴 완전 정복

asyncio에 의한 단일 스레드 병렬 작업

지난 글에서 concurrent.futures 모듈을 통해서 멀티스레딩/멀티프로세싱에 대한 고수준 API를 파이썬에서 사용할 수 있게 됨을 확인했다. 이 새로운 API는 멀티 스레드 디스패치를 사용하기 쉽게 만들 뿐 아니라, 직접 스레드를 제어하는 것이 아닌 Future 객체를 사용함으로써 Promise 개념을 도입한 것과 유사한 결과를 보여준다고 했다.

이 기능이 도입된 것이 파이썬 3.2였고, 뒤이은 업데이트인 3.4 버전에서는 단일 스레드 기반의 비동기 처리를 할 수 있는 asyncio가 도입되었다. asyncio는 파이썬의 코루틴을 사용하여 I/O 등의 레이턴시가 큰 작업에 대해 non-blocking으로 동작하는 코드를 작성할 수 있게 한다.

이는 CPU에 부하가 집중되는 다른 작업들과는 달리 I/O 작업은 CPU와 개별적으로 동작이 가능하다는 점에 착안하여, 메인 스레드는 I/O 대기시간동안 이를 기다리지 않고 다른 작업을 처리하며, 비동기 작업이 완료되면 런루프에 의해서 이후 작업을 처리할 수 있게 한다. 단일 스레드에서 메인 코드의 흐름과 별개로 이러한 작업을 독립적으로 처리해줄 수 있는 구조가 파이썬에는 이미 코루틴이라는 이름으로 갖춰져 있었고, 미완료 작업을 액세스할 수 있는 방법이 Future에 의해 준비되었으므로, 이 둘을 결합하여 단일 스레드 기반 non-blocking API를 출시할 수 있었다.

병렬작업1이라고 하여 마치 GIL 제약을 우회했다거나, 멀티스레드에서 하던 걸 단일 스레드로 할 수 있게 됐다는 둥의 마법은 아니고 I/O 작업과 CPU 중심 작업을 병렬로 처리한다는 것이고 이는 NodeJS가 밀고 있는 non-blocking 비동기 처리에 더 근접하는 개념이다.

코루틴 문법

먼저 들어가기에 앞서서 분명히해야할 점이 있는데, 이 글에서 말하는 코루틴은 제너레이터 문법을 이용해서 메인 루틴과 흐름을 분리하여 동작하는 기존의 코루틴과는 완전히 구분될 필요가 있다. 물론 동작하는 방식 자체로는 ‘코루틴’이라는 표현은 정확하지만, 일반적으로 사용하던 제너레이터 코루틴과는 다른 관점에서 봐야하며, asyncio.coroutine()에 의해 생성되는 비동기 코루틴을 의미한다고 봐야한다.

async, await는 이 비동기 코루틴을 위해 새로 추가된 문법이다. 이는 asyncio의 API를 위한 문법이며, 제너레이터를 이용한 기존의 코루틴은 이 문법을 적용하지 않는다. async 는 코루틴으로 정의하려는 함수의 def 앞에 붙이며, await는 코루틴 내에서 다른 코루틴을 호출하고 그 결과를 받을 때 사용하며, 그 의미는 async가 붙어서 정의된 함수는 비동기로 호출되는 코루틴이라는 뜻이며, await는 말 그대로 다른 비동기 코루틴을 호출하되, 해당 작업이 완료될 때까지 기다린다는 뜻으로 해석하면 된다.

import asyncio

async def greet(msg):
    await asyncio.sleep(1)
    print(msg)

파이썬 3.4에서는 이 문법이 추가되지 않았고, 다음과 같이 사용한다.

# Python 3.4

import asyncio

@asyncio.coroutine
def greet(msg):
    yield from asyncio.sleep(1)
    prit(msg)

코루틴 실행하기

async 지시어를 붙여서 정의한 함수는 정확히 말하면 코루틴을 생성해주는 함수가 된다. 즉 위의 greet("hello")를 실행하면 1초후에 메시지가 출력되는것이 아니라 실제로는 아무일도 일어나지 않는다. 이를 실제로 실행하기 위해서는 asyncio.ensure_future() 를 이용한다. 2 이 함수는 코루틴 객체3 를 인자로 받아서 Task 객체4를 리턴해주는데 이 Task 객체는 Future의 서브 클래스이며, concurrent.futures에 정의된 Future 클래스와 거의 동일한 API를 제공한다.5

ensure_future 함수가 실행되면 코루틴 객체가 스케줄링되고 이 함수는 즉시 리턴한다. 따라서 비동기 작업의 결과를 받아서 처리하고 싶다면 메인 스레드가 코드 끝까지 실행돼서 끝나는 것을 방지하기 위해 런루프를 돌려야 한다. 결국 비동기 처리를 쓰는 코드는 다음과 같은 식으로 구성한다.

  1. 비동기로 처리될 루틴을 코루틴으로 정의한다.
  2. 런루프를 생성하고
  3. 런루프에 스케줄링한 다음
  4. 런루프를 돌려 코루틴이 끝나기를 기다린다.

비동기 코루틴을 호출하는 방법

가장 간단한 폼은 다음과 같은 모양이다.

import asyncio

async def lazy_greet(msg, delay=1):
    await asyncio.sleep(delay)
    print(msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(lazy_greet("hello", 3))
loop.close()

  1. lazy_greet 코루틴은 몇 초의 딜레이를 가진 후에, 입력받은 문구를 출력한다.
  2. 런루프를 생성하고
  3. run_until_complete()를 이용해서 이를 실행한다. 이 메소드는 코루틴이나 Future 객체를 받아서 스케줄링한다. 코루틴이 전달된 경우에는 이를 내부에서 Future로 래핑하여 처리한다.

사실 이 코드만으로는 time.sleep을 사용한 지연 출력이랑 뭐가 다르냐고 할 수 있다. run_until_complete는 코루틴을 받으면 이를 Future 객체로 감싸서 실행한다.

런루프는 스레드내에서 동작하며 콜백이나 코루틴을 같은 스레드에서 돌린다. 따라서 스레드 내에서 어떤 한 작업이 실행중인 경우에, 다른 작업이나 메인 스레드 작업은 여전히 멈춰있다. 대신에 taskawait 구문을 만나면 현재 코루틴은 그 결과를 받도록 대기하며, 그 상태에서 런루프는 다른 task를 실행하는 방식으로 동작한다.

복수 코루틴을 동시에 비동기로 호출하는 방법

여러개의 코루틴을 한꺼번에 스케줄링하면 위에서 언급한바와 같이 순서대로 실행되면서 내부에서 await가 등장하면 다른 코루틴으로 제어권이 넘어간다. 다음의 예제는 여러 코루틴을 한꺼번에 스케줄링하고 as_completed를 이용해서 하나씩 그 결과를 얻어 처리하는 가장 기본적인 패턴이다.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()


async def main():
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    for f in asyncio.as_completed(fts):
        x = await f
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

  1. lazy_greet()는 지연 시간 후에 메시지를 출력한 후에 해당 메시지를 대문자로 변환해서 리턴하도록 수정됐다.
  2. 맨먼저 런루프에 대해 main() 이 스케줄링되고 즉시 실행된다.
  3. main 내에서는 fts를 생성하면서 다섯개의 코루틴이 스케줄링 된다. 하지만 현재 스레드에서 제어권은 아직 main에 있으므로 여기서 블럭되지 않고 즉시 넘어간다.
  4. for 문에서 첫번째 await 가 등장한다. 이제 main 코루틴의 실행은 여기서 잠시 멈추고 fts 내의 첫번째 코루틴이 실행된다.
  5. lazy_greet('hello')에서 먼저 메시지를 출력한다. 그런다음 await를 만났으니 여기서 일시정지하고 스케줄링된 다음 코루틴으로 넘어간다.
  6. lazy_greet('world')가 시작된다. 역시 첫 메시지를 출력하고 await를 만난다. 이런 식으로 다섯 개 코루틴이 모두 await 에서 기다린다.
  7. 처음 최소 1초간은 모든 코루틴이 await 에서 멈춰있다.
  8. fts 내 다섯 코루틴 중에서 지연시간이 가장 짧은 코루틴의 asyncio.sleep()이 끝난다. 그럼 해당 코루틴은 await 다음부터 진행한 후 리턴한다.
  9. 하나의 코루틴이 리턴하면, 이벤트 루프는 그 다음번 기다리고 있는 코루틴으로 제어권을 넘겨준다. 이 코루틴은 main 일수도 있고 다른 lazy_greet()일 수 있다.
  10. 이런식으로 런루프로부터 먼저 끝난 코루틴이 발생할 때마다 x = await f가 평가 완료되고 그 결과가 하나씩 출력된다.

특정 종료 조건까지 대기하는 방법

concurrent.futuresExecutor.wait() 에 대응하는 것은 asyncio.wait() 이다. 여기에는 Futures의 시퀀스와 타임아웃 혹은 종료 조건이 넘겨지고, (done, pending)의 집합 짝이 리턴된다. 중요한 것은 wait 역시 코루틴이며, 따라서 await를 통해서 결과를 받아야 한다는 점이다. 6

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

위의 예제를 특정 타임아웃 기간 동안만 처리하도록 asyncio.wait()를 사용하는 패턴으로 변경해보자.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()


async def main():
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    (done, pending) = await asyncio.wait(fts, timeout=2)
    if pending:
        print("there is {} tasks not completed".format(len(pending)))
        for f in pending:
            f.cancel()
    for f in done:
        x = await f
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

이 코드의 실행은 대동소이하다. 항상 코루틴이 스케줄링되고 await 되는 부분에서 다음 스케줄로 넘어가면 된다.

  1. main()이 맨먼저 스케줄링되어 실행을 시작한다.
  2. main내에서 다섯개의 다른 코루틴이 스케줄링되어 대기하고 있다가
  3. await asyncio.wait를 만난다. 이제 앞의 다섯개 코루틴이 한 번씩 그 내부의 await를 만날때까지만 실행된다.
  4. asyncio.wait는 아직 끝나지 않았고 계속 기다리며, 시간이 흐르면서 타임아웃이 완료되면 아마 어떤 한 코루틴내에서 실행중이던 흐름이 강제로 끊기고 wait()가 리턴한다.
  5. wait()의 결과는 task의 상태에 따라서 두 세트로 나뉘어 리턴되었다.
  6. 종료되지 않은 건들에 대해서 각각을 취소하고
  7. 종료된 건들의 결과를 받아온다. 사실 이 시점에서 f들은 모두 종료되었으므로 await가 아니라 f.result()로 동기식으로 받아와도 된다.

단일 코루틴의 실행을 기다리는 상황이라면 wait_for()를 쓰면 된다.

결과만 취합하기

as_completed를 통해서 완료된 코루틴의 결과들을 개별적으로 꺼내는 것 말고 한꺼번에 결과를 취합받는 방법도 있다. 이는 asyncio.gather()를 이용하는데, 이 함수는

  1. 코루틴 함수이며
  2. Future의 시퀀스를 인자로 받는게 아니라, 개별 인자들로 받는다.

이번에는 예제를 조금 더 바꿔봤다. 각각의 코루틴은 개별적으로 동작하므로 이번에는 각 초를 카운트하는 코루틴도 추가로 돌려보도록 하자.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()

async def time_log():
    i = 0
    print("time log starts.")
    while True:
        await asyncio.sleep(1)
        i += 1
        print('...%02d sec.' % (i,))


async def main():
    t = asyncio.ensure_future(time_log())
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    result = await asyncio.gather(*fts)
    t.cancel()
    print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

  1. asyncio.gather()를 통해서 모든 코루틴의 결과를 수집한다. 이 결과는 [Future] 타입이 아니라 임의의 리스트 타입이다.

그외의 유틸리티 함수들

.shield()wait_for()와 비슷한데, 타임아웃이 없고 해당 코루틴이 외부에서 cancel 되는 것을 방지한다. 또 코루틴이 종료될 때 호출될 콜백을 지정할 수 있는데 add_done_callback()은 해당 Future의 콜백을 추가한다. 이 때 콜백은 코루틴이 아닌 일반함수이며, Future 객체를 전달받는다. 따라서 코루틴의 결과값을 쓰고 싶다면 ft.result() 를 호출한다. 이 호출은 이미 완료된 작업에 대해 호출하는 것이기 때문에 안전한 것으로 평가될 수 있다. 또 콜백은 같은 스레드에서 호출 될 수도 있고, 그렇지 않을 수도 있다. 최상위 네임스페이스에 선언된 변수는 global로 내부에서 다시 선언해야 참조 가능하며 nonlocal 로는 찾을 수 없는 경우가 생긴다.

요약

asyncio의 코루틴은 실질적으로 CPU의 스레드 처리 입장에서 보자면 여러 코루틴이 await를 만날 때까지 돌려가면서 스레드를 사용하는 셈이다. 이 때 각 코루틴이 기다리는 동작이 시스템 차원에서 CPU와 무관한 I/O 작업이라면 이는 별도로 처리되었다가 완료되면 이벤트 루프에 등록되는 식으로 처리되기 때문에 메인 스레드입장에서는 non-blocking한 프로그램을 만들 수 있다는 장점이 있다.

asyncioFuture는 역시 “아직 완료되지 않은 작업”의 의미로 Promise API를 제공하는데, 멀티스레드/멀티프로세스 작업의 그것과는 오묘하게 다른 뉘앙스는 존재한다. Future의 외부 관점에서 볼 때 concurrent.futures의 그것은 내부적으로 일이 돌아가는 중 상태이지만, asyncioFuture는 그냥 await가 선언된 지점에서 멈춰있는 작업인 셈이다.

하지만 실제로 asyncio를 활용하는데는 여전히 몇가지 한계가 남아있다. 실제로 이 글에서 보여준 예제들은 asyncio.sleep에 의존하는데, 이 코루틴은 완전하게 비동기로 구현되어 있기 때문이다. 커스텀하게 작성된 함수들이나 asyncio가 아닌 파이썬 표준 라이브러리들은 여전히 blocking하게 작성되어 있기 때문에, 실제 쓸만한 연습용 코드를 작성해봤자 (일부러 중간중간에 asyncio.sleep()을 넣지 않는 이상) 순차적으로 실행될 뿐이기 때문이다. 물론 asyncio 내에서는 비동기 소켓 통신 등의 기능을 지원하고 있지만 활용하기에는 난관이 많은 편이다. 다음 글에서는 blocking 함수들을 포함한 기존의 코드를 어떻게 asyncio와 통합해서 사용할 수 있는지, 그 가능성에 대해서 살펴보도록 하겠다.


  1. 실제 파이썬 문서에서도 concurrecy같은 표현을 쓰기 때문에 헷갈리는 사람들이 많다. 
  2. 파이썬 3.4 버전에서는 asyncio.async() 이다. 
  3. 코루틴 생성함수를 실행해서 리턴된 객체 
  4. 문서 참고. 런루프에 의해 스케줄링된 Future를 의미한다. 
  5. API가 같다는 것이지, 이 둘은 호환되는 클래스가 아니다. 
  6. https://docs.python.org/3/library/asyncio-task.html#asyncio.wait