파이썬으로 구현하는 스트림리더

파이썬에서 파일의 내용을 읽어와서 처리할 때 가장 기본적인 방법은 opne() 함수를 이용해서 파일 객체를 만들고, read() 메소드를 이용해서 파일의 전체 내용을 한 번에 읽어오는 것이다. 그런데 많은 경우에 실제로 다루는 파일은 텍스트포맷인 경우가 많다. 텍스트 포맷을 다룰 때에는 다음과 같은 몇 가지 전략이 존재한다.

  • read()를 이용해서 파일의 전체 내용을 읽어와 하나의 문자열로 사용한다.
  • readlines()를 이용하면 파일의 전체 내용을 읽어와서 라인 단위로 잘라 문자열의 리스트로 만들 수 있다.
  • readline()을 이용해서 파일의 내용을 한 줄씩 읽어와서 처리한다.

실제로는 텍스트 파일을 한줄 단위로 읽어와서 파싱해서 사용하는 경우가 많기 때문에 위 방법 중에서는 readline()이 가장 많이 쓰이며, 아예 텍스트 파일을 연 파일 객체는 문자열의 제너레이터처럼 동작하기 때문에 for ... in 구문으로 한줄씩 데이터를 읽어서 사용한다. 이 방식의 가장 좋은 점은 파일을 한 줄 단위로만 읽어와서 처리하기 때문에 불필요한 메모리 낭비를 하지 않는다는 것이다. 심지어 수 기가짜리 텍스트 파일이 있더라도, 한 줄씩 처리하는 경우에는 메모리에 한줄 만큼의 분량을 읽어와서 처리하기 때문에 프로그램이 메모리 부족으로 죽을 일이 없다.

그런데 어떤 경우에는 텍스트로 구성된 대용량 파일이 있고, 이 파일 내의 데이터가 컴마나 스페이스 같은 문자만 구분되어 있고 개행문자가 포함되지 않았다면 어떻게 처리하면 좋을까? 극단적인 예이기는 하지만 몇 기가 짜리 텍스트 파일이 있고, 여기에는 컴마로 구분된 숫자값들이 있다고 하자. 이를 읽어서 split()으로 나눠서 써야겠지만, split()이라는 동작 자체가 일단 잘라낼 문자열을 메모리에 올린 다음, 한꺼번에 잘라서 리스트로 만드는 방식으로 처리되기 때문에 실제 문자열의 크기의 두 배 이상의 메모리를 요구하게 된다.

이 문제를 해결하기 위해서 조금 느긋하게 파일의 내용을 읽어들이는 방법을 구현해보자. 기본적인 아이디어는 다음과 같다.

  1. 파일의 내용을 일부 읽어올 버퍼를 준비한다.
  2. 미리 정해둔 사이즈만큼 파일의 내용을 읽어와서 버퍼에 담는다.
  3. 버퍼의 처음부터 구분자가 있는 곳까지를 탐색한 후, 해당 영역을 잘라내어 사용한다.
  4. 구분자를 버린다.
  5. 3의 과정을 반복한다. 만약 구분자가 발견되지 않으면, 파일로부터 다시 정해진 사이즈만큼의 데이터를 읽어와서 버퍼에 덧붙인다.
  6. 파일에서 더 이상 읽을 내용이 없다면 버퍼에 남아있는 데이터가 마지막 조각이 된다.
  7. 파일을 닫는다.

이처럼 파일로부터 특정한 구분자까지의 데이터를 읽어서 순차적으로 잘라내어 리턴해주는 제너레이터를 만들 수 있을 것이다.

def read_strem(filename, sep=","): 
    ## filename : 읽을 파일
    ## sep : 구분자
    buffer = bytearray()
    chunk_size = 1024 # 한 번에 1024바이트씩 읽어온다. 
    token = sep.encode()

    with open(filename, 'rb') as f: ## 파일을 이진 파일로 읽어온다. 
      while True:
        data = f.read(chunk_size)
        if not data: ## 파일에서 더이상 읽을 내용이 없으면 루프 종료
          break
        buffer.extend(data)
        ## 버퍼내에서 구분자까지 자르는 작업을 반복
        while True:
          i = buffer.find(token)
          if i < 0 :  ## 구분자가 발견되지 않으면 한 번 더 읽어온다.
            break
          found = buffer[:i].decode()
          yield found
          ## 구분자앞까지 자른 내용을 내놓은 후에는 버퍼의 앞쪽을 정리한다.
          buffer[:i+len(token)] = []
       ## 파일에서 읽는 내용을 모두 처리했다. 버퍼에 남은 내용이 있으면 내놓는다.
       if buffer:
         yield buffer.decode()

만약 아주 커다란 텍스트파일로부터 컴마로 나누어진 데이터를 읽어와서 처리하는 동작을 수행한다고 해보자. 심지어 어떤 경우에는 데이터 전체가 필요한 것도 아니고 앞 부분에서 1000개 정도만 필요할 수도 있다. 위에서 만든 함수는 제너레이터 함수이기 때문에 for 문에서 사용할 수 있고, 만약 앞 1000개까지만 사용하려면 다음과 같이 쓸 수 있다.

