asyncio의 동기화수단들

asyncio는 단일 스레드에서 비동기 코루틴을 사용하여 동시성 처리를 한다. 따라서 asyncio의 세계에서는 적어도 멀티 스레드에서 발생할 수 있는 자원 선점문제가 없을 것이라 생각할 수 있다. 전적으로 틀린 것은 아니다. 스레드가 1개밖에 없기 때문에 메모리 내의 특정한 객체를 동시에 액세스하는 일은 없을 것이다. 그러나 그외의 IO와 관련된 자원은 여전히 선점 문제가 발생할 수 있다. 이러한 문제를 피하기 위해서 asyncio는 threading과 유사한 동기화 수단들을 제공하고 있으며, 이들의 사용 방법 또한 거의 유사하다. asyncio에서 제공하는 동기화 수단에는 다음과 같은 것들이 있다.

  • 락(Lock)
  • 이벤트(Event)
  • 컨디션(Condition)
  • 세마포어(Semaphore)
  • 바운디드세마포어(BoundedSemaphore)
asyncio의 동기화수단들 더보기

컨디션을 통한 스레드 동기화 예제

동시성을 다룰 때에는 특정한 자원을 동시에 액세스하지 못하도록 관리하거나 여러 작업들이 시작되는 시점을 맞추는 동기화 수단이 필요할 수 있다. Lock은 특정 코드 영역을 동시에 여러 스레드가 실행하지 못하도록 보호할 때 사용하며, 이벤트는 여러 스레드들이 특정 이벤트가 발생할 때까지 기다리다가 동시에 시작될 수 있도록 한다. 컨디션(Condition)은 락과 이벤트가 결합되어 있는 동기화 수단이다.

컨디션은 락을 내재하고 있는 이벤트라 할 수 있다. 락과 마찬가지로 acquire() ~ release() 구간이 있어 한 번에 하나의 스레드/프로세스가 실행되는 영역을 만들 수 있는데, 그 사이에 wait()를 통해서 이벤트를 기다릴 수 있다. 이때 한 스레드가 락을 잠근 상태에서 wait()를 호출하여 이벤트를 기다리게 되면, 같은 컨디션 객체를 점유하고자 하는 스레드가 다시 락을 얻어서 크리티컬 영역에 진입할 수 있다. 이와 같은 방식으로 여러 스레드가 크리티컬 영역에서 이벤트를 기다리는 상태가 될 때, 누군가가 해당 컨디션 이벤트를 set()하게 되면 대기 중인 모든 스레드가 깨어나게 된다. 하지만 이들은 모두 같은 크리티컬 영역에서 대기 중이었기 때문에 일반 이벤트와 달리 한꺼번에 동시에 시작하지 않고, 한 번에 하나씩 크리티컬 영역의 코드를 실행한다. 깨어난 스레드가 락을 릴리즈하는 시점에 wait()를 끝낸 다른 스레드가 실행되는 식으로 순차적으로 크리티컬 구간을 지나게 된다.

컨디션을 통한 스레드 동기화 예제 더보기

concurrent.futures를 이용한 병렬처리 예제 – Python

concurrent.futures 를 사용한 병렬처리

멀티스레드로 처리하는 부분은 그냥 동시에 돌린다 뿐이지, 전체적인 수행시간을 줄이는 부분은 아니라서, 여기서는 프로세스 풀을 이용하는 방법을 설명한다.

  1. 병렬처리를 위해 작업을 스케줄링하는 부분은 concurrent.futures.ProcessPoolExecutor 클래스의 인스턴스가 담당한다. 사용자는 .submit() 메소드를 이용해서 특정한 동작을 스케줄링하도록 요청하거나, .map() 메소드를 이용해서 입력데이터와 동작함수를 짝지어서 바로 스케줄링할 수 있다.
  2. .map() 메소드는 이터레이터를 리턴하는데, 이는 각 개별 작업이 동시에 실행된 후, 먼저 종료된 작업부터 내놓는 리턴값을 내놓게 된다.

보통은 .map 을 이용하면 되는데, Future클래스를 이용하는 방법도 있다. Executor의 .submit() 메소드를 이용하면, 여러 인자를 전달하여 하나의 Future 객체를 받는다. 이 객체는 큐에 들어가지만 아직 실행을 시작하지는 않는다.

