파이썬의 새로운 병렬처리 API – Concurrent.futures

어떤 처리량이 많은 작업을 작은 단위로 쪼개거나, 현재 진행되는 흐름과 독립적으로 병렬적인 처리를 하기 위해서 멀티스레드나 멀티프로세스를 사용하는 경우가 (지금까지는 드물지만) 종종 있다.

이전에는 Threading.ThreadMultiprocessing.Process 를 이용해서 각각의 스레드나 별도 프로세를 제어하는 방식을 사용했다. 파이썬 3.2에서 이러한 비동기 실행을 위한 API를 보다 고수준으로 만들고 사용하기 쉽도록 개선한 concurrent.futures 모듈이 도입되었다.

이 새로운 API가 기존의 스레드나 멀티프로세싱 모듈을 완전히 대체하지는 않는다. 이 모듈은 내부적으로 _thread와 같은 기존 API에 의존하고 있고, 스레드나 프로세스 제어에 대한 전반적인 방법을 제공하지도 않는다. 다만 자바스크립트 진영의 Promise 개념에 자극받아 탄생한 것으로 별도의 스레드/프로세스에서 처리한 결과를 받는 동기화 부분을 간단하게 만들어준다는 장점이 있다.

기존 스레드/멀티프로세스와 차이점

기존 threading.Threadmultiprocessing.Process는 스레드 및 프로세스를 객체화하여 특정 함수를 별도의 스레드 혹은 프로세스에서 처리할 수 있게 한다. concurrent.futures는 내부적으로 이 모듈들을 사용하여 특정 작업을 병렬로 실행하는 기능을 제공하는데, 단순히 기존 API를 고수준화한 것 이상의 컨셉을 담고 있다.

스레드만 예를 들었을 때, threading.Thread는 스레드를 생성하고 실행하는 것에 주안점을 두며, 그외 락이나 세마포어 같은 동기화 수단을 통해 스레드 실행의 시작과 중지를 제어하는데 포커스를 둔다. 이는 전통적인 스레드 제어 모델의 디자인과 크게 다르지 않다. 그런데 이 모델에서는 스레드로 갈라져 나온 실행흐름을 완전히 새로운 노선으로 보고 있다. 즉 어떤 데이터 A에 대한 처리를 별도의 워크 스레드에서 처리하고 그 결과로 B가 만들어진다면 이 B를 원래의 스레드 혹은 메인 스레드로 전달할 방법에 대해서는 고려하지 않고 있다는 말이다.

처리된 결과 혹은 입력을 스레드 간에 전달하기 위해서는 별도의 스레드 안전한 교통 수단이 필요하다. 파이썬에서는 보통 queue.Queue를 써서 스레드 사이에 데이터를 전달하거나 (multiprocessing 모듈에서는 이와 유사한 multiprocessing.Queue를 지원한다) 소켓이나 파이프 등을 사용해서 데이터를 전달해야 한다. concurrent.futuresFuture라는 개념을 도입하여 스레드 및 프로세스 사이의 데이터 교환을 동기화하는 강력한 기능을 제공한다. 또한 스레드와 프로세스를 사용하는데 있어서 실행기(Executor) 클래스만 변경하면 다른 코드를 전혀 변경하지 않아도 되는 수준으로 API의 구성을 동일하게 맞춰놓은 것도 주목할만한 점이다.

future

concurrent.futures의 다중 스레드/프로세스 모델에서는 데이터의 처리 흐름을 분산하는 것에 주안점을 두고 Future 클래스를 도입했다. 이 Future 클래스를 통해서 분산 처리된 결과를 손쉽게 원래의 스레드로 돌려줄 수 있게 되었다.

Future란 별도의 실행흐름에서 처리되는 코드를 캡슐화한 것이다. 현재 스레드는 작업을 스케줄링한 후 즉시 Future 객체를 얻게 되며, 이는 작업이 다른 어딘가에서 처리중이며 ‘미래의 적절한 시점에 그 결과가 resolve되어 나오게 된다’는 약속을 의미한다. 자바스크립트에서는 비동기 호출의 결과로 Promise를 얻게 되는데 이와 유사한 디자인으로 구현된 것이 바로 Future이다.

