asyncio의 동기화수단들

asyncio는 단일 스레드에서 비동기 코루틴을 사용하여 동시성 처리를 한다. 따라서 asyncio의 세계에서는 적어도 멀티 스레드에서 발생할 수 있는 자원 선점문제가 없을 것이라 생각할 수 있다. 전적으로 틀린 것은 아니다. 스레드가 1개밖에 없기 때문에 메모리 내의 특정한 객체를 동시에 액세스하는 일은 없을 것이다. 그러나 그외의 IO와 관련된 자원은 여전히 선점 문제가 발생할 수 있다. 이러한 문제를 피하기 위해서 asyncio는 threading과 유사한 동기화 수단들을 제공하고 있으며, 이들의 사용 방법 또한 거의 유사하다. asyncio에서 제공하는 동기화 수단에는 다음과 같은 것들이 있다.

  • 락(Lock)
  • 이벤트(Event)
  • 컨디션(Condition)
  • 세마포어(Semaphore)
  • 바운디드세마포어(BoundedSemaphore)

멀티스레드와의 차이점

threading 모듈에서 제공하는 동기화 수단들과 asyncio가 제공하는 것은 약간의 차이가 있다. 일단 RLock이 제공되지 않는다. 또한 BoundedSemaphore가 추가되었다. 그리고 락을 획득하거나 이벤트를 기다릴 때, timeout= 옵션이 제공되지 않는다. 또 한가지 중요한 점은 이 모든 동기화 수단들이 런루프 위에서 동작한다는 것이다. 런루프가 돌아가고 있지 않다면 잠겨있는 락을 릴리즈하거나 이벤트에 시그널을 보내도 다른 코루틴들이 제어권을 가져갈 스케줄을 잡을 수 없으므로 반응하지 않게 된다.

사용방법

락, 세마포어, 컨디션은 모두 획득-해제 매커니즘을 통해 동작하며, asyncio에서는 async with 문을 통해서 사용하는 비동기식 컨텍스트 매니저 프로토콜을 따르고 있다. 알아둬야 할 것은 이들에 대한 획득 액션은 대기 가능한 코루틴이지만, 릴리즈는 즉시 리턴하는 일반 메소드 함수라는 점이다.

asyncio.run()은 자체적으로 이벤트 루프를 관리한다. 따라서 전역 레벨에서 동기화수단이 생성되었다면 명시적으로 이벤트 루프를 만들었을 것인데, 이렇게 되면 asyncio.run()에서 만들어진 이벤트 루프와 별개의 루프에서 돌게 되므로 동작하지 않게 된다.

Lock을 통한 크리티컬 구간 설정

아래는 asyncio.Lock을 사용하여 크리티컬한 구간에서 어떤 자원을 독점적으로 사용하는 예이다. 스레드의 락과 완전히 동일하지만, 락을 잠글 수 있을 때까지 대기할 수 있어야 하므로 async with를 사용한다.

import asyncio
from random import uniform

loop = asyncio.get_event_loop()
lock = asyncio.Lock()
res = {'count': 0}


async def worker(name):
    for i in range(4):
        print(f'{name} > {i}')
        await asyncio.sleep(uniform(0.1, 0.5))
        print(f'{name} < {i}')
    
    # Lock을 사용한 크리티컬 구간
    async with lock:
        print(f'{name} entered critical area')
        for _ in range(3):
            r = res['count']
            await asyncio.sleep(0.4)
            # 0.4초 동안 다른 코루틴이 스레드를 가져갈 수 있지만
            # 같은 락을 사용할 수는 없음
            print(f'{name} is still in critical area')
            res['count'] += 1
            print(f"{name} : {r} -> {res['count']}")
            await asyncio.sleep(0.4)
    print(f'{name} leave critial area')

    for i in range(10):
        print(f'{name} > {i}')
    await asyncio.sleep(0.4)
    print(f'{name} < {i}')


async def main():
    fs = [worker(f'worker{i}') for i in range(4)]
    await asyncio.wait(fs)


if __name__ == '__main__':
    loop.run_until_complete(main())

위 코드가 실행된 예는 왼쪽의 그림과 같다. 각각의 워커는 약간의 지연을 사이에 두고 메시지를 출력한다. 이 때 중간 부분은 크리티컬한 영역이며, 따라서 진입할 때 Lock을 얻어서 들어가야 한다.

