콘텐츠로 건너뛰기
Home » 파이썬의 새로운 병렬처리 API – Concurrent.futures

파이썬의 새로운 병렬처리 API – Concurrent.futures

컴퓨터 프로그램이 어떤 일을 처리해 나가는 실행 흐름은 기본적으로 선형이다. 서브루틴을 실행하는 것은 실질적으로 현재의 실행흐름이 해당 루틴의 단계까지 내려갔다가 다시 복귀하는 것이며, 하나의 CPU는 마치 고속도로에서 차선을 바꿔타듯 코드를 진행하며 작업을 처리해나간다. 하지만 두 개의 차선에서 동시에 다른 작업이 진행되어야 하는 상황이 언젠가는 필요할 수 있을 것이다.

이전에는 이것을 ‘동시성’이라 불렀고, 파이썬에서는 기본적으로 threading.ThreadMultiprocessing.Process 를 이용해서 다중 스레드 및 다중 프로세스를 통해서 동시성 작업을 처리했다. 이들 라이브러리 이전에 _thread 라는 저수준 API가 존재했었지만, 지금은 잊어도 좋을 것 같다.

파이썬 3.2에서는 ‘동시성’에 관한 개념을 발전시킨 고수준 API를 제공하여 더 나은 형태의 코드를 작성할 수 있게끔해주는데, 그것이 바로 오늘 소개할 concurrent.futures 라이브러리이다.

이 새로운 API는 기존의 멀티스레드 관련 API를 완전히 대체하지는 않는다. concurrent 모듈은 여전히 내부적으로 _thread와 같은 기존 API에 의존하고 있는 것도 사실이며, 스레드나 프로세스를 저수준에서 세세하게 제어하는 수단을 모두 제공하지도 않는다.

다만 주목할 것은 자바스크립트의 Promise 개념과 비슷한 Future라는 개념을 도입했고, 결과적으로 분산처리 및 그외 동시성에 관련된 코드를 좀 더 편하게 작성할 수 있게 해준다.

기존 스레드 모듈과의 차이점

앞서 말했듯이 concurrent.futures는 기본적으로 threading.Thread 보다는 고수준의 API를 구성한다고 했다. 하지만 실질적으로는 개념적인 차이가 있다. 기존의 스레드 모델은 일련의 코드를 다른 스레드에서 실행하는 것에 중점을 둔다. 단일 스레드에서는 메인루틴이 서브 루틴을 호출하는 경우, 이 서브루틴의 작업이 완료되기 전에는 메인루틴이 더 이상 진행하지 않는다. 이때 서브 루틴의 코드가 별도의 스레드에서 작동한다면, 메인 루틴은 서브 루틴이 처리하는 작업 때문에 ‘진행에 방해를 받지 않고’ 하던 일을 계속할 수 있다.

스레드만 예를 들었을 때, threading.Thread는 스레드를 생성하고 실행하는 것에 주안점을 두며, 그외 락이나 세마포어 같은 동기화 수단을 통해 스레드 실행의 시작과 중지를 제어하는데 포커스를 둔다. 이 때, 이 모델에서는 스레드로 갈라져 나온 실행흐름을 완전히 새로운 노선으로 보고 있다. 즉 어떤 데이터 A에 대한 처리를 별도의 워크 스레드에서 처리하고 그 결과로 B가 만들어진다면 이 B를 원래의 스레드 혹은 메인 스레드로 전달할 방법에 대해서는 지금 당장 고려하지 않는다.

대신에 기존 스레드 모듈은 스레드간의 ‘동기화’를 전통적인 ‘동기화 수단’이라 불리는 것에 의존한다. 즉 C의 스레드 API를 객체지향 적으로 감싸 고도화 하는 것을 목적으로 한 것이다. 그런데, 전통적인 동기화 수단(락, 세마포어, 컨디션)은 데이터를 안전하게 보관하고 전달하기 위해서, 각각의 스레드가 싸우지 않도록 교통정이를 하는 것에 초점을 두고 있다. 그러다보니 파이썬에서 두 스레드 간의 데이터 전달과 같이 언뜻 봐도 가장 기본적이고 또 중요한 작업은 정작 스레드 모듈이 아닌 queue 모듈을 사용해야 한다.

새로운 모듈인 concurrent.futures 는 Future라는 개념을 도입했고, 이것은 다른 스레드에서 작동한 코드가 만드는 결과물을 안전하고 또 쉽게 전달받을 수 있는 기능을 포함한다. 그리고 근본적으로 하나의 스레드에서는 해당 스레드에서의 흐름만 고려할 수 있는 방향으로 API의 디자인이 진화하고 있다.

