파이썬의 스레드 사용법

멀티스레드를 사용한 병렬처리는 concurrent.futuresThreadPoolExecutor를 사용하면 스레드 풀 관리에서부터, 처리 결과들의 동기화에 이르기까지의 여러 작업을 간단한 API를 통해서 처리할 수 있다. 특히 이 API의 경우 멀티스레드와 멀티프로세스에서 동일한 형태로 디자인되어 있고, 실제 사용시에는 어떤 구현을 선택할 것인지에 따라 Pool관리자의 클래스만 변경하면 되기 때문에 편리하게 사용할 수 있다.

하지만 이 API는 특정한 시점에 데이터를 나누어 처리하고 그 결과를 취합하는 분산 처리 구조에 적합하게 디자인 되어있는데, 우리가 스레드를 사용하는 케이스는 이러한 분산 처리 이외에도 여러 가지가 있을 수 있다. 예를 들어 소켓 서버에서 동시 접속을 처리하는 등, 디스패치 시점과 종료 시점이 각각 다른 스레드 관리 등의 경우에 스레드를 사용해야 한다. 이 글에서는 스레드 기반의 동시성 프로그래밍 라이브러리인 threading을 통한 스레드 사용과 해당 라이브러리에서 사용하는 동기화 및 스레드 간 통신 방법에 대해서 살펴보도록 하겠다.

Thread

threading.Thread는 스레드를 객체화한 클래스이다.  몇 가지 속성에 대해 알아보자.

  • Thread(group=, target=, args=, kwargs=, *, daemon=None) : 새로운 스레드를 생성한다. 각각의 파라미터의 의미는 다음과 같다.
    • group= : 아직 구현되지 않은 기능이며, 추후 사용을 위해 정의만 되어 있다.
    • target= : 워커스레드에서 디스패치할 함수
    • args, kwargs : 각각 튜플, 사전으로 target 함수에 넘겨질 인자값들
    • daemon : 해당 스레드가 데몬[^데몬]으로 실행될지의 여부.
  • start() : 스레드의 실행을 시작한다. 내부적으로 이 메소드가 호출되면 자신의 run() 메소드를 호출한다.
  • run() : 스레드가 실제로 수행할 작업. 기본적으로 target 속성의 함수를 별도 스레드에서 호출한다. 많은 예제가 실제 Thread 클래스를 서브 클래싱하면서 이 메소드를 오버라이드하는데, 클래스의 디자인 상, 서브클래싱할 이유가 거의 없는 것으로 보인다.
  • join() : 해당 스레드가 끝날 때까지 기다린다. 만약 데드락을 유발할 상황에서 조인하거나, 시작하지 않은 스레드에 대해 조인하는 것은 런타임 에러를 발생시킨다.
  • name : 스레드의 이름 속성
  • daemon : 데몬 속성. start()가 호출되기 전에 변경되어야 한다.
  • threading.locals() : 스레드가 다른 스레드와 공유하지 않고 자신의 스코프 내에서만 사용할 변수를 여기에 정의할 수 있다.

간단한 몇 가지 예를 들어보자.  먼저 간단한 분산처리의 예이다. 몇 개 URL을 주고 그에 대해서 동시에 접속하여 그 콘텐츠를 파일에 저장하도록 한다. 개별작업은 스레드로 처리된다. http 접근 자체는 CPU 집약적인 작업이 아니기 때문에 실제로 이 동작은 ‘동시에 진행되는 것으로’보이며, 실제로도 응답이 빠르고 용량이 작은 데이터를 먼저 얻어와서 저장할 수 있다.

from threading import Thread
from urllib.request import urlopen

urls = ('https://..', ...)

def download_contents(url, filename):
  '''주어진 URL을 다운로드 받고, 그 내용을 파일로 기록한다.'''
  res = urlopen(url)
  if res.code != 200:
    return
  with open(filename, 'wb') as f:
    f.write(res.read())

def main():
  threads = []
  for url in urls:
    filename = url.rsplit('/', 1)[1]
    t = Thread(target=download_contents, args=(url, filename))
    threads.append(t)
  ## 각각의 스레드를 시작하고, 조인합니다.
  for t in threads:
    t.start()
  for t in threads:
    t.join()


if __name__ == '__main__':
  main()

