Barrier를 사용한 스레드/프로세스 동기화 예제

Barrier는 스레드를 사용하는 동시성 프로그래밍에서 특정 작업의 시작 시점을 동기화하는 수단으로, 여러 스레드가 특정한 시점까지 서로를 기다리다가 동시에 실행 흐름을 재개할 때 사용할 수 있다. 이는 보통 서버-클라이언트의 역할을 하는 각각의 스레드가 서로의 준비 과정을 기다리다 동시에 시작하도록 할 때 유용하게 사용될 수 있다.

Barrier를 사용한 스레드/프로세스 동기화 예제 더보기

Python 병렬처리 예제

concurrent.futures 를 사용한 병렬처리

멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.

  1. 병렬처리를 위해 작업을 스케줄링하는 부분은 concurrent.futures.ProcessPoolExecutor 클래스의 인스턴스가 담당한다. 사용자는 .submit() 메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map() 메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
  2. .map() 메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.

보통은 .map 을 이용하면 되는데, Future클래스를 이용하는 방법도 있다. Executor의 .submit() 메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.

이를 wait 함수나 as_completed 함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.

결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__ 모듈인지 체크하는 로직이 있어야 한다.

예제

오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.


def is_prime(n): if n < 2: return False if n is 2 or n is 3: return True if n % 2 is 0 or n % 3 is 0: return False if n < 9: return True k, l = 5, n ** 0.5 while k <= l: if n % k is 0 or n % (k+2) is 0: return False k += 6 return True print(sum((x for x in range(2, 2000000) if is_prime(x))))

이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.

먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s

중간에 폼나게 범위와 결과를 출력하도록 했다.

이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.

import concurrent.futures
from functools import partial

def main():
    r = 100000 # 업무를 나누는 단위
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.

최종 코드는 다음과 같다.

import concurrent.futures
from functools import partial

def is_prime(n):
    if n < 2:
        return False
    if n is 2 or n is 3:
        return True
    if n % 2 is 0 or n % 3 is 0:
        return False
    if n < 9:
        return True
    k, l = 5, n ** 0.5
    while k <= l:
        if n % k is 0 or n % (k+2) is 0:
            return False
        k += 6
    return True

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s



def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

exe.map() 메소드를 쓰지 않고 Futures의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.

def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
        done, _ = concurrent.futures.wait(fs)
        result = sum((f.result() for f in done))
        print(result)

exe.submit()을 이용해서 Future 객체를 받고 큐에 넣은다음, wait() 함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.

코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…

$ time pypy e010.py
142913828922

real    0m0.765s
user    0m0.000s
sys     0m0.015s

pypy의 무식한 성능이 깡패라는 것.

파이썬의 새로운 병렬처리 API – Concurrent.futures

어떤 처리량이 많은 작업을 작은 단위로 쪼개거나, 현재 진행되는 흐름과 독립적으로 병렬적인 처리를 하기 위해서 멀티스레드나 멀티프로세스를 사용하는 경우가 (지금까지는 드물지만) 종종 있다.

이전에는 Threading.ThreadMultiprocessing.Process 를 이용해서 각각의 스레드나 별도 프로세를 제어하는 방식을 사용했다. 파이썬 3.2에서 이러한 비동기 실행을 위한 API를 보다 고수준으로 만들고 사용하기 쉽도록 개선한 concurrent.futures 모듈이 도입되었다.

concurrent.futures 모듈

https://docs.python.org/3/library/concurrent.futures.html#module-concurrent.futures

이 모듈은 멀티프로세싱 및 멀티스레딩을 위한 API를 제공한다. 이전의 스레드, 프로세스 관련  API들이 C 기반의 코드를 래핑하는 수준으로 개발되어 있어서 사용하기도 복잡하고, 스레드나 락 객체를 직접 제어해야 하던 부분들과 여러 스레드들을 다시 동기화하는 작업들이 어려웠던 점 등 여러 모로 까다로운 부분이 있었다.

concurrent.futures 모듈은 이러한 점들을 개선하면서 사용하기 쉽고 스레드와 프로세스를 사용하는 API를 통일하고, 특히 비동기 코루틴과 거의 유사한 형태의 API를 제공하여, 현대적인 자바스크립트의 비동기 Task 프로토콜인 Promise와 유사한 Future라는 클래스를 도입하여 보다 깔끔하게 병렬처리 코드를 작성할 수 있게 해준다. 파이썬의 새로운 병렬처리 API – Concurrent.futures 더보기

[iOS/OSX] 특정 작업을 병렬로 처리하기

