Python 병렬처리 예제

concurrent.futures 를 사용한 병렬처리

멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.

  1. 병렬처리를 위해 작업을 스케줄링하는 부분은 concurrent.futures.ProcessPoolExecutor 클래스의 인스턴스가 담당한다. 사용자는 .submit() 메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map() 메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
  2. .map() 메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.

보통은 .map 을 이용하면 되는데, Future클래스를 이용하는 방법도 있다. Executor의 .submit() 메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.

이를 wait 함수나 as_completed 함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.

결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__ 모듈인지 체크하는 로직이 있어야 한다.

예제

오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.


def is_prime(n): if n < 2: return False if n is 2 or n is 3: return True if n % 2 is 0 or n % 3 is 0: return False if n < 9: return True k, l = 5, n ** 0.5 while k <= l: if n % k is 0 or n % (k+2) is 0: return False k += 6 return True print(sum((x for x in range(2, 2000000) if is_prime(x))))

이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.

먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s

중간에 폼나게 범위와 결과를 출력하도록 했다.

이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.

import concurrent.futures
from functools import partial

def main():
    r = 100000 # 업무를 나누는 단위
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.

최종 코드는 다음과 같다.

import concurrent.futures
from functools import partial

def is_prime(n):
    if n < 2:
        return False
    if n is 2 or n is 3:
        return True
    if n % 2 is 0 or n % 3 is 0:
        return False
    if n < 9:
        return True
    k, l = 5, n ** 0.5
    while k <= l:
        if n % k is 0 or n % (k+2) is 0:
            return False
        k += 6
    return True

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s



def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

exe.map() 메소드를 쓰지 않고 Futures의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.

def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
        done, _ = concurrent.futures.wait(fs)
        result = sum((f.result() for f in done))
        print(result)

exe.submit()을 이용해서 Future 객체를 받고 큐에 넣은다음, wait() 함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.

코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…

$ time pypy e010.py
142913828922

real    0m0.765s
user    0m0.000s
sys     0m0.015s

pypy의 무식한 성능이 깡패라는 것.