## 파일이름이 'verybigfile.txt'라 할 때,
g = read_stream('verybigfile.txt')
for (i, w) in enumerate(g):
    if not i < 1000:
       break
    print(w)

약간 다른 접근방법도 있다. 파이썬보다는 Objective-C나 Swift에 어울릴 것 같은 방법인데, “필요할 때마다 꺼내 쓰는” 제너레이터가 아니라 튀어나오는 데이터를 처리할 핸들러를 넘겨주고 함수 내에서 다 처리해버리는 방법이다. 핸들러의 디자인은 대략 다음과 같은 식으로 처리하면 되겠다.

  • 처리할 문자열 데이터와, 몇 번째 데이터인지를 나타내는 정수 값을 인자로 받고
  • 문자열을 처리한 후, 더 처리할 것인지 그렇지 않은지를 리턴한다. None이나 False로 평가되는 값을 리턴하면 계속하고, 그렇지 않으면 더 이상 실행하지 않도록 할 것이다.

예를 들어 다음과 같은 식으로 핸들러를 만들 수 있는데,

def process_word(word, idx):
  if idx < 10:
     print(word)
     return
  return True

위 핸들러는 10번째까지는 출력하고, 그 이후로는 더 이상 실행하지 않겠다는 의미가 된다. 이런 핸들러를 받아서 처리해주는 형태로 위 제너레이터 함수를 다시 쓰면 다음과 같은 모양이 될 것이다.

def split_file(filename, handler, sep=","):
  buffer = bytearray()
  chunk_size = 1024
  token = sep.encode()
  idx = 0
  with open(filename, 'rb') as f:
    while True:
      data = f.read(chunk_size)
      if not data:
        break
      buffer.extend(data)
      while True:
        i = buffer.find(token)
        if i < 0:
          break
        found = buffer[:i].decode()
        r = handler(found, idx)
        if r:
          return
        buffer[:i+len(token)], idx = [], idx+1
    if buffer:
      handler(buffer.decode(), idx)

## 10개 데이터만 출력하고 끝낸다.
split_file('varybidfile.txt', process_word)

이 알고리듬은 제법 간단하면서도 대용량 데이터 파일을 안전하게 처리할 수 있도록 해주는 제법 괜찮은  방법이다. 간단하기 때문에 Swift 등의 다른 언어로도 충분히 컨버팅할 수 있다.

 

 

ZMQ – 그외 메시징패턴 구현하기

Publisher 패턴 – 단방향 브로드캐스팅

ZMQ를 이용한 소켓 네트워크 프로그래밍에서 두 번째로 소개되는 메시징패턴은 바로 Publisher vs Subscriber이다. 일전에 소개한 Request vs Response 패턴은 전통적인 메시지 교환의 방식으로 서버와 클라이언트가 서로 한 번씩 메시지를 주고 받는 식으로 통신하는 방식이었다. 또다른 메시징 패턴인

Publisher vs Subscriber는 다음과 같은 특징을 갖는 메시징 패턴이다.

  1. 서버는 퍼블리셔(Publisher)가 되어 일련의 메시지들을 ZMQ 소켓을 통해 발송한다.
  2. 클라이언트는 구독자(Subscriber)가 되어 메시지들을 구독한다.
  3. 메시지는 1 -> N 으로 복수의 구독자에게 전송된다.
  4. 소켓에 구독자가 없는 경우 메시지는 소실된다.
  5. PUB 소켓을 통해서 서버가 메시지를 받을 수는 없다. 반대로, `SUB` 소켓을 통해서 클라이언트가 메시지를 전송할 수도 없다.

즉, 이 패턴은 일종의 단방향 브로드캐스팅이다. 실제 가이드 페이지에서 제공하는 코드를 분석해보자. 서버는 랜덤한 값을 사용해서 가짜 우편번호와 날씨 데이터를 생성해서 퍼블리싱한다.

from random import randrange
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB) #1
sock.bind('tcp://*:5556')
n = 0 #2
while True:
  zip_codde = randrange(10_000, 100_000)
  temp = randrange(-20, 45)
  humidity = randrange(10, 101)
  sock.send(f'{zip_code} {temp} {humidity} {n}'.encode())
  n += 1
  #3
  1. zmq.PUB 패턴의 커뮤니케이션용으로 소켓을 생성한다.
  2. n은 도대체 메시지가 몇개나 생성되는지 보려고하는 것이다.
  3. 주의깊게 코드를 보다보면 뭔가 허전한 것이 없나? 바로 time.sleep(1) 같은 시간 때우기 호출이 없다. 즉 이 루프는 엄청 빡센 무한루프를 돈다.

클라이언트