스레드를 먼저 만든 후에 이를 리스트에 추가해놓고, 다시 리스트를 순회하면서 각 스레드를 시작하고 다시 리스트를 순회하면서 join()한다. join을 하지 않으면 스레드를 시작해놓은 후의 메인 스레드는 곧 종료점에 도착해서 끝나게 되고, 이는 프로세스의 종료를 의미하므로 개별 워커 스레드의 작업이 처리되는 시간을 확보할 수 없다.

이처리는 단적인 병렬처리의 예이므로, 이전에 포스팅했던 concurrent.futures를 이용하는 것이 훨씬 깔끔하고 간편하게 처리될 수 있다. Future의 리스트로 맵핑한 후에 리스트 자체를 wait 하면 된다.

def main():
  with ThreadPoolExecutor() as pool:
    fs = [poo.submit(download_content, url, url.rsplit('/', 1)[1]) for url in urls]
    pool.wait(fs)

실제로 스레드API를 직접 사용해야 하는 부분은 이제 데몬을 작성하거나, 실제로 메인 스레드와 별개의 작업을 처리하는 워커 스레드를 만들 때라고 할 수 있다. 가장 흔히 접할 수 있는 예는 소켓 프로그래밍에서이다.  소켓 프로그래밍에서 서버를 작성할 때 소켓을 초기화하고 사용하는데, 실제로 접속이 들어오는 시점에는 별도의 소켓 객체가 만들어진다. 이를 해당 루프에서 처리하는 것이 아니라 별도의 접속 처리함수를 이용해서 스레드로 분기시킨다.

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', 7777))
while True:
  sock.listen(1)
  conn, addr = sock.accept()  ## conn은 접속이 체결된 소켓 객체이다.
  t = Thread(target=manage_connection, args=(conn, addr))
  t.start()
...

이렇게하면 접속이 들어온 직후, 개별 접속을 처리할 manage_connection 함수가 해당 접속 소켓을 가지고 별도 스레드에서 시작된다. 메인 스레드는 접속이 별개의 스레드에서 처리되는동안 다시 while 문으로 돌아가서 새로운 접속을 listen 할 수 있다. 말 그대로 동시 접속을 허용하는 서버가 만들어진다. (이 포스트의 다음 페이지에 해당 소스를 올려 두겠다.)

스레드별 로컬 네임스페이스

많은 오래된 예제들에서는 Thread 클래스를 서브클래싱해서 별도의 클래스를 만들어서 시작하는 경우를 많이 소개하는데, 이렇게 서브클래싱해야 하는 경우는 스레드별로 별도의 자원을 관리해야 하는 경우외에는 거의 없다고 볼 수 있다. 스레드는 하나의 프로세스 하위의 별개의 실행 흐름이며, 그외의 메모리 공간은 모두 공유하게 된다. 따라서 이름이 같으면서, 각 스레드별로 분리된 변수를 액세스하기 위해서 아예 해당 변수를 스레드 객체의 인스턴스 속성을 만들기 위해 이런 식으로 처리하는 케이스들은 있을 수 있다. (하지만 내가 봐 왔던 거의 대부분의 스레드 관련 예제는 이런 케이스도 아니고, 그냥 습관적으로 서브 클래싱부터 하고 보는 경우들이 대부분도 아니고 전부였다.)

파이썬은 뼛속까지 객체 지향 언어가 맞기는 하지만, 많은 언어의 디자인이 “클래스를 만들어 쓰는 것보다는 더 나은 어떤 방법이 있다”는 것을 암시한다. 여튼, 스레드별로 독립된 로컬 변수를 만들 기위해서는 서브클래싱 대신에 threading.local을 쓴다.

my_data = thrading.local()
my_data.x = 1  ## my_data는 해당 스레드의 고유한 로컬 변수 네임스페이스가 된다. 

동기화 수단들

앞서 스레드는 단일 프로세스 내의 독립된 실행 흐름이며, 메모리 영역은 프로세스 내 모든 스레드가 동시에 사용한다고 하였다. 이는 필연적으로 자원경쟁이라는 상황을 만들게 된다. 즉 실행중인 프로세스 내의 모든 스레드는 기술적으로는 특정한 함수들이 동시에 실행되고 있는 상황이라 볼 수 있다. 각각의 실행 시점에서 메모리는 공유되기 때문에 같은 이름을 여러 스레드가 동시에 액세스하려는 경우, 문제가 발생할 수 있다. 예를 들어 하나의 스트림에 두 개의 스레드가 동시에 값을 쓰려고 한다거나, 어떤 변수에 대해서 한 스레드는 그것을 변경하려 하고, 다른 스레드는 같은 시점에 그 값을 읽으려고 한다면 정상적으로 동작하지 못할 수 있다는 말이다.

