Barrier를 사용한 스레드/프로세스 동기화 예제

Barrier는 스레드를 사용하는 동시성 프로그래밍에서 특정 작업의 시작 시점을 동기화하는 수단으로, 여러 스레드가 특정한 시점까지 서로를 기다리다가 동시에 실행 흐름을 재개할 때 사용할 수 있다. 이는 보통 서버-클라이언트의 역할을 하는 각각의 스레드가 서로의 준비 과정을 기다리다 동시에 시작하도록 할 때 유용하게 사용될 수 있다.

Barrier를 사용한 스레드/프로세스 동기화 예제 더보기

스레드를 이용한 데몬 만들기 – Python

이 블로그를 통해서 파이썬에서의 병렬처리에 대해서는 명시적으로 threading.Thread 대신에 concurrent.futures 에서 제공하는 API를 사용할 것을 여러 차례 권장해 왔다. 여기서 주목할 것은 바로 “병렬처리”라는 조건이다. 즉 concurrent.futures의 API는 일련의 데이터에 대해서 동일한 처리를 하려할 때, 이 “동일한 처리”를 여러 스레드 혹은 프로세스로 나눠서 동시에 진행하는 상황에 어울리는 기능이다. 하지만 실제 상황에서는 동시에 서로 다른 작업이 진행되어야 하는 경우가 존재한다.주로 메인 스레드와 백그라운드 스레드 (혹은 작업 스레드)에서 하는 일이 서로 다른 경우에 이러한 패턴이 필요할 수 있다.

이번 글에서는 파이썬의 threading 모듈을 사용해서 멀티스레드 프로그램을 어떻게 쉽게 작성하는지에 대해서 예제와 같이 살펴보도록 하겠다.

먼저 어떤 파일을 하나 열어서 한줄씩 읽는 프로그램을 생각해보자. 보통 파일을 열고 file.readline()을 사용했을 때, 파일의 마지막 라인을 읽은 후라면 None이 리턴되며 더 이상 내용을 읽지 않는다. 하지만 우리 컴퓨터에는 로그 파일과 같이 지속적으로 뭔가 내용이 계속 늘어나는 파일들이 있다. 따라서 파일을 열어 놓고 특정 시간 주기마다 파일로부터 한줄씩 읽어서 그 내용을 처리하는 프로그램을 작성한다고 생각해보자.

import time

def process_line(l):
  print(l)

with open('message.log') as f:
  while True:
    line = f.readline()
    if not line:
      time.sleep(0.5)
      continue
    process_line(line)

위 코드는 이렇게 동작한다.

  1. 파일 하나를 읽기 모드로 열었다.
  2. 한 줄씩 읽어서 process_line() 함수로 보내어 처리한다.
  3. 만약 마지막 줄을 읽은 후 더 읽을 내용이 없다면 0.5초 후에 재시도한다.

이 프로그램은 분명 무한루프를 돌게되지만, CPU 사용량이 치솟거나 하지는 않는다. 0.5초의 대기시간은 CPU 입장에서 봤을 때에는 무척이나 길고 지루한 시간이다. (쿨타임을 0.1초로 잡아도 마찬가지다.)

다음과 같이 빈 파일을 만든 후, 파이썬 코드를 백그라운드에서 실행해놓자. 이 상태에서 파일에 메시지를 추가하면 곧 추가된 라인들이 출력되는 것을 알 수 있다. (백그라운드 실행은 윈도 명령줄에서는 지원되지 않는다. 윈도에서 따라해본다면, 터미널 창을 2개 열어서 한쪽은 파이썬을 실행하고 다른 한쪽은 echo 명령으로 파일에 라인을 추가해보면 된다.)

$ touch message.log
$ python sample.py&
$ echo "hello" >> message.log

자 이 프로그램의 처리흐름을 다시 살펴보도록하자.  먼저 프로그램은 기본적으로 지정된 파일을 열어서 한 줄씩 읽고 그 내용을, 내용 처리를 담당하는 별도의 함수 (여기서는 데이터를 다루는 역할을 하고 있으니 ‘핸들러’라고 하자)로 넘겨준다. 핸들러 함수의 동작이 끝나면 다시 그 다음 줄을 읽는 식으로 루프를 도는데, 더 이상 읽어들일 내용이 없으면 지정된 시간(0.5초)만큼 기다린 후에 재시도 한다.