클라이언트는 메시징타입을 zmq.REQ 가 아니라 zmq.SUB를 쓴다. 그런데 서버는 엄청나게 많은 양의 메시지를 쏟아낼 것이기 때문에 이를 적절히 필터링할 필요가 있다. 물론 메시지를 받은 다음에 메시지 내용을 보고 필터링해도 되긴하는데, ZMQ의 소켓은 여러 옵션을 통해서 필터를 설정할 수 있다. 이 클라이언트는 소켓에 필터를 설정하고, 필터에 해당하는 우편번호의 날씨 데이터만을 수집한 후 기온 필드를 추출하여 50개 데이터에 대해서 평균치를 출력한다.

## wu-client.py
import sys
import zmq

zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001' #1

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter) #2

c, temp = 50, []
for _ in range(c):
  msg = sock.recv().decode()
  print(msg)
  _, t, *_ = msg.split()
  temp.append(t)
print(f'Average Temperature: {sum(temp)/c}')
  1. 서버가 쏟아내는[^1] 데이터는 다섯자리 숫자의 가상 우편번호이다. 클라이언트는 정해진 하나의 우편번호로 시작하는 데이터만을 받아들일 예정이므로 필터링할 조건을 생성한다. 여러 개의 클라이언트가 붙을 수 있음을 상정하여 명령줄에서 옵션으로 변경할 수 있게 한다.
  2. socket.setsockopt()는 소켓에 여러 옵션을 추가할 수 있다. zmq.SUBSCRIBE 는 구독 필터를 세팅한다. _string 이라는 접미사가 붙은 것은 필터의 내용이 문자열이기 때문이다.

setsockopt()의 보다 자세한 내용은 API 문서를 확인하자.

내용이 워낙 많아서 여기서 일일이 다 소개할 수는 없을 것 같다. 어쨌든 한 번 실행해보자. 우선 서버를 먼저 실행한 다음, 여러 터미털 화면[^2]에서 클라이언트를 실행한다. 우편번호 옵션은 서로 달라도 같아도 상관없다. 오른쪽 스크린샷은 터미널화면을 분할하여 동시에 클라이언트들을 실행한 화면이다. 실제 8개의 클라이언트를 돌리고 있다. 이 때 괄호안에 표시되는 값이 서버가 생성한 메시지의 일련번호인데 우리 서버가 굉장히 열일하고 있다는 것을 볼 수 있다.

분산처리

이번에 소개할 패턴은 PULL vs PUSH 패턴이다. PUSH 패턴은 1:N 연결에서 다수의 반대쪽 종단점에 대해서 순차적으로 메시지를 분산 발송한다. PULL은 반대로 N:1 의 연결에서 여러 클라이언트가 보내는 메시지를 순차적으로 받아들이는 것을 말한다. 즉 여러 클라이언트와 커뮤니케이션하면서 모든 클라이언트에게 동일한 메시지를 전송하는 Publisher  패턴과는 달리, 모든 클라이언트는 각각 고유한 메시지를 받을 수 있게 된다.

이 메시지 패턴은 여러 노드로 메시지를 전송한 후 다시 메시지를 취합할 수 있는 효과를 가지기 때문에 분산처리에 사용하기 좋다. 실제로 가이드 문서에서는 간단한 분산처리 기법을 선보이고 있다.

왼쪽에서 보는 도식에서 Ventilator는 작업할 내용을 생성해서 각 Worker에게 분배하는 역할을 한다. (PUSH 패턴을 이용해서 네트워크에 접속한다.) 개별 Worker들은 특정한 계산이나 처리를 수행하는 노드로 PULL 패턴 소켓으로 Ventilator와 연결되어 순차적으로 입력값을 받는다.

Worker들은 다시 반대쪽에는 PUSH 패턴의 소켓을 갖고 있으며, 이들은 개별적으로 처리된 메시지를 이곳으로 보낸다. 이 PUSH 소켓의 다른 종단점은 sink의 PULL 패턴 소켓이다. Sink는 처리완료된 결과를 취합하여 소비하는 곳이다.

이는 Ventilator -> Worker -> Sink의 파이프라인인데, Worker의 개수가 1개 혹은 그 이상이 될 수 있는 셈이다. 눈여겨볼 지점은 이 그래프에서 Ventilator와 Sink는 각각 서버에 해당하며, 각 Worker가 클라이언트에 해당한다. REQ\REP 패턴에서는 흔히 서버가 REP 패턴을, 클라이언트가 REQ패턴을 가졌지만, 파이프라이닝에서는 소켓의 타입이 해당 노드가 서버인지 클라이언트인지는 중요하지 않다. 다만 다음 라인으로 넘겨주는 위치에서는 PUSH를, 이전 라인으로부터 받는 위치에서는 PULL을 사용한다.

이 그래프에 나와있는 각 노드들을 간단하게 구현하겠다. Worker는 적당히 시간이 오래 걸리는 작업을 수행해야 하니,  time.sleep()으로 대충 시간을 끌도록 하겠다.

 

Ventilator 구현

Ventilator는 Worker가 소모해야 하는 데이터를 생성하는 역할을 한다. 이 값은 100~500 사이의 값이며, 이 값을 받은 worker는 해당 값의 밀리세컨드만큼 지연후에 Sink로 메시지를 보낸다. Ventilator가 사용하는 포트는 5007번이며, Sink가 사용하는 포트는 5008이라고 가정한다.

