스레드를 이용한 데몬 만들기 – Python

이 블로그를 통해서 파이썬에서의 병렬처리에 대해서는 명시적으로 threading.Thread 대신에 concurrent.futures 에서 제공하는 API를 사용할 것을 여러 차례 권장해 왔다. 여기서 주목할 것은 바로 “병렬처리”라는 조건이다. 즉 concurrent.futures의 API는 일련의 데이터에 대해서 동일한 처리를 하려할 때, 이 “동일한 처리”를 여러 스레드 혹은 프로세스로 나눠서 동시에 진행하는 상황에 어울리는 기능이다. 하지만 실제 상황에서는 동시에 서로 다른 작업이 진행되어야 하는 경우가 존재한다.주로 메인 스레드와 백그라운드 스레드 (혹은 작업 스레드)에서 하는 일이 서로 다른 경우에 이러한 패턴이 필요할 수 있다.

이번 글에서는 파이썬의 threading 모듈을 사용해서 멀티스레드 프로그램을 어떻게 쉽게 작성하는지에 대해서 예제와 같이 살펴보도록 하겠다.

먼저 어떤 파일을 하나 열어서 한줄씩 읽는 프로그램을 생각해보자. 보통 파일을 열고 file.readline()을 사용했을 때, 파일의 마지막 라인을 읽은 후라면 None이 리턴되며 더 이상 내용을 읽지 않는다. 하지만 우리 컴퓨터에는 로그 파일과 같이 지속적으로 뭔가 내용이 계속 늘어나는 파일들이 있다. 따라서 파일을 열어 놓고 특정 시간 주기마다 파일로부터 한줄씩 읽어서 그 내용을 처리하는 프로그램을 작성한다고 생각해보자.

import time

def process_line(l):
  print(l)

with open('message.log') as f:
  while True:
    line = f.readline()
    if not line:
      time.sleep(0.5)
      continue
    process_line(line)

위 코드는 이렇게 동작한다.

  1. 파일 하나를 읽기 모드로 열었다.
  2. 한 줄씩 읽어서 process_line() 함수로 보내어 처리한다.
  3. 만약 마지막 줄을 읽은 후 더 읽을 내용이 없다면 0.5초 후에 재시도한다.

이 프로그램은 분명 무한루프를 돌게되지만, CPU 사용량이 치솟거나 하지는 않는다. 0.5초의 대기시간은 CPU 입장에서 봤을 때에는 무척이나 길고 지루한 시간이다. (쿨타임을 0.1초로 잡아도 마찬가지다.)

다음과 같이 빈 파일을 만든 후, 파이썬 코드를 백그라운드에서 실행해놓자. 이 상태에서 파일에 메시지를 추가하면 곧 추가된 라인들이 출력되는 것을 알 수 있다. (백그라운드 실행은 윈도 명령줄에서는 지원되지 않는다. 윈도에서 따라해본다면, 터미널 창을 2개 열어서 한쪽은 파이썬을 실행하고 다른 한쪽은 echo 명령으로 파일에 라인을 추가해보면 된다.)

$ touch message.log
$ python sample.py&
$ echo "hello" >> message.log

자 이 프로그램의 처리흐름을 다시 살펴보도록하자.  먼저 프로그램은 기본적으로 지정된 파일을 열어서 한 줄씩 읽고 그 내용을, 내용 처리를 담당하는 별도의 함수 (여기서는 데이터를 다루는 역할을 하고 있으니 ‘핸들러’라고 하자)로 넘겨준다. 핸들러 함수의 동작이 끝나면 다시 그 다음 줄을 읽는 식으로 루프를 도는데, 더 이상 읽어들일 내용이 없으면 지정된 시간(0.5초)만큼 기다린 후에 재시도 한다.

이 때 핸들러 함수의 실행에 필요한 시간이 매우 짧다면 문제가 되지 않겠지만 항상 그러하지는 못할 것이다. 하지만 텍스트 파일의 내용이 특정한 URL이고 핸들러가 해당 URL의 데이터를 다운로드 받는다면 어떨까? 위 예제의 구조에서는 핸들러는 블럭킹 함수로 동작하므로 현재 데이터를 처리하는 동안에는 추가된 데이터가 계속 남아있으리라고는 보장할 수 없다. 왜냐하면 해당 파일은 계속 내용이 추가되기 보다는 별도의 에디터에서 추가되었던 내용이 삭제처리 될 수도 있기 때문이다. 다음 코드와 그 실행 시나리오를 생각해보자.

from urllib.request import rulopen
import time