이 때 핸들러 함수의 실행에 필요한 시간이 매우 짧다면 문제가 되지 않겠지만 항상 그러하지는 못할 것이다. 하지만 텍스트 파일의 내용이 특정한 URL이고 핸들러가 해당 URL의 데이터를 다운로드 받는다면 어떨까? 위 예제의 구조에서는 핸들러는 블럭킹 함수로 동작하므로 현재 데이터를 처리하는 동안에는 추가된 데이터가 계속 남아있으리라고는 보장할 수 없다. 왜냐하면 해당 파일은 계속 내용이 추가되기 보다는 별도의 에디터에서 추가되었던 내용이 삭제처리 될 수도 있기 때문이다. 다음 코드와 그 실행 시나리오를 생각해보자.

from urllib.request import rulopen
import time

def handler(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(urlopen(url))

with open('inputs.txt') as f:
  while True:
    line = f.readline()
    if not line:
       time.sleep(0.1)
       continue
    handler(*(line.split()[:2]))

위에서 소개한 코드와 완전히 동일한데, 한가지 차이는 핸들러가 URL요청과 디스크IO를 수행하는 제법 시간이 오래걸릴 수 있는 작업이라는 점에서 차이가 있다. inputs.txt의 파일의 내용이 실시간으로 편집되고 있고, 네트워크 사정이 그리 좋지 못하다면 다음의 시나리오가 가능해질 수 있다.

  1. 파일에 데이터 A, B, C가 추가됨
  2. 프로그램은 데이터 A를 처리함 (그리고 처리되는 동안 블럭됨)
  3. 그 사이에 외부 프로그램에 의해서 데이터 B, C가 제거됨
  4. A를 처리하고 난 후 파일을 읽으려 시도하면 더 이상 남은 데이터가 없거나 세로운 데이터 D, E 등을 처리하게 됨

이 시나리오에서는 핸들러가 블럭킹으로 동작하는 동안 이후에 추가된 데이터를 확보하지 못한다는 문제가 있다. 이를 해결하기 위해서는 핸들러가 논블럭킹(non-blocking)으로 동작해야 할 필요가 있다. 즉 핸들러 함수를 메인 스레드가 아닌 별개의 스레드에서 실행하게하면 핸들러를 호출하는 구문은 즉시 다름 라인으로 진행이 가능해진다.

threading 모듈

threading 모듈은 파이썬에서 스레드를 사용할 수 있게 해주는 API를 담고 있다. 스레드를 위한 클래스인 Thread와 여러 가지 동기화 도구인 Lock, RLock, Condition, Semaphore 등이 지원된다. 스레드를 사용하는 방법에 대해서 오랜된 문서나 책에서는 Thread 클래스를 오버라이드하여 사용하는 방법을 소개하고 있는데, 스레드를 통한 분기가 함수의 실행단위로 쓰여진다면 굳이 번거롭게 오버라이드를 할 필요는 없다.

Thread 객체를 생성할 때 target= 이라는 인자가 있다. 이 인자로 백그라운드 스레드에서 실행할 함수를 넘겨주면 해당 함수가 별도의 스레드에서 실행될 수 있다. 위의 ‘파일 처리’ 코드를 스레드를 사용하는 방식으로 수정해보면 다음과 같이 작성될 수 있다. 시나리오를 잠시 수정해보자면, 파일에는 한 줄에 공백으로 구분되는 URL과 파일명이 들어온다. 우리의 프로그램은 각 라인을 읽어서 주어진 URL의 데이터를 주어진 파일 이름으로 저장하는 동작을 처리할 것이다.

from threading import Thread
from urllib.request import urlopen
import time

## 1
def hanlder(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(res.read())

## 2
with open('inputs.txt') as f:
  f.seek(2) ## 3
  while True:
    line = f.readline()
    if not line: ## 4
      time.sleep(0.1)
      continue
    data = line.split()[:2] 
    t = Thread(target=handler, args=data) ## 5
    t.start() ## 6

그다지 복잡한 코드는 아니다. 코드는 다음과 같이 설명된다.

  1. handler 함수는 URL과 파일이름을 인자로 받아서 주어진 URL의 콘텐츠를 다운로드 받아 해당 파일 이름으로 저장한다.
  2. 파일의 내용을 조사하는 루프를 시작한다.
  3. 현재 시점까지의 파일의 내용은 무시해야 할 것이므로 seek(2)를 호출해서 파일 스트림의 끝으로 커서를 옮긴다.
  4. 새로운 라인이 없으면 0.1초를 대기한다.
  5. 새로운 스레드에서 핸들러 함수를 실행하도록 설정한다.
  6. 만들어진 스레드를 실행한다.

과도한 스레드 생성을 피하는 법

위 코드는 다시 새로운 문제를 야기한다. 스레드를 만든 후 시작하는 t.start() 부분은 논블럭킹이기 때문에 호출하게 되면 새로운 스레드에서 네트워크 요청을 생성하는 일련의 작업을 처리하는데, 이 작업의 완료여부와 별개로 스레드가 시작되면 t.start()는 그 즉시 리턴된다. 따라서 inputs.txt 파일에 새로운 입력이 수십~수백개가 쏟아져 들어갔다면 이 프로그램은 그만큼 많은 스레드를 생성하면서 전체적으로 매우 느려지게 될 것이다.

이 상황에서 어쩌면 누군가는 세마포어같은 동기화 도구를 생각할지도 모르겠다. 혹은 스레드 풀을 구현하여 한 번에 동작하는 스레드의 수를 제한할 수도 있을 것이다. 하지만 이때 가장 중요한 것은 ‘입력으로 들어온 데이터를 나중에 처리하더라도 잃지는 말자’는 것이다. 따라서 가장 간단히 처리하는 방법은 큐를 사용하는 것이다.

  1. 파일로부터 데이터를 입력받는 무한루프는 계속 파일을 체크하여, 추가로 입력된 데이터가 있으면 큐에 넣는다.
  2. 큐를 watching하는 무한루프는 큐에 들어온 데이터를 순서대로 꺼내어 하나씩 처리한다.

여기서 중요한 점은 두 개의 무한루프가 독립적으로 돌아간다는 것이다. 이를 위해서는 둘 중의 하나의 무한루프는 별도의 스레드에서 돌릴 필요가 있다. 그리고 두 스레드 사이의 데이터 교환은 동기화된 상태여야 한다. (한쪽에서 큐에서 데이터를 빼는 동시에 데이터를 집어넣어서는 곤란하다.) 스레드 사이의 동기화된 데이터 교환은 queue 모듈 내의 Queue를 사용하면 된다.

또한 별도 스레드에서의 동작이 무한루프가 되어야 하므로 해당 스레드는 ‘데몬’으로서 동작해야 한다. 데몬은 그 수명이 메인 스레드에 의존적인 스레드로, 그 자신이 무한루프를 돌고있다 하더라도 메인스레드가 종료되면 (즉 프로세스가 종료되면) 자동으로 종료되면서 리소스를 반납한다. 스레드를 데몬으로 만들고 싶다면 Thread 를 생성할 때 daemon=True 옵션을 지정하면 된다.

위 구조에서 어떤 것이 메인 스레드가 되고 어떤 것이 데몬이 될지는 본인의 판단에 달렸다. 여기서는 데이터 핸들링을 메인스레드에서 하고 데이터 마이닝을 데몬에서 수행하도록 구조를 만들어 보도록 하자.

아래는 최종 코드이며, 이 프로그램은 2개의 스레드를 써서 파일 내용을 감시하고, 추가 입력된 데이터를 처리한다.

from threading import Thread
from queue import Queue
from urllib.request import urlopen
from time import sleep

queue = Queue()

def daemon(filename):
  with open(filename) as f:
    f.seek(2)
    while True:
      line = f.readline()
      if not line:
        sleep(0.5)
        continue
      data = line.split()[:2]
      queue.put(data)

def mainLoop():
   while True:
     url, filename = queue.get()
     with urlopen(url) as res:
       with open(filenamem, 'wb') as f:
         f.write(res.read())

if __name__ == '__main__':
   t = Thread(target=daemon, args=('inputs.txt',), daemon=True)
   t.start()
   mainLoop()

정리

프로그램 내에서 두 개 이상의 무한루프가 독립적으로 동작하게 하려면 각각의 루프를 서로 다른 스레드에서 돌려야 한다. 이 때 프로세스 종료 시 작업 스레드들을 안전하게 종료하기 위해서는 이들 스레드는 생성 시 데몬으로 생성한다. 또한 보통 데몬을 사용하는 경우에는 한쪽 스레드에서 수집한 데이터를 다른 스레드(들)에게 넘겨주어야 할텐데, 이 때는 동기화된 큐를 쓰는 것이 권장되며 이 방법이 가장 간편하기도 하다.

더보기

threading 모듈을 통해서 멀티스레드를 사용하는 대표적인 예는 소켓통신의 다중접속 구현일 것이다. 소켓으로부터 접속을 받아들이는 코드는 보통 다음과 같은 모양이다.

sock = ... # 소켓 초기화 및 설정
sock.listen(host, port)
while True:
  conn, client = sock.accept() # 1
  ## 2
  recivedData = conn.recv(1024)
  ## ... 데이터를 처리하고 응답을 보내기

위 코드 중에서 sock.accept()에 의해서 리턴되는 값 중 첫번째 인자는 연결을 표현하는 소켓객체이다.  이 단일 루프 구조는 소켓이 연결을 기다리고, 연결을 받아들이면 해당 연결로부터 송수신을 처리하고 다시 루프의 시작으로 돌아가서 새로운 연결을 기다린다. 즉 한 번에 하나의 연결만 허용하는 구조가 된다. 하지만 이것은 소켓의 구조적인 한계라기보다는 그냥 코드가 선형으로 루프를 돌기 때문이다. 다음와 같이 처리한다면 어떨까?

while True:
  conn, client = sock.accept()
  t = Thread(target=handler, args=(conn, client), daemon=True)
  t.start()

위 코드에서는 서버 소켓이 연결을 받을 때마다 접속 소켓과 접속 정보를 핸들러에게 넘기고, 핸들러는 새로운 스레드에서 돌아가게 된다. 그리고 그 즉시 메인 루프는 다시 처음으로 돌아가서 새로운 연결을 기다릴 수 있다. 이 말은 방금 전에 접속한 소켓의 통신을 별도 스레드에서 처리하도록 하고 (소켓 통신을 주고/받기의 반복으로 돌아가는 일종의 무한 루프(최소한 끝이 정해지지 않은)라 볼 수 있으므로) 개별 접속 소켓의 처리를 데몬으로 만들어주는 것이다.

파이썬의 스레드 사용법

멀티스레드를 사용한 병렬처리는 concurrent.futuresThreadPoolExecutor를 사용하면 스레드 풀 관리에서부터, 처리 결과들의 동기화에 이르기까지의 여러 작업을 간단한 API를 통해서 처리할 수 있다. 특히 이 API의 경우 멀티스레드와 멀티프로세스에서 동일한 형태로 디자인되어 있고, 실제 사용시에는 어떤 구현을 선택할 것인지에 따라 Pool관리자의 클래스만 변경하면 되기 때문에 편리하게 사용할 수 있다.

하지만 이 API는 특정한 시점에 데이터를 나누어 처리하고 그 결과를 취합하는 분산 처리 구조에 적합하게 디자인 되어있는데, 우리가 스레드를 사용하는 케이스는 이러한 분산 처리 이외에도 여러 가지가 있을 수 있다. 예를 들어 소켓 서버에서 동시 접속을 처리하는 등, 디스패치 시점과 종료 시점이 각각 다른 스레드 관리 등의 경우에 스레드를 사용해야 한다. 이 글에서는 스레드 기반의 동시성 프로그래밍 라이브러리인 threading을 통한 스레드 사용과 해당 라이브러리에서 사용하는 동기화 및 스레드 간 통신 방법에 대해서 살펴보도록 하겠다.

Thread

threading.Thread는 스레드를 객체화한 클래스이다.  몇 가지 속성에 대해 알아보자.

  • Thread(group=, target=, args=, kwargs=, *, daemon=None) : 새로운 스레드를 생성한다. 각각의 파라미터의 의미는 다음과 같다.
    • group= : 아직 구현되지 않은 기능이며, 추후 사용을 위해 정의만 되어 있다.
    • target= : 워커스레드에서 디스패치할 함수
    • args, kwargs : 각각 튜플, 사전으로 target 함수에 넘겨질 인자값들
    • daemon : 해당 스레드가 데몬[^데몬]으로 실행될지의 여부.
  • start() : 스레드의 실행을 시작한다. 내부적으로 이 메소드가 호출되면 자신의 run() 메소드를 호출한다.
  • run() : 스레드가 실제로 수행할 작업. 기본적으로 target 속성의 함수를 별도 스레드에서 호출한다. 많은 예제가 실제 Thread 클래스를 서브 클래싱하면서 이 메소드를 오버라이드하는데, 클래스의 디자인 상, 서브클래싱할 이유가 거의 없는 것으로 보인다.
  • join() : 해당 스레드가 끝날 때까지 기다린다. 만약 데드락을 유발할 상황에서 조인하거나, 시작하지 않은 스레드에 대해 조인하는 것은 런타임 에러를 발생시킨다.
  • name : 스레드의 이름 속성
  • daemon : 데몬 속성. start()가 호출되기 전에 변경되어야 한다.
  • threading.locals() : 스레드가 다른 스레드와 공유하지 않고 자신의 스코프 내에서만 사용할 변수를 여기에 정의할 수 있다.

간단한 몇 가지 예를 들어보자.  먼저 간단한 분산처리의 예이다. 몇 개 URL을 주고 그에 대해서 동시에 접속하여 그 콘텐츠를 파일에 저장하도록 한다. 개별작업은 스레드로 처리된다. http 접근 자체는 CPU 집약적인 작업이 아니기 때문에 실제로 이 동작은 ‘동시에 진행되는 것으로’보이며, 실제로도 응답이 빠르고 용량이 작은 데이터를 먼저 얻어와서 저장할 수 있다.

from threading import Thread
from urllib.request import urlopen

urls = ('https://..', ...)

def download_contents(url, filename):
  '''주어진 URL을 다운로드 받고, 그 내용을 파일로 기록한다.'''
  res = urlopen(url)
  if res.code != 200:
    return
  with open(filename, 'wb') as f:
    f.write(res.read())

def main():
  threads = []
  for url in urls:
    filename = url.rsplit('/', 1)[1]
    t = Thread(target=download_contents, args=(url, filename))
    threads.append(t)
  ## 각각의 스레드를 시작하고, 조인합니다.
  for t in threads:
    t.start()
  for t in threads:
    t.join()


if __name__ == '__main__':
  main()

스레드를 먼저 만든 후에 이를 리스트에 추가해놓고, 다시 리스트를 순회하면서 각 스레드를 시작하고 다시 리스트를 순회하면서 join()한다. join을 하지 않으면 스레드를 시작해놓은 후의 메인 스레드는 곧 종료점에 도착해서 끝나게 되고, 이는 프로세스의 종료를 의미하므로 개별 워커 스레드의 작업이 처리되는 시간을 확보할 수 없다.

이처리는 단적인 병렬처리의 예이므로, 이전에 포스팅했던 concurrent.futures를 이용하는 것이 훨씬 깔끔하고 간편하게 처리될 수 있다. Future의 리스트로 맵핑한 후에 리스트 자체를 wait 하면 된다.

def main():
  with ThreadPoolExecutor() as pool:
    fs = [poo.submit(download_content, url, url.rsplit('/', 1)[1]) for url in urls]
    pool.wait(fs)

실제로 스레드API를 직접 사용해야 하는 부분은 이제 데몬을 작성하거나, 실제로 메인 스레드와 별개의 작업을 처리하는 워커 스레드를 만들 때라고 할 수 있다. 가장 흔히 접할 수 있는 예는 소켓 프로그래밍에서이다.  소켓 프로그래밍에서 서버를 작성할 때 소켓을 초기화하고 사용하는데, 실제로 접속이 들어오는 시점에는 별도의 소켓 객체가 만들어진다. 이를 해당 루프에서 처리하는 것이 아니라 별도의 접속 처리함수를 이용해서 스레드로 분기시킨다.

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 7777))
while True:
  sock.listen(1)
  conn, addr = sock.accept()  ## conn은 접속이 체결된 소켓 객체이다.
  t = Thread(target=manage_connection, args=(conn, addr))
  t.start()