import random, time
import zmq

ctx = zmq.Context()
dc = 1000 #1

#2
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557') #3

sink = ctx.socket(zmq.PUSH) #4
sink.connect('tcp://localhost:5558') #5

def run():
  _ = input('Please press enter when the workers are ready...')
  print('Sending tasks to workers')
  
  sink.send(f'{dc}'.encode())  #6
  random.seed()
  
  #7
  total_msec = 0
  for _ in range(dc):
    workload = random.randrange(100, 500)
    total_msec += workload
    sender.send(f'{workload}'.encode())
  print(f'total expected cost: {total_msec}ms')
  time.sleep(1)

run()
  1. dc 값은 처리해야 할 데이터의 개수를 말한다.
  2. 값을 worker에게 뿌리기 위한 소켓을 생성해서 sender 라는 이름을 붙였다. 타입은 PUSH이다.
  3. Ventilator는 Worker에게는 서버의 역할을 수행한다. sender는 5007 포트에 바인딩한다.
  4. Sink에게 전체 수집해야 할 데이터의 개수를 알려주기 위한 소켓을 생성했다.  역시 PUSH 타입이다.
  5. Ventilator — Sink 간 연결에서 Ventilator는 클라이언트이다. 따라서 여기선 5008번 포트로 connect() 한다.
  6. 작업을 시작하기 전에 Sink에게 총 데이터 개수를 보낸다.
  7. Worker에게 메시지를 전송하기 시작한다. 메시지의 내용은 대기해야 할 지연시간이다.

Worker로 전송되는 모든 지연시간은 누적합산된 후, 메시지 발송이 끝나면 예상 비용으로 출력된다.

Sink 구현

Sink를 먼저 구현해보자.  Sink는 5008번 포트를 들으면서, 최초 1회는 수집해야 할 데이터의 개수를 전달받는다. 이후 들어오는 데이터를 소비하면 되는데, 여기서는 매번 ‘.’을 찍어주는 것으로 했다. 시각적으로 구분을 위해서 10개 단위마다 ‘:’를 출력하여 보이게 한다.

import sys, time
import zmq

#1
ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

#2
s = int(receiver.recv().decode())
tstart = time.time()

#3
for task_no in range(s):
  s = receiver.recv()
  if task_no % 10 is 0:
    sys.stdout.write(':')
  else:
    sys.stdout.write('.')
  sys.stdout.flush()

#4
tend = time.time()
print(f'Total elapsed time: {(tend-tstart) * 1000}ms')   
  1. 소켓을 생성하고 5558 포트에 바인딩한다.
  2. 첫 입력은 Ventilator가 보낸 데이터 개수이다.
  3. 시작 시간을 설정하고 루프를 시작한다.
  4. 루프에서는 입력으로 들어오는 메시지를 확인하고 메시지가 들어올 때마다 ‘.’을 찍는다. 단 10번에 한 번은 ‘:’을 출력한다.
  5. 출력에는 print() 문이 아닌 sys.stdout을 사용했다. 가이드에서는 특별히 다른 이유는 알려주지 않았다.

 Worker구현

Worker는 단순하다. 5557 포트로부터 들어온 값을 정수로 변환하여, 그만큼 대기하고, 다시 Sink로 메시지를 보내는 일을 계속 반복한다.

import sys, time
import zmq

ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receivier.connect('tcp://localhost:5557')

sender = ctx.socket(zmq.PUSH)
receiver.connect('tcp://localhost:5558')

while True:
  s = receiver.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s.decode()) * 0.001)
  sender.send(b'')

실행하기

다음과 같은 순서대로 실행한다.

  1. 먼저 Sink를 실행한다.
  2. 다음은 Ventilator를 실행한다.
  3. 적당한 수의 Worker를 실행시킨다.
  4. Ventilator를 실행하는 창에서 enter 키를 누른다.

각 Worker 창에서 느긋하게 …. 이 출력되기 시작하고 Sink는 유독 바쁘게 출력하기 시작한다. 여기서 주목해야 할 부분은 Ventilator는 몇 개의 Worker가 작업을 나눠서 하는지에 대해서는 알지 못하며 알 필요도 없다는 것이다. PUSH / PULL 패턴에서는 1:1, 1:N, N:1, N:N을 모두 상정할 수 있고, 이러한 위상의 구분은 코드상에서 구분되는 것이 아니라 어떤 노드가 어떻게 연결되는것인지에 따라 구분된다 할 수 있다.

또 ZMQ를 이용한 분산처리의 장점은 각각의 worker 노드가 스레드일 수도 있고, (위 예제에서와 같이) 프로세스 일수도 있는데 각 노드간의 교신이 TCP 포트를 사용하게 되므로 사실상 위치에 제약을 받지 않는다. 따라서 다른 여러 대의 컴퓨터를 각각 Worker로 만들어서 접속하거나, 혹은 여러 대의 컴퓨터에서도 각각 여러 개의 프로세스를 만들어서, 수백/수천개의 Worker를 작업에 손쉽게 투입할 수 있다.

