asyncio : 단일 스레드 기반의 Nonblocking 비동기 코루틴 완전 정복

asyncio에 의한 단일 스레드 병렬 작업

지난번 concurrent.futures를 소개한 글에서 파이썬 3에서부터 멀티스레딩/멀티프로세싱에 대해 새로 도입된 고수준 API에 대해 살펴봤다. 이 새로운 API는 함수 호출을 병렬로 처리하는 동작을 사용하기 쉽게 만들 뿐 아니라, 직접 스레드를 제어하는 것이 아닌 Future 객체를 사용함으로써 자바스크립트의 Promise 개념을 도입한 것으로 평가할 수 있다고 보았다.

새로운 병렬처리 API와 더불어 Future 클래스가 도입된 것이 파이썬 3.2였다. Future 개념의 도입은 스레드를 관리하고, 다른 스레드에서 돌아가는 작업에 대해서 리턴을 동기화하는 등의 작업들이 매우 골치아팠던 것을 그 자체를 객체로 래핑하면서 매우 우아하게 처리할 수 있었다. 이는 결국 비선형적인 제어 흐름과 관계된 코드를 작성하는 것이 더 이상 너저분한 작업이 아닐 수 있다는 가능성을 보였다.

다중 스레드 및 다중 프로세스에 대해서 Future를 적용하는 것이 성공적이었다면, 이는 단일 스레드에 대해서도 비동기 non-blocking 코드를 작성하는데에 동일한 Future 개념을 도입할 수 있지 않을까하는 것으로 아이디어가 옮겨갔다.

비동기 I/O와 코루틴

실제로 NodeJS는 이와 같은 단일 스레드 비동기 I/O를 사용하여 불필요한 대기 시간을 줄이는 것으로 더 빠른 성능을 내고 있다. 파이썬에서도 이와 같은 개선을 할 수 있지 않을까? 예를 들어 DB 쿼리 요청이나 네트워크 요청, 파일 I/O 등의 작업은 CPU를 거의 사용하지 않지만, 해당 작업에 대한 호출이 리턴할 때까지 스레드흐름이 멈춘 상태가 된다. 이러한 대기시간 동안에 해당 작업 외에 다른 작업으로의 전환을 단일 스레드에서 처리할 수 있는 기반 기술들은 이미 파이썬 내에 구비되어 있었는데, 바로 코루틴이다.

파이썬의 제너레이터는 제너레이터 함수를 호출하여 생성되는 객체로 하나 이상의 값을 반복적으로 리턴할 수 있다. 일반적인 함수는 값을 리턴한 후에 그 내부 스코프의 모든 데이터가 파괴되지만 제너레이터는 “실행을 멈춘 상태”로 기다리다 다시 호출되면 “그자리에서 이어서” 실행된다.

CPU에 부하가 집중되는 다른 작업들과는 달리 I/O 작업은 CPU와 개별적으로 동작이 가능하다는 점에 착안하여, I/O 처리를 기다리는 역할을 코루틴에게 위임하고,그 시간동안 다른 코루틴을 이용해서 별도의 작업을 또 비동기로 처리할 수 있다.

이러한 작업을 독립적으로 처리해줄 수 있는 구조가 파이썬에는 이미 코루틴이라는 이름으로 갖춰져 있었고, 미완료 작업을 액세스할 수 있는 방법이 Future에 의해 준비되었으므로, 이 둘을 결합하여 단일 스레드 기반 non-blocking API를 출시할 수 있었다.

병렬작업1이라고 하여 마치 GIL 제약을 우회했다거나, 멀티스레드에서 하던 걸 단일 스레드로 할 수 있게 됐다는 둥의 마법은 아니고 I/O 작업과 CPU 중심 작업을 병렬로 처리한다는 것이고 결국 이는 NodeJS가 밀고 있는 non-blocking 비동기 처리에 더 근접하는 개념이다.

비동기 IO 코루틴을 위한 asyncio

먼저 들어가기에 앞서서 분명히해야할 점이 있는데, 이 글에서 말하는 코루틴은 제너레이터 문법을 이용해서 메인 루틴과 흐름을 분리하여 동작하는 기존의 코루틴과는 완전히 구분될 필요가 있다. 물론 동작하는 방식 자체로는 ‘코루틴’이라는 표현은 정확하지만, 일반적으로 사용하던 제너레이터 코루틴과는 다른 관점에서 봐야하며, asyncio.coroutine()에 의해 생성되는 비동기 IO처리를 위한 코루틴을 의미한다고 봐야한다.