def handler(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(urlopen(url))

with open('inputs.txt') as f:
  while True:
    line = f.readline()
    if not line:
       time.sleep(0.1)
       continue
    handler(*(line.split()[:2]))

위에서 소개한 코드와 완전히 동일한데, 한가지 차이는 핸들러가 URL요청과 디스크IO를 수행하는 제법 시간이 오래걸릴 수 있는 작업이라는 점에서 차이가 있다. inputs.txt의 파일의 내용이 실시간으로 편집되고 있고, 네트워크 사정이 그리 좋지 못하다면 다음의 시나리오가 가능해질 수 있다.

  1. 파일에 데이터 A, B, C가 추가됨
  2. 프로그램은 데이터 A를 처리함 (그리고 처리되는 동안 블럭됨)
  3. 그 사이에 외부 프로그램에 의해서 데이터 B, C가 제거됨
  4. A를 처리하고 난 후 파일을 읽으려 시도하면 더 이상 남은 데이터가 없거나 세로운 데이터 D, E 등을 처리하게 됨

이 시나리오에서는 핸들러가 블럭킹으로 동작하는 동안 이후에 추가된 데이터를 확보하지 못한다는 문제가 있다. 이를 해결하기 위해서는 핸들러가 논블럭킹(non-blocking)으로 동작해야 할 필요가 있다. 즉 핸들러 함수를 메인 스레드가 아닌 별개의 스레드에서 실행하게하면 핸들러를 호출하는 구문은 즉시 다름 라인으로 진행이 가능해진다.

threading 모듈

threading 모듈은 파이썬에서 스레드를 사용할 수 있게 해주는 API를 담고 있다. 스레드를 위한 클래스인 Thread와 여러 가지 동기화 도구인 Lock, RLock, Condition, Semaphore 등이 지원된다. 스레드를 사용하는 방법에 대해서 오랜된 문서나 책에서는 Thread 클래스를 오버라이드하여 사용하는 방법을 소개하고 있는데, 스레드를 통한 분기가 함수의 실행단위로 쓰여진다면 굳이 번거롭게 오버라이드를 할 필요는 없다.

Thread 객체를 생성할 때 target= 이라는 인자가 있다. 이 인자로 백그라운드 스레드에서 실행할 함수를 넘겨주면 해당 함수가 별도의 스레드에서 실행될 수 있다. 위의 ‘파일 처리’ 코드를 스레드를 사용하는 방식으로 수정해보면 다음과 같이 작성될 수 있다. 시나리오를 잠시 수정해보자면, 파일에는 한 줄에 공백으로 구분되는 URL과 파일명이 들어온다. 우리의 프로그램은 각 라인을 읽어서 주어진 URL의 데이터를 주어진 파일 이름으로 저장하는 동작을 처리할 것이다.

from threading import Thread
from urllib.request import urlopen
import time

## 1
def hanlder(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(res.read())

## 2
with open('inputs.txt') as f:
  f.seek(2) ## 3
  while True:
    line = f.readline()
    if not line: ## 4
      time.sleep(0.1)
      continue
    data = line.split()[:2] 
    t = Thread(target=handler, args=data) ## 5
    t.start() ## 6

그다지 복잡한 코드는 아니다. 코드는 다음과 같이 설명된다.

  1. handler 함수는 URL과 파일이름을 인자로 받아서 주어진 URL의 콘텐츠를 다운로드 받아 해당 파일 이름으로 저장한다.
  2. 파일의 내용을 조사하는 루프를 시작한다.
  3. 현재 시점까지의 파일의 내용은 무시해야 할 것이므로 seek(2)를 호출해서 파일 스트림의 끝으로 커서를 옮긴다.
  4. 새로운 라인이 없으면 0.1초를 대기한다.
  5. 새로운 스레드에서 핸들러 함수를 실행하도록 설정한다.
  6. 만들어진 스레드를 실행한다.

과도한 스레드 생성을 피하는 법

위 코드는 다시 새로운 문제를 야기한다. 스레드를 만든 후 시작하는 t.start() 부분은 논블럭킹이기 때문에 호출하게 되면 새로운 스레드에서 네트워크 요청을 생성하는 일련의 작업을 처리하는데, 이 작업의 완료여부와 별개로 스레드가 시작되면 t.start()는 그 즉시 리턴된다. 따라서 inputs.txt 파일에 새로운 입력이 수십~수백개가 쏟아져 들어갔다면 이 프로그램은 그만큼 많은 스레드를 생성하면서 전체적으로 매우 느려지게 될 것이다.

이 상황에서 어쩌면 누군가는 세마포어같은 동기화 도구를 생각할지도 모르겠다. 혹은 스레드 풀을 구현하여 한 번에 동작하는 스레드의 수를 제한할 수도 있을 것이다. 하지만 이때 가장 중요한 것은 ‘입력으로 들어온 데이터를 나중에 처리하더라도 잃지는 말자’는 것이다. 따라서 가장 간단히 처리하는 방법은 큐를 사용하는 것이다.

  1. 파일로부터 데이터를 입력받는 무한루프는 계속 파일을 체크하여, 추가로 입력된 데이터가 있으면 큐에 넣는다.
  2. 큐를 watching하는 무한루프는 큐에 들어온 데이터를 순서대로 꺼내어 하나씩 처리한다.

여기서 중요한 점은 두 개의 무한루프가 독립적으로 돌아간다는 것이다. 이를 위해서는 둘 중의 하나의 무한루프는 별도의 스레드에서 돌릴 필요가 있다. 그리고 두 스레드 사이의 데이터 교환은 동기화된 상태여야 한다. (한쪽에서 큐에서 데이터를 빼는 동시에 데이터를 집어넣어서는 곤란하다.) 스레드 사이의 동기화된 데이터 교환은 queue 모듈 내의 Queue를 사용하면 된다.

또한 별도 스레드에서의 동작이 무한루프가 되어야 하므로 해당 스레드는 ‘데몬’으로서 동작해야 한다. 데몬은 그 수명이 메인 스레드에 의존적인 스레드로, 그 자신이 무한루프를 돌고있다 하더라도 메인스레드가 종료되면 (즉 프로세스가 종료되면) 자동으로 종료되면서 리소스를 반납한다. 스레드를 데몬으로 만들고 싶다면 Thread 를 생성할 때 daemon=True 옵션을 지정하면 된다.

위 구조에서 어떤 것이 메인 스레드가 되고 어떤 것이 데몬이 될지는 본인의 판단에 달렸다. 여기서는 데이터 핸들링을 메인스레드에서 하고 데이터 마이닝을 데몬에서 수행하도록 구조를 만들어 보도록 하자.

아래는 최종 코드이며, 이 프로그램은 2개의 스레드를 써서 파일 내용을 감시하고, 추가 입력된 데이터를 처리한다.

from threading import Thread
from queue import Queue
from urllib.request import urlopen
from time import sleep

queue = Queue()

def daemon(filename):
  with open(filename) as f:
    f.seek(2)
    while True:
      line = f.readline()
      if not line:
        sleep(0.5)
        continue
      data = line.split()[:2]
      queue.put(data)

def mainLoop():
   while True:
     url, filename = queue.get()
     with urlopen(url) as res:
       with open(filenamem, 'wb') as f:
         f.write(res.read())

if __name__ == '__main__':
   t = Thread(target=daemon, args=('inputs.txt',), daemon=True)
   t.start()
   mainLoop()

정리

프로그램 내에서 두 개 이상의 무한루프가 독립적으로 동작하게 하려면 각각의 루프를 서로 다른 스레드에서 돌려야 한다. 이 때 프로세스 종료 시 작업 스레드들을 안전하게 종료하기 위해서는 이들 스레드는 생성 시 데몬으로 생성한다. 또한 보통 데몬을 사용하는 경우에는 한쪽 스레드에서 수집한 데이터를 다른 스레드(들)에게 넘겨주어야 할텐데, 이 때는 동기화된 큐를 쓰는 것이 권장되며 이 방법이 가장 간편하기도 하다.

더보기

threading 모듈을 통해서 멀티스레드를 사용하는 대표적인 예는 소켓통신의 다중접속 구현일 것이다. 소켓으로부터 접속을 받아들이는 코드는 보통 다음과 같은 모양이다.

sock = ... # 소켓 초기화 및 설정
sock.listen(host, port)
while True:
  conn, client = sock.accept() # 1
  ## 2
  recivedData = conn.recv(1024)
  ## ... 데이터를 처리하고 응답을 보내기

위 코드 중에서 sock.accept()에 의해서 리턴되는 값 중 첫번째 인자는 연결을 표현하는 소켓객체이다.  이 단일 루프 구조는 소켓이 연결을 기다리고, 연결을 받아들이면 해당 연결로부터 송수신을 처리하고 다시 루프의 시작으로 돌아가서 새로운 연결을 기다린다. 즉 한 번에 하나의 연결만 허용하는 구조가 된다. 하지만 이것은 소켓의 구조적인 한계라기보다는 그냥 코드가 선형으로 루프를 돌기 때문이다. 다음와 같이 처리한다면 어떨까?

while True:
  conn, client = sock.accept()
  t = Thread(target=handler, args=(conn, client), daemon=True)
  t.start()

위 코드에서는 서버 소켓이 연결을 받을 때마다 접속 소켓과 접속 정보를 핸들러에게 넘기고, 핸들러는 새로운 스레드에서 돌아가게 된다. 그리고 그 즉시 메인 루프는 다시 처음으로 돌아가서 새로운 연결을 기다릴 수 있다. 이 말은 방금 전에 접속한 소켓의 통신을 별도 스레드에서 처리하도록 하고 (소켓 통신을 주고/받기의 반복으로 돌아가는 일종의 무한 루프(최소한 끝이 정해지지 않은)라 볼 수 있으므로) 개별 접속 소켓의 처리를 데몬으로 만들어주는 것이다.