Future의 사용 방법은 다음과 같다.

  1. (나중에 소개할) 모종의 방법으로 특정한 함수를 별도의 실행흐름에서 시작하고, 해당 작업의 완료를 기다리면서 블럭되는 대신 Future 객체를 얻는다.
  2. 1의 시점 직후에는 Future 내의 작업은 아직 실행 중이며 그 결과는 생성되지 않았을 것이다.
  3. Future 객체 내의 완료 여부와 상관 없이, 우리는 Future에 콜백을 추가하거나, 혹은 작업을 취소할 수 있다. 그리고 여전히 현재 스레드에서 다른 작업을 처리할 수도 있다.
  4. 현재 스레드에서 작업의 결과가 필요한 시점이 오면 Future.result()를 통해서 결과를 요청한다. 이 시점에서 Future의 결과가 생성되어 있다면 그 결과는 즉시 반환된다. 만약 Future 내부의 코드가 아직 처리중이라면 result() 메소드는 그 시점부터 결과의 생성을 기다리도록 현재 스레드를 중지시킬 수 있다.

그렇다면 함수와 매개 변수들을 가지고 이를 어떻게 Future로 만들 수 있을까? 이 작업을 처리해주는 것이 실행기(Executer)이다.

Executor

 concurrent.futures에서의 병렬작업은 별도의 스레드나 프로세스에서 동작할 수 있으며, 사용자는 직접 이를 생성할 필요가 없다. 스레드/프로세스를 생성하고 작업을 스케줄링하는 일은 실행기라 불리는 클래스가 담당하여 처리한다. 실행기들은 한정된 개수의 스레드/프로세스 풀을 가지고 있으며, 스케줄링된 작업은 가용 스레드/프로세스가 있을 때 시작될 수 있다.

실행기는 스레드를 사용하느냐, 프로세스를 사용하느냐에 따라서 ThreadPoolExecutor 혹은 ProcessPoolExecutor 중 하나를 사용하면 되며, 이 두 클래스 모두 동일한 Future 와 관련되며, API들이 모두 똑같이 생겼다. 즉 완전히 동일한 코드에서 클래스만 선택해주면 멀티스레드 환경인지 멀티 프로세스 환경인지를 쉽게 변경할 수 있다.

생성

(ThreadPoolExecutor를 기준으로 예를 들겠음) 실행기를 생성할 때 인자로는 max_workers= 가 있다. 풀에서 관리되는 스레드의 최대 갯수를 지정한다. 프로세스 풀의 경우 기본값으로 시스템의 CPU 코어 수가 사용되며, 스레드 풀의 경우에는 프로세스 개수의 5배가 기본적으로 지정된다.

작업 실행

비동기로 작업을 실행하는 방법에는 실행기의 두 가지 메소드가 사용된다. 먼저 단일 작업을 Future로 생성하는 .submit()이 있고 여러 개의 작업을 동시에 시작하는 .map()이 있다.

.submit() – 개별작업실행

submit()은 개별 함수 호출을 별도의 흐름에서 시작하고 Future를 즉시 리턴하는 함수이다. 첫번째 인자로 함수 객체를 받으며, 이후 받는 모든 인자를 함수 객체에 전달한다.

executor.submit(work, *args, **kwds)
 \--> 별도 스레드에서 work(*args, **kwds)를 실행
  --> Future를 리턴

.map() – 동시성 map 연산

concurrent.futures.Executor.map()은 어떤 함수를 반복 가능 객체의 각 원소에 대해 적용하는 기본 함수 map() 함수와 비슷하다. 하지만 기본함수 map()과 달리 이 메소드는 느긋하게 동작하지 않으며, 전달된 작업들을 가능한 빨리 (그리고 가능하다면 동시에) 시작된다. 모든 작업의 처리가 완료되면 변환된 결과가 리스트로 리턴된다. 즉 모든 요소값에 대해서 내부적으로 Future 객체를 생성하고 모든 future에 대해서 result()가 반환될 때 이 결과를 취합하여 전달하는 것이다.

이 메소드의 첫 인자는 map() 함수와 같이 인자를 1개 받는 함수이다. 만약 추가적인 인자를 더 사용해야 한다면 functools.partial을 쓰거나 익명 함수를 사용해서 간접적으로 호출하도록 한다.