...

이렇게하면 접속이 들어온 직후, 개별 접속을 처리할 manage_connection 함수가 해당 접속 소켓을 가지고 별도 스레드에서 시작된다. 메인 스레드는 접속이 별개의 스레드에서 처리되는동안 다시 while 문으로 돌아가서 새로운 접속을 listen 할 수 있다. 말 그대로 동시 접속을 허용하는 서버가 만들어진다. (이 포스트의 다음 페이지에 해당 소스를 올려 두겠다.)

스레드별 로컬 네임스페이스

많은 오래된 예제들에서는 Thread 클래스를 서브클래싱해서 별도의 클래스를 만들어서 시작하는 경우를 많이 소개하는데, 이렇게 서브클래싱해야 하는 경우는 스레드별로 별도의 자원을 관리해야 하는 경우외에는 거의 없다고 볼 수 있다. 스레드는 하나의 프로세스 하위의 별개의 실행 흐름이며, 그외의 메모리 공간은 모두 공유하게 된다. 따라서 이름이 같으면서, 각 스레드별로 분리된 변수를 액세스하기 위해서 아예 해당 변수를 스레드 객체의 인스턴스 속성을 만들기 위해 이런 식으로 처리하는 케이스들은 있을 수 있다. (하지만 내가 봐 왔던 거의 대부분의 스레드 관련 예제는 이런 케이스도 아니고, 그냥 습관적으로 서브 클래싱부터 하고 보는 경우들이 대부분도 아니고 전부였다.)

