Python 표준 함수를 asyncio에서 비동기로 호출하는 방법

파이썬 3.4에서 asyncio 가 추가되어 I/O 바운드된 작업을 단일 스레드에서 비동기로 처리할 수 있는 방법이 생겼다. 하지만 대부분의 파이썬 내장 라이브러리 함수들은 코루틴이 아닌 일반 함수들이며, 이들은 모두 블럭킹 방식으로 동작한다. 즉 asyncio 의 비동기는 실질적으로는 I/O 액세스처럼 CPU가 관여할 필요가 없는 일들에 대해서 “병렬적으로 기다리는” 식으로 동시다발적인 처리의 전체 수행 시간을 줄이는 식으로 동작하는데, 그 중에 이런 블럭킹 함수로 처리되는 과정이 끼어 있다면 수행 시간 단축이 어렵게 된다.

런루프

이 블로그의 다른 글에서도 몇 번 이야기했듯이, 파이썬에는 GIL(전역 인터프리터 락)이라는 기재가 있어서 멀티 스레드로 병렬 작업을 처리하더라도 실질적으로 여러 스레드가 동시에 진행되지 못하고 CPU는 한 번에 하나의 스레드만 처리한다.

## 흔히 생각하는 멀티스레드

( -- 은 작업을 수행하는 시간,  ..은 스레드가 중단되는 시간을 의미)

메인스레드 ------------------------------> 진행
스레드 1     생성 -----------------------> 진행
스레드 2           생성 -----------------> 진행

## 실제 파이썬의 멀티 스레드

메인스레드 ------....---...---......-----> 진행 
스레드 1    생성 ----.........---........> 진행
스레드 2            생성---......---.....> 진행

위 도식에서 --- 으로 표현된 부분은 스레드가 일을 하는 시간이고, ... 으로 표현된 부분은 다른 스레드가 일하는 동안 해당 스레드가 멈춰있는 시간이다. 즉 아래/위 두 케이스에서 같은 시간 동안 멀티스레드가 실행되었다고 가정하면 처리된 일의 총 량은 - 의 개수와 같다고 볼 수 있으며, 파이썬의 멀티스레드는 단일 스레드에서의 작업량과 다를 바가 없는 일을 처리한다. (오히려 컨텍스트 스위칭에 들어가는 비용이 있기 때문에 더 느리다.)

이 관점에서 생각해볼 것이, asyncio 는 본래 단일 스레드에서 I/O 작업의 대기 시간이 CPU 사용시간에 포함되지 않도록 여러 코루틴을 옮겨가며 실행하는 것이라는 점이다. 그리고 위의 멀티 스레드처리에서 ... 에 해당하는 시간이 그냥 다른 스레드를 위해서 해당 스레드가 쉬는 것이 아니라 해당 스레드가 I/O 작업을 기다리는 시간으로 만들면 어떠냐는 것이다.

즉 멀티스레드로 분기해서 실행되는 블럭킹 함수 콜 자체를 I/O 작업으로 보고, 이를 기다리는동안 중지하는 비동기 코루틴이 있다면 되지 않을까? 이를 위해서 스레드 작업의 상태에 따라서 런루프에 시그널을 보내고, 해당 함수 콜 자체를 비동기 코루틴으로 감싸는 처리를 해줘야 하는데, 직접 구현하기에는 좀 난이도가 있는 작업이다.

하지만 파이썬은 왠만하면 다 있다고 했던가… 이런 기능을 이미 런루프 클래스에서 제공하고 있다. run_in_executor() 함수가 이 용도로 사용된다.

coroutine AbstractEventLoop.run_in_executor(executor, func, *args)

func 에 대해서 지정한 executor에서 실행되도록 조정한다. executor는 Executor 클래스의 인스턴스이며, None을 사용하는 경우 디폴트 executor가 사용된다. 인자들을 넘겨줄 수 있지만, 키워드 인자는 지원하지 않기 때문에 func가 키워드 인자를 요구한다면 functools.partial을 사용하는 것이 좋다. 이 메소드는 코루틴이다.

> https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.AbstractEventLoop.run_in_executor

예제

그렇다면 이게 실제로 효과가 있는지 확인해보자. 기본 라이브러리에서 네트워크 액세스를 처리하는 urllib.request.urlopen 함수를 생각해보자. 이 함수는 비동기 코루틴이 아닌 일반 함수이며, HTTP 요청을 보낸 후 응답을 받을 때까지 블럭되는 함수이다.  이 함수를 호출해서 콘텐츠를 받아오는 함수를 하나 작성해보자.

from urllib.request import urlopen
import time

async def get_url_data(url:str) -> (str, str, str):
  '''특정 URL에 요청을 보내어 HTML 문서를 문자열로 받는다.
  url, 응답텍스트, 포맷팅된 소요시간을 리턴한다.'''
  print(f'Request for: {url}')
  s = time.time()
  res = urlopen(str)
  data = res.read().decode()
  return url, data, f'{time.time() -  s: .3f}'

몇 개의 URL 집합에 대해서 이 함수를 테스트해본다.

import asyncio

urls = ('http:// ..... ') # 예닐곱 개 정도의 URL을 준비한다.