async, await는 이 비동기 코루틴을 위해 새로 추가된 문법(Python 3.5)이다. 이는 asyncio의 API를 위한 문법이며, 제너레이터를 이용한 기존의 코루틴은 이 문법을 적용하지 않는다. async 는 코루틴으로 정의하려는 함수의 def 앞에 붙이며, await는 코루틴 내에서 다른 코루틴을 호출하고 그 결과를 받을 때 사용하며, 그 의미는 async가 붙어서 정의된 함수는 비동기로 호출되는 코루틴이라는 뜻이며, await는 말 그대로 다른 비동기 코루틴을 호출하되, 해당 작업이 완료될 때까지 기다린다는 뜻으로 해석하면 된다.

간단한 예제

비동기 코루틴은 기본적으로 def 앞에 async를 붙여서 사용한다. 그리고 내부에서 다른 비동기 작업을 호출하게되면 await를 붙여야 한다. 물론 이 때 await 뒤에 호출되는 함수 역시 비동기 코루틴 함수여야 한다.

아래 코드는 1초동안 대기한 후 메시지를 출력한다.

import asyncio

async def greet(msg):
    await asyncio.sleep(1)
    print(msg)

파이썬 3.4에서는 이 문법이 추가되지 않았고, 다음과 같이 사용한다.

# Python 3.4

import asyncio

@asyncio.coroutine
def greet(msg):
    yield from asyncio.sleep(1)
    prit(msg)

사실 이 작은 코드 조각은 time.sleep을 이용해서 1초 지연한 후 메시지를 출력하는 거랑 아무런 차이가 없을 수 있다. 하지만 time.sleep()은 현재 스레드를 1초 동안 정지시키지만, 비동기 코루틴은 스레드를 중지시키지 않는다. 따라서 만약 다른 비동기 코루틴이 어디선가 실행중이라면 greet()이 1초동안 멈춘 사이에 다른 코루틴이 실행될 수 있다.

코루틴 실행하기

async 지시어를 붙여서 정의한 함수는 정확히 말하면 코루틴을 생성해주는 함수가 된다. 즉 위의 greet("hello")를 실행하면 1초후에 메시지가 출력되는것이 아니라 실제로는 아무일도 일어나지 않는다.

이를 실제로 실행하기 위해서는 asyncio.ensure_future() 를 이용한다. 2 이 함수는 코루틴 객체3 를 인자로 받아서 Task 객체4를 리턴해주는데 이 Task 객체는 Future의 서브 클래스이며, concurrent.futures에 정의된 Future 클래스와 거의 동일한 API를 제공한다.5

asyncio.ensure_future()는 병렬처리 모듈인 concurrent.futuresExecutor.submit()과 동일한 역할을 한다고 할 수 있다.

런루프

이쯤에서 이 기술이 어떤식으로 사용될 수 있는지에 대한 시나리오를 하나 생각해보자. 비동기 처리를 포함하는 코루틴 A가 호출되어 await를 통해서 I/O 작업을 요청하고 띵가띵가 놀게 생겼다. 그런데 다른 한 쪽에 또 다른 비동기 코루틴 B가 대기하고 있었다면, 이번에는 B가 정해진 코드들을 처리해 나간다. 그러는 와중에 B 내부에서도 await 구문이 나왔고 이번에는 B도 놀게 생겼다.

이런 식으로 여러 개의 코루틴이 각각 비동기 처리를 요청해놓고 기다리고 있다가, 어느 IO 작업 하나가 완료되었다고 하자. 그러면 그 IO 작업을 요청했던 코루틴이 다시 이어서 실행을 계속해야 한다. IO 작업이 처리될 수 있는 시간은 제각각이므로 먼저 개시된 작업이 먼저 끝난다는 보장도 없다. 그러면 어떻게 원래의 코루틴이 작업을 이어서 수행해나갈 수 있을까?

아니, 애초에 모둔 코루틴이 띵가띵가 노는 시점이 오면, 스레드는 어떻게 멈춰있어야 할까? 여기서 런루프가 등장해야 한다. 런루프는 일종의 무한루프인데, 특정한 이벤트나 콜이 발생하면 런루프에 해당 작업이 등록된다. 그리고 루프의 말미에 처리해야 할 함수들을 차례로 호출해 주는 것이다.