멀티 프로세스로 분산작업을 하려는 경우에는 엄두조차 내기 힘든 스케일을 지원할 수 있다.

정리

이상으로 기본적인 ZMQ 메시지 패턴에 따른 처리 방법을 살펴보았다. ZMQ 메시지 패턴은 여기서 소개한 것 외에도 몇 가지가 더 있으며, 각 소켓은 상황에 따라 바인드하거나 커넥트할 수 있다. ZMQ는 단순히 소켓을 추상화하여 사용하기 쉽게 만들어주는 것 외에도 다양한 상황에서 분산 메시지 기술을 적용할 수 있도록 해주는 놀라운 도구이며, 활용법을 익혀 나감에 따라서 상상했던 것 이상으로 폭넓게 적용할 수 있음을 알 게 될 것이다.

ZMQ의 메시지 패턴

ZMQ의 기본 개념을 다루면서 ZMQ에서는 몇 가지 코어 메시징 패턴이 있다고 언급했다. 이 중에서 REQ-REP 패턴에 대해서는 에코 서버를 구현하면서 소개하였다. 이 글에서는 ZMQ 메시징 패턴에 대해 각각의 예시에 대해서 언급해보겠다.

REQ-REP 패턴

REQ-REP 패턴은 전형적인 서버-클라이언트 통신에 사용될 수 있는 패턴이다. 클라이언트 -> 서버로 요청이 전송되고 다시 서버 -> 클라이언트로 응답이 전송된다. 이미 이전글에서 서버 1개가 여러 개의 포트를 리스닝하는 동시에 각 포트에 대해서도 다중 접속이 가능한 구현에 대해서 소개한 바 있다. (사실 이를 구현하는 것은 거의 공짜로 가능했다.) 여기서는 이전의 예제를 살짝 비틀어서 여러 개의 서버에 대해서 하나의 클라이언트가 접속했을 때 어떤식으로 메시지가 처리되는지 살펴보자.

서버

서버는 실행시 포트와 서버 이름을 받아서 실행한다. (이 둘은 콤마로 구분된 한 덩어리의 문자열이어야 한다.) 서버는 지정된 포트에 바인드되어 클라이언트로부터 들어온 메시지에 서버의 이름을 덧붙여서 회신한다.

import sys, zmq

ctx = zmq.Context()

def run_server(port, name):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  while True:
    data = sock.recv()
    msg = data.decode()
    print(f'{name} Recieved: {msg}')
    rep = f'{name}->{msg}'
    sock.send(rep.encode())
    if msg == 'bye':
      sock.close()

if __name__ == '__main__':
  if len(sys.argv) > 1:
    port, name = sys.argv[1].split(',', 1)
    run_server(port, name)

클라이언트

이전의 리턴되는 메시지를 약간 변형했다는 것 외에는 별다른 특이한 점이 없는 에코 서버 코드였다. 다음은 클라이언트이다. 클라이언트는 명령줄에서 접속할 포트를 1개 이상 받게 되고, 주어진 모든 포트에 접속한다. 이후 루프를 돌면서 입력받은 메시지를 서버로 전송하고 그 결과를 회신받아 출력한다.

import zmq, sys

ctx = zmq.Context()

def run_client(*ports):
  sock = ctx.socket(zmq.REQ)
  for port in ports:
    sock.connect(f'tcp://127.0.0.1:{port}')
  while True:
    msg = input()
    sock.send(msg.encode())
    rep = sock.recv().decode()
    print(rep)

if __name__ == '__main__':
  if len(sys.argv) > 1:
    ports = sys.argv[1:]
    run_client(*ports)

여러개의 터미널을 띄워놓고 세 개의 터미널에서는 7777,A, 7778,B, 7779,C 의 옵션으로 각각 서버를 돌린 다음, 클라인터는 7777 7778 7779를 인자로주고 실행한다. 그런 다음 클라인트 창에서 입력을 한 줄씩 실행한다. 그러면 그 결과로 각각의 서버가 사이좋게 하나씩 메시지를 받아서 클라이언트에게 돌려주는 광경을 목격할 수 있다. 즉 클라이언트 입장에서도 여러 개의 동일한 동작을 하는 서버의 소켓에 제각각 접속해두면 자체적으로 로드밸런싱을 수행하여 각각의 서버에 대해서 요청과 응답을 수행하게 된다. 즉, REQ-REP 패턴에서 각각의 소켓은 자신이 서버인지 클라이언트인지는 중요하지 않으며, 주고 받는 대상 노드가 1개 이상인 경우에는 순서대로 번갈아가며 통신하게 된다.

만약, 우리가 작성하고 있는 서버 X가 있고, 이는 REP 패턴의 소켓을 가지고 있으며, 내부 네트워크의 클라이언트들과 통신하고 있다고 하자.  REQ 소켓과 REP 소켓을 한 몸에 지니고 있는 ‘무언가’를 만들어서 두 소켓을 연결하는 간단한 코드를 작성한 후, 이 디바이스의 REP 소켓을 외부 IP로 노출하게 되면, 해당 디바이스는 내부 네트워크와 외부 네트워크를 연결하는 단자, 일종의 라우터로 기능하게 된다.

