컨디션을 통한 스레드 동기화 예제

동시성을 다룰 때에는 특정한 자원을 동시에 액세스하지 못하도록 관리하거나 여러 작업들이 시작되는 시점을 맞추는 동기화 수단이 필요할 수 있다. Lock은 특정 코드 영역을 동시에 여러 스레드가 실행하지 못하도록 보호할 때 사용하며, 이벤트는 여러 스레드들이 특정 이벤트가 발생할 때까지 기다리다가 동시에 시작될 수 있도록 한다. 컨디션(Condition)은 락과 이벤트가 결합되어 있는 동기화 수단이다.

컨디션은 락을 내재하고 있는 이벤트라 할 수 있다. 락과 마찬가지로 acquire() ~ release() 구간이 있어 한 번에 하나의 스레드/프로세스가 실행되는 영역을 만들 수 있는데, 그 사이에 wait()를 통해서 이벤트를 기다릴 수 있다. 이때 한 스레드가 락을 잠근 상태에서 wait()를 호출하여 이벤트를 기다리게 되면, 같은 컨디션 객체를 점유하고자 하는 스레드가 다시 락을 얻어서 크리티컬 영역에 진입할 수 있다. 이와 같은 방식으로 여러 스레드가 크리티컬 영역에서 이벤트를 기다리는 상태가 될 때, 누군가가 해당 컨디션 이벤트를 set()하게 되면 대기 중인 모든 스레드가 깨어나게 된다. 하지만 이들은 모두 같은 크리티컬 영역에서 대기 중이었기 때문에 일반 이벤트와 달리 한꺼번에 동시에 시작하지 않고, 한 번에 하나씩 크리티컬 영역의 코드를 실행한다. 깨어난 스레드가 락을 릴리즈하는 시점에 wait()를 끝낸 다른 스레드가 실행되는 식으로 순차적으로 크리티컬 구간을 지나게 된다.

보통의 락과 이벤트로는 이러한 구현을 만들기가 어렵다. 한 스레드가 락을 획득한채로 이벤트를 기다린다면, 다른 스레드들은 락을 얻지 못하기 때문에 이벤트를 대기할 지점까지 진입할 수가 없기 때문이다.

생산자-소비자 패턴을 구현하는 입장에서 컨디션은 흥미로운 동시에 유용한 동기화 수단이다. 생산자와 소비자가 모두 크리티컬 구간을 사용할 수 있기 때문이다. 생산자는 다른 소비자 스레드에 의한 간섭을 배제한 상태에서 데이터를 만들어 낼 수 있고, 소비자 역시 다른 소비자와의 경쟁이나 생산자의 간섭이 없는 상태에서 안전하게 데이터를 획득하고 소비할 수 있다. 특히 두 개 이상의 생산자가 있는 상황이라면 같은 컨디션 아래에서는 두 생산자가 서로를 간섭하지 않고 크리티컬 구간을 점유하면서 데이터를 만들어낼 수 있다.

멀티스레드 환경에서 유용한 데이터 전달 및 동기화 수단인 queue.Queue가 컨디션을 사용하여 동작한다. 데이터를 put하는 동작과 get하는 동작이 모두 하나의 크리티컬 구간을 지나게 되며, 큐가 비게되면 get 하는 스레드를은 모두 이벤트를 기다리도록 대기하게 된다.

참고로 멀티프로세싱 환경에서는 queue.Queue를 사용할 수 없다. 멀티 프로세싱 환경에서 데이터를 주고 받기 위해서는 multiprocessing.Queue를 사용할 수 있다.

생산자 함수 구현하기

먼저 생산자 함수를 만들어보자. 기본적으로 알아둘 것은 컨디션 객체는 Lock처럼 동작한다는 것이다. 따라서 생산자는 내부 버퍼에 값을 생성하고, 이것을 외부에 내보내는 시점에 락을 획득해서 전달한다.

from threading import Thread, Condition, Barrier
import logging
import time
from random import randrange, uniform

# 멀티 스레드 환경에서 로그를 출력할 때,
# 메시지들이 꼬이지 않도록 `logging` 모듈을 사용한다.
logging.basicConfig(
    level=logging.DEBUG,
    format="%(threadName)s %(message)s"
)

bar = Barrier(1 + 6) 
# 모든 스레드의 종료 시점을 통일하기 위해서 
# 배리어를 하나 세워둔다.
# 6개의 소비자와 메인스레드가 배리어를 기다릴 것이다.

cv = Condition()

queue = []  # 생산자-소비자 간의 데이터 교환 수단
            # 스레드 안전하게 접근하도록 할 것이다.


def producer():
    buf = []
    while True:
        # 데이터를 생성하는데 시간이 오래걸리는 것을 상정한다. (2초)
        logging.debug('Generating Data...')
        buf = [randrange(1, 100) for _ in range(1, 7)]
        time.sleep(2)

        with cv:
        # 크리티컬한 구간에 진입한다.
        # 이 시점에 이미 큐를 액세스하는 소비자가 있다면 잠시 대기할 것이다.
        # 큐로 데이터를 옮긴 후 wait() 중인 스레드들을 깨운다.
            logging.debug('Make Data Avaiable')     
            queue[-1:-1] = buf
            logging.debug(f'QUEUE: {queue!r}')
            cv.notify_all()
        
        # 그외 정리작업을 수행하고 다시 데이터 준비에 들어간다.
        time.sleep(1)