async def test_urls(co, urls):
  s = time.time()
  fs = {co(url) for url in urls}
  for f in asyncio.as_completed(fs):
    url, body, t = await f
    print(f'Resonse from: {url}, {len(body)}Bytes - {f}sec')
  print(f'{time.time() - s:0.3f}sec')

loop = asyncio.get_event_loop()
loop.run_until_complete(test_urls(get_url_data, urls))

이 코드는 비록 코루틴으로 각 URL을 테스트하는 코드를 작성하였지만, urlopen 함수 자체가 블럭킹 함수이므로 동시에 실행되는 것이 아니라 하나씩 실행된다. print() 하는 지점까지는 번갈아가며 실행되지만 HTTP 통신을 하는 동안은 한 번에 하나씩만 실행되고, 실제 출력되는 결과도 요청을 보낸 순서대로 출력된다.

다음은 스레드를 사용해서 실행하는 코드이다.

async def get_url_data2(url):
  print(f'Request for: {url}')
  loop = asyncio.get_event_loop()
  s = time.time()
  res = await loop.run_in_executor(None, urlopen, url)
  data = res.read().decode()
  return url, data, f'{time.time() -  s: .3f}'

loop = asyncio.get_event_loop()
loop.run_until_complete(test_urls(get_url_data2, urls))

일반 함수호출인 아닌 런루프+스레드 조합의 코루틴으로 감싼 호출을 사용했다. 실제 완료 시간이 체감상으로 확 줄어드는 것을 볼 수 있다.

결과 비교

다음은 위 코드를 순차적으로 실행하여 테스트하고, 걸린 시간을 비교하는 결과이다.

----------------------------------------
call by run_in_executor:
----------------------------------------
requesting: https://www.naver.com
requesting: https://www.daum.net
requesting: https://www.yahoo.com
requesting: http://fa.bianp.net/
requesting: https://jakevdp.github.io
requesting: http://arogozhnikov.github.io
----------------------------------------
response:   https://www.naver.com, 155791, 0.199
response:   https://www.daum.net, 204124, 0.379
response:   http://arogozhnikov.github.io, 24867, 0.346
response:   https://jakevdp.github.io, 60277, 0.492
response:   https://www.yahoo.com, 509304, 1.987
response:   http://fa.bianp.net/, 51394, 2.093
total time: 2.210sec
----------------------------------------

----------------------------------------
call normal function:
----------------------------------------
requesting: https://www.naver.com
requesting: https://www.daum.net
requesting: https://www.yahoo.com
requesting: http://fa.bianp.net/
requesting: https://jakevdp.github.io
requesting: http://arogozhnikov.github.io
----------------------------------------
response:   https://www.naver.com, 152331, 0.098
response:   https://www.daum.net, 204133, 0.105
response:   https://www.yahoo.com, 509307, 1.440
response:   http://fa.bianp.net/, 51394, 2.027
response:   https://jakevdp.github.io, 60277, 1.729
response:   http://arogozhnikov.github.io, 24867, 1.163
total time: 6.743sec
----------------------------------------
  1. 비동기 코루틴으로 바꿔서 실행한 경우에 총 수행 시간은 일반 함수 호출을 사용한 것보다 약 절반 이하로 실행된다.
  2. 그렇다고 개별 HTTP 요청에 소요된 시간이 (당연히) 더 짧아지지는 않는다.
  3. 전체 수행 시간은 가장 오래걸린 연결 시간보다 약간 큰 값이다.
  4. 결과가 출력되는 순서는 요청을 시작한 순서와 다르며, 이는 응답시간이 빠른 순으로 표시된다. 결국 6개의 연결이 동시에 수행되었다가, 먼저 완료된 것 순으로 결과가 출력되었음을 알 수 있다.
  5. 일반 함수 호출모드에서 테스트한 결과에서 총 시간은 개별 연결 시간의 합과 비슷하다.
  6. 개별 연결에 걸린 시간에 상관없이 출력 순서는 요청 순서와 일치한다. 즉 HTTP 통신 자체가 동시에 이루어지지 못하고 순서대로 하나씩 실행된 셈이다.

정리

대부분의 가이드 문서가 asyncio.sleep 만 사용하는 식으로 예제를 만들어내다 보니, 기존의 표준 라이브러리 함수들을 논블럭 모드로 사용하는 방법에 대해서는 따로 소개하지 않는 경우가 많다. 이처럼 런루프의 run_in_executor() 를 사용하면 기존의 표준 함수들도 병렬적으로 동작하는 코루틴으로 완전하게 전환할 수 있다. 어차피 I/O 대기 시간을 병렬적으로 쉬어버리는 식으로 스케줄링 되기 때문에 GIL의 영향을 우회해서 훌륭하게 전체 수행 시간을 단축할 수 있으며, 기존의 함수를 그대로 쓸 수 있다는 막강한 장점이 있다.

다만, 이는 스레드를 별도로 생성해서 동작하기 때문에 메모리 자원을 보다 많이 사용하게 되는 문제가 있고, 또한 I/O 바운드되는 작업에만 적용할 수 있다. 예를 들어 CPU를 많이 사용하는 계산이 필요한 소수 검사 등의 연산은 당연히 스레드를 동시에 돌리지 못한다. 물론 이 경우에도 ProcessPoolExecutor를 사용해서 다중 프로세스로 처리하는 방법도 있을 것이다.