Python 병렬처리 예제

concurrent.futures 를 사용한 병렬처리

멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.

  1. 병렬처리를 위해 작업을 스케줄링하는 부분은 concurrent.futures.ProcessPoolExecutor 클래스의 인스턴스가 담당한다. 사용자는 .submit() 메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map() 메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
  2. .map() 메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.

보통은 .map 을 이용하면 되는데, Future클래스를 이용하는 방법도 있다. Executor의 .submit() 메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.

이를 wait 함수나 as_completed 함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.

결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__ 모듈인지 체크하는 로직이 있어야 한다.

예제

오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.


def is_prime(n): if n < 2: return False if n is 2 or n is 3: return True if n % 2 is 0 or n % 3 is 0: return False if n < 9: return True k, l = 5, n ** 0.5 while k <= l: if n % k is 0 or n % (k+2) is 0: return False k += 6 return True print(sum((x for x in range(2, 2000000) if is_prime(x))))

이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.

먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s

중간에 폼나게 범위와 결과를 출력하도록 했다.

이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.

import concurrent.futures
from functools import partial

def main():
    r = 100000 # 업무를 나누는 단위
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.

최종 코드는 다음과 같다.

import concurrent.futures
from functools import partial

def is_prime(n):
    if n < 2:
        return False
    if n is 2 or n is 3:
        return True
    if n % 2 is 0 or n % 3 is 0:
        return False
    if n < 9:
        return True
    k, l = 5, n ** 0.5
    while k <= l:
        if n % k is 0 or n % (k+2) is 0:
            return False
        k += 6
    return True

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s



def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

exe.map() 메소드를 쓰지 않고 Futures의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.

def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
        done, _ = concurrent.futures.wait(fs)
        result = sum((f.result() for f in done))
        print(result)

exe.submit()을 이용해서 Future 객체를 받고 큐에 넣은다음, wait() 함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.

코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…

$ time pypy e010.py
142913828922

real    0m0.765s
user    0m0.000s
sys     0m0.015s

pypy의 무식한 성능이 깡패라는 것.

GCD in Swift

GCD in Swift

Swift에서도 GCD를 여전히 쓸 수 있다. 먼저 dispatch_async 함수는 Objective-C 에서는 아래와 같이 쓴다.

void dispatch_async(dispatch_queue_t queue, dispatch_block_t block);

똑같은 방식으로 swift에서도 아래와 같이 정의된다.

func dispatch_async(queue:dispatch_queue_t!, block: dispatch_block_t!)

물론 swift에서 코드 블럭은 클로져이고, trailing closure 문법을 이용하면 보통은

dispatch_async(dispatch_get_main_queue()){
    println("Currently dispatched asynchronously.")
}

이런 식으로 쓸 수 있다. GCD in Swift 더보기

파이썬의 새로운 병렬처리 API – Concurrent.futures

어떤 처리량이 많은 작업을 작은 단위로 쪼개거나, 현재 진행되는 흐름과 독립적으로 병렬적인 처리를 하기 위해서 멀티스레드나 멀티프로세스를 사용하는 경우가 (지금까지는 드물지만) 종종 있다.

이전에는 Threading.ThreadMultiprocessing.Process 를 이용해서 각각의 스레드나 별도 프로세를 제어하는 방식을 사용했다. 파이썬 3.2에서 이러한 비동기 실행을 위한 API를 보다 고수준으로 만들고 사용하기 쉽도록 개선한 concurrent.futures 모듈이 도입되었다.

concurrent.futures 모듈

https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

이 모듈은 멀티프로세싱 및 멀티스레딩을 위한 API를 제공한다. 이전의 스레드, 프로세스 관련  API들이 C 기반의 코드를 래핑하는 수준으로 개발되어 있어서 사용하기도 복잡하고, 스레드나 락 객체를 직접 제어해야 하던 부분들과 여러 스레드들을 다시 동기화하는 작업들이 어려웠던 점 등 여러 모로 까다로운 부분이 있었다.

concurrent.futures 모듈은 이러한 점들을 개선하면서 사용하기 쉽고 스레드와 프로세스를 사용하는 API를 통일하고, 특히 비동기 코루틴과 거의 유사한 형태의 API를 제공하여, 현대적인 자바스크립트의 비동기 Task 프로토콜인 Promise와 유사한 Future라는 클래스를 도입하여 보다 깔끔하게 병렬처리 코드를 작성할 수 있게 해준다. 파이썬의 새로운 병렬처리 API – Concurrent.futures 더보기

NSOperation Tutorial in Swift

NSOperation in Swift

http://www.raywenderlich.com/76341/use-nsoperation-nsoperationqueue-swift

버튼을 탭하거나 텍스트 편집을 시작할 때 iOS/Mac앱이 반응을 멈추는 당혹스러운 경험을 해본적이 있을 것이다. Mac 앱이라면 마우스포인터(흔히 말하는 커서)가 형형색색의 비치볼로 변하는 것으로 지금 UI 반응을 할 수 없다는 것을 알려주는데, iOS앱에서는 이런 기제가 없으므로 사용자는 항상 UI에 반응할 수 있다고 기대하게 된다. 반응하지 않는 앱은 문제가 있거나 느리다고 느껴지고 리뷰에서 좋은 평가를 받기 힘들다.

앱이 항상 반응하도록 하는 것은 말처럼 쉽지 않다. 앱이 한가지 이상의 일을 동시에(사용자 터치에 반응하면서 다른 작업을 하는)해야 한다면 순식간에 여러가지 것들이 꼬이기 쉽다. 메인 런루프에서는 많은 작업을 처리할 시간이 없고 이는 오롯이 UI 반응에 집중해야 한다.

이제 불쌍한 개발자는 메인스레드에서 병렬작업으로 이행해야 한다. 병렬작업은 동시에 여러 개의 작업 스트림이 진행된다는 의미이며, 이를 통해 메인스레드는 항상 사용자의 터치에 반응하게 된다.

iOS에서 이런 작업을 수행하는 방법 중 하나는 NSOperation과 NSOperationQueue를 사용하는 것이다. 먼저 병렬작업을 사용하지 않은 앱을 만들어보자. 이 앱은 매우 버벅이고 느릴 것이다. 그리고 이 앱에 병렬작업을 추가하면 보다 반응이 좋은 UI를 제공하게 될 것이다. NSOperation Tutorial in Swift 더보기