Python 병렬처리 예제

concurrent.futures 를 사용한 병렬처리

멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.

  1. 병렬처리를 위해 작업을 스케줄링하는 부분은 concurrent.futures.ProcessPoolExecutor 클래스의 인스턴스가 담당한다. 사용자는 .submit() 메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map() 메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
  2. .map() 메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.

보통은 .map 을 이용하면 되는데, Future클래스를 이용하는 방법도 있다. Executor의 .submit() 메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.

이를 wait 함수나 as_completed 함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.

결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__ 모듈인지 체크하는 로직이 있어야 한다.

예제

오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.


def is_prime(n): if n < 2: return False if n is 2 or n is 3: return True if n % 2 is 0 or n % 3 is 0: return False if n < 9: return True k, l = 5, n ** 0.5 while k <= l: if n % k is 0 or n % (k+2) is 0: return False k += 6 return True print(sum((x for x in range(2, 2000000) if is_prime(x))))

이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.

먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s

중간에 폼나게 범위와 결과를 출력하도록 했다.

이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.

import concurrent.futures
from functools import partial

def main():
    r = 100000 # 업무를 나누는 단위
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.

최종 코드는 다음과 같다.

import concurrent.futures
from functools import partial

def is_prime(n):
    if n < 2:
        return False
    if n is 2 or n is 3:
        return True
    if n % 2 is 0 or n % 3 is 0:
        return False
    if n < 9:
        return True
    k, l = 5, n ** 0.5
    while k <= l:
        if n % k is 0 or n % (k+2) is 0:
            return False
        k += 6
    return True

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s



def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

exe.map() 메소드를 쓰지 않고 Futures의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.

def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
        done, _ = concurrent.futures.wait(fs)
        result = sum((f.result() for f in done))
        print(result)

exe.submit()을 이용해서 Future 객체를 받고 큐에 넣은다음, wait() 함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.

코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…

$ time pypy e010.py
142913828922

real    0m0.765s
user    0m0.000s
sys     0m0.015s

pypy의 무식한 성능이 깡패라는 것.

NSOperation Tutorial in Swift

NSOperation in Swift

http://www.raywenderlich.com/76341/use-nsoperation-nsoperationqueue-swift

버튼을 탭하거나 텍스트 편집을 시작할 때 iOS/Mac앱이 반응을 멈추는 당혹스러운 경험을 해본적이 있을 것이다. Mac 앱이라면 마우스포인터(흔히 말하는 커서)가 형형색색의 비치볼로 변하는 것으로 지금 UI 반응을 할 수 없다는 것을 알려주는데, iOS앱에서는 이런 기제가 없으므로 사용자는 항상 UI에 반응할 수 있다고 기대하게 된다. 반응하지 않는 앱은 문제가 있거나 느리다고 느껴지고 리뷰에서 좋은 평가를 받기 힘들다.

앱이 항상 반응하도록 하는 것은 말처럼 쉽지 않다. 앱이 한가지 이상의 일을 동시에(사용자 터치에 반응하면서 다른 작업을 하는)해야 한다면 순식간에 여러가지 것들이 꼬이기 쉽다. 메인 런루프에서는 많은 작업을 처리할 시간이 없고 이는 오롯이 UI 반응에 집중해야 한다.

이제 불쌍한 개발자는 메인스레드에서 병렬작업으로 이행해야 한다. 병렬작업은 동시에 여러 개의 작업 스트림이 진행된다는 의미이며, 이를 통해 메인스레드는 항상 사용자의 터치에 반응하게 된다.

iOS에서 이런 작업을 수행하는 방법 중 하나는 NSOperation과 NSOperationQueue를 사용하는 것이다. 먼저 병렬작업을 사용하지 않은 앱을 만들어보자. 이 앱은 매우 버벅이고 느릴 것이다. 그리고 이 앱에 병렬작업을 추가하면 보다 반응이 좋은 UI를 제공하게 될 것이다. NSOperation Tutorial in Swift 더보기

[Objective C] 다중처리 큐와 오퍼레이션

다음은 큐와 오퍼레이션을 사용할 때 염두에 두어야 하는 사항들이다.

1

오퍼레이션은 기본적으로 이를 시작한 스레드에서 돌아간다. 만약 오퍼레이션이 비동기적으로 작업하기를 원한다면 오퍼레이션 큐를 사용하거나 NSOperation의 서브 클래스를 만들어서 별도의 스레드에서 시작하도록 해야 한다.

2

하나의 오퍼레이션은 다른 오퍼레이션이 작업을 완료하고나서 그 작업을 시작한다. 흔한 실수는 두 개의 오퍼레이션이 서로에 대해 의존하도록 만드는 것이다. 이렇게 되면 두 개의 오퍼레이션은 서로를 계속 기다리기만 하고 아무런 작업을 하지 못한다. 결국 메모리를 소진하고 앱이 죽을 수도 있다.

3

오퍼레이션은 취소될 수 있다. 따라서 NSOperation을 서브클래싱할 때는 isCanceled 변수의 값을 확인해서 작업을 시작하기 전에 취소 여부를 확인해야 한다. 예를 들어 인터넷 연결을 20초 동안 기다리는 작업이 있다고 하면, 작업을 시작할 때 이를 확인해서 취소한 작업이 시작되지 않도록 한다. 만약 긴 시간이 걸리는 작업의 경우에는 반복적으로 isCanceled의 값을 확인해서 중간에 중지할 수 있도록 하는게 좋다.

4

오퍼레이션 객체는 isFinished, isReady, isExecuting 등의 값에 대해 KVO가 적용되어야 한다. 이들 값을 변경할 때는 willChangeValueForKey: , didChangeValueForKey: 를 명시해준다.

5

NSOperation을 서브클래싱할 때는 main 메소드에 대해 별도의 오토릴리즈 풀을 만들어야 한다. (이는 단순히 @autorelease{…} 블럭 안에 코드를 쓰면 된다!) main 스레드는 프로그램의 메인 함수인 main과 유사하다. 실질적으로 오퍼레이션에 start 메시지를 보내면 오퍼레이션은 자신의 main 메소드를 호출한다.

6

항상 생성한 오퍼레이션 객체에 대해서 참조를 유지해야 한다. 한 번 큐에 들어간 오퍼레이션을 얻어오는 것은 불가능하다. 따라서 큐에 집어 넣기 이전에 참조를 만들어서 가지고 있어야 한다.

앱이 시작되는 지점은 메인 스레드이다. UI의 갱신은 항상 메인 스레드에서 일어나므로, 시간이 많이 걸리는 작업을 메인 스레드에서 하게되면 그 작업 동안은 UI가 반응이 없거나 매우 느린 것으로 표시된다. 이 경우에는 별도의 스레드를 만들어 그 작업을 진행해야 한다.(메인 스레드가 계속 UI와 반응할 수 있도록)