파이썬은 뼛속까지 객체 지향 언어가 맞기는 하지만, 많은 언어의 디자인이 “클래스를 만들어 쓰는 것보다는 더 나은 어떤 방법이 있다”는 것을 암시한다. 여튼, 스레드별로 독립된 로컬 변수를 만들 기위해서는 서브클래싱 대신에 threading.local을 쓴다.

my_data = thrading.local()
my_data.x = 1  ## my_data는 해당 스레드의 고유한 로컬 변수 네임스페이스가 된다. 

동기화 수단들

앞서 스레드는 단일 프로세스 내의 독립된 실행 흐름이며, 메모리 영역은 프로세스 내 모든 스레드가 동시에 사용한다고 하였다. 이는 필연적으로 자원경쟁이라는 상황을 만들게 된다. 즉 실행중인 프로세스 내의 모든 스레드는 기술적으로는 특정한 함수들이 동시에 실행되고 있는 상황이라 볼 수 있다. 각각의 실행 시점에서 메모리는 공유되기 때문에 같은 이름을 여러 스레드가 동시에 액세스하려는 경우, 문제가 발생할 수 있다. 예를 들어 하나의 스트림에 두 개의 스레드가 동시에 값을 쓰려고 한다거나, 어떤 변수에 대해서 한 스레드는 그것을 변경하려 하고, 다른 스레드는 같은 시점에 그 값을 읽으려고 한다면 정상적으로 동작하지 못할 수 있다는 말이다.