이를 wait 함수나 as_completed 함수를 이용해서 한 번에 돌릴 수 있다. (asyncio와 동일하다) wait의 경우에는 특정한 타임아웃까지 기다렸다가 완료된 Future와 그렇지 않은 Future의 세트를 리턴하며, as_completed는 이터레이터를 리턴하므로 for 문에서 유용하게 쓰일 수 있다.

결정적으로 멀티프로세스를 이용하는 경우, 해당 스크립트 파일이 매번 하위 프로세스로 반입된다. 따라서 이 경우에는 반드시 __main__ 모듈인지 체크하는 로직이 있어야 한다.

예제

오일러 프로젝트의 10번 문제는 2백만 이하의 모든 소수의 합을 구하는 문제이다. 체를 이용하지 않고 모든 경우를 검사하여 합을 구하는 코드를 보자.


def is_prime(n): if n < 2: return False if n is 2 or n is 3: return True if n % 2 is 0 or n % 3 is 0: return False if n < 9: return True k, l = 5, n ** 0.5 while k <= l: if n % k is 0 or n % (k+2) is 0: return False k += 6 return True print(sum((x for x in range(2, 2000000) if is_prime(x))))

이 코드는 약 18초 가량이 소요된다. 소요 시간을 줄이기 위해서 다중프로세스 환경에서 돌아가도록 코드를 손 보겠다.

먼저 특정한 구간으로 나눠서 각각의 프로세스가 답을 계산하고 그 값을 리턴하도록한 다음, 리턴 받은 값을 합산하면 된다. 먼저 특정한 하나의 구간에 대해 소수의 합을 구하는 함수를 보자.

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s

중간에 폼나게 범위와 결과를 출력하도록 했다.

이제 작업을 쪼개어 전달하는 메인 함수를 만들 차례이다.

import concurrent.futures
from functools import partial

def main():
    r = 100000 # 업무를 나누는 단위
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

여기서 중요한 부분은 if __name__ == "__main__": 부분인데, 자식 프로세스에서 실행되는 워커는 작업에 필요한 함수 정보를 얻기 위해서 본 파일을 import 하게 된다. 따라서 __main__ 모듈과 그렇지 않은 모듈의 행동양식이 구분되어야 한다. 그리고 반드시 __main__ 모듈은 있어야 하기 때문에 REPL 환경에서는 멀티프로세스 코드를 실행할 수 없다.

최종 코드는 다음과 같다.

import concurrent.futures
from functools import partial

def is_prime(n):
    if n < 2:
        return False
    if n is 2 or n is 3:
        return True
    if n % 2 is 0 or n % 3 is 0:
        return False
    if n < 9:
        return True
    k, l = 5, n ** 0.5
    while k <= l:
        if n % k is 0 or n % (k+2) is 0:
            return False
        k += 6
    return True

def process(n, r=10000):
    print("processing: {} ..< {}".format(n, n+r), end="... ")
    s = sum((x for x in range(n, n+r) if is_prime(x) if x <= 2000000))
    print(s)
    return s



def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        result = 0
        for i in exe.map(partial(process, r=r), range(0, 2000000, r)):
            result += i
            print(result)
        print(result)

if __name__ == "__main__":
    main()

exe.map() 메소드를 쓰지 않고 Futures의 기능을 이용하는 형태로 코드를 조금 고쳐보았다.

def main():
    r = 50000
    with concurrent.futures.ProcessPoolExecutor(max_workers=2) as exe:
        fs = {exe.submit(process, n, r) for n in range(0, 2000000, r)}
        done, _ = concurrent.futures.wait(fs)
        result = sum((f.result() for f in done))
        print(result)

exe.submit()을 이용해서 Future 객체를 받고 큐에 넣은다음, wait() 함수를 통해서 완료, 미완료 작업을 받아, 완료된 것 내에서 결과값을 꺼내어 합산한다.

코어 수가 많으면 많을 수록 (나는 듀얼코어라서 max_worker를 2로 했는데) 시간을 단축한다. 분할하여 동시 처리한 경우 전체 소요 시간은 약 10초 내외였다. 문제는…

$ time pypy e010.py
142913828922

real    0m0.765s
user    0m0.000s
sys     0m0.015s

pypy의 무식한 성능이 깡패라는 것.