단일 스레드이기 때문에 res에 접근해서 변경하는 것이 기술적으로는 문제가 되지 않을 것이다. 하지만 이 코드는 루프를 3회 돌면서 카운터를 1씩 올리는 동안, 다른 코루틴에 의해서 간섭받지 않고 카운터의 값이 연속적으로 변하는 것을 보고 싶은 것이다.

그림에서는 구분하기 쉽게 워커 별로 다른색으로 메시지를 출력하도록 한 것이다. > 3 과 같이 크리티컬 하지 않은 영역의 메시지들은 계속해서 뒤섞여 표시된다. 하지만 하나의 워커가 락을 얻어서 크리티컬한 구간에 진입하게 되면 다른 워커는 크리티컬 구간에 진입하지 못하는 것을 볼 수 있다. 물론 그 와중에도 IO 작업을 대기하는 사이에 락과 무관한 영역의 처리는 병렬적으로 일어나는 것을 볼 수 있다.

참고로 여기서는 asyncio.run()을 사용하지 않았기 때문에 asyncio.get_event_loop()를 통해 획득한 런루프를 기준으로 모든 코드가 동작한다.

명령줄에서 컬러로 출력하고 싶다면 이 글을 참고하자.

세마포어 및 이벤트

세마포어는 락과 비슷한데, 2개 이상 동시에 사용할 수 있는 리소스에 적용된다. 바운디드 세마포어는 세마포어의 변형으로 동시에 N개 이상의 액세스가 발생하려 할 때 예외를 일으킨다. asyncio는 단일 스레드를 상정하기 때문에 asyncio.Semaphore는 스레드 안전하지 않다. 특정 리소스에 접근하려할 때. acquire()를 호출하며 대기하고, 사용을 완료하면 release()를 호출한다. 이 때 핸들 수가 0이되면 대기하게 된다.

명시적으로 acquire(), release()를 호출하는 구간은 async with sem: 으로 대체할 수 있다.

이벤트는 특정 시점에서 재시작 이벤트를 기다리는 일시정지 구간을 만든다. 주로 여러 작업의 시작 시점을 동기화할 때 사용한다. asyncio.Eventasync with 문과 함께 쓰이지 않고 await event.wait()를 통해서 대기하다가, 외부에서 event.set()을 호출하면 일제히 시작된다.

다음 예는 이벤트와 세마포어를 혼합하여 사용하는 예이다. 생산자는 이벤트를 clear() 하고 set()하는 사이의 구간에 데이터를 생성한다. 소비자들은 이벤트를 대기하면서 데이터 생산이 완료될 때까지 기다린다.

이후 카운트가 3인 세마포어를 가지고 데이터를 소비하기 때문에 동시에 데이터를 소비할 수 있는 소비자는 3개로 제한된다.

import asyncio
from random import uniform, randrange
from color import Color
from typing import List

queue: List[int] = []


async def producer(ev: asyncio.Event, sema: asyncio.Semaphore,
                   name='producer', color='red'):
    log = lambda ms: print(Color.colored(ms, color))
    delay = uniform(0.1, 1.6)
    while True:
        log('prepare...')
        await asyncio.sleep(delay)
        ev.clear()
        queue[-1:-1] = [randrange(1, 100) for _ in range(3)]
        await asyncio.sleep(delay * 3)
        log(f'start >>> {queue}')
        ev.set()
        await asyncio.sleep(delay * 3)


async def worker(ev: asyncio.Event, sema: asyncio.Semaphore,
                 name='worker', color='yello'):
    log = lambda ms: print(Color.colored(ms, color))
    delay = uniform(0.5, 1.6)
    data: List[int] = []
    for _ in range(10):
        for i in range(3):
            log(f'{name} > {i}')
            await asyncio.sleep(delay)
            log(f'{name} < {i}')

        log('waiting')
        await ev.wait()
        log(f'{name} entering sema')
        async with sema:
            if not queue:
                log('Not available')
                continue
            log('Retriving data...')
            for _ in range(3):
                log(f'{name}:: GGGGGG')
                await asyncio.sleep(delay)
            r = queue.pop()
            data.append(r)
            await asyncio.sleep(delay)
            log(f'Data: {data}')


async def main():
    colors = 'yellow green blue magenta cyan white'.split()
    sema = asyncio.Semaphore(3)
    ev = asyncio.Event()
    srv = asyncio.create_task(producer(ev, sema))
    fs = [worker(ev, sema, name=f'WORKER{i+1:02d}', color=colors[i])
          for i in range(6)]
    await asyncio.wait(fs)
    try:
        await asyncio.wait_for(srv, 1)
    except asyncio.TimeoutError:
        pass
    finally:
        print("END")