future

concurrent.futures의 다중 스레드/프로세스 모델에서는 데이터의 처리 흐름을 분산하는 것에 주안점을 두고 Future 클래스를 도입했다. 이 Future 클래스를 통해서 분산 처리된 결과를 손쉽게 원래의 스레드로 돌려줄 수 있게 되었다.

Future란 별도의 실행흐름에서 처리되는 코드를 캡슐화한 것이다. 현재 스레드는 작업을 스케줄링한 후 즉시 Future 객체를 얻게 되며, 이는 작업이 다른 어딘가에서 처리중이며 ‘미래의 적절한 시점에 그 결과가 resolve되어 나오게 된다’는 약속을 의미한다. 자바스크립트에서는 비동기 호출의 결과로 Promise를 얻게 되는데 이와 유사한 디자인으로 구현된 것이 바로 Future이다.

Future의 사용 방법은 다음과 같다.

  1. (나중에 소개할) 모종의 방법으로 특정한 함수를 별도의 실행흐름에서 시작하고, 해당 작업의 완료를 기다리면서 블럭되는 대신 Future 객체를 얻는다.
  2. 1의 시점 직후에는 Future 내의 작업은 아직 실행 중이며 그 결과는 생성되지 않았을 것이다.
  3. Future 객체 내의 완료 여부와 상관 없이, 우리는 Future에 콜백을 추가하거나, 혹은 작업을 취소할 수 있다. 그리고 여전히 현재 스레드에서 다른 작업을 처리할 수도 있다.
  4. 현재 스레드에서 작업의 결과가 필요한 시점이 오면 Future.result()를 통해서 결과를 요청한다. 이 시점에서 Future의 결과가 생성되어 있다면 그 결과는 즉시 반환된다. 만약 Future 내부의 코드가 아직 처리중이라면 result() 메소드는 그 시점부터 결과의 생성을 기다리도록 현재 스레드를 중지시킬 수 있다.

그렇다면 함수와 매개 변수들을 가지고 이를 어떻게 Future로 만들 수 있을까? 이 작업을 처리해주는 것이 실행기(Executer)이다.

Executor

 concurrent.futures에서의 병렬작업은 별도의 스레드나 프로세스에서 동작할 수 있으며, 사용자는 직접 이를 생성할 필요가 없다. 스레드/프로세스를 생성하고 작업을 스케줄링하는 일은 실행기라 불리는 클래스가 담당하여 처리한다. 실행기들은 한정된 개수의 스레드/프로세스 풀을 가지고 있으며, 스케줄링된 작업은 가용 스레드/프로세스가 있을 때 시작될 수 있다.

실행기는 스레드를 사용하느냐, 프로세스를 사용하느냐에 따라서 ThreadPoolExecutor 혹은 ProcessPoolExecutor 중 하나를 사용하면 되며, 이 두 클래스 모두 동일한 Future 와 관련되며, API들이 모두 똑같이 생겼다. 즉 완전히 동일한 코드에서 클래스만 선택해주면 멀티스레드 환경인지 멀티 프로세스 환경인지를 쉽게 변경할 수 있다.

생성

(ThreadPoolExecutor를 기준으로 예를 들겠음) 실행기를 생성할 때 인자로는 max_workers= 가 있다. 풀에서 관리되는 스레드의 최대 갯수를 지정한다. 프로세스 풀의 경우 기본값으로 시스템의 CPU 코어 수가 사용되며, 스레드 풀의 경우에는 프로세스 개수의 5배가 기본적으로 지정된다.

작업 실행

비동기로 작업을 실행하는 방법에는 실행기의 두 가지 메소드가 사용된다. 먼저 단일 작업을 Future로 생성하는 .submit()이 있고 여러 개의 작업을 동시에 시작하는 .map()이 있다.

.submit() – 개별작업실행

submit()은 개별 함수 호출을 별도의 흐름에서 시작하고 Future를 즉시 리턴하는 함수이다. 첫번째 인자로 함수 객체를 받으며, 이후 받는 모든 인자를 함수 객체에 전달한다.

executor.submit(work, *args, **kwds)
 \--> 별도 스레드에서 work(*args, **kwds)를 실행
  --> Future를 리턴

.map() – 동시성 map 연산