all_result = list(executor.map(work, data, timeout=2))
 # -> [11, 22, 33, 44, 55, 66]

추가 인자로 timeout= 을 전달하여 지정 시간(초 단위)이내까지만 실행하도록 할 수 있다. 이는 .map()이 호출된 이후에 지정한 시간이 지난 후에도 모든 future가 결과를 resolve하지 못했다면 concurrent.futures.TimeoutError 예외를 일으키게 된다.

이 메소드는 여러 면에서 모듈함수 .as_completed()와 비교된다.

.shutdown() – 리소스 정리

.shutdown() 메소드는 executor에게 종료 시그널을 보내어 더이상 사용하지 않게 됨을 알리게 된다. 이 메소드가 호출된 실행기는 아직 시작하지 않은 모든 future들을 시작하지 않고 취소하며, 이미 진행중인 작업들이 종료될 때까지 대기한다.shutdown이 호출된 executor에게는 추가적으로 map, submit을 호출할 수 없다.

wait= 파라미터값을 전달할 수 있는데, 이것은 이미 시작해버린 작업들을 지금 기다릴 것인지 프로그램 종료 시점에 대기할 것인지를 결정하는 플래그이다. 기본적으로 with 구문 내에서 실행기를 사용하면 블럭을 나가기 전에 .shutdown(wait=True)를 호출한 것 같은 효과를 낸다.


Future 상세

실행기의 .submit() 메소드를 사용하거나, 모듈 함수인 concurrent.futures.wait(), concurrent.futures.as_completed()를 사용하면 Future 객체를 얻게 된다.

Future 객체의 메소드 몇 가지를 소개한다.

  • cancel() : 작업 취소를 시도한다. 만약 현재 실행중이고 취소가 불가능할 경우 False를 리턴한다. 작업이 취소되었다면 True가 리턴된다.
  • canceled() : 취소가 완료된 작업이면 True를 리턴한다.
  • running(): 실행 중인 경우 True를 리턴한다.
  • done(): 작업이 완료되어고 정상적으로 종료되었다면 True를 리턴한다.
  • result(timeout=): 결과를 대기한 후 리턴한다. timeout= 인자가 주어지면 해당 초까지 기다린다.
  • add_done_callback(): 완료 콜백을 추가할 수 있다. 콜백은 Future 를 인자로 받는 형태여야 하며, future가 완료되거나 취소될 때 호출된다. (취소 여부를 알기 위해서는 콜백에 전달된 future에 대해서 .cancelled()를 호출해본다.) 콜백을 추가하는 시점에 이미 작업이 완료/취소되었다면 콜백이 즉시 호출된다.

메소드들이 거의 모두 asyncio.Future와 비슷하게 생겼다는 점을 알 수 있다. 실제로 asyncio.Futureconcurrent.futures.Future 클래스의 디자인을 따와서 만들었다. 물론 이름이나 메소드들이 닮아있다 뿐이지 이 두 클래스의 내부 구조는 다르므로 혼합해서 사용하는 것은 불가능하다.

모듈 함수

concurrent.futures  모듈은 두 가지 함수를 제공한다. 이들은 공통적으로 List[concurrent.futures.Future] 타입을 인자로 받는다. 즉 한 개 이상의 Future를 받아서 완료를 기다리거나, 먼저 완료되는 것 부터 하나씩 처리하게 하는 것이다.

concurrent.futures.wait()

먼저 wait() 함수는 (asyncio.wait()와 비슷한데) 특정한 조건까지 future의 시퀀스를 대기한 다음 완료된 것들, 계류중인 것들로 구성된 튜플을 리턴한다.

리턴 시점에 대한 조건은 return_when= 인자가 결정한다.

  • FIRST_COMPLETD : 가장 빨리 끝난 하나의 작업이 있을 때 리턴
  • FIRST_EXCEPTION : 예외가 발생하면 리턴, 예외가 발생하지 않으면 모두 완료될 때 리턴
  • ALL_COMPLETED : 기본값으로 모든 작업이 완료될 때 리턴.

추가적으로 timeout= 인자를 통해서 주어진 시간 후에 강제로 리턴할 수 있다.

