concurrent.futures 를 사용한 병렬처리
멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.
- 병렬처리를 위해 작업을 스케줄링하는 부분은
concurrent.futures.ProcessPoolExecutor
클래스의 인스턴스가 담당한다. 사용자는 .submit()
메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map()
메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
.map()
메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.
보통은 .map
을 이용하면 되는데, Future
클래스를 이용하는 방법도 있다. Executor의 .submit()
메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.
이를 wait
함수나 as_completed
함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed
는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.
결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__
모듈인지 체크하는 로직이 있어야 한다.
예제
오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.
def is_prime(n):
if n < 2:
return False
if n is 2 or n is 3:
return True
if n % 2 is 0 or n % 3 is 0:
return False
if n < 9:
return True
k, l = 5, n ** 0.5
while k <= l:
if n % k is 0 or n % (k+2) is 0:
return False
k += 6
return True
print(sum((x for x in range(2, 2000000) if is_prime(x))))
이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.
먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.
def process(n, r=10000):
print("processing: {} ..< {}".format(n, n+r), end="... ")
s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
print(s)
return s
중간에 폼나게 범위와 결과를 출력하도록 했다.
이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.
import concurrent.futures
from functools import partial
def main():
r = 100000 # 업무를 나누는 단위
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
result = 0
for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
result += i
print(result)
print(result)
if __name__ == "__main__":
main()
여기서 중요한 부분은 if __name__ == "__main__":
부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__
모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__
모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.
최종 코드는 다음과 같다.
import concurrent.futures
from functools import partial
def is_prime(n):
if n < 2:
return False
if n is 2 or n is 3:
return True
if n % 2 is 0 or n % 3 is 0:
return False
if n < 9:
return True
k, l = 5, n ** 0.5
while k <= l:
if n % k is 0 or n % (k+2) is 0:
return False
k += 6
return True
def process(n, r=10000):
print("processing: {} ..< {}".format(n, n+r), end="... ")
s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
print(s)
return s
def main():
r = 50000
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
result = 0
for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
result += i
print(result)
print(result)
if __name__ == "__main__":
main()
exe.map()
메소드를 쓰지 않고 Futures
의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.
def main():
r = 50000
with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
done, _ = concurrent.futures.wait(fs)
result = sum((f.result() for f in done))
print(result)
exe.submit()
을 이용해서 Future 객체를 받고 큐에 넣은다음, wait()
함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.
코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…
$ time pypy e010.py
142913828922
real 0m0.765s
user 0m0.000s
sys 0m0.015s
pypy의 무식한 성능이 깡패라는 것.