concurrent.futures.Executor.map()은 어떤 함수를 반복 가능 객체의 각 원소에 대해 적용하는 기본 함수 map() 함수와 비슷하다. 하지만 기본함수 map()과 달리 이 메소드는 느긋하게 동작하지 않으며, 전달된 작업들을 가능한 빨리 (그리고 가능하다면 동시에) 시작된다. 모든 작업의 처리가 완료되면 변환된 결과가 리스트로 리턴된다. 즉 모든 요소값에 대해서 내부적으로 Future 객체를 생성하고 모든 future에 대해서 result()가 반환될 때 이 결과를 취합하여 전달하는 것이다.

이 메소드의 첫 인자는 map() 함수와 같이 인자를 1개 받는 함수이다. 만약 추가적인 인자를 더 사용해야 한다면 functools.partial을 쓰거나 익명 함수를 사용해서 간접적으로 호출하도록 한다.

all_result = list(executor.map(work, data, timeout=2))
 # -> [11, 22, 33, 44, 55, 66]

추가 인자로 timeout= 을 전달하여 지정 시간(초 단위)이내까지만 실행하도록 할 수 있다. 이는 .map()이 호출된 이후에 지정한 시간이 지난 후에도 모든 future가 결과를 resolve하지 못했다면 concurrent.futures.TimeoutError 예외를 일으키게 된다.

이 메소드는 여러 면에서 모듈함수 .as_completed()와 비교된다.

.shutdown() – 리소스 정리

.shutdown() 메소드는 executor에게 종료 시그널을 보내어 더이상 사용하지 않게 됨을 알리게 된다. 이 메소드가 호출된 실행기는 아직 시작하지 않은 모든 future들을 시작하지 않고 취소하며, 이미 진행중인 작업들이 종료될 때까지 대기한다.shutdown이 호출된 executor에게는 추가적으로 map, submit을 호출할 수 없다.

wait= 파라미터값을 전달할 수 있는데, 이것은 이미 시작해버린 작업들을 지금 기다릴 것인지 프로그램 종료 시점에 대기할 것인지를 결정하는 플래그이다. 기본적으로 with 구문 내에서 실행기를 사용하면 블럭을 나가기 전에 .shutdown(wait=True)를 호출한 것 같은 효과를 낸다.


Future 상세

실행기의 .submit() 메소드를 사용하거나, 모듈 함수인 concurrent.futures.wait(), concurrent.futures.as_completed()를 사용하면 Future 객체를 얻게 된다.

Future 객체의 메소드 몇 가지를 소개한다.

  • cancel() : 작업 취소를 시도한다. 만약 현재 실행중이고 취소가 불가능할 경우 False를 리턴한다. 작업이 취소되었다면 True가 리턴된다.
  • canceled() : 취소가 완료된 작업이면 True를 리턴한다.
  • running(): 실행 중인 경우 True를 리턴한다.
  • done(): 작업이 완료되어고 정상적으로 종료되었다면 True를 리턴한다.
  • result(timeout=): 결과를 대기한 후 리턴한다. timeout= 인자가 주어지면 해당 초까지 기다린다.
  • add_done_callback(): 완료 콜백을 추가할 수 있다. 콜백은 Future 를 인자로 받는 형태여야 하며, future가 완료되거나 취소될 때 호출된다. (취소 여부를 알기 위해서는 콜백에 전달된 future에 대해서 .cancelled()를 호출해본다.) 콜백을 추가하는 시점에 이미 작업이 완료/취소되었다면 콜백이 즉시 호출된다.

메소드들이 거의 모두 asyncio.Future와 비슷하게 생겼다는 점을 알 수 있다. 실제로 asyncio.Futureconcurrent.futures.Future 클래스의 디자인을 따와서 만들었다. 물론 이름이나 메소드들이 닮아있다 뿐이지 이 두 클래스의 내부 구조는 다르므로 혼합해서 사용하는 것은 불가능하다.

모듈 함수

concurrent.futures  모듈은 두 가지 함수를 제공한다. 이들은 공통적으로 List[concurrent.futures.Future] 타입을 인자로 받는다. 즉 한 개 이상의 Future를 받아서 완료를 기다리거나, 먼저 완료되는 것 부터 하나씩 처리하게 하는 것이다.

concurrent.futures.wait()

먼저 wait() 함수는 (asyncio.wait()와 비슷한데) 특정한 조건까지 future의 시퀀스를 대기한 다음 완료된 것들, 계류중인 것들로 구성된 튜플을 리턴한다.