이렇게 특정한 자원을 서로 점유하려는 문제를 해결하기 위해서 스레드는 ‘동기화’라는 기법을 적용한다. 동기화란, 두 개 이상의 스레드가 특정 자원 액세스 시점에 한정하여 마치 하나의 스레드 내의 두 개 구문처럼 순차적으로 실행되도록하는 것을 의미한다. (그리고 그로부터 파생할 수 있는 여러 기법을 말한다.)

  • 락 / R락: 말 그대로 자물쇠에 해당한다. 특정 작업을 사용하려할 때 스레드들은 자물쇠에 접근한다. 최초로 자물쇠를 획득한 스레드는 자물쇠를 잠그고 실행하며, 같은 시점에 잠겨있는 자물쇠를 사용하려는 스레드는 모두 자신의 차례에서 자물쇠가 열릴 때까지 기다려야 한다.
  • 컨디션 : 조건부 락(conditional lock)이라 할 수 있다. 락과는 반대로 동작하는데, 조건부 락을 건 스레드는 일단 중지한다. 다른 스레드에서 해당 조건값의 상태(프로그래머가 원하는 상태)에 따라서 notify()notify_all()을 호출하여 조건부락을 해제해주면 해당 스레드가 실행되는 형식이다.
  • 세마포어 : 일반 락이 최대 하나의 스레드만 소유하는 것을 허용하는데 비해, 세마포어는 동시에 사용가능한 수가 제한된 어떤 리소스에 대한 선착순 락이다.
  • 이벤트 : 가장 기본적인 스레드 동기화 기법으로 한 스레드는 특정한 이벤트를 기다리고, 다른 스레드는 해당 이벤트에 대한 시그널을 보낸다. 시그널이 보내지면 중단되었던 스레드가 진행하는 방식이다.
  • 타이머 : 특정 딜레이 후에 시작되는 스레드
  • 베리어 : 특정한 카운트 값만큼 스레드들을 막아두었다가 동시에 시작하는 기법. 베리어에 지정된 개수만큼의 스레드가 wait()를 호출하면 베리어가 해제되고 이를 기다리던 모든 스레드가 동시에 시작한다.

락은 자물쇠처럼 이를 선점한 스레드가 락을 획득하면, 자물쇠가 잠긴다. 이후에 접근하는 스레드들은 락이 열릴 때까지 그 앞에서 멈춰기다렸다가, 락을 선점한 스레드가 락을 풀어주면 차례로 획득 > 해제를 반복하면서 순차적으로 처리되는 모양을 만들게 된다. 가장 명료한 비유는 칸이 하나 밖에 없는 화장실을 생각하면 된다.

락을 사용하기 위해서는 스레드들의 외부, 대충 메인스레드 쯤에서 락 인스턴스를 생성한다. 그리고 특정한 작업에 들어가는 경우에 스레드는 해당 락 객체에 acquire() 메소드를 호출한다. 잠겨있지 않은 락의 메소드는, 상태를 변경하면서 즉시 리턴한다. 하지만 만약 락이 잠겨있는 상태라면 이 메소드는 계속 해당 메소드를 블럭킹할 것이다. 따라서 가장 먼저 접근한 하나의 스레드가 볼일을 다 보고 release()를 호출해주고 나가야 다음 스레드가 acquire()를 리턴받고 이후 볼일(?)을 보게 되는 구조이다.

## main thread
counter_lock = threading.Lock()
counter = 0
...

## worker thread A
counter_lock .acquire()
counter += 1
lock.release()

## worker thread B
counter_lock .acquire()
print(counter)
lock.release()
  1. 락은 어느 시점이든 미리 생성해 놓기만하면 된다.
  2. 스레드A는 변수 counter를 변경하려한다. 다른 어떤 스레드가 counter를 동시에 변경하려할지 모르기 때문에 lock을 걸고 이를 처리한 후 락을 풀어준다. 모든 워커스레드가 같은 코드를 공유한다고 하면 모든 스레드는 counter 값을 액세스하는 시점에 락을 경유하기 때문에 문제가 생기지 않을 것이다.
  3. 스레드B는 반대로 그냥 counter 값을 읽기만 한다. 하지만 역시 읽는 중간에 값이 변경되면서 깨진 값을 참조할 가능성이 있기 때문에 안전한 참조를 위해서는 락을 걸어준다.