이렇게 특정한 자원을 서로 점유하려는 문제를 해결하기 위해서 스레드는 ‘동기화’라는 기법을 적용한다. 동기화란, 두 개 이상의 스레드가 특정 자원 액세스 시점에 한정하여 마치 하나의 스레드 내의 두 개 구문처럼 순차적으로 실행되도록하는 것을 의미한다. (그리고 그로부터 파생할 수 있는 여러 기법을 말한다.)

  • 락 / R락: 말 그대로 자물쇠에 해당한다. 특정 작업을 사용하려할 때 스레드들은 자물쇠에 접근한다. 최초로 자물쇠를 획득한 스레드는 자물쇠를 잠그고 실행하며, 같은 시점에 잠겨있는 자물쇠를 사용하려는 스레드는 모두 자신의 차례에서 자물쇠가 열릴 때까지 기다려야 한다.
  • 컨디션 : 조건부 락(conditional lock)이라 할 수 있다. 락과는 반대로 동작하는데, 조건부 락을 건 스레드는 일단 중지한다. 다른 스레드에서 해당 조건값의 상태(프로그래머가 원하는 상태)에 따라서 notify()notify_all()을 호출하여 조건부락을 해제해주면 해당 스레드가 실행되는 형식이다.
  • 세마포어 : 일반 락이 최대 하나의 스레드만 소유하는 것을 허용하는데 비해, 세마포어는 동시에 사용가능한 수가 제한된 어떤 리소스에 대한 선착순 락이다.
  • 이벤트 : 가장 기본적인 스레드 동기화 기법으로 한 스레드는 특정한 이벤트를 기다리고, 다른 스레드는 해당 이벤트에 대한 시그널을 보낸다. 시그널이 보내지면 중단되었던 스레드가 진행하는 방식이다.
  • 타이머 : 특정 딜레이 후에 시작되는 스레드
  • 베리어 : 특정한 카운트 값만큼 스레드들을 막아두었다가 동시에 시작하는 기법. 베리어에 지정된 개수만큼의 스레드가 wait()를 호출하면 베리어가 해제되고 이를 기다리던 모든 스레드가 동시에 시작한다.