if __name__ == '__main__':
    asyncio.run(main())

컨디션 사용하기

컨디션의 사용법 역시 스레드에서의 사용법과 크게 차이나지는 않는다. 컨디션은 락과 이벤트의 결합이다. async with cv:구문으로 블럭에 진입하여 독점적으로 사용하는 구간을 잡는다. 이벤트를 대기하기 위해서는 await cv.wait()를 쓴다. 독점 영역에 있더라도 이벤트를 대기 중이라면, 같은 컨디션을 사용하는 다른 코루틴으로 전환할 수 있다. 이러한 코루틴 중에서 notify(), notify_all()을 호출한 후 블럭을 탈출하면 다른 대기중이던 코루틴들이 “한 번에 하나씩” 실행된다.

세마포어에서 release()와 마찬가지로 cv.notify_all()등은 모두 awaitable이 아니다. 이 패턴은 생산자-소비자 구조에서 모든 소비자가 생산자의 전처리를 기다린 후, 생산자가 신호를 줄 때 한 번에 하나의 소비자가 그 자원을 독점적으로 사용하게 된다는 점이 보통의 이벤트와는 다른점이다.

다음은 생산자(Producer)의 코드이다. 이 예에서는 생산자와 모든 소비자가 같은 컨디션 아래로 들어간다. 따라서 생산자든 소비자든 구분없이 모든 작업은 한 번에 하나만 실행될 수 있다. 대신 wait() 중인 작업은 크리티컬 구간 내에 있더라도 다른 작업에게 제어권을 양보할 수 있으므로 결국에는 모든 소비자가 대기하고 생산자가 독점적으로 작업을 처리할 수 있다. 이 패턴은 생산자가 둘 이상인 경우에 좀 더 적합할 것이다.

import asyncio
from random import uniform, randrange
from color import Color
from typing import List

queue = []


async def producer(cv: asyncio.Condition, name='producer', color='red'):
    log = lambda x: print(Color.colored(x, color))
    delay = uniform(0.1, 1.5)
    temp: List[int] = []


    while True:
        
        # 컨디션을 사용하여 크리티컬 구간에 진입
        # 생산자가 크리티컬 구간에 있는 동안
        # 다른 소비자는 자원에 액세스할 수 없다.
        async with cv:
            log('Enter CRITICAL')
            temp = [randrange(1, 100) for _ in range(randrange(1, 5))]
            queue[-1:-1] = temp
            await asyncio.sleep(delay)
            log('Leave CRITICAL')
            cv.notify_all()
        # 크리티컬 구간을 빠져나오기 직전에 이벤트 해제하여
        # 대기중이던 소비자들이 자원을 처리할 수 있게 한다.

        await asyncio.sleep(3)
        # 충분한 시간을 두어 소비자들이 각자의 처리를 끝낼 여유를 준다.
        # 이 여유가 짧다면 다른 소비자가 제어권을 받기 전에
        # 생산자가 다시 크리티컬 구간에 진입해버릴 수 있다.

다음은 소비자의 코드와 전체 테스트 코드이다.

async def worker(cv: asyncio.Condition, name='worker', color='yellow'):
    log = lambda x: print(Color.colored(x, color))
    delay = uniform(0.1, 1.5)
    data: List[int] = []

    for _ in range(5):
        async with cv:
            log('Waiting condition')
            # 크리티컬 구간에 진입한 후 이벤트 대기
            # 이벤트를 기다리는 동안에는 다른 작업이 (예: 생산자)
            # 크리티컬 코드를 실행할 수 있다.
            await cv.wait()

            # 이벤트가 발생한 후, 
            # 이 코드는 여전히 독점 구간에 있으므로
            # 연속적으로 실행하며, sleep 시에도 전환되지 않는다.
            log('Retriving Data...')
            if queue:
                data.append(queue.pop(0))
            await asyncio.sleep(delay)
            log(f'Data: {data}')
            log('Leaving CRITICAL')

        await asyncio.sleep(delay)


async def main():
    colors = 'yellow green blue magenta cyan'.split()
    cv = asyncio.Condition()
    fs = [worker(cv, name=f'WORKER{i+1:02d}', color=colors[i])
          for i in range(5)]
    srv = asyncio.create_task(producer(cv))
    await asyncio.wait(fs)
    await asyncio.wait_for(srv, 10)


if __name__ == '__main__':
    asyncio.run(main())