ZMQ는 노드와 노드간의 연결이 어떤식으로 기능하는가를 메시징 패턴으로 정의한다. 그리고 우리는 만들고자하는 네트워크의 특정한 노드가 어떤식으로 동작할지를 결정해서 간단한 디바이스를 하나 추가하는 것으로 손쉽게 네트워크를 확장할 수 있음을 볼 수 있다. (이와 관련된 실제 예를 다른 예제에서 다시 살펴보도록 할 것이다.)

PUB-SUB 패턴

PUB-SUB패턴은 Pulisher와 Subscriber를 정의하고, 두 노드간의 통신을 정의한다. 이 통신은 단방향으로 데이터가 흐르며, publisher가 생성하는 메시지는 해당 퍼블리셔를 구독하는 subscriber들에게 전송된다. 가장 간단한 브로드캐스팅 방법이라 보면 되겠다.

다음은 지역별 온도와 습도를 발행하는 서버이다. 말이 그렇다는 것이고 실제로는 임의의 우편번호, 온도, 습도 값을 랜덤으로 생성해서 무차별적으로 발행해버린다. (원문에는 메시지 번호를 넣지 않는데, 이 서버가 실제로 얼마나 많은 메시지를 만들어 내는지 체험해보는 것도 나쁘지 않을 것 같아서 넣었다.)

퍼블리셔

import zmq
from random import randrange

ctx = zmq.Context()

def run_server(port):
  sock = ctx.socket(zmq.PUB)
  sock.bind(f'tcp://*:{port}')
  i = 0
  while True:
    zipcode = randrange(1, 100000)
    temp = randrange(-80, 135)
    hum = rangdrange(10, 60)
    msg = f'{zipcode:05d} {temp} {hum} {i}'
    sock.send(msg.encode())
    i += 1

run_server(5556)

다음은 subscriber에 해당하는 클라이언트 코드이다. 클라이언트는 필터링할 우편번호를 필요로 하며, 해당 우편번호로 시작되는 값만을 받아들인다.

구독자(Subscriber)

import sys, zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

sock.connect('tcp://127.0.0.1:5556')
zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001'
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

total_temp = 0
for update_no in range(50):
  msg = sock.recv().decode()
  print(msg)
  z, t, h, n = msg.split()
  total_temp += int(t)

print(f'Average temp for {z}: {total_temp/50:0.3f}')

이 때 주목해야 할 부분은 setsockopt_string()을 사용한 부분이다. 여기에서 소켓의 옵션을 설정할 수 있다. zmq.SUBSCRIBE는 메시지 구독 필터를 설정하는 부분이다. 구독 필터를 설정하지 않으면 아무런 메시지도 구독하지 않는다.

클라이언트를 먼저 서 너개 실행한 후에 서버를 실행하자. 서버가 시작되면 클라이언트들은 무서운 속도로 50개의 메시지를 채울 것이다. 또한 클라이언트들이 실행을 종료한 후에 다시 동일하게 실행시켜보자. 서버는 클라이언트가 있든 없든 계속해서 메시지를 발행해나간다. 수신자가 없는 메시지는 큐에 들어있다가, 다른 메시지들이 계속 밀고 들어오게되면 어느 때에가서는 자동으로 버려진다.

가이드 문서에서 중요하게 지적하는 한가지는 이 패턴에서 메시지를 가져오는 시간을 정확하게 알기 어렵다는 것이다. (심지어 클라이언트가 먼저 시작하더라도!) 왜냐하면 클라이언트가 먼저 실행되고 있던 중이라하더라도 접속이 체결되는데에는 밀리세컨드 레벨의 지연이 발생하고 이 시점에도 계속 메시지가 나갈 수 있기 때문이다. (이러한 문제는 나중에 컨트롤러 소켓을 별도로 운영하거나 하는 방식으로 해결할 수 있다.)

PUSH-PULL 패턴

이 패턴은 pipeline이라 불리는 패턴으로 한쪽에서 다른쪽으로 메시지를 푸시하는 방식이다. 이 방식이 REQ-REP와 다른 점은 한쪽에서 다른쪽으로 일방적으로만 통신이 이루어진다는 점이다.(이를 서버와 클라이언트로 나누기도 어려운데, 푸시하는 쪽이 클라이언트가 될 수도, 서버가 될 수도 있기 때문이다.) 또 PUB-SUB 패턴과도 다른데, PUB-SUB 패턴에서는 여러 개의 구독자가 하나의 퍼블리셔에 연결되면 모두 동일한 메시지를 받게 되지만 (물론 구독 필터에 따라서 다른 값을 받을 수는 있지만, 모든 구독자가 같은 메시지 풀에 들어간다는 점은 같다.) PUSH-PULL 방식에서는 마치 REQ-REP에서 1:N으로 연결된 소켓처럼 연결된 PULL 소켓쪽으로 번갈아가면서 메시지가 하나씩 전달된다는 것이다.