락은 자물쇠처럼 이를 선점한 스레드가 락을 획득하면, 자물쇠가 잠긴다. 이후에 접근하는 스레드들은 락이 열릴 때까지 그 앞에서 멈춰기다렸다가, 락을 선점한 스레드가 락을 풀어주면 차례로 획득 > 해제를 반복하면서 순차적으로 처리되는 모양을 만들게 된다. 가장 명료한 비유는 칸이 하나 밖에 없는 화장실을 생각하면 된다.

락을 사용하기 위해서는 스레드들의 외부, 대충 메인스레드 쯤에서 락 인스턴스를 생성한다. 그리고 특정한 작업에 들어가는 경우에 스레드는 해당 락 객체에 acquire() 메소드를 호출한다. 잠겨있지 않은 락의 메소드는, 상태를 변경하면서 즉시 리턴한다. 하지만 만약 락이 잠겨있는 상태라면 이 메소드는 계속 해당 메소드를 블럭킹할 것이다. 따라서 가장 먼저 접근한 하나의 스레드가 볼일을 다 보고 release()를 호출해주고 나가야 다음 스레드가 acquire()를 리턴받고 이후 볼일(?)을 보게 되는 구조이다.

## main thread
counter_lock = threading.Lock()
counter = 0
...

## worker thread A
counter_lock .acquire()
counter += 1
lock.release()