웹서버나 GUI 앱들을 생각해보면 된다. 스마트폰이나 데스크톱에서 아무 GUI앱을 하나 실행해보자. 이 앱이 실행된 직후에 하는 일은 아무것도 안하고 기다리는 것이다. 그러다가 사용자의 마우스나 키보드 입력을 받으면 그에 따라 ‘반응’한다. 이 때 아무것도 안하고 기다리는 것을 구현해주는 기술이 바로 런루프이며, 특정한 이벤트가 발생하면 런루프에 해당 이벤트를 처리할 핸들러 함수가 등록되었다가 처리되는 것이다.

따라서 ensure_future 함수가 실행되려면 코루틴을 걸어둘 런루프가 필요하며, 비동기 작업을 처리하기 전에는 런루프를 돌려야 한다. 런루프 역시 asyncio 모듈에서 지원하며, get_event_loop() 함수를 통해서 얻고, run_until_*() 함수들을 통해서 돌릴 수 있다.

비동기 코루틴을 호출하는 방법

비동기 처리를 쓰는 코드는 다음과 같은 식으로 구성한다.

  1. 비동기로 처리될 루틴을 코루틴으로 정의한다.
  2. 런루프를 생성하고
  3. 런루프에 스케줄링한 다음
  4. 런루프를 돌려 코루틴이 끝나기를 기다린다.

가장 간단한 폼은 다음과 같은 모양이다.

import asyncio

async def lazy_greet(msg, delay=1):
    await asyncio.sleep(delay)
    print(msg)

loop = asyncio.get_event_loop()
loop.run_until_complete(lazy_greet("hello", 3))
loop.close()

위 코드는 아래와 같이 동작한다.

  1. lazy_greet 코루틴은 몇 초의 딜레이를 가진 후에, 입력받은 문구를 출력하도록 구성되었다.
  2. get_event_loop() 함수를 써서 런루프를 얻고
  3. run_until_complete()를 이용해서 이를 돌리면서 코루틴을 넘겨준다. 이 메소드는 코루틴이나 Future 객체를 받아서 스케줄링한다. 코루틴이 전달된 경우에는 이를 내부에서 Future로 래핑하여 처리한다. 어쨌든 이 함수는 넘겨받은 코루틴이 실행을 끝내면 리턴한다.

사실 여기까지만 해도 이걸 왜 써야함? 하고 감이 오지 않을 것이다.

여러 개의 비동기 작업을 스케줄링해보자

여러개의 코루틴을 한꺼번에 스케줄링하면 위에서 언급한바와 같이 순서대로 실행되면서 내부에서 await가 등장하면 다른 코루틴으로 제어권이 넘어간다. 다음의 예제는 여러 코루틴을 한꺼번에 스케줄링하고 as_completed를 이용해서 하나씩 그 결과를 얻어 처리하는 가장 기본적인 패턴이다.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()


async def main():
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    for f in asyncio.as_completed(fts):
        x = await f
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
  1. lazy_greet()는 지연 시간 후에 메시지를 출력한 후에 해당 메시지를 대문자로 변환해서 리턴하도록 수정됐다.
  2. 맨먼저 런루프에 대해 main() 이 스케줄링되고 즉시 실행된다.
  3. main 내에서는 fts를 생성하면서 다섯개의 코루틴이 스케줄링 된다. 하지만 현재 스레드에서 제어권은 아직 main에 있으므로 여기서 블럭되지 않고 즉시 넘어간다.
  4. for 문에서 첫번째 await 가 등장한다. 이제 main 코루틴의 실행은 여기서 잠시 멈추고 fts 내의 첫번째 코루틴이 실행된다.
  5. lazy_greet('hello')에서 먼저 메시지를 출력한다. 그런다음 await를 만났으니 여기서 일시정지하고 스케줄링된 다음 코루틴으로 넘어간다.
  6. lazy_greet('world')가 시작된다. 역시 첫 메시지를 출력하고 await를 만난다. 이런 식으로 다섯 개 코루틴이 모두 await 에서 기다린다.
  7. 처음 최소 1초간은 모든 코루틴이 await 에서 멈춰있다.
  8. fts 내 다섯 코루틴 중에서 지연시간이 가장 짧은 코루틴의 asyncio.sleep()이 끝난다. 그럼 해당 코루틴은 await 다음부터 진행한 후 리턴한다.
  9. 하나의 코루틴이 리턴하면, 이벤트 루프는 그 다음번 기다리고 있는 코루틴으로 제어권을 넘겨준다. 이 코루틴은 main 일수도 있고 다른 lazy_greet()일 수 있다.
  10. 이런식으로 런루프로부터 먼저 끝난 코루틴이 발생할 때마다 x = await f가 평가 완료되고 그 결과가 하나씩 출력된다.