가이드 문서에서는 이를 이용해서 분산처리를 수행하는 아키텍쳐를 어떻게 쉽게 만들 수 있는가를 설명하고 있다. 프로세스를 여러개 만들어서 분산처리를 하는 경우, 개별 워커가 계산해낸 결과 데이터를 다시 하나로 수집하는 데에는 파이프와 같은 장치를 사용하는데, 이것이 익숙하지 않은 사람에게는 조금 어렵고, 워커 수가 많아지는 경우에는 구현 난이도가 높아진다.

PUSH-PULL 패턴은 데이터의 흐름이 일방적인 라인에 있는 경우, 이를 구현하기에 알맞는 패턴이다. 여기서는 각 단계별로 작성해야하는 프로그램이 나눠진다.

  1. 벤틸레이터 – 처리해야 할 데이터를 계속해서 생성해내는 프로세스이다.
  2. 워커 – 생성된 데이터를 받고 처리한 후 그 결과를 만드는 처리 프로세스이다.
  3. 싱크 – 워커에 의해 생성된 결과 값들이 모이는 곳이다.

이 때 워커가 여러 개가 되면 훌륭한 분산처리 시스템이 된다. 문제는 파이썬에서도 멀티 스레드나 멀티 프로세스를 통한 분산처리 작업은 이미 가능하다는 것이다. 그렇다면 ZMQ를 사용해서 이러한 분산처리 시스템을 구축하는 것이 큰 도움이 될까?

당연히 그렇다. 간단한 멀티 프로세스를 통한 분산처리 코드는 파이썬에서도 작성하기가 간단하다! 하지만 ZMQ를 만들 때 워커는 ZMQ 네트워크 상의 노드로 기능하는 프로세스이며, 각 노드는 IPC외에도 TCP를 통해서 교신할 수 있다. 이 말은 한 대가 아닌 수십~수백대의 네트워크 상의 컴퓨터를 이 분산처리 그리드 내에 투입하는 것이 가능하며, 각각의 컴퓨터가 연산 수행 능력에 여유가 있다면, 1대당 여러 개의 워커 프로세스를 띄울 수 있다는 것이다. 분산 처리에 투입되는 워커가 고작해야 4~8개 정도 될 수 있는 단일 머신 처리에서 수백~수천개로 확장된 분산처리를 “거의 공짜로” 만들어낼 수 있다.

작업의 처리 순서는 다음과 같이 디자인한다.

  1. 맨 먼저 싱크 프로세스가 한 개 실행된다. (편의상 5558 포트를 사용해서 PULL 소켓으로 값을 받는다고 하자)
  2. 벤틸레이터 서버가 구동된다. 벤틸레이터 서버는 구동 후 싱크에 연결하여 (PUSH를 사용) 작업이 시작되는 시그널을 보낸다. 이후 시작을 준비하는 상태로 input() 함수를 하나 실행해서 기다린다.
  3. 각 워커들을 구동한다. 각 워커는 2개의 소켓을 가지고 있는데, 하나는 PULL 소켓으로 이는 벤틸레이터에 connect로 연결되며, 다른 하나는 PUSH로 sink에 connect된다.
  4. 작업이 준비되면 벤틸레이터는 5557포트 PUSH 소켓으로 데이터를 쏟아낸다. 각 데이터는 순차적으로 워커 프로세스로 전달된다.
  5. 각 워커 프로세스는 5558 포트의 PUSH 소켓으로 데이터를 푸시한다. 그 종착역은 sink이다.
  6. sink는 5558번 포트로 들어온 데이터들을 취합한다.

싱크

먼저 sink의 코드를 보자. sink는 각 워커가 응답을 보내올 때마다 ‘.‘을 찍을 것인데, 10개 마다 :을 찍어서 진행상황을 보다 눈에 쉽게 들어오도록 할 것이다. 최초로 실행된 후에는 무엇보다 벤틸레이터로부터 한 번의 통신을 전달받아 준비상태를 완료할 것이다.  준비상태가 완료되면 이 때부터 미리 정해진 개수의 응답을 푸시 받을 때까지 시간을 재고, 총 시간을 출력할 것이다.

import sys, time, zmq

ctx = zmq.Context()

receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

start_signal = receiver.recv()  #1
v = int(start_signal.decode())

start_time = time.time()
for task_no in range(v):
  s = receiver.recv() #2
  sys.stdout(':' if task_no % 10 is 0 else '.') #3
  sys.stdout.flush()
end_time = time.time()
print(f'Total time: {:%d}msec')

벤틸레이터

벤틸레이터는 구동된 후에 소켓을 5557포트에 바인딩한다. sink와 직접 통신을 위해서 5558번 포트로는 connect로 연결한다. 워커가 실행되는 동안 input() 함수로 기다렸다가, 최종적으로 입력이 들어오면 sink에게 지금부터 시간을 재라고 신호를 하나 보낸 후 100개의 랜덤한 숫자값을 보낸다. 각각의 워커는 무슨 복잡한 일을 하는 것이 아니라, 주어진 시간만큼 sleep 했다가 다음 신호를 처리받을 것이다.