## worker thread B
counter_lock .acquire()
print(counter)
lock.release()
  1. 락은 어느 시점이든 미리 생성해 놓기만하면 된다.
  2. 스레드A는 변수 counter를 변경하려한다. 다른 어떤 스레드가 counter를 동시에 변경하려할지 모르기 때문에 lock을 걸고 이를 처리한 후 락을 풀어준다. 모든 워커스레드가 같은 코드를 공유한다고 하면 모든 스레드는 counter 값을 액세스하는 시점에 락을 경유하기 때문에 문제가 생기지 않을 것이다.
  3. 스레드B는 반대로 그냥 counter 값을 읽기만 한다. 하지만 역시 읽는 중간에 값이 변경되면서 깨진 값을 참조할 가능성이 있기 때문에 안전한 참조를 위해서는 락을 걸어준다.

RLock

락은 기본적으로 한 스레드에서 잠금과 해제 사이의 구간을 처리할 때, 같은 락을 열고 들어와야 하는 다른 스레드의 간섭을 배제하는 용도로 쓴다고 했다. 그런데 이 과정이 특별한 언어적인 장치가 아니라 API 콜에 의존하고 있다는 문제가 있다. 무슨 말인고 하니 어떤 함수의 도입부에서 락을 걸고 리턴하기 직전에 락을 해제하는데, 이 함수가 재귀호출을 하는 경우가 있을 수 있다는 말이다.

이 경우에 첫번째 재귀 호출에서 앞서 걸린 락 때문에 스레드가 블럭되고, 영영 해당 락을 해제하지 못하는 문제가 생긴다. RLock은 이 문제를 해결하기 위한 Lock의 변종이다. RLock은 이미 잠긴 상태에서도 자신을 잠그려는 스레드가 이미 자신을 잠근 스레드라면 잠김 수를 1올리면서 즉시 리턴하는 락이다.

RLock의 해제는 잠김 수를 1 내리는 것으로 잠근 횟수만큼 다시 해제하여 잠김수를 0으로 만들어야지 다른 스레드가 이를 잠글 수 있게 된다.

컨텍스트 매니저

락외에 여러가지 스레드 동기화 장치들은 컨텍스트 매니저 프로토콜을 따른다. 따라서 acquire ~ release를 쓰기 보다는 with 문을 통해서 간단히 처리할 수 있다.  위 예에서의 카운터를 1올리는 코드는 아래와 같이 예쁘게 쓸 수 있다.

with counterLock:
  counter += 1

컨텍스트 매니저의 강점이, 단순히 매번 release()해야하는 귀찮음을 덜어줄 뿐 아니라, 락을 잠근 상태에서 발생하는 예외에 대해서도 대비한다. 예외가 발생하여 흐름이 깨지면, with 문을 강제로 나가야 하는데, with 문은 이 때에도 구문을 나갈 때 처리하기로 약속한 동작이 실행되는 것을 보장한다.

세마포어

세마포어는 락과 유사한 개념으로, 칸이 1개 이상인 화장실을 의미한다. 즉 세마포어는 내부적으로 사용량에 대한 카운터를 가지고 있고, 스레드가 접근할 때마다 이 값을 1씩 내리게 된다. 만약 이 값이 0이되면 스레드의 접근은 차단되고, 이미 접근한 스레드가 릴리즈하는 것을 기다린다. 이는 마치 동시에 사용할 수 있는 핸들의 수가 정해진 리소스에 대한 자원 경쟁을 정리하는 장치라 볼 수 있다.

pool_sema = threading.Semaphore(3)  ## 카운터가 3인 세마포어 생성
...

## 아래 코드는 여러 스레드에 대해서 최대 동시 3개의 주소에 대해
## URL의 콘텐츠를 다운로드하도록 하는 코드이다.
with pool_sema:  ## <-- 4개 이상 접근시 4번째부터는 여기서 기다린다.
  res = urlopen(url)
  with open(filename, 'wb') as f:
    f.write(res.read())

역시 코드상으로는 락과 아무런 차이가 없다. 실제 어떻게 동작하는가..라는 것을 실증으로 보이기는 제법 어려우니, 앞서 언급한 화장실의 비유를 사용하자. 각각의 스레드는 3칸짜리 화장실로 가서 각 칸에 들어가서 다운로드 작업을 진행한다. 칸이 가득차면 다른 스레드들은 발을 동동 굴리며 기다려야 하고, (대체로) 먼저 온 순서대로 먼저 빠져나가는 칸에 들어가게 된다.

컨디션