리턴 시점에 대한 조건은 return_when= 인자가 결정한다.

  • FIRST_COMPLETD : 가장 빨리 끝난 하나의 작업이 있을 때 리턴
  • FIRST_EXCEPTION : 예외가 발생하면 리턴, 예외가 발생하지 않으면 모두 완료될 때 리턴
  • ALL_COMPLETED : 기본값으로 모든 작업이 완료될 때 리턴.

추가적으로 timeout= 인자를 통해서 주어진 시간 후에 강제로 리턴할 수 있다.

as_completed()

as_completed()함수는 이터레이터를 리턴하는데, 주로 for ... in 구문에 사용된다. 즉 주어진 future들 중에서 먼저 끝나는 것부터 차례로 순회하여 반복문을 돌리는 것이다. executor.map() 은 주어진 연속열의 순서대로 결과를 리스트로 만들지만 .as_completed()에서는 결과의 순서가 반드시 주어진 것이 아니며, 실제로 이터레이터는 결과가 아닌 각각의 future 객체를 리턴한다.

as_completed()의 사용 예제

(파이썬 공식 문서 내 예제를 조금 수정했다.) url의 리스트로부터 해당 페이지를 읽어서 바이트 수를 리턴하는 함수를 하나 정의하고 이를 여러 URL에 대해서 비동기로 호출하는 예이다. urlopen() 함수의 실행 시간의 대부분은 IO 대기이기 때문에 스레드로 동작해도 순차적으로 실행했을 때 보다 빠르게 실행될 것이다. 또한, 먼저 끝나는 순서대로 완료되기 때문에 출력되는 결과가 입력 URL 순이 아니라는 점도 확인할 수 있다.

from concurrent.futures import ThreadPoolExecutor as PE
from urllib.request import urlopen
import concurrent.futures
URLS = [
    'http://www.foxnews.com',
    'http://www.cnn.com',
    'http://europe.wsj.com',
    'http://www.bbc.co.uk']
def load_url(url):
    conn = urlopen(url, timeout=1)
    return conn.read()
def main():
    with PE() as exe:
        fs = {exe.submit(load_url, url): url for url in URLS}
        for f in concurrent.futures.as_completed(fs):
            try:
                url = fs[f]
                data = f.result()
                print(f'[{url}] has {len(data)} characters.')
            except Exception:
                print(f'Fail to open : {url}')
if __name__ == '__main__':
    main()

ProcessPoolExecutor의 사용예

이번에는 앞에서 언급한 프로세스 풀을 통한 CPU 분산처리 방법이다.  map 메소드를 이용했기 때문에 모든 프로세스가 실행을 완료할 때까지 기다리게 된다. 같은 방식을 스레드 풀로 방식을 바꿔보면 확실히 성능이 차이가 날 것이다.


PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]
def is_prime(n):
    if n < 2:
        return False
    if n in (2, 3):
        return True
    for k in range(3, int(n**0.5 + 1.5), 2):
        if n % k == 0:
            return False
    return True
def main():
    with PE() as exe:
        for num, prime in zip(PRIMES, exe.map(is_prime, PRIMES)):
            print(f'{num} is prime: {prime}')
if __name__ == '__main__':
    main()

그외 몇 가지 사용예

이상의 예들은 동일한 함수를 여러 스레스/프로세스에서 동시에 분산처리하는 형태의 문제이다. 단순히 작업 자체를 별도의 스레드에서 처리해서 나중에 그 결과를 얻는 경우는 다음과 같은 식으로 쓸 수 있다.

def main():
    exe = EP()
    num = 115797848077099
    # 먼저 URL에 접속하여 페이지 데이터를 받는 것을 별도 스레드에서 처리
    f = exe.submit(load_url, 'http://some.where.com')
    # 현재 스레드는 소수 검사 로직을 처리
    prime = is_prime(num)
    print(f'{num} is prime: {prime}')
    # 이제 페이지 데이터가 필요한 순간
    data = f.result()
    print(f'The page is {len(data)} bytes.')
    exe.shutdown(True) # 작업을 종료합니다.

이처럼 concurrent.futures는 별도의 스레드에서 어떤 처리를 수행하고 그 결과를 받아오는 코드를 추가적인 다른 장치없이 처리할 수 있게 해준다. 그렇다고 하더라도 기존의 threading.Thread를 완전히 대체하는 것은 아니다. 스레드가 1회성이 아니라 데몬처럼 반복적인 작업을 처리해야 하거나, 특정한 자원을 제한적으로 사용해야 하는 등의 상황처럼 기존의 스레드 관리 모델이 적용되어야 하는 곳에서는 기존의 스레드 관련 라이브러리를 그대로 사용하면 된다.