“동시에 진행되는 작업”을 처리하기 위해서는 iOS 및 OSX 환경에서는 크게 두 가지 방법을 (흔히) 사용한다. GCD (dispatch queue)와 Operation Queue가 그것이다. 오퍼레이션 큐는 GCD의 Objective-C 버전이라 할 만큼 비슷한데 (사실 좀 다르기는 다르다) 어쨌거나 이 두 가지 방법은 스레드의 생성과 관리를 시스템이 알아서 처리해주는 레벨로 가지고 내려가기 때문에 실제로 프로그래머가 신경써야 할 부분을 “동시에 진행되는 작업을 처리”하는 부분에만 집중하면 되도록 해준다.

예를 들면 네트워크를 통해 데이터를 로드해야 하는 경우나 그 반대로 네트워크를 통해 데이터를 저장해야 하는 경우에 응답이 느리다면 (이는 디스크 같은 영구 저장소를 액세스할 때도 일어날 수 있다. 아주 미묘한 수준이기는 하나 이런 작업은 앱에 blocking을 가져오고 UI에 대한 반응을 느리게 만든다) 이 작업의 처리를 기다리는 동안 앱은 사용자의 터치에 반응하지 못하고 계속 대기하게 될 것이다. 따라서 사용자 경험의 품질이 매우 나빠질 수 있다. 이런 경우에는 “동시작업 처리”를 하도록 해주는 것이 좋다. 동시작업 처리를 사용하면 멀티 코어 프로세서를 효율적으로 사용할 수 있고, 시스템을 보다 “바삐” 움직이게 할 수 있기 때문이다.

(*여기서 주목해야 할 부분은 “동시작업 처리”를 “멀티 스레드”로 기재하지 않은 것이다. GCD에서 동시작업을 처리하는 것은 “디스패치 큐”를 분리하여 동시에 2개 이상의 작업을 진행시키는 것인데, 놀라운 점은 GCD를 사용한 동시작업은 해당 작업에서 다시 스레드를 생성하지 않는 이상, 모두 메인 스레드에서 돌아간다. 따라서 멀티스레드가 아닌 경우가 있을 수 있다.)

NSOperationQueue를 통한 동시작업

먼저 NSOperationQueue를 사용하는 경우를 살펴보도록 하자.

만약 Block 객체를 사용하는 코딩 문법에 조금 익숙하다면 (특히 이는 애니메이션과 관련한 새로운 메소드에서 자주 등장한다. 우리가 익힌 바 있는 UIDocument 관련 글에서도 본 적이 있을 것이다.) 상당히 쉽게 익숙해질 수 있다. 즉 NSOperation은 코드 블럭과 같이 “일련의 작업을 지시하는 코드”를 객체로 만들어 이를 별도의 큐에서 실행하도록 하는 방식이다. 이 때 스레드의 생성과 관리는 큐가 알아서 하게 되므로 여전히 스레드 관리에 대한 크나큰 부담을 덜 수 있게 되는 것이다.

작업 객체 생성

NSOperation은 우리가 작업해야 하는 코드를 담는 객체인데, 이를 활용하는 방법에는 다음 세 가지가 있다.

  • NSInvocationOperation
  • NSBlockOperation
  • subclassing NSOperation
먼저 NSInvocation은 특정 객체의 메소드를 작업 객체로 만들어버리는 방법이다. 즉, 다른 스레드에서 동시 처리를 해야할 메소드를 가진 객체가 있다면, 그 객체의 메소드를 동시 처리 작업으로 만들 수 있다.
NSInvocationOperation *theOp = [[NSInvocationOperation alloc] 
                                      initWithTarget:self 
                                            selector:@selector(doMyTask:) 
                                              object:withData];

NSBlockOperation 객체는 코드 블럭을 사용해서 작업 객체를 만들 수 있다.

NSBlockOperation *theBlockOp = [NSBlockOperation blockOperationWithBlock:^{
    NSLog(@"Block has started");
}];

// 아래와 같이 블럭을 계속 추가해 나갈 수 있음
[theBlockOp addBlock:^{
    // do something...
}];

혹은 NSOperation 객체를 새로 생성할 수도 있다. (이에 대한 자세한 내용은 다른 글에서 다뤄볼까 한다.)

작업 객체의 실행

작업 객체는 물론 그대로도 실행이 가능하다. 하지만 특별히 멀티 스레드로 동작하도록 작업 객체를 커스터마이징 하지 않은 경우라면 이런 작업들은 메인 스레드에서 돌아가는 함수와 동일하다. (즉, 동시작업으로 처리되지 않는다.) 별도의 스레드에서 동시 작업으로 처리되도록 하려면 NSOperationQueue 객체를 생성하여, 이 곳에 앞서 말한 방법으로 생성된 작업객체를 추가해주면 된다.

NSOperationQueue *aQueue = [[NSOperationQueue alloc] init];
[aQueue addOperation:anOp];

작업이 추가되면 큐 객체는 자동으로 스레드를 만들고 먼저 큐에 추가된 순서대로 작업 객체에 start 메시지를 보내어 각각의 작업을 시작하게 된다.