import zmq, randome, time

ctx = zmq.Context
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557')

sink = ctx.socket(zmp.PUSH)
sink.connect('tcp://localhost:5558') #1

_ = input('Press Enter when the all workers are ready...')
print('sending tasks to workers')

v = 100
sink.send(f'{v}'.encode()) #2

total_msec = 0
for t_no in range(v):
  workload = random.rangrange(1, 100)
  total_msec += workload
  sender.send(f'{workload}'.encode())
print(f'total expected cost: {total_msec}ms') #3
time.sleep(1) #4
  1. 포트에 bind 할 것인지 connect 할 것인지는 연결의 성격에 따라 달라진다. 여기는 두 개의 PUSH 소켓을 하나는 bind하고 하나는 connect 하는 식으로 연결했다. bind된 쪽에 연결할 때에는 connect를 해야 한다.
  2. 준비가 끝나면 sink에게 타이머를 시작하라고 시그널을 전송한다.
  3. 이후 모든 워커에게 지연시간값을 나눠준다. 그 총량은 모두 합산된후에 출력한다.
  4. 큐에 남아있는 메시지가 있을지 모르니, 타이머를 1초 가량 준다. (이 부분은 별도의 컨트롤러 포트를 통해서 제어할 수 있다.)

워커

워커는 벤틸레이터로부터 값을 받아서 다시 싱크쪽으로 값을 밀어넣는다. 따라서 한쪽은 PULL 소켓 다른 한쪽은 PUSH 소켓이 있어야 한다. 그 사이에 받은 값을 정수로 환산하고 그 밀리세컨드 시간만큼 지연한 후 싱크로 시그널을 보낸다.

import zmq, sys, time

ctx = zmq.Context()

left = ctx.socket(zmq.PULL)
left.connect('tcp://localhost:5557')
right = ctx.socket(zmq.PUSH)
right.connect('tcp://localhost:5558')

while True:
  s = left.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s) * 0.001)
  right.send(b'')

이제 모든 구성원에 대한 준비가 끝났다. 순서대로 각 프로세스를 실행한 후에 구동해보자. 아래 캡쳐는 이 결과를 보여주는 것이다. 총 7개의 워커를 사용했으며 수천개의 값을 벤틸레이팅했다.  상대적으로 천천히 .을 찍어 나가는 화면이 각각의 worker이고 맨 마지막에 빠르게 결과를 찍어나가는 창은 sink에 해당한다. 최종적으로 sink에 출력되는 총 소요 시간은 벤틸레이터가 합산한 전체 코스트 / 워커의 개수보다 아주 약간 많은 시간이 소요되는 것을 확인할 수 있다.

만약 같은 네트워크 내에 worker를 구동할 머신이 있다면 네트워크를 통한 분산처리를 테스트해보는 것도 어렵지 않을 것이다. (sink와 벤틸레이터를 구동할 컴퓨터의 IP만 알면 된다!)

PUSH-PULL 패턴에서도 소켓이 N:1 혹은 1:N으로 접속할 수 있음을 보고 있다. 즉 ZMQ 소켓에서는 소켓이 서버냐 클라이언트냐에 관계없이 1:N으로 접속하느냐 혹은 N:1로 접속하느냐에 따라서 바깥으로 전송될 메시지는 반대편 노드에 대해서 순차적으로 분산되어 하나씩 전송되며, 반대로 수신측에서도 N개의 노드로부터 데이터를 수신하는 경우 공평하게 하나의 소켓씩 메시지를 수신해서 처리한다.

몇 가지 다른 문제

여기서 “많은 워커”들은 사실상 종료하는 조건 없이 계속해서 실행되면서 데이터를 기다리게 된다. 모든 처리가 끝났을 때, 어떻게 수많은 워커들을 일시에 종료하게 할 수 있을까? 간단하게 생각해보면 sink는 모든 작업이 완료되는 시점을 알고 있으므로, 최종적으로 모든 작업이 완료되었을 때 일종의 PUB 소켓을 통해서 종료 시그널을 배포하는 것이다. 그리고 다른 모든 워커들은 시작할 때 해당 PUB 소켓에 대한 SUB 소켓을 연결한다.

문제는 워커가 일종의 무한루프를 돌면서 입력을 처리하고 있기 때문에, 종료 시그널을 받기 시작할 시점을 만들기가 애매하다는 점이다. 물론 ZMQ 컨텍스트는 스레드 안전하기 때문에, 실제 워커의 계산 작업을 백그라운드 스레드에서 무한 루프를 돌게하고, 메인 스레드는 종료 시그널을 청취하도록 하는 방법이 있다.

이것은 이것대로 좋은 해결 방법인데, ZMQ는 이를 위한 별도의 해결책으로 조금 특별한 POLLER라는 디바이스를 제공한다. 다음 시간에는 POLLER를 어떻게 사용하는지에 대해서 살펴보고, 스레드 간의 통신에서는 ZMQ를 어떻게 사용할 수 있는지 살펴보기로 하자.