ZMQ – 그외 메시징패턴 구현하기

Publisher 패턴 – 단방향 브로드캐스팅

ZMQ를 이용한 소켓 네트워크 프로그래밍에서 두 번째로 소개되는 메시징패턴은 바로 Publisher vs Subscriber이다. 일전에 소개한 Request vs Response 패턴은 전통적인 메시지 교환의 방식으로 서버와 클라이언트가 서로 한 번씩 메시지를 주고 받는 식으로 통신하는 방식이었다. 또다른 메시징 패턴인

Publisher vs Subscriber는 다음과 같은 특징을 갖는 메시징 패턴이다.

  1. 서버는 퍼블리셔(Publisher)가 되어 일련의 메시지들을 ZMQ 소켓을 통해 발송한다.
  2. 클라이언트는 구독자(Subscriber)가 되어 메시지들을 구독한다.
  3. 메시지는 1 -> N 으로 복수의 구독자에게 전송된다.
  4. 소켓에 구독자가 없는 경우 메시지는 소실된다.
  5. PUB 소켓을 통해서 서버가 메시지를 받을 수는 없다. 반대로, `SUB` 소켓을 통해서 클라이언트가 메시지를 전송할 수도 없다.

즉, 이 패턴은 일종의 단방향 브로드캐스팅이다. 실제 가이드 페이지에서 제공하는 코드를 분석해보자. 서버는 랜덤한 값을 사용해서 가짜 우편번호와 날씨 데이터를 생성해서 퍼블리싱한다.

from random import randrange
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB) #1
sock.bind('tcp://*:5556')
n = 0 #2
while True:
  zip_codde = randrange(10_000, 100_000)
  temp = randrange(-20, 45)
  humidity = randrange(10, 101)
  sock.send(f'{zip_code} {temp} {humidity} {n}'.encode())
  n += 1
  #3
  1. zmq.PUB 패턴의 커뮤니케이션용으로 소켓을 생성한다.
  2. n은 도대체 메시지가 몇개나 생성되는지 보려고하는 것이다.
  3. 주의깊게 코드를 보다보면 뭔가 허전한 것이 없나? 바로 time.sleep(1) 같은 시간 때우기 호출이 없다. 즉 이 루프는 엄청 빡센 무한루프를 돈다.

클라이언트

클라이언트는 메시징타입을 zmq.REQ 가 아니라 zmq.SUB를 쓴다. 그런데 서버는 엄청나게 많은 양의 메시지를 쏟아낼 것이기 때문에 이를 적절히 필터링할 필요가 있다. 물론 메시지를 받은 다음에 메시지 내용을 보고 필터링해도 되긴하는데, ZMQ의 소켓은 여러 옵션을 통해서 필터를 설정할 수 있다. 이 클라이언트는 소켓에 필터를 설정하고, 필터에 해당하는 우편번호의 날씨 데이터만을 수집한 후 기온 필드를 추출하여 50개 데이터에 대해서 평균치를 출력한다.

## wu-client.py
import sys
import zmq

zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001' #1

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter) #2

c, temp = 50, []
for _ in range(c):
  msg = sock.recv().decode()
  print(msg)
  _, t, *_ = msg.split()
  temp.append(t)
print(f'Average Temperature: {sum(temp)/c}')
  1. 서버가 쏟아내는[^1] 데이터는 다섯자리 숫자의 가상 우편번호이다. 클라이언트는 정해진 하나의 우편번호로 시작하는 데이터만을 받아들일 예정이므로 필터링할 조건을 생성한다. 여러 개의 클라이언트가 붙을 수 있음을 상정하여 명령줄에서 옵션으로 변경할 수 있게 한다.
  2. socket.setsockopt()는 소켓에 여러 옵션을 추가할 수 있다. zmq.SUBSCRIBE 는 구독 필터를 세팅한다. _string 이라는 접미사가 붙은 것은 필터의 내용이 문자열이기 때문이다.

setsockopt()의 보다 자세한 내용은 API 문서를 확인하자.

내용이 워낙 많아서 여기서 일일이 다 소개할 수는 없을 것 같다. 어쨌든 한 번 실행해보자. 우선 서버를 먼저 실행한 다음, 여러 터미털 화면[^2]에서 클라이언트를 실행한다. 우편번호 옵션은 서로 달라도 같아도 상관없다. 오른쪽 스크린샷은 터미널화면을 분할하여 동시에 클라이언트들을 실행한 화면이다. 실제 8개의 클라이언트를 돌리고 있다. 이 때 괄호안에 표시되는 값이 서버가 생성한 메시지의 일련번호인데 우리 서버가 굉장히 열일하고 있다는 것을 볼 수 있다.

