asyncio

https://docs.python.org/3/library/asyncio-task.html

Task와 코루틴

코루틴은 특정한 규약을 따르는 제너레이터이다. 문서상으로 모든 코루틴은 @asyncio.coroutine 데코레이터를 붙이지만, 이것이 반드시 필수적인 것은 아니다.

코루틴에서는 전통적인 yield 대신에 yield from 구문을 사용한다.

코루틴이라는 단어는 제너레이터와 마찬가지로 두 가지 다른 컨셉으로 사용된다.

  • 코루틴을 정의하는 함수. 이 경우 구분을 위해 이것을 코루틴 함수라 따로 부를 수 있다.
  • 코루틴 함수를 호출하여 생성된 객체. 이 객체는 계산 및 IO 작업을 표현한다. 또한 반드시 실행을 완료해야 한다. 코루틴 함수와 구분하여 코루틴 객체라 부른다.


코루틴은 다음과 같은 일을 한다.

  • result = yield from futurefuture가 완료될 때까지 코루틴은 대기한다. 작업이 완료되면 코루틴은 이 결과를 리턴하거나 예외를 낸다. (만약 future가 취소되면 CancelError를 낸다)
  • result = yield from coroutine – 다른 코루틴이 결과를 반환하기를 기다린다.
  • return expression – 자신을 기다리는 코루틴에게 결과를 리턴해준다.
  • raise Exception – 예외를 던진다.

코루틴을 호출하는 것으로 바로 코드가 실행되지는 않는다. 이는 일종의 제너레이터이고 코루틴함수로부터 리턴되는 코루틴 객체는 제너레이터 객체로, 이터레이션을 하기 전까지는 아무 일도 하지 않는다. (이 일을 해주는 구문이 yield from 이다)

코루틴은 이벤트 루프가 있을 때만 실행된다.

코루틴 데코레이터

@asyncio.coroutine

만약 코루틴이 파괴되기 전에 리턴(yield)하지 않으면 이는 에러 메시지를 로깅한다.

예제: hello world

import asyncio

@asyncio.coroutine
def hello_world():
    print("hello, world!")

loop = asyncio.get_event_loop()
loop.run_until_complete(hello_world))
loop.close()

예제: 오늘 날짜를 출력하는 코루틴

import asyncio
import datetime

@asyncio.coroutine
def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        yield from asyncio.sleep(1)

loop = asyncio.get_event_loop()
loop.run_until_complete(display_date(loop))
loop.close()

예제: 연쇄 코루틴

import asyncio

@asyncio.coroutine
def compute(x, y):
    print("Compute %s + %s ..." %(x, y))
    yield from asyncio.sleep(1.0)
    return x + y

@asyncio.coroutine
def print_sum(x, y):
    result = yeild from compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute()print_sum()에 체이닝되어 있다. 위 코드는 아래의 타임라인으로 실행된다.


이벤트루프 태스크 print_sum() compute() | 런루프시작 | -----------> 중단됨 | | | | -----------> yield from... | | | 코루틴 중지 | | | -------------> | yield from sleep(1) | | <-----------------------------| | <------------ | ==== 1초 경과 ==== |(sleep()종료)->| | | ----------------------------->| return 1+2 | | | <-------------- (raise StopIteration) | | | print(...) | |<-------------| (raise StopIteration) | | task 완료 | <-------------| 런루프 종료

태스크 객체는 BaseEventLoop.run_until_complete() 함수가 호출될 때 생성된다. 물론 위 다이어그램은 매우 간략하게 표현된 것이며 내부적인 흐름 전체를 표현하지 않는다. sleep() 코루틴은 내부 future 객체를 만드는 데 1초후에 task를 깨우도록 하는 BaseEventLoop.call_later() 함수를 사용한다.

Future

asyncio.Future 클래스의 인스턴스이다. 이는 concurrent.futures.Future와 거의 유사하다. 단,

  • result()exception()은 타임아웃값을 받지 않으며 future가 없으면 즉시 예외가 던져진다.
  • add_done_callback() 함수를 통해 등록된 콜백은 항상 call_soon_threadsafe() 함수를 통해서 호출된다.
  • 이 클래스는 concurrent.futures 패키지의 wait(), as_completed() 와는 호환되지 않는다.

예제: Future with run_until_complete()

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.async(slow_operation(future))
loop.run_until_complete(future)
print(future.result())
loop.close()
  • .set_result() : 결과값을 세팅하고 future 객체가 완료되었다고 표시한다.
  • .add_done_callback(): 콜백함수를 추가할 수 있다. 콜백함수는 future 자신을 인자로 받게 된다. 만약 여러 인자를 받는 함수를 사용해야 한다면, functools를 사용할 것
fut.add_done_callback(functools.partial(print, "Future:", flush=True))
--> print("Future:", fut, flush=True) 가 호출된다.

예제: Future와 run_forever()

import asyncio