컨디션은 많이 쓰이는 동기화 도구인데, 많이 쓰인다는 것은 그만큼 유용하다는 의미이다. 락이나 세마포어는 간단히 화장실에 비유했는데, 컨디션은 그 내부에 락을 가지고 있는 조건절이라고 해야 하나? 굳이 비유를 하자면, 1회용 변기를 생산할 수 있는 설비를 갖춘 화장실이다.

차라리 보다 실제적인 예를 들어보자. 어떤 리스트가 하나 있다. 스레드 A는 뭔가를 하다가 쓸만한 데이터를 발견하면 리스트에 그 데이터를 추가한다. 다른 스레드 B는 그 리스트를 들여다보다가 스레드 A가 추가해놓은 데이터가 있으면 낼름 가져가다 사용한다.

스레드 B의 동작은 일종의 왓치독 같은 거라서 흔히 time.sleep()while 문을 조합해서 만들 수 있다. 물론 반응이 빠릿하려면 슬립 주기가 그만큼 짧아야 하고, 또 쓸데없이 리소스를 낭비하는 셈이된다.

그런데 바로 이 패턴이 컨디션이 쓰이기 딱 좋은 부분이다. 컨디션은 락과 같이 ‘선점’을 따지는 도구가 아니다. 다만 특정 조건이 되면 풀리는 락이라고 보면 된다. 하지만 그 특정 조건이라는 것을 락 스스로가 판단하도록 디자인하는 것은 극히 어려우니, 또 다른 제 3의 스레드가 그 판단을 대신해준다.

cv = threaing.Condition() ## RLock이 하나 자동으로 생성된다.
aList = [...]
## 데이터를 갖다 쓰는 스레드B
with cv:
  while not aList:
    cv.wait() ## 1
  p = aList.pop(0)
  ... # process p


## 데이터를 공급하는 스레드A
with cv:
  while True:
    ... # 데이터를 수집
    if data:
      aList.append(data)
      cv.notify()  ## 2

위 코드는 간단한 큐를 사용한 스레드간 데이터 교환을 보여준다.

  1. 데이터를 갖다 쓰는 스레드 B는 aList의 원소가 없으면 cv.wait()를 호출하여 컨디션(여기서는 리스트에 새 값이 있음)을 만족할 때까지 블럭한다.
  2. 데이터를 발굴하는(?) 스레드 A는 새로운 쓸만한 데이터를 찾으면 aList에 추가한 후 notify()를 호출해서, 해당 조건을 기다리는 다른 스레드에게 조건이 “변경되었음”을 알려준다.
  3. 스레드 B는 조건이 변경되었으니 다시 체크해야 한다. 따라서 not aList를 다시 판단한 후 while 루프를 탈출하고 이후처리를 계속하게 된다.

참고로 위 코드는 컨디션의 기본적인 사용원리를 설명하기 위한 예이며, 실제로는 wait_for()를 사용하면 된다.

이벤트/타이머/배리어

이벤트는 가장 기본적인 스레드간 동기화 수단으로 특정한 이벤트를 생성하고 이를 기다리는 스레드가 있을 때, 다른 스레드가 해당 이벤트에 대해서 .set()을 호출하여 시그널을 보내면 기다리던 스레드의 wait()이 리턴되면서 스레드가 재개되는 매커니즘이다.

배리어는 일종의 묻지마 버스로, 카운트를 정해놓은 이벤트이다. 외부에서 set()을 보낼 필요 없이, wait()가 호출된 개수가 카운트에 도달하면 모든 스레드가 동시에 출발한다.

타이머는 주어진 딜레이 후에 시작되는 Thread의 서브 클래스로, start()한 이후 주어진 딜레이타임 내에 cancel()이 호출되지 않으면 타깃 함수를 실행하게 된다.

정리

이상으로 스레드 사용에 필요한 여러 클래스와 API들을 살펴보았다. 흔히들 파이썬의 스레드는 Global Interpreter Lock이라는 제약때문에 써봐야 아무런 효과가 없는 것으로 오해하기 쉬운데, 일련의 데이터를 병렬적으로 분산처리하여 취합하는 패턴이 아닌 non-block의 형태로 워커 스레드들을 돌리는 패턴은 서버나 데몬 혹은 GUI를 사용하는 앱에서는 꼭 필요한 부분이니 어떻게 사용하는지 정도는 알아둘 필요가 있겠다. 또한 Thread 클래스를 포함한 여러 기본제공 API들은 그 자체로도 충분한 사용성을 보여주고 있으니, 괜히 불필요한 서브클래싱없이 잘 활용할 수 있도록 연습해 두도록 하자.