RLock

락은 기본적으로 한 스레드에서 잠금과 해제 사이의 구간을 처리할 때, 같은 락을 열고 들어와야 하는 다른 스레드의 간섭을 배제하는 용도로 쓴다고 했다. 그런데 이 과정이 특별한 언어적인 장치가 아니라 API 콜에 의존하고 있다는 문제가 있다. 무슨 말인고 하니 어떤 함수의 도입부에서 락을 걸고 리턴하기 직전에 락을 해제하는데, 이 함수가 재귀호출을 하는 경우가 있을 수 있다는 말이다.

이 경우에 첫번째 재귀 호출에서 앞서 걸린 락 때문에 스레드가 블럭되고, 영영 해당 락을 해제하지 못하는 문제가 생긴다. RLock은 이 문제를 해결하기 위한 Lock의 변종이다. RLock은 이미 잠긴 상태에서도 자신을 잠그려는 스레드가 이미 자신을 잠근 스레드라면 잠김 수를 1올리면서 즉시 리턴하는 락이다.

RLock의 해제는 잠김 수를 1 내리는 것으로 잠근 횟수만큼 다시 해제하여 잠김수를 0으로 만들어야지 다른 스레드가 이를 잠글 수 있게 된다.

컨텍스트 매니저

락외에 여러가지 스레드 동기화 장치들은 컨텍스트 매니저 프로토콜을 따른다. 따라서 acquire ~ release를 쓰기 보다는 with 문을 통해서 간단히 처리할 수 있다.  위 예에서의 카운터를 1올리는 코드는 아래와 같이 예쁘게 쓸 수 있다.

with counterLock:
  counter += 1

컨텍스트 매니저의 강점이, 단순히 매번 release()해야하는 귀찮음을 덜어줄 뿐 아니라, 락을 잠근 상태에서 발생하는 예외에 대해서도 대비한다. 예외가 발생하여 흐름이 깨지면, with 문을 강제로 나가야 하는데, with 문은 이 때에도 구문을 나갈 때 처리하기로 약속한 동작이 실행되는 것을 보장한다.

세마포어

세마포어는 락과 유사한 개념으로, 칸이 1개 이상인 화장실을 의미한다. 즉 세마포어는 내부적으로 사용량에 대한 카운터를 가지고 있고, 스레드가 접근할 때마다 이 값을 1씩 내리게 된다. 만약 이 값이 0이되면 스레드의 접근은 차단되고, 이미 접근한 스레드가 릴리즈하는 것을 기다린다. 이는 마치 동시에 사용할 수 있는 핸들의 수가 정해진 리소스에 대한 자원 경쟁을 정리하는 장치라 볼 수 있다.

pool_sema = threading.Semaphore(3)  ## 카운터가 3인 세마포어 생성
...

## 아래 코드는 여러 스레드에 대해서 최대 동시 3개의 주소에 대해
## URL의 콘텐츠를 다운로드하도록 하는 코드이다.
with pool_sema:  ## <-- 4개 이상 접근시 4번째부터는 여기서 기다린다.
  res = urlopen(url)
  with open(filename, 'wb') as f:
    f.write(res.read())

역시 코드상으로는 락과 아무런 차이가 없다. 실제 어떻게 동작하는가..라는 것을 실증으로 보이기는 제법 어려우니, 앞서 언급한 화장실의 비유를 사용하자. 각각의 스레드는 3칸짜리 화장실로 가서 각 칸에 들어가서 다운로드 작업을 진행한다. 칸이 가득차면 다른 스레드들은 발을 동동 굴리며 기다려야 하고, (대체로) 먼저 온 순서대로 먼저 빠져나가는 칸에 들어가게 된다.

컨디션

컨디션은 많이 쓰이는 동기화 도구인데, 많이 쓰인다는 것은 그만큼 유용하다는 의미이다. 락이나 세마포어는 간단히 화장실에 비유했는데, 컨디션은 그 내부에 락을 가지고 있는 조건절이라고 해야 하나? 굳이 비유를 하자면, 1회용 변기를 생산할 수 있는 설비를 갖춘 화장실이다.