단순한 예제를 만들 때 유의할 점은, 큐가 스레드를 새로 생성할 때는 약간의 시간이 걸리는 데, 그 사이에 메인 스레드가 종료되어 버린다면 큐에 담긴 작업이 아예 처리되지 못하고 프로그램이 종료될 수도 있다.

이런 예제와 같은 경우에는 큐를 처리하는 동안 큐를 생성했던 현재 스레드를 잠깐 멈추게 하여 큐가 처리된 이후에 그 다음 작업을 실행해주는 방법도 있다.

[aQueue waitUntilAllOperationsAreFinished];

하지만 이렇게 큐의 작업이 처리되는 것을 기다리는 것은 성능에도 좋지 않은 영향을 미치고 (왜냐면 그만큼 메인 스레드가 블럭킹을 당하고 잠기기 때문에) 되려 동시 작업성을 저해하는 결과를 가져오기 때문에 가능하면 쓰지 말 것을 권한다.

큐에서 작업을 시작할 때는 작업 객체에 start 메시지를 보낸다. 이와 같은 방법으로 NSInvocationOperation 객체나 NSBlockOperation 객체에 start 메시지를 보내 해당 작업을 실행시킬 수 있다. 하지만 메인 스레드에서 명시적으로 이런 작업을 실행하는 것은 그냥 코드 블럭을 실행하는 것과 아무런 차이가 없게된다.

Dispatch Queue 사용하기

Dispatch Queue도 큐에 코드 블럭을 밀어넣어 실행하는 것과 유사하게 디스패치 큐에 작업(코드 블럭)을 넣고 이를 동시에 실행시키는 방법이다. 동시 작업으로 진행될 작업은 메인 스레드에서 함께 돌아간다. 이것이 오퍼레이션 큐와의 가장 큰 차이점이라 하겠다. (실은 항상 메인 스레드에서 돌아가는지는 모르겠다. 동시에 처리되는 작업의 개수도 시스템이 코어의 개수나 시스템에 현재 걸려 있는 부하에 따라 자동으로 판별한다.

즉 동시 작업으로 병렬처리되는 일이 종료되었을 때 어떤 일이 이어서 일어나게 만들고자 할 때 (이때는 델리게이션이나 KVO를 써도 되지만) 이 방법을 사용하는 것도 굉장히 쉽고 간단하다. GCD를 이용해서 병렬작업을 처리하는 가장 간단한 방법은 글로벌 큐를 사용하는 것이다. 물론 글로벌 큐를 사용하지 않고 별도의 큐를 생성하여 작업을 처리할 수도 있다. 단 이렇게 생성되는 큐는 serial 큐로, 추가된 순서대로 작업이 수행된다. 대신 글로벌 큐는 들어간 순서대로 작업이 시작되나, 큐에 들어간 작업은 가능한 많은 수가 동시에 실행되므로 먼저 들어간 작업이 먼저 끝난다고는 특정할 수 없다.

큐에 작업을 추가하여 실행하기 위해서는 dispatch_async 함수를 사용한다. 이 함수에 수행할 작업을 코드 블럭으로 넘겨서 수행하도록 할 수 있다. 이 함수를 호출한 직후 프로그램의 흐름은 다음 라인으로 넘어가고, 디스패치 큐는 이와 동시에 넘겨진 작업을 즉시 처리하게 된다. 다음과 같이 글로벌 큐를 적용한 아주 간단한 코드를 사용해서 병렬 작업을 수행할 수 있다.

dispatch_queue_t aQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(aQueue, ^{
        // 병렬적으로 진행할 코드
        });

매우 큰 DB에서 값을 검색해 오거나, 인터넷을 통해 데이터를 다운로드 받아서 처리해야 하거나, 영구저장소에 저장된 파일을 액세스 하는 등 시간이 걸릴 수 있는 일을 처리하는 경우에는 메인스레드가 blocking 될 수 있으므로 이렇게 처리해주면 백그라운드에서 돌아가는 것처럼 처리되고 UI 반응은 멈추지 않고 계속 이루어질 수 있다.

만약 저렇게 큐에서 돌아가는 작업이 끝나거나 혹은 그 중간에 UI를 업데이트 하거나 해야 한다면, UI 갱신을 처리하는 부분은 메인 큐이므로 메인 큐에서 필요한 작업을 처리할 수 있다. 즉 메인 큐 ▶ 글로벌 큐에서 동시작업 ▶ 메인큐에서 작업 하는 식으로 중간에 메인 큐에 끼어들 수도 있다.

dispatch_queue_t aQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_async(aQueue, ^{
        // 병렬적으로 진행할 코드
        dispatch_async(dispatch_get_main_queue(), ^{
            //메인큐에서 UI 업데이트 등을 실행
            });
        });

참고자료 : Concurrency Programming Guide