Wireframe

컨디션을 통한 스레드 동기화 예제

동시성을 다룰 때 여러 스레드가 하나의 자원에 순차적으로 접근하게 하거나, 반대로 특정한 시점에 동시에 작동하도록 하는 등의 상황에 제대로 대응할 수 있도록 락이나 이벤트와 같은 동기화 수단을 사용한다. 컨디션은 컨디션 락이라고도 하는데, 간단히 말하자면 이벤트와 락을 적절히 결합한 것이다. 락이나 R락을 사용하는 경우, 락을 획득한 구간의 코드는 항상 하나의 스레드만 진행할 수 있다. 락을 사용하는 중간에 다른 스레드로 사용 권한을 넘기려 한다면 현재 획득한 락을 해제하여 크리티컬 구간을 끝내야 한다. 그런데, 경우에 따라서는 atomic한 자원을 사용하려는 구간에서 해당 자원이 준비되지 않아 구간 내에서 일시적으로 실행을 중단하고 대기해야 하는 상황이 될 수 있다. 이런 경우에 컨디션 락을 사용한다.

컨디션은 락을 획득한 구간내에서 락을 일시적으로 반환하고 기다리는 역할을 가능하게 한다. 컨디션 락을 기다리는 스레드는 락을 해제하면서 블럭된다. 그러면 같은 컨디션락을 획득하려는 다른 스레드에게 제어권이 넘어가는데, 이러한 스레드 중에 해당 자원을 생성하는 역할을 담당하는 스레드가 있다면 이 생산자 스레드는 자원을 생성한 후, 이벤트와 비슷하게 대기 중인 다른 스레드를 깨워주게 된다. 생산자 스레드가 락을 반환하고 나면 중단됐다가 깨어난 스레드는 락을 다시 획득해서 해당 자원을 독점적으로 사용할 수 있게 된다.

중요한 것은 컨디션락은 락의 한 종류이기 때문에 대기중인 상태에서 깨어난 스레드는 그 즉시 실행을 재개하는 것이 아니라, 해당 락을 선점한 다른 스레드가 (이 스레드가 자신을 깨워주었을 것이다.) 락을 반환한 후 다시 그 락 객체를 획득해야 실행을 재개할 수 있게 된다는 것이다.

컨디션은 락과 이벤트가 결합된 성격이지만, 보통의 락과 이벤트를 결합해서는 이러한 기능을 만들기가 어렵다. 크리티컬 구간을 빠져나오지 않으면 다른 스레드가 해당 락을 획득하여 자원의 사용권을 가져갈 수 없기 때문에, 크리티컬 구간을 다시 진입하는 형태로 흐름을 만들어야 하기 때문이다.

컨디션 락은 생산자-소비자 패턴을 구현하려는 상황에서 특히 유용하다. 생산자는 다른 소비자 및 생산자 스레드에 의한 간섭을 배제한 상태에서 데이터를 만들어 낼 수 있고, 소비자 역시 다른 소비자와의 경쟁이나 생산자의 간섭이 없는 상태에서 안전하게 데이터를 획득하고 소비할 수 있다. 그리고 이러한 과정에서 전체적인 코드 모양은 일반적인 락을 사용하는 것과 비슷하게 작성된다.

파이썬의 표준 라이브러리에서 멀티스레드 환경에서 유용한 데이터 전달 및 동기화 수단인 queue.Queue가 컨디션을 사용하여 동작한다. 데이터를 put하는 동작과 get하는 동작이 모두 하나의 크리티컬 구간을 지나게 되며, 큐가 비어있는 경우 get 하려는 동작은 큐에 값이 들어오기 까지 기다리게 된다.

간단한 생산자-소비자 패턴 구현하기

간단한 생산자-소비자 패턴에 컨디션 락을 적용해보자. 컨디션 락을 사용하면 소비자 스레드가 필요한 값을 체크하면서 기다리는 과정을 포함할 수 있다. 생산자는 1초 주기로 한 번에 여러 개의 값을 생성하고, 생산자들은 가능한 빨리 리스트에 값이 있으면 가져가려고 한다.

컨디션을 사용하는 코드는 락을 사용하는 것과 크게 다르지 않다. 소비자는 값이 없는 동안 반복해서 condition.wait() 를 호출해서 값을 확인한다. 이렇게 하는 이유는 중단 상태에서 깨어난 소비자 스레드가 값을 확인했을 때, 자신보다 먼저 깨어나서 락을 선점한 다른 소비자에 의해서 가용 자원이 다 소모되고 없는 경우가 있을 수 있기 때문이다. 거의 모든 소비자에서 이러한 패턴을 사용하여 컨디션을 사용한다고 생각하면 된다.

생산자의 경우는 락과 거의 완전히 같은데, 데이터의 생성을 마친 후에 condition.notify() / notify_all() 을 호출해줘야 하는 것만 다르다.

from threading import Thread, Condition
import time
import logging
import random

logging.basicConfig(level=logging.DEBUG)
data = []

def consumer(cd: Condition, name: int):
  while True:
    with cd:
      # 값을 체크
      while not data:
        cd.wait()
      x = data.pop(0)
    logging.debug(f"Consumer {name} gets value {x}")


def producer(cd: Condition):
  while True:
    time.sleep(1)
    with cd:
      for _ in range(10):
        data.append(random.randint(1, 99))
      cd.notify_all()


def main():
  cd = Condition()
  Thread(target=producer, args=(cd,), daemon=True).start()
  time.sleep(1)
  cs = [Thread(target=consumer, args=(cd, i + 1)) for i in range(7)]
  for c in cs:
    c.start()
  time.sleep(5)
   
if __name__ == "__main__":
  main()

동기화 큐 작성하기

컨디션락을 내장하는 큐를 만들어서 표준 라이브러리의 동기화 큐를 흉내내 볼 수도 있다.put() / get() 메소드가 각각 생산자, 소비자에 대응하는 메소드라 생각하면 된다.

from typing import Any
from threading import Condition

class Queue:
  def __init__(self):
    self.cv: Condition = Condition()
    self.data: [Any] = []

  def put(self, x: Any):
    with self.cv:
      self.data.append(x)
      self.cv.notify_all()
  
  def get(self) -> Any:
    with self.cv:
      while not self.data:
        self.cv.wait()
      return self.data.pop(0)

Exit mobile version