차라리 보다 실제적인 예를 들어보자. 어떤 리스트가 하나 있다. 스레드 A는 뭔가를 하다가 쓸만한 데이터를 발견하면 리스트에 그 데이터를 추가한다. 다른 스레드 B는 그 리스트를 들여다보다가 스레드 A가 추가해놓은 데이터가 있으면 낼름 가져가다 사용한다.

스레드 B의 동작은 일종의 왓치독 같은 거라서 흔히 time.sleep()while 문을 조합해서 만들 수 있다. 물론 반응이 빠릿하려면 슬립 주기가 그만큼 짧아야 하고, 또 쓸데없이 리소스를 낭비하는 셈이된다.

그런데 바로 이 패턴이 컨디션이 쓰이기 딱 좋은 부분이다. 컨디션은 락과 같이 ‘선점’을 따지는 도구가 아니다. 다만 특정 조건이 되면 풀리는 락이라고 보면 된다. 하지만 그 특정 조건이라는 것을 락 스스로가 판단하도록 디자인하는 것은 극히 어려우니, 또 다른 제 3의 스레드가 그 판단을 대신해준다.

cv = threaing.Condition() ## RLock이 하나 자동으로 생성된다.
aList = [...]
## 데이터를 갖다 쓰는 스레드B
with cv:
  while not aList:
    cv.wait() ## 1
  p = aList.pop(0)
  ... # process p


## 데이터를 공급하는 스레드A
with cv:
  while True:
    ... # 데이터를 수집
    if data:
      aList.append(data)
      cv.notify()  ## 2

위 코드는 간단한 큐를 사용한 스레드간 데이터 교환을 보여준다.

  1. 데이터를 갖다 쓰는 스레드 B는 aList의 원소가 없으면 cv.wait()를 호출하여 컨디션(여기서는 리스트에 새 값이 있음)을 만족할 때까지 블럭한다.
  2. 데이터를 발굴하는(?) 스레드 A는 새로운 쓸만한 데이터를 찾으면 aList에 추가한 후 notify()를 호출해서, 해당 조건을 기다리는 다른 스레드에게 조건이 “변경되었음”을 알려준다.
  3. 스레드 B는 조건이 변경되었으니 다시 체크해야 한다. 따라서 not aList를 다시 판단한 후 while 루프를 탈출하고 이후처리를 계속하게 된다.

참고로 위 코드는 컨디션의 기본적인 사용원리를 설명하기 위한 예이며, 실제로는 wait_for()를 사용하면 된다.

이벤트/타이머/배리어

이벤트는 가장 기본적인 스레드간 동기화 수단으로 특정한 이벤트를 생성하고 이를 기다리는 스레드가 있을 때, 다른 스레드가 해당 이벤트에 대해서 .set()을 호출하여 시그널을 보내면 기다리던 스레드의 wait()이 리턴되면서 스레드가 재개되는 매커니즘이다.

배리어는 일종의 묻지마 버스로, 카운트를 정해놓은 이벤트이다. 외부에서 set()을 보낼 필요 없이, wait()가 호출된 개수가 카운트에 도달하면 모든 스레드가 동시에 출발한다.

타이머는 주어진 딜레이 후에 시작되는 Thread의 서브 클래스로, start()한 이후 주어진 딜레이타임 내에 cancel()이 호출되지 않으면 타깃 함수를 실행하게 된다.

정리

이상으로 스레드 사용에 필요한 여러 클래스와 API들을 살펴보았다. 흔히들 파이썬의 스레드는 Global Interpreter Lock이라는 제약때문에 써봐야 아무런 효과가 없는 것으로 오해하기 쉬운데, 일련의 데이터를 병렬적으로 분산처리하여 취합하는 패턴이 아닌 non-block의 형태로 워커 스레드들을 돌리는 패턴은 서버나 데몬 혹은 GUI를 사용하는 앱에서는 꼭 필요한 부분이니 어떻게 사용하는지 정도는 알아둘 필요가 있겠다. 또한 Thread 클래스를 포함한 여러 기본제공 API들은 그 자체로도 충분한 사용성을 보여주고 있으니, 괜히 불필요한 서브클래싱없이 잘 활용할 수 있도록 연습해 두도록 하자.