특정 종료 조건까지 대기하는 방법

concurrent.futuresExecutor.wait() 에 대응하는 것은 asyncio.wait() 이다. 여기에는 Futures의 시퀀스와 타임아웃 혹은 종료 조건이 넘겨지고, (done, pending)의 집합 짝이 리턴된다. 중요한 것은 wait 역시 코루틴이며, 따라서 await를 통해서 결과를 받아야 한다는 점이다. 6

coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

위의 예제를 특정 타임아웃 기간 동안만 처리하도록 asyncio.wait()를 사용하는 패턴으로 변경해보자.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()


async def main():
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    (done, pending) = await asyncio.wait(fts, timeout=2)
    if pending:
        print("there is {} tasks not completed".format(len(pending)))
        for f in pending:
            f.cancel()
    for f in done:
        x = await f
        print(x)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

이 코드의 실행은 대동소이하다. 항상 코루틴이 스케줄링되고 await 되는 부분에서 다음 스케줄로 넘어가면 된다.

  1. main()이 맨먼저 스케줄링되어 실행을 시작한다.
  2. main내에서 다섯개의 다른 코루틴이 스케줄링되어 대기하고 있다가
  3. await asyncio.wait를 만난다. 이제 앞의 다섯개 코루틴이 한 번씩 그 내부의 await를 만날때까지만 실행된다.
  4. asyncio.wait는 아직 끝나지 않았고 계속 기다리며, 시간이 흐르면서 타임아웃이 완료되면 아마 어떤 한 코루틴내에서 실행중이던 흐름이 강제로 끊기고 wait()가 리턴한다.
  5. wait()의 결과는 task의 상태에 따라서 두 세트로 나뉘어 리턴되었다.
  6. 종료되지 않은 건들에 대해서 각각을 취소하고
  7. 종료된 건들의 결과를 받아온다. 사실 이 시점에서 f들은 모두 종료되었으므로 await가 아니라 f.result()로 동기식으로 받아와도 된다.

단일 코루틴의 실행을 기다리는 상황이라면 wait_for()를 쓰면 된다.

결과만 취합하기

as_completed를 통해서 완료된 코루틴의 결과들을 개별적으로 꺼내는 것 말고 한꺼번에 결과를 취합받는 방법도 있다. 이는 asyncio.gather()를 이용하는데, 이 함수는 Executor.map() 함수와 비슷하다. 차이가 있다면,

  1. 코루틴 함수이며
  2. Future의 시퀀스를 인자로 받는게 아니라, 개별 인자들로 받는다.

이번에는 예제를 조금 더 바꿔봤다. 각각의 코루틴은 개별적으로 동작하므로 이번에는 각 초를 카운트하는 코루틴도 추가로 돌려보도록 하자.

import asyncio
import random


async def lazy_greet(msg, delay=1):
    print(msg, "will be displayed in", delay, "seconds")
    await asyncio.sleep(delay)
    return msg.upper()

async def time_log():
    i = 0
    print("time log starts.")
    while True:
        await asyncio.sleep(1)
        i += 1
        print('...%02d sec.' % (i,))