분산처리

이번에 소개할 패턴은 PULL vs PUSH 패턴이다. PUSH 패턴은 1:N 연결에서 다수의 반대쪽 종단점에 대해서 순차적으로 메시지를 분산 발송한다. PULL은 반대로 N:1 의 연결에서 여러 클라이언트가 보내는 메시지를 순차적으로 받아들이는 것을 말한다. 즉 여러 클라이언트와 커뮤니케이션하면서 모든 클라이언트에게 동일한 메시지를 전송하는 Publisher  패턴과는 달리, 모든 클라이언트는 각각 고유한 메시지를 받을 수 있게 된다.

이 메시지 패턴은 여러 노드로 메시지를 전송한 후 다시 메시지를 취합할 수 있는 효과를 가지기 때문에 분산처리에 사용하기 좋다. 실제로 가이드 문서에서는 간단한 분산처리 기법을 선보이고 있다.

왼쪽에서 보는 도식에서 Ventilator는 작업할 내용을 생성해서 각 Worker에게 분배하는 역할을 한다. (PUSH 패턴을 이용해서 네트워크에 접속한다.) 개별 Worker들은 특정한 계산이나 처리를 수행하는 노드로 PULL 패턴 소켓으로 Ventilator와 연결되어 순차적으로 입력값을 받는다.

Worker들은 다시 반대쪽에는 PUSH 패턴의 소켓을 갖고 있으며, 이들은 개별적으로 처리된 메시지를 이곳으로 보낸다. 이 PUSH 소켓의 다른 종단점은 sink의 PULL 패턴 소켓이다. Sink는 처리완료된 결과를 취합하여 소비하는 곳이다.

이는 Ventilator -> Worker -> Sink의 파이프라인인데, Worker의 개수가 1개 혹은 그 이상이 될 수 있는 셈이다. 눈여겨볼 지점은 이 그래프에서 Ventilator와 Sink는 각각 서버에 해당하며, 각 Worker가 클라이언트에 해당한다. REQ\REP 패턴에서는 흔히 서버가 REP 패턴을, 클라이언트가 REQ패턴을 가졌지만, 파이프라이닝에서는 소켓의 타입이 해당 노드가 서버인지 클라이언트인지는 중요하지 않다. 다만 다음 라인으로 넘겨주는 위치에서는 PUSH를, 이전 라인으로부터 받는 위치에서는 PULL을 사용한다.

이 그래프에 나와있는 각 노드들을 간단하게 구현하겠다. Worker는 적당히 시간이 오래 걸리는 작업을 수행해야 하니,  time.sleep()으로 대충 시간을 끌도록 하겠다.

 

Ventilator 구현

Ventilator는 Worker가 소모해야 하는 데이터를 생성하는 역할을 한다. 이 값은 100~500 사이의 값이며, 이 값을 받은 worker는 해당 값의 밀리세컨드만큼 지연후에 Sink로 메시지를 보낸다. Ventilator가 사용하는 포트는 5007번이며, Sink가 사용하는 포트는 5008이라고 가정한다.

import random, time
import zmq

ctx = zmq.Context()
dc = 1000 #1

#2
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557') #3

sink = ctx.socket(zmq.PUSH) #4
sink.connect('tcp://localhost:5558') #5

def run():
  _ = input('Please press enter when the workers are ready...')
  print('Sending tasks to workers')
  
  sink.send(f'{dc}'.encode())  #6
  random.seed()
  
  #7
  total_msec = 0
  for _ in range(dc):
    workload = random.randrange(100, 500)
    total_msec += workload
    sender.send(f'{workload}'.encode())
  print(f'total expected cost: {total_msec}ms')
  time.sleep(1)

run()
  1. dc 값은 처리해야 할 데이터의 개수를 말한다.
  2. 값을 worker에게 뿌리기 위한 소켓을 생성해서 sender 라는 이름을 붙였다. 타입은 PUSH이다.
  3. Ventilator는 Worker에게는 서버의 역할을 수행한다. sender는 5007 포트에 바인딩한다.
  4. Sink에게 전체 수집해야 할 데이터의 개수를 알려주기 위한 소켓을 생성했다.  역시 PUSH 타입이다.
  5. Ventilator — Sink 간 연결에서 Ventilator는 클라이언트이다. 따라서 여기선 5008번 포트로 connect() 한다.
  6. 작업을 시작하기 전에 Sink에게 총 데이터 개수를 보낸다.
  7. Worker에게 메시지를 전송하기 시작한다. 메시지의 내용은 대기해야 할 지연시간이다.

