리스트와 같은 일련의 데이터를 하나의 함수로 반복 처리해야 할 때, 이를 여러 프로세스에서 분산하여 동시에 처리하게 하면 전체적인 작업 시간을 단축시킬 수 있다. 서브 프로세스를 사용하는 방법으로는 multiprocessing.Process
를 사용하는 방법이 있지만, 프로세스별로 분산하여 처리한 데이터를 다시 수집하여 취합하는 과정이 좀 번거롭다. 또한 데이터 매우 많을 때 너무 많은 프로세스가 생기지 않도록 프로세스 풀을 관리하는 로직을 직접 구현해야하는 점이 번거롭기도 하다. concurrent.futures
는 이러한 분산처리에 특화된 작업을 쉽게 구현할 수 있도록 도와주는 라이브러리로 사용방법이 쉽고, asyncio와 비슷한 Future라는 개념을 사용하고 있다.
함수의 병렬 맵핑 방법
concurrent.future
에서 제시하는 병렬 처리 방법 중 가장 간단한 것은 프로세스 풀의 map()
메소드를 사용하는 것이다. 이는 일반적인 map()
함수와 비슷한데, 각각의 맵핑이 별도의 프로세스에 할당되어 최대 워커 개수만큼 동시에 진행된다는 차이가 있다. 하나의 워커 프로세스에서 처리가 끝나 해당 워커가 유휴상태가 되면 자동으로 다음 work load를 할당받아 데이터를 처리해나간다. 모든 작업이 완료되면 처리 결과가 모여 반복가능 객체로 리턴된다.
간단한 예제를 통해서 사용하는 법을 살펴보자. 함수 f()
는 일련의 값들을 받아서 각 값과 그 순번을 곱한 값의 합계를 리턴한다. 그 과정에서 시간이 오래 걸리는 작업처럼 보이기 위해서 각 값만큼 멈춰있도록 한다.
from concurrent.futures import ProcessPoolExecutor as Pool
from random import random
from time import sleep
def f(*xs):
s = 0
for i, x in enumerate(xs):
s += (i + 1) * x
time.sleep(x)
return s
def main():
pool = Pool(max_workers=4)
data = [(0.2 + random() * 0.2 for _ in range(4)) for _ in range(20)]
result = pool.map(f, data)
for res in result:
print(res)
__if__ == "__main__":
main()
프로세스 풀은 최대 4개의 워커로 구성되도록 만들고 나면 나머지는 그냥 map() 함수를 쓰는 것과 사용법은 동일하다.1물론, 멀티 프로세스로 처리되므로 최상위 레벨에서는 함수 외에 어떤 변수를 정의해서는 안된다. 문법적으로는 아주 약간의 변화만 가지고 간단하게 분산 처리의 효과를 얻을 수 있다.
Future를 통한 분산처리
최종 결과만 필요한 경우에는 앞서 소개한 방법으로도 충분하겠다. 하지만 일정한 시간 제약 이내에 처리된 결과만 취하려하거나, 먼저 처리된 결과부터 빨리 다음 작업으로 처리하고 싶은 등, 작업의 처리 흐름을 좀 더 세밀하게 다루고 싶다면 Future
객체를 만들고 wait()
나 as_complete()
함수를 사용한다.
pool.submit()
: 작업할 함수와 그 인자를 넘겨서 Future 객체를 만든다. 프로세스/스레드 풀에 전달된 작업은 자동으로 스케줄링되고, 각 작업의 완료 여부 및 처리 결과는 각각의 Future 객체의 메소드들을 통해 얻을 수 있다.pool.map()
과 달리pool.submit()
은 거의 즉시 리턴하므로 결과를 얻기 위해서는 이렇게 생성한 Future 객체의 모음(collection)에 대해서 결과를 기다리는wait()
함수를 사용한다. 이 함수는 asyncio의wait()
와 유사하다. 기본적으로 모든 작업이 완료될 때까지 기다리지만, 선택적 인자를 설정하여 타임아웃을 설정하거나, 가장 빠른 1개 작업이 완료될 때 까지만 기다리게 할 수 있다. 모든 작업은 (done, pending) 의 튜플 형태로 리턴되며, 여기에는 완료된 작업과 아직 처리가 덜 끝난 작업들이 각각 set 로 모여있게 된다.as_complete()
함수는 for 문에 사용할 수 있는데, 제출된 작업들 중에서 완료된 순서대로 for 문을 통해 처리할 수 있다. 작업 전체가 끝날 때까지 계속 기다리지 않고 먼저 끝난 작업들의 결과를 최대한 빨리 얻어서 순서대로 처리하고 싶을 때 사용한다.
Future 객체 만들기
프로세스/스레드 풀을 만들고 작업할 함수와 그 함수에 전달할 인자를 submit()하면 Future 객체가 즉시 리턴되고, 해당 작업은 풀 내의 프로세스에 할당되어 시작된다. 이제 메인 프로세스에서는 다른 작업을 수행한 후 작업의 결과가 필요해지는 시점에 Future 객체의 result()
메소드를 호출한다. 만약 이미 작업이 완료되었다면 결과가 즉시 리턴될 것이고, 그렇지 않다면 작업이 완료될 때까지 result()
메소드가 실행 흐름을 block할 것이다.
fut = pool.submit(f, (1,2,3,4))
print('started...') #=> 즉시 출력됨
print(fut.result()) #=> 10초 후 출력됨
이 패턴을 분산처리에 적용하려면 각각의 work load에 이를 적용해서 Future의 리스트나 세트를 만든다. 그리고 원하는 결과 사용 패턴에 맞게 적절하게 응답을 기다리는 방법을 사용하면 된다.
다음 코드는 concurrent.futures.wait()
함수를 사용하여 주어진 제한 시간 내에 완료된 작업의 결과만 보는 방법을 소개한 것이다.
def main2():
# 제한시간 3초 이내에 끝난 결과들
pool = Pool(max_workers=4)
data = {(random() * 0.2 + 0.2 for _ in range(4))
for _ in range(20)}
futs = { pool.submit(f, d) for d in data }
done, pending = wait(futs, timeout=3)
for d in done:
print(d.result())
또 한편으로는 다음과 같이 먼저 끝나는 작업부터 결과를 사용하는 방법도 소개한다.
def main3():
pool = Pool(max_workers=4)
data = [(random() * 0.2 + 0.2 for _ in range(4)) for _ in range(20)]
futs = {pool.submit(f, d) for d in data}
for f in as_completed(futs):
print(f.result())