작업 큐를 구현해보자 – Python

어떤 함수와 인자값을 전달해서 백그라운드에서 처리되도록 하려면 스레드를 사용해서 작업할 수 있다. 스레드를 사용하면 동시에 여러 개의 작업을 진행시키는 것도 가능하다. 문제는 데이터가 단시간에 다량으로 들어오는 경우인데, 여러 스레드가 동시에 돌아가는 경우 실질적으로는 모두 동시에 돌아가는 것이 아니라 CPU가 “매우 빠른 속도로” 각 스레드 사이의 작업을 전환해가면서 처리하게 된다. 따라서 스레드가 너무 많아지게 되면 스레드 사이의 스위칭에 많은 비용이 들어갈 뿐만 아니라 프로세스 자체가 불안정하게 된다.

한정된 스레드 개수만 사용하여 더 많은 데이터를 다루기 위해서는 스레드 풀이라고도 불리는 작업 큐를 사용하는 방법이 있다.  기본적인 아이디어는 데몬 구현을 설명한 포스트에서 썼던 방법과 비슷하다. 이전 글에서는 큐 하나를 준비해서 데이터를 큐에 쌓아두고, 작업 스레드에서 해당 큐에서 순차적으로 하나씩 데이터를 추출하여 순서대로 처리할 수 있었다.

작업 큐는 이 아이디어를 확장하여, 1) 큐에는 데이터가 아닌 실행하게될 작업(여기서는 스레드로 만들어진)이 쌓이게 된다. 2) 한 번에 하나씩 처리하는 것이 아니라 임의의 동시 작업 개수까지는 동시에 실행하도록 한다.

이러한 아이디어는 큐와 세마포어를 조합하여 구현할 수 있다.  작업 큐의 실행 스레드에서 세마포어를 생성하고, 큐에 들어있는 스레드를 시작하기 전에 세마포어를 요청한다. 이후 가용한 스레드의 개수를 다 사용하고 나면, 새로 추출된 작업은 dispatch되지 못하고 기다리게 된다. 이전에 시작된 스레드 중에서 가장 먼저 끝난 스레드에서 세마포어를 해제하면, 큐의 맨 앞의 작업이 시작되는 방식이다.

Thread 오버라이딩

여기서 중요한 부분은 큐로부터 시작된 스레드 작업이 실행을 마무리할 때, 세마포어를 해제해야 한다는 것이다. 즉 주어진 작업 함수를 데코레이팅하는 식으로 세마포어 해제코드를 삽입하고 데코레이팅된 함수를 가지고 스레드를 만들어야 한다. 이를 위해서 다음과 같이 Thread의 서브 클래스를 작성한다. (아니면, 함수를 만들어도 된다.)

import threading
import queue

class _Operation(threading.Thread):
  def __init__(self, sem, *args, **kwds):
    self.sem = sem
    self.method = kwds.pop('target') 
    super().__init__(target=self.wrappedTarget, args=args, kwargs=kwds, daemon=True)
  
  def wrappedTarget(self, *args, **kwds):
    self.method()
    if isinstance(self.sem, threading.Semaphore):
      self.sem.release()

Thread 객체는 생성될 때 target= 인자에 의해 다른 스레드에서 실행될 함수를 받게 되고, start()를 호출하면 새로운 스레드를 만들고 여기서 이 target 속성의 함수를 실행한다. 이 구조를 거꾸로 이용해서 이 target을 다른 이름의 속성으로 바인딩하고, self.method()를 호출한 후 해당 함수의 실행이 종료되면 세마포어를 릴리즈하는 동작을 수행하는 메소드 wrappedTarget을 추가한다. 그리고 수퍼클래스의 초기화메소드를 호출할 때, 주어진 method가 아닌 wrappedTarget을 target 인자로 넘겨버리는 것이다.

(이 구조를 그대로 이용하면 스레드 작업 종료 후 콜백을 호출하도록 만들 수도 있을 것이다.)

작업 큐 만들기

이번에는 작업 큐를 작성해 볼 차례이다. 작업 큐는 동시 실행 가능한 작업의 개수를 인자로 받아서 생성될 수 있으며, 내부에 세마포어 하나와 큐 객체 하나를 가지게 된다. 큐는 세마포어를 통해서 실행을 제어하게 되므로 메인스레드에서 동작하지 않고 별도의 스레드에서 데몬처럼 동작한다. 즉,

  • 별도의 제어 스레드를 만들고 큐의 작업 루프를 여기서 돌아가게 한다.
  • 메인 스레드에서는 만들어진 큐에 작업을 추가할 수 있다.
  • 큐의 작업 루프에서는 순서대로 가용한 만큼의 개별 작업 스레드를 시작한다.

사실 가장 복잡한 부분은 작업 클래스 만드는 부분이었으므로, 이후는 간단하다.

class OperationQueue:
  def __init__(self, numberOfConcurrentTask=1):
    self.queue = queue.Queue()
    self.sem = threading.Semaphore(numberOfConcurrentTask)

  ## 함수와 인자를 받아서 큐에 추가한다.
  def add(self, method, *args, **kwds):
    task = _Operation(self.sem, method, *args, **kwds)
    self.queue.put(task)

  ## 작업 루프
  def mainloop(self):
    while True:
      t = self.queue.get()
      self.sem.acquire()
      t.start()

  ## 루프를 돌리는 명령
  def start(self, run_async=False):
    t = threading.Thread(target=self.mainloop, daemon=True)
    t.start()
    if not run_async: # 옵션값에 따라 큐의 실행을 블럭킹으로 한다.
      t.join()

기본적인 구현은 위와 같다.  Queue는 기본적으로 큐가 비어있는 동안 블럭되었다가, 다른 스레드에서 큐에 데이터를 밀어넣으면 이를 리턴할 수 있다. 큐에는 Thread 인스턴스가 들어있는데, 이를 시작하기 전에 세마포어를 획득하는 식으로 최대 동시 작업 허용 수를 초과하지 못하도록 한다.

작업 큐 사용 방법

상황에 따라서는 위 코드를 조금 수정해서 작업 객체의 목록을 초기화시에 전달 받은 후에, 작업 큐의 모든 작업이 종료되면 작업 루프를 자동으로 종료하도록 만들 수도 있을 것이다. 이렇게 만들어진 작업 큐는 다음과 같이 사용할 수 있다.

def foo(n):
  for i in range(n):
    print(i)
    time.sleep(0.25)

q = OperationQueue(3) # 동시에 최대 3개의 작업이 돌아갈 수 있는 작업 큐 생성
q.start(True) # 큐를 넌블럭 모드로 시작한다. 

## 이미 시작된 큐에 작업을 추가해서 자동으로 시작되는지 확인한다.
for _ in range(100):
  q.add(foo, random.randrange(2, 40))

## 100개의 작업이 수행되겠지만,
## 동시에 최대 3개씩 실행되고, 한 개 작업은 최대 10초간 돌아갈 것이기 때문에
## 위 작업은 40초 이내에 완료될 것이다.
time.sleep(40)