@asyncio.coroutine
def slow_operation(future):
    yield from asyncio.sleep(1)
    future.set_result('Future is done!')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.async(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

이 예제는 future 객체를 콜백함수를 코루틴과 연결하기 위해 사용했다. slow_operation() 코루틴은 종료할 때 futrue를 done 상태로 만들게 되고, future는 이 때 콜백을 호출한다. 콜백에서는 이벤트 루프가 중지되면서 모든 작업이 종료된다.

Task

Task는 Future의 서브클래스로, 코루틴을 Future로 래핑할 수 있다. 이는 런루프에서 코루틴의 스케줄링을 제어한다. 이벤트 루프는 한 번에 하나의 태스크만을 실행할 수 있어서 여러 태스크를 동시에 돌리려면 다른 스레드에서 만들어진 런루프가 필요하다. 하지만 태스크가 future의 실행을 기다리기 위해 중지되면 런루프는 다른 태스크를 병렬적으로 실행할 수 있다.

Task를 취소하면(cancle()) 예외가 바로 떨어지는 Future와는 달리 예외를 내부의 코루틴으로 던진다.

예제: 느린 팩토리얼

아래 예제는 3개의 Task를 만들고 이를 병렬로 돌린다. 각 태스크는 내부적으로 지연을 포함하는 루프를 도는데, 초기값에 따라 슬립의 간격이 다르다.

  • 첫번째 Task가 시작된다.
  • 첫번째 Task가 yield from asyncio.sleep()을 호출하면 대기상태에 빠진다.
  • 이 때 두 번째 Task가 시작되고, 같은 식으로 세 번째 태스크가 시작된다.
  • 세번째 태스크의 슬립텀이 가장 짧으므로 슬립텀이 먼저 끝난 작업이 우선 재개된다. (따라서 실행순서는 뒤섞이게 된다)
  • 최종적으로는 Task의 총 시간이 적은 순으로 끝난다. (여기서는 지연의 총량)
import asyncio

@asyncio.coroutine
def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)... *%s" % (name,number, i))
        yield from asyncio.sleep((100.0/number)**2 / 160.0)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
task = [
    asyncio.async(factorial("A", 4)),
    asyncio.async(factorial("B", 6)),
    asyncio.async(factorial("C", 9))]
loop.run_until_complete(asyncio.wait(task))
loop.close()

실행결과는 다음과 같다.

Task A: Compute factorial(4)... *2
Task B: Compute factorial(6)... *2
Task C: Compute factorial(9)... *2
Task C: Compute factorial(9)... *3
Task C: Compute factorial(9)... *4
Task B: Compute factorial(6)... *3
Task C: Compute factorial(9)... *5
Task C: Compute factorial(9)... *6
Task B: Compute factorial(6)... *4
Task C: Compute factorial(9)... *7
Task A: Compute factorial(4)... *3
Task C: Compute factorial(9)... *8
Task B: Compute factorial(6)... *5
Task C: Compute factorial(9)... *9
Task C: factorial(9) = 362880
Task B: Compute factorial(6)... *6
Task A: Compute factorial(4)... *4
Task B: factorial(6) = 720
Task A: factorial(4) = 24
  • asyncio.as_completed(fs): future 객체들을 받아서 돌린 후 먼저 끝나는 순서대로 이터레이션한다.
for f in as_completed(fs):
    return  = yield from f # --> invoke f.result()
  • asyncio.async(): 코루틴이나 future 객체를 Task로 생성한다.
  • asyncio.wait(fs): task 객체들이나 future 객체들을 런루프에 넣고 모두 완료될 때까지 기다린다. 이 함수도 코루틴이다.
  • asyncio.wait_for(): 한 개 Future나 코루틴을 돌리고 이를 기다린다. 코루틴이 인자로 들어오면 Task 객체로 포장된다. 이 함수는 타임아웃값을 받을 수 있고, 타임아웃이 지나면 해당 task를 캔슬한다. 이 함수 역시 코루틴이다.
  • asyncio.gather(fs): 여러 개의 작업을 받아서 돌리고 모두 종료되면 그 결과를 모은 리스트를 리턴한다.

결론

asyncio는 Future라는 개념을 도입하여 이벤트루프에 이를 삽입하고, 자체적으로 스케줄링하는 기능을 자체적으로 수행할 수 있게 한다. 이는 고수준 API를 통해서 쉽고 효과적으로 병렬작업을 진행하는 것으로 특히 특정 시점에서 흐름을 분기하여 메인 스레드가 블럭킹되지 않고 IO 작업을 수행할 수 있도록 한다. GUI앱에서 네트워크 액세스나 저장소 액세스할 때, 전체적인 체감성능의 향상을 가져올 수 있다.

정리

  • 병렬작업으로 진행하려는 작업은 항상 코루틴으로 만든다. (@asyncio.coroutine)
  • asyncio.async를 사용해서 코루틴을 Task로 만들 수 있다. (선택) 이 경우 .add_done_callback() 메소드를 통해서 작업 완료 후 콜백 함수를 지정할 수 있다.
  • asyncio.wait, asyncio.wait_for를 통해서 병렬작업을 한꺼번에 돌리고 런루프를 기다릴 수 있다.
  • CLI 환경에서는 loop.run_until_complte() 함수를 통해서 비동기 작업의 종료를 기다릴 수 있다.