Worker로 전송되는 모든 지연시간은 누적합산된 후, 메시지 발송이 끝나면 예상 비용으로 출력된다.

Sink 구현

Sink를 먼저 구현해보자.  Sink는 5008번 포트를 들으면서, 최초 1회는 수집해야 할 데이터의 개수를 전달받는다. 이후 들어오는 데이터를 소비하면 되는데, 여기서는 매번 ‘.’을 찍어주는 것으로 했다. 시각적으로 구분을 위해서 10개 단위마다 ‘:’를 출력하여 보이게 한다.

import sys, time
import zmq

#1
ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

#2
s = int(receiver.recv().decode())
tstart = time.time()

#3
for task_no in range(s):
  s = receiver.recv()
  if task_no % 10 is 0:
    sys.stdout.write(':')
  else:
    sys.stdout.write('.')
  sys.stdout.flush()

#4
tend = time.time()
print(f'Total elapsed time: {(tend-tstart) * 1000}ms')   
  1. 소켓을 생성하고 5558 포트에 바인딩한다.
  2. 첫 입력은 Ventilator가 보낸 데이터 개수이다.
  3. 시작 시간을 설정하고 루프를 시작한다.
  4. 루프에서는 입력으로 들어오는 메시지를 확인하고 메시지가 들어올 때마다 ‘.’을 찍는다. 단 10번에 한 번은 ‘:’을 출력한다.
  5. 출력에는 print() 문이 아닌 sys.stdout을 사용했다. 가이드에서는 특별히 다른 이유는 알려주지 않았다.

 Worker구현

Worker는 단순하다. 5557 포트로부터 들어온 값을 정수로 변환하여, 그만큼 대기하고, 다시 Sink로 메시지를 보내는 일을 계속 반복한다.

import sys, time
import zmq

ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receivier.connect('tcp://localhost:5557')

sender = ctx.socket(zmq.PUSH)
receiver.connect('tcp://localhost:5558')

while True:
  s = receiver.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s.decode()) * 0.001)
  sender.send(b'')

실행하기

다음과 같은 순서대로 실행한다.

  1. 먼저 Sink를 실행한다.
  2. 다음은 Ventilator를 실행한다.
  3. 적당한 수의 Worker를 실행시킨다.
  4. Ventilator를 실행하는 창에서 enter 키를 누른다.

각 Worker 창에서 느긋하게 …. 이 출력되기 시작하고 Sink는 유독 바쁘게 출력하기 시작한다. 여기서 주목해야 할 부분은 Ventilator는 몇 개의 Worker가 작업을 나눠서 하는지에 대해서는 알지 못하며 알 필요도 없다는 것이다. PUSH / PULL 패턴에서는 1:1, 1:N, N:1, N:N을 모두 상정할 수 있고, 이러한 위상의 구분은 코드상에서 구분되는 것이 아니라 어떤 노드가 어떻게 연결되는것인지에 따라 구분된다 할 수 있다.

또 ZMQ를 이용한 분산처리의 장점은 각각의 worker 노드가 스레드일 수도 있고, (위 예제에서와 같이) 프로세스 일수도 있는데 각 노드간의 교신이 TCP 포트를 사용하게 되므로 사실상 위치에 제약을 받지 않는다. 따라서 다른 여러 대의 컴퓨터를 각각 Worker로 만들어서 접속하거나, 혹은 여러 대의 컴퓨터에서도 각각 여러 개의 프로세스를 만들어서, 수백/수천개의 Worker를 작업에 손쉽게 투입할 수 있다.

멀티 프로세스로 분산작업을 하려는 경우에는 엄두조차 내기 힘든 스케일을 지원할 수 있다.

정리

이상으로 기본적인 ZMQ 메시지 패턴에 따른 처리 방법을 살펴보았다. ZMQ 메시지 패턴은 여기서 소개한 것 외에도 몇 가지가 더 있으며, 각 소켓은 상황에 따라 바인드하거나 커넥트할 수 있다. ZMQ는 단순히 소켓을 추상화하여 사용하기 쉽게 만들어주는 것 외에도 다양한 상황에서 분산 메시지 기술을 적용할 수 있도록 해주는 놀라운 도구이며, 활용법을 익혀 나감에 따라서 상상했던 것 이상으로 폭넓게 적용할 수 있음을 알 게 될 것이다.