as_completed()

as_completed()함수는 이터레이터를 리턴하는데, 주로 for ... in 구문에 사용된다. 즉 주어진 future들 중에서 먼저 끝나는 것부터 차례로 순회하여 반복문을 돌리는 것이다. executor.map() 은 주어진 연속열의 순서대로 결과를 리스트로 만들지만 .as_completed()에서는 결과의 순서가 반드시 주어진 것이 아니며, 실제로 이터레이터는 결과가 아닌 각각의 future 객체를 리턴한다.

as_completed()의 사용 예제

(파이썬 공식 문서 내 예제를 조금 수정했다.) url의 리스트로부터 해당 페이지를 읽어서 바이트 수를 리턴하는 함수를 하나 정의하고 이를 여러 URL에 대해서 비동기로 호출하는 예이다. urlopen() 함수의 실행 시간의 대부분은 IO 대기이기 때문에 스레드로 동작해도 순차적으로 실행했을 때 보다 빠르게 실행될 것이다. 또한, 먼저 끝나는 순서대로 완료되기 때문에 출력되는 결과가 입력 URL 순이 아니라는 점도 확인할 수 있다.

from concurrent.futures import ThreadPoolExecutor as PE
from urllib.request import urlopen
import concurrent.futures

URLS = [
    'http://www.foxnews.com',
    'http://www.cnn.com',
    'http://europe.wsj.com',
    'http://www.bbc.co.uk']


def load_url(url):
    conn = urlopen(url, timeout=1)
    return conn.read()


def main():
    with PE() as exe:
        fs = {exe.submit(load_url, url): url for url in URLS}
        for f in concurrent.futures.as_completed(fs):
            try:
                url = fs[f]
                data = f.result()
                print(f'[{url}] has {len(data)} characters.')
            except Exception:
                print(f'Fail to open : {url}')


if __name__ == '__main__':
    main()

ProcessPoolExecutor의 사용예

이번에는 앞에서 언급한 프로세스 풀을 통한 CPU 분산처리 방법이다.  map 메소드를 이용했기 때문에 모든 프로세스가 실행을 완료할 때까지 기다리게 된다. 같은 방식을 스레드 풀로 방식을 바꿔보면 확실히 성능이 차이가 날 것이다.



PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]


def is_prime(n):
    if n < 2:
        return False
    if n in (2, 3):
        return True
    for k in range(3, int(n**0.5 + 1.5), 2):
        if n % k == 0:
            return False
    return True


def main():
    with PE() as exe:
        for num, prime in zip(PRIMES, exe.map(is_prime, PRIMES)):
            print(f'{num} is prime: {prime}')


if __name__ == '__main__':
    main()

그외 몇 가지 사용예

이상의 예들은 동일한 함수를 여러 스레스/프로세스에서 동시에 분산처리하는 형태의 문제이다. 단순히 작업 자체를 별도의 스레드에서 처리해서 나중에 그 결과를 얻는 경우는 다음과 같은 식으로 쓸 수 있다.

def main():
    exe = EP()
    num = 115797848077099
    # 먼저 URL에 접속하여 페이지 데이터를 받는 것을 별도 스레드에서 처리
    f = exe.submit(load_url, 'http://some.where.com')
    
    # 현재 스레드는 소수 검사 로직을 처리
    prime = is_prime(num)
    print(f'{num} is prime: {prime}')

    # 이제 페이지 데이터가 필요한 순간
    data = f.result()
    print(f'The page is {len(data)} bytes.')
    exe.shutdown(True) # 작업을 종료합니다.

이처럼 concurrent.futures는 별도의 스레드에서 어떤 처리를 수행하고 그 결과를 받아오는 코드를 추가적인 다른 장치없이 처리할 수 있게 해준다. 그렇다고 하더라도 기존의 threading.Thread를 완전히 대체하는 것은 아니다. 스레드가 1회성이 아니라 데몬처럼 반복적인 작업을 처리해야 하거나, 특정한 자원을 제한적으로 사용해야 하는 등의 상황처럼 기존의 스레드 관리 모델이 적용되어야 하는 곳에서는 기존의 스레드 관련 라이브러리를 그대로 사용하면 된다.