위 함수에서 보면 생산자(producer)는 크리티컬 하지 않은 구간에서 데이터를 생성하는 작업을 별도로 수행하도록 했다. 이 작업을 처리하는 동안에 데이터가 필요해진 소비자 스레드들이 마치 아기새처럼 입을 벌리고 데이터를 기다리게 될 것인데, 이들을 위해서 크리티컬한 구간을 걸어 잠근 후 데이터를 전역 범위에 있는 큐로 옮기고 시그널을 보낸 후(notify_all()) 블럭을 빠져나오면서 락을 해제한다. 만약 생산자 스레드가 전역 큐에 데이터를 쓰는 시점에 다른 소비자가 큐를 잡고 있다면 그 작업이 끝나기를 기다리기 때문에 전역 큐는 스레드 안전함을 보장 받는다.

컨디션 내의 락이 해제되기 직전에 cv.notify_all()을 호출하여 기다리고 있는 모든 소비자 워커들에게 데이터가 준비되었음을 알린다. 생성된 데이터의 개수가 일정치 않고, 이 시점에 소비자의 개수를 모른다는 점도 주목하자. 이 부분을 적절하게 처리할 수 있도록 소비자 스레드에서 고려해야 할 것이다. 또한 notify() 동작은 생산자 스레드가 cv 락을 획득하고 있는 사이에만 호출할 수 있다. 그리고 notify()를 호출했다 하더라도 아직 with cv: 구간에 남아있는 코드가 있다면 그 코드까지 실행하고 블럭을 탈출해야 소비자들이 실제로 움직일 수 있다.

소비자 스레드

생산자 스레드에서는 with cv: 내부에서 notify_all()을 호출한다는 점을 제외하면 해당 구간이 락과 완전히 동일하게 작동하는 것처럼 보인다. 그러면 소비자 스레드는 어떻게 구성되어야 할까?

  1. with cv: 와 함께 블럭으로 진입하면 크리티컬 구간으로 간주된다. 이 구간 내의 동작에서 다른 스레드의 with cv: 는 동시에 실행될 수 없다. 따라서 특정한 소비자가 이 구간에서 큐의 값을 읽고 얻으려는 동작은 스레드 안전함이 보장된다.
  2. 큐에 읽을 수 있는 데이터가 없다면? 큐에 데이터가 들어올 때까지 대기해야 하므로 cv.wait()를 호출한다. 이때 마법이 벌어진다. wait()를 호출하면 스레드는 with cv: 내에 있으면서 락을 해제하고 동시에 이벤트를 기다리는 상태로 전환한다. 락은 해제되었기 때문에 다른 소비자들이 이 코드에 접근할 수 있다. 물론 다른 소비자들도 큐가 비어있기 때문에 wait() 의 리턴을 기다리게 될 것이다.
  3. 결국 이 락은 생산자 스레드가 얻게 될 것이다. 생산자는 안전하게 큐에 데이터를 밀어 넣고 notify_all()을 호출한 후 락을 해제한다. 다시 락을 얻게되는 소비자 스레드들이 순차적으로 하나씩 큐로부터 데이터를 꺼낼 것이다.
  4. 깨어난 후에도 먼저 깨어난 스레드들이 큐의 데이터를 소진했을 수 있기 때문에 소비자 스레드는 큐를 확인하고 없으면 wait()하는 동작을 반복해야 한다.

이상의 구성을 코드로 옮기면 다음과 같을 것이다.

def consumer():
    delay = uniform(0.1, 1)
    data = []
    for _ in range(10):
        with cv:
            # 크리티컬 구간 시작
            while not queue:
                # 큐에 데이터가 없으니 기다린다.
                # 깨어나서 락을 재점유할 때마다 체크한다.
                logging.debug('wait for data')
                wait()
            # 큐에 데이터가 있음을 알았고, 이 큐는 자신만이 다루른 상태가 되었다.
            # res = queue.pop(0)
        # 데이터를 획득했고 크리티컬 구간을 빠져나왔다.
        logging.debug('consuming data...')
        data.append(queue.pop(0))
        time.sleep(delay)
        logging.debug(f'{data!r}')
    bar.wait() # 일제 종료를 위한 배리어

이제 이들을 조립해보자.

def main():
    prod = Thread(name='producer', target=producer, daemon=True)
    # daemon=True 옵션을 주면 프로세스가 종료될 때 강제 종료된다.
    ts = [Thread(name=f'worker{i+1:02d}', target=consumer)
           for i in range(6)]
    for t in ts:
        t.start()
    bar.wait()
    time.sleep(1)

예제를 실행해보면 소비자들은 큐에 값이 채워질 때까지 나란히 기다린 다음 하나씩 값을 사용하는 것을 볼 수 있다. 생산자 스레드가 2개 이상이 되도록 조정하더라도 잘 돌아가는 것을 알 수 있다.

이렇게 컨디션을 이용하면 하나의 생산자와 여러 소비자 혹은 그 반대로 여러 생산자와 하나의 소비자 심지어 여러 소비자와 여러 생산자 스레드들을 연결할 수 있다. 그리고 이것을 쓰기 쉽게 만들어 놓은 것이 queue.Queue 클래스이다. 이 큐는 put()get()으로 데이터를 넣고 뺄 수 있는데, 두 동작 모두 스레드 안전하며, get() 메소드의 경우 큐가 비었을 때 데이터가 들어올 때까지 블럭하기 때문에 편리하게 스레드들 사이에서 데이터를 주고 받는 통로로 사용하기 용이하다.