concurrent.futures

Concurrent.futures

https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

파이썬3.2에서 추가된 모듈로 병렬작업을 추상화한 고수준의 API를 제공한다. 병렬작업자체는 스레드기반이나 멀티프로세스 기반으로 실행될 수 있는데, 이는 ThreadPoolExcutorProcessPoolExecutor 중 어느것을 사용하느냐에 따라 달라진다. 이 두 클래스는 동일한 API를 제공한다.

Executor

class concurrent.futures.Executor

함수/메소드 호출을 비동기적으로 진행할 수 있게 해주는 추삭 객체이다. 직접 쓰이기보다는 이를 서브클래싱하여 구상클래스를 만들어 사용한다. ThreadPoolExcutorProcessPoolExecutor의 원형이 된다.

  • submit(fn, *args, **kwargs) : 함수 fn에 대해 주어진 인자들을 전달하여 실행할 수 있는 Future 객체를 리턴한다. 해당 함수는 호출 즉시 스케줄링된다.
with ThreadPoolExcutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
  • map(func, *iterables, timeout=None) : 일반함수 map과 동일하나, 각 호출은 병렬적으로 일어난다. 만약 타임아웃값이 지정된 경우, 맵핑 작업이 완료되지 않은 호출이 있으면 TimeoutError가 일어난다.

  • shutdown(wait=True) : executor에게 종료 시그널을 보낸다. 시그널을 받은 이그제큐터는 실행중 및 대기중인 모든 future에 대해 리소스를 정리한다. 셧다운 후에 submit이나 map을 호출하면 런타임에러가 발생한다.

만약 wait 값이 True로 정해지면 진행 및 대기중이던 작업이 종료된 후에 셧다운이 일어나고, 그 때까지 해당 함수는 리턴을 보류하게 된다. 만약 강제 셧다운을 피하고 싶다면 with 구문 내에서 사용한다.

import shutil
with ThreadPoolExcutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ThreadPoolExecutor

스레드기반의 병렬처리 이그제큐터 객체. 만약 한쪽 Future가 다른쪽 Future를 기다리면서 양쪽을 순환하면 데드락을 피할 수 없다.

예제

url의 리스트로부터 해당 페이지를 읽어서 바이트 수를 리턴한다. 각 호출은 비동기적으로 진행되며, 모든 future 객체가 완료된 후에 결과값이 리스트로 리턴된다. (as_completed())

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

def load_url(url, timeout):
    conn = urllib.request.urlopen(url, timeout=timeout)
    return conn.readall()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as exe:
    future_to_url = {exe.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exp:
            print("%r generated an exception: %s" % (url, exp))
        else:
            print("{} page is {} bytes".format(url, len(data)))

ProcessPoolExecutor

이는 스레드가 아닌 서브프로세스 기반으로 함수호출을 비동기적으로 처리한다. (windows에서는 프로세스 생성비용이 매우 크므로 별로 권하고 싶지 않다.) 다만, GIL은 단일 프로세스 내에서만 영향을 주므로, 이를 피하고 싶다면 사용할 수 있다.

__main__ 모듈은 워커 서브프로세스에 대해 반입이 가능해야 한다. 따라서 REPL에서는 이를 사용할 수 없다.

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
#[Finished in 4.9s]

만약 위 작업을 ThreadPoolExecutor로 돌렸다면 약 7.9초가 걸린다. (코어수가 많다면 더 빨리 끝날 거다)

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()


#112272535095293 is prime: True
#112582705942171 is prime: True
#112272535095293 is prime: True
#115280095190773 is prime: True
#115797848077099 is prime: True
#1099726899285419 is prime: False
#[Finished in 7.9s]

Future

호출가능한 함수나 메소드를 비동기적으로 실행하기 위해 캡슐화한 객체이다. Executor.submit() 호출에 의해 인스턴스가 만들어진다. 이 객체는 asyncio의 Future 클래스와 유사한 API를 가지고 있다.

  • cancel() : 작업 취소를 시도한다. 만약 현재 실행중이고 취소가 불가능할 경우 False를 리턴한다. 작업이 취소되었다면 True가 리턴된다.
  • canceled() : 취소가 완료된 작업이면 True를 리턴한다.
  • running(): 실행 중인 경우 True를 리턴한다.
  • done(): 작업이 완료되어고 정상적으로 종료되었다면 True를 리턴한다.
  • result(): 해당 호출의 결과를 리턴한다. 만약 작업이 아직 완료되지 않았다면 최대 타임아웃1시간까지 기다린다음, None을 리턴한다.
  • exception(): 해당 호출이 던진 예외를 반환한다. 역시 작업이 완료되지 않았다면 타임아웃 시간까지 기다린다.
  • add_done_callback(): 콜백함수를 Future 객체에 추가한다. 이 함수는 future 객체하나를 인자로 받는 함수이다. 콜백은 취소되거나 종료된 경우에 모두 호출된다.

함수

  • .wait(fs, timeout=None, return_when=ALL_COMPLETED) : 비동기 작업을 기다린다.
  • .as_completed(fs, timeout=None) : 모든 실행이 끝나면 그 결과를 순회하는 이터레이터를 리턴한다.

  1. 디폴트 값은 None이며, 이 경우 무한히 기다리게 된다.