async def main():
    t = asyncio.ensure_future(time_log())
    messages = ['hello', 'world', 'apple', 'banana', 'cherry']
    fts = [asyncio.ensure_future(lazy_greet(m, random.randrange(1, 5)))
           for m in messages]
    result = await asyncio.gather(*fts)
    t.cancel()
    print(result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
  1. asyncio.gather()를 통해서 모든 코루틴의 결과를 수집한다. 이 결과는 [Future] 타입이 아니라 임의의 리스트 타입이다.

Task / Future

asyncio 모듈의 Future는 아직 완료되지 않고 실행중일 코루틴 작업을 감싸는 클래스이며, concurrent.future의 그것과 같은 API를 제공하고 있다.

  • result() 를 이용해서 결과를 얻거나
  • done(), cancelled() 를 이용해 완료/취소여부를 확인할 수 있으며, cancel() 메소드로 취소할 수 있다.
  • add_done_callback()을 이용해서 완료 콜백함수를 삽입할 수 있다.

Task 클래스는 Future의 서브 클래스로, 클래스 메소드를 통해서 주어진 이벤트 루프에 걸려있는 모든 작업들 및 현재 실행중인 작업을 알아낼 수 있는 기능을 제공한다. (하지만 병렬처리 모듈의 Future라 헷갈려서 그냥 한 번 더 래핑한게 아니냐는 생각도….)

코루틴의 완료 콜백

Taskadd_done_callback()메소드를 통해서 완료 콜백을 줄 수 있다. (생각해보면 그리 중요하지는 않은 것이, 대부분의 코루틴은 await 뒤에서 호출되기 때문에 자신을 실행한 코루틴에게 결과를 돌려줄 수 있기 때문이다.) 멀티 스레딩 코드에서는 스레드의 작업이 끝나서 리턴하는 시점을 동기화할 수 없기 때문에 콜백이 중요하지만, 코루틴의 경우에는 동작이 완료되면 이벤트 루프에 의해서 실행 흐름이 복구되기 때문에 약간 묘하게 덜 중요한 느낌이다.

비동기 코루틴의 완료 콜백으로 전달되는 함수는 결과값을 받는 함수가 아닌 task/future 객체를 인자로 받는 일반 함수이다.

정리

asyncio의 코루틴은 실질적으로 CPU의 스레드 처리 입장에서 보자면 여러 코루틴이 await를 만날 때까지 돌려가면서 스레드를 사용하는 셈이다. 이 때 각 코루틴이 기다리는 동작이 시스템 차원에서 CPU와 무관한 I/O 작업이라는 가정하에 별도로 진행되다가 작업이 완료되면 이벤트 루프에 등록되는 식으로 처리되기 때문에 메인 스레드입장에서는 non-blocking한 프로그램을 만들 수 있다는 장점이 있다.

asyncioFuture는 역시 “아직 완료되지 않은 작업”의 의미로 Promise API를 제공하는데, 멀티스레드/멀티프로세스 작업의 그것과는 오묘하게 다른 뉘앙스는 존재한다. Future의 외부 관점에서 볼 때 concurrent.futures의 그것은 내부적으로 일이 돌아가는 중 상태이지만, asyncioFuture는 그냥 await가 선언된 지점에서 멈춰있는 작업인 셈이다.

하지만 실제로 asyncio를 활용하는데는 여전히 몇가지 한계가 남아있다. 실제로 이 글에서 보여준 예제들은 asyncio.sleep에 의존하는데, 이 코루틴은 완전하게 비동기로 구현되어 있기 때문이다. 커스텀하게 작성된 함수들이나 asyncio가 아닌 파이썬 표준 라이브러리들은 여전히 blocking하게 작성되어 있기 때문에, 실제 쓸만한 연습용 코드를 작성해봤자 (일부러 중간중간에 asyncio.sleep()을 넣지 않는 이상) 순차적으로 실행될 뿐이기 때문이다. 물론 asyncio 내에서는 비동기 소켓 통신 등의 기능을 지원하고 있지만 활용하기에는 난관이 많은 편이다. 다음 글에서는 blocking 함수들을 포함한 기존의 코드를 어떻게 asyncio와 통합해서 사용할 수 있는지, 그 가능성에 대해서 살펴보도록 하겠다.


  1. 실제 파이썬 문서에서도 concurrecy같은 표현을 쓰기 때문에 헷갈리는 사람들이 많다. 
  2. 파이썬 3.4 버전에서는 asyncio.async() 이다. 
  3. 코루틴 생성함수를 실행해서 리턴된 객체 
  4. 문서 참고. 런루프에 의해 스케줄링된 Future를 의미한다. 
  5. concurrent.futures에서도 언급했지만 API가 같다는 것이지, 이 둘은 호환되는 클래스가 아니다. 
  6. https://docs.python.org/3/library/asyncio-task.html#asyncio.wait 
  • 강태평

    엄청나게 도움이 됐어요 감사합니다

  • Hyeungshik Jung

    이해에 큰 도움이 되었습니다. 감사합니다 😀

  • Pingback: python async URL요청 – ItRockIt()

  • 방준호(Junho Bang)

    이해에 큰 도움이 되었습니다. 정리 감사합니다.

  • Luke Lee

    좋은글 감사합니다.