ZMQ의 메시지 패턴

ZMQ의 기본 개념을 다루면서 ZMQ에서는 몇 가지 코어 메시징 패턴이 있다고 언급했다. 이 중에서 REQ-REP 패턴에 대해서는 에코 서버를 구현하면서 소개하였다. 이 글에서는 ZMQ 메시징 패턴에 대해 각각의 예시에 대해서 언급해보겠다.

REQ-REP 패턴

REQ-REP 패턴은 전형적인 서버-클라이언트 통신에 사용될 수 있는 패턴이다. 클라이언트 -> 서버로 요청이 전송되고 다시 서버 -> 클라이언트로 응답이 전송된다. 이미 이전글에서 서버 1개가 여러 개의 포트를 리스닝하는 동시에 각 포트에 대해서도 다중 접속이 가능한 구현에 대해서 소개한 바 있다. (사실 이를 구현하는 것은 거의 공짜로 가능했다.) 여기서는 이전의 예제를 살짝 비틀어서 여러 개의 서버에 대해서 하나의 클라이언트가 접속했을 때 어떤식으로 메시지가 처리되는지 살펴보자.

서버

서버는 실행시 포트와 서버 이름을 받아서 실행한다. (이 둘은 콤마로 구분된 한 덩어리의 문자열이어야 한다.) 서버는 지정된 포트에 바인드되어 클라이언트로부터 들어온 메시지에 서버의 이름을 덧붙여서 회신한다.

import sys, zmq

ctx = zmq.Context()

def run_server(port, name):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  while True:
    data = sock.recv()
    msg = data.decode()
    print(f'{name} Recieved: {msg}')
    rep = f'{name}->{msg}'
    sock.send(rep.encode())
    if msg == 'bye':
      sock.close()

if __name__ == '__main__':
  if len(sys.argv) > 1:
    port, name = sys.argv[1].split(',', 1)
    run_server(port, name)

클라이언트

이전의 리턴되는 메시지를 약간 변형했다는 것 외에는 별다른 특이한 점이 없는 에코 서버 코드였다. 다음은 클라이언트이다. 클라이언트는 명령줄에서 접속할 포트를 1개 이상 받게 되고, 주어진 모든 포트에 접속한다. 이후 루프를 돌면서 입력받은 메시지를 서버로 전송하고 그 결과를 회신받아 출력한다.

import zmq, sys

ctx = zmq.Context()

def run_client(*ports):
  sock = ctx.socket(zmq.REQ)
  for port in ports:
    sock.connect(f'tcp://127.0.0.1:{port}')
  while True:
    msg = input()
    sock.send(msg.encode())
    rep = sock.recv().decode()
    print(rep)

if __name__ == '__main__':
  if len(sys.argv) > 1:
    ports = sys.argv[1:]
    run_client(*ports)

여러개의 터미널을 띄워놓고 세 개의 터미널에서는 7777,A, 7778,B, 7779,C 의 옵션으로 각각 서버를 돌린 다음, 클라인터는 7777 7778 7779를 인자로주고 실행한다. 그런 다음 클라인트 창에서 입력을 한 줄씩 실행한다. 그러면 그 결과로 각각의 서버가 사이좋게 하나씩 메시지를 받아서 클라이언트에게 돌려주는 광경을 목격할 수 있다. 즉 클라이언트 입장에서도 여러 개의 동일한 동작을 하는 서버의 소켓에 제각각 접속해두면 자체적으로 로드밸런싱을 수행하여 각각의 서버에 대해서 요청과 응답을 수행하게 된다. 즉, REQ-REP 패턴에서 각각의 소켓은 자신이 서버인지 클라이언트인지는 중요하지 않으며, 주고 받는 대상 노드가 1개 이상인 경우에는 순서대로 번갈아가며 통신하게 된다.

만약, 우리가 작성하고 있는 서버 X가 있고, 이는 REP 패턴의 소켓을 가지고 있으며, 내부 네트워크의 클라이언트들과 통신하고 있다고 하자.  REQ 소켓과 REP 소켓을 한 몸에 지니고 있는 ‘무언가’를 만들어서 두 소켓을 연결하는 간단한 코드를 작성한 후, 이 디바이스의 REP 소켓을 외부 IP로 노출하게 되면, 해당 디바이스는 내부 네트워크와 외부 네트워크를 연결하는 단자, 일종의 라우터로 기능하게 된다.

ZMQ는 노드와 노드간의 연결이 어떤식으로 기능하는가를 메시징 패턴으로 정의한다. 그리고 우리는 만들고자하는 네트워크의 특정한 노드가 어떤식으로 동작할지를 결정해서 간단한 디바이스를 하나 추가하는 것으로 손쉽게 네트워크를 확장할 수 있음을 볼 수 있다. (이와 관련된 실제 예를 다른 예제에서 다시 살펴보도록 할 것이다.)

PUB-SUB 패턴

PUB-SUB패턴은 Pulisher와 Subscriber를 정의하고, 두 노드간의 통신을 정의한다. 이 통신은 단방향으로 데이터가 흐르며, publisher가 생성하는 메시지는 해당 퍼블리셔를 구독하는 subscriber들에게 전송된다. 가장 간단한 브로드캐스팅 방법이라 보면 되겠다.

다음은 지역별 온도와 습도를 발행하는 서버이다. 말이 그렇다는 것이고 실제로는 임의의 우편번호, 온도, 습도 값을 랜덤으로 생성해서 무차별적으로 발행해버린다. (원문에는 메시지 번호를 넣지 않는데, 이 서버가 실제로 얼마나 많은 메시지를 만들어 내는지 체험해보는 것도 나쁘지 않을 것 같아서 넣었다.)

퍼블리셔

import zmq
from random import randrange

ctx = zmq.Context()

def run_server(port):
  sock = ctx.socket(zmq.PUB)
  sock.bind(f'tcp://*:{port}')
  i = 0
  while True:
    zipcode = randrange(1, 100000)
    temp = randrange(-80, 135)
    hum = rangdrange(10, 60)
    msg = f'{zipcode:05d} {temp} {hum} {i}'
    sock.send(msg.encode())
    i += 1

run_server(5556)

다음은 subscriber에 해당하는 클라이언트 코드이다. 클라이언트는 필터링할 우편번호를 필요로 하며, 해당 우편번호로 시작되는 값만을 받아들인다.

구독자(Subscriber)

import sys, zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

sock.connect('tcp://127.0.0.1:5556')
zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001'
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

total_temp = 0
for update_no in range(50):
  msg = sock.recv().decode()
  print(msg)
  z, t, h, n = msg.split()
  total_temp += int(t)

print(f'Average temp for {z}: {total_temp/50:0.3f}')

이 때 주목해야 할 부분은 setsockopt_string()을 사용한 부분이다. 여기에서 소켓의 옵션을 설정할 수 있다. zmq.SUBSCRIBE는 메시지 구독 필터를 설정하는 부분이다. 구독 필터를 설정하지 않으면 아무런 메시지도 구독하지 않는다.

클라이언트를 먼저 서 너개 실행한 후에 서버를 실행하자. 서버가 시작되면 클라이언트들은 무서운 속도로 50개의 메시지를 채울 것이다. 또한 클라이언트들이 실행을 종료한 후에 다시 동일하게 실행시켜보자. 서버는 클라이언트가 있든 없든 계속해서 메시지를 발행해나간다. 수신자가 없는 메시지는 큐에 들어있다가, 다른 메시지들이 계속 밀고 들어오게되면 어느 때에가서는 자동으로 버려진다.

가이드 문서에서 중요하게 지적하는 한가지는 이 패턴에서 메시지를 가져오는 시간을 정확하게 알기 어렵다는 것이다. (심지어 클라이언트가 먼저 시작하더라도!) 왜냐하면 클라이언트가 먼저 실행되고 있던 중이라하더라도 접속이 체결되는데에는 밀리세컨드 레벨의 지연이 발생하고 이 시점에도 계속 메시지가 나갈 수 있기 때문이다. (이러한 문제는 나중에 컨트롤러 소켓을 별도로 운영하거나 하는 방식으로 해결할 수 있다.)

PUSH-PULL 패턴

이 패턴은 pipeline이라 불리는 패턴으로 한쪽에서 다른쪽으로 메시지를 푸시하는 방식이다. 이 방식이 REQ-REP와 다른 점은 한쪽에서 다른쪽으로 일방적으로만 통신이 이루어진다는 점이다.(이를 서버와 클라이언트로 나누기도 어려운데, 푸시하는 쪽이 클라이언트가 될 수도, 서버가 될 수도 있기 때문이다.) 또 PUB-SUB 패턴과도 다른데, PUB-SUB 패턴에서는 여러 개의 구독자가 하나의 퍼블리셔에 연결되면 모두 동일한 메시지를 받게 되지만 (물론 구독 필터에 따라서 다른 값을 받을 수는 있지만, 모든 구독자가 같은 메시지 풀에 들어간다는 점은 같다.) PUSH-PULL 방식에서는 마치 REQ-REP에서 1:N으로 연결된 소켓처럼 연결된 PULL 소켓쪽으로 번갈아가면서 메시지가 하나씩 전달된다는 것이다.

가이드 문서에서는 이를 이용해서 분산처리를 수행하는 아키텍쳐를 어떻게 쉽게 만들 수 있는가를 설명하고 있다. 프로세스를 여러개 만들어서 분산처리를 하는 경우, 개별 워커가 계산해낸 결과 데이터를 다시 하나로 수집하는 데에는 파이프와 같은 장치를 사용하는데, 이것이 익숙하지 않은 사람에게는 조금 어렵고, 워커 수가 많아지는 경우에는 구현 난이도가 높아진다.

PUSH-PULL 패턴은 데이터의 흐름이 일방적인 라인에 있는 경우, 이를 구현하기에 알맞는 패턴이다. 여기서는 각 단계별로 작성해야하는 프로그램이 나눠진다.

  1. 벤틸레이터 – 처리해야 할 데이터를 계속해서 생성해내는 프로세스이다.
  2. 워커 – 생성된 데이터를 받고 처리한 후 그 결과를 만드는 처리 프로세스이다.
  3. 싱크 – 워커에 의해 생성된 결과 값들이 모이는 곳이다.

이 때 워커가 여러 개가 되면 훌륭한 분산처리 시스템이 된다. 문제는 파이썬에서도 멀티 스레드나 멀티 프로세스를 통한 분산처리 작업은 이미 가능하다는 것이다. 그렇다면 ZMQ를 사용해서 이러한 분산처리 시스템을 구축하는 것이 큰 도움이 될까?

당연히 그렇다. 간단한 멀티 프로세스를 통한 분산처리 코드는 파이썬에서도 작성하기가 간단하다! 하지만 ZMQ를 만들 때 워커는 ZMQ 네트워크 상의 노드로 기능하는 프로세스이며, 각 노드는 IPC외에도 TCP를 통해서 교신할 수 있다. 이 말은 한 대가 아닌 수십~수백대의 네트워크 상의 컴퓨터를 이 분산처리 그리드 내에 투입하는 것이 가능하며, 각각의 컴퓨터가 연산 수행 능력에 여유가 있다면, 1대당 여러 개의 워커 프로세스를 띄울 수 있다는 것이다. 분산 처리에 투입되는 워커가 고작해야 4~8개 정도 될 수 있는 단일 머신 처리에서 수백~수천개로 확장된 분산처리를 “거의 공짜로” 만들어낼 수 있다.

작업의 처리 순서는 다음과 같이 디자인한다.

  1. 맨 먼저 싱크 프로세스가 한 개 실행된다. (편의상 5558 포트를 사용해서 PULL 소켓으로 값을 받는다고 하자)
  2. 벤틸레이터 서버가 구동된다. 벤틸레이터 서버는 구동 후 싱크에 연결하여 (PUSH를 사용) 작업이 시작되는 시그널을 보낸다. 이후 시작을 준비하는 상태로 input() 함수를 하나 실행해서 기다린다.
  3. 각 워커들을 구동한다. 각 워커는 2개의 소켓을 가지고 있는데, 하나는 PULL 소켓으로 이는 벤틸레이터에 connect로 연결되며, 다른 하나는 PUSH로 sink에 connect된다.
  4. 작업이 준비되면 벤틸레이터는 5557포트 PUSH 소켓으로 데이터를 쏟아낸다. 각 데이터는 순차적으로 워커 프로세스로 전달된다.
  5. 각 워커 프로세스는 5558 포트의 PUSH 소켓으로 데이터를 푸시한다. 그 종착역은 sink이다.
  6. sink는 5558번 포트로 들어온 데이터들을 취합한다.

싱크

먼저 sink의 코드를 보자. sink는 각 워커가 응답을 보내올 때마다 ‘.‘을 찍을 것인데, 10개 마다 :을 찍어서 진행상황을 보다 눈에 쉽게 들어오도록 할 것이다. 최초로 실행된 후에는 무엇보다 벤틸레이터로부터 한 번의 통신을 전달받아 준비상태를 완료할 것이다.  준비상태가 완료되면 이 때부터 미리 정해진 개수의 응답을 푸시 받을 때까지 시간을 재고, 총 시간을 출력할 것이다.

import sys, time, zmq

ctx = zmq.Context()

receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

start_signal = receiver.recv()  #1
v = int(start_signal.decode())

start_time = time.time()
for task_no in range(v):
  s = receiver.recv() #2
  sys.stdout(':' if task_no % 10 is 0 else '.') #3
  sys.stdout.flush()
end_time = time.time()
print(f'Total time: {:%d}msec')

벤틸레이터

벤틸레이터는 구동된 후에 소켓을 5557포트에 바인딩한다. sink와 직접 통신을 위해서 5558번 포트로는 connect로 연결한다. 워커가 실행되는 동안 input() 함수로 기다렸다가, 최종적으로 입력이 들어오면 sink에게 지금부터 시간을 재라고 신호를 하나 보낸 후 100개의 랜덤한 숫자값을 보낸다. 각각의 워커는 무슨 복잡한 일을 하는 것이 아니라, 주어진 시간만큼 sleep 했다가 다음 신호를 처리받을 것이다.

import zmq, randome, time

ctx = zmq.Context
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557')

sink = ctx.socket(zmp.PUSH)
sink.connect('tcp://localhost:5558') #1

_ = input('Press Enter when the all workers are ready...')
print('sending tasks to workers')

v = 100
sink.send(f'{v}'.encode()) #2

total_msec = 0
for t_no in range(v):
  workload = random.rangrange(1, 100)
  total_msec += workload
  sender.send(f'{workload}'.encode())
print(f'total expected cost: {total_msec}ms') #3
time.sleep(1) #4
  1. 포트에 bind 할 것인지 connect 할 것인지는 연결의 성격에 따라 달라진다. 여기는 두 개의 PUSH 소켓을 하나는 bind하고 하나는 connect 하는 식으로 연결했다. bind된 쪽에 연결할 때에는 connect를 해야 한다.
  2. 준비가 끝나면 sink에게 타이머를 시작하라고 시그널을 전송한다.
  3. 이후 모든 워커에게 지연시간값을 나눠준다. 그 총량은 모두 합산된후에 출력한다.
  4. 큐에 남아있는 메시지가 있을지 모르니, 타이머를 1초 가량 준다. (이 부분은 별도의 컨트롤러 포트를 통해서 제어할 수 있다.)

워커

워커는 벤틸레이터로부터 값을 받아서 다시 싱크쪽으로 값을 밀어넣는다. 따라서 한쪽은 PULL 소켓 다른 한쪽은 PUSH 소켓이 있어야 한다. 그 사이에 받은 값을 정수로 환산하고 그 밀리세컨드 시간만큼 지연한 후 싱크로 시그널을 보낸다.

import zmq, sys, time

ctx = zmq.Context()

left = ctx.socket(zmq.PULL)
left.connect('tcp://localhost:5557')
right = ctx.socket(zmq.PUSH)
right.connect('tcp://localhost:5558')

while True:
  s = left.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s) * 0.001)
  right.send(b'')

이제 모든 구성원에 대한 준비가 끝났다. 순서대로 각 프로세스를 실행한 후에 구동해보자. 아래 캡쳐는 이 결과를 보여주는 것이다. 총 7개의 워커를 사용했으며 수천개의 값을 벤틸레이팅했다.  상대적으로 천천히 .을 찍어 나가는 화면이 각각의 worker이고 맨 마지막에 빠르게 결과를 찍어나가는 창은 sink에 해당한다. 최종적으로 sink에 출력되는 총 소요 시간은 벤틸레이터가 합산한 전체 코스트 / 워커의 개수보다 아주 약간 많은 시간이 소요되는 것을 확인할 수 있다.

만약 같은 네트워크 내에 worker를 구동할 머신이 있다면 네트워크를 통한 분산처리를 테스트해보는 것도 어렵지 않을 것이다. (sink와 벤틸레이터를 구동할 컴퓨터의 IP만 알면 된다!)

PUSH-PULL 패턴에서도 소켓이 N:1 혹은 1:N으로 접속할 수 있음을 보고 있다. 즉 ZMQ 소켓에서는 소켓이 서버냐 클라이언트냐에 관계없이 1:N으로 접속하느냐 혹은 N:1로 접속하느냐에 따라서 바깥으로 전송될 메시지는 반대편 노드에 대해서 순차적으로 분산되어 하나씩 전송되며, 반대로 수신측에서도 N개의 노드로부터 데이터를 수신하는 경우 공평하게 하나의 소켓씩 메시지를 수신해서 처리한다.

몇 가지 다른 문제

여기서 “많은 워커”들은 사실상 종료하는 조건 없이 계속해서 실행되면서 데이터를 기다리게 된다. 모든 처리가 끝났을 때, 어떻게 수많은 워커들을 일시에 종료하게 할 수 있을까? 간단하게 생각해보면 sink는 모든 작업이 완료되는 시점을 알고 있으므로, 최종적으로 모든 작업이 완료되었을 때 일종의 PUB 소켓을 통해서 종료 시그널을 배포하는 것이다. 그리고 다른 모든 워커들은 시작할 때 해당 PUB 소켓에 대한 SUB 소켓을 연결한다.

문제는 워커가 일종의 무한루프를 돌면서 입력을 처리하고 있기 때문에, 종료 시그널을 받기 시작할 시점을 만들기가 애매하다는 점이다. 물론 ZMQ 컨텍스트는 스레드 안전하기 때문에, 실제 워커의 계산 작업을 백그라운드 스레드에서 무한 루프를 돌게하고, 메인 스레드는 종료 시그널을 청취하도록 하는 방법이 있다.

이것은 이것대로 좋은 해결 방법인데, ZMQ는 이를 위한 별도의 해결책으로 조금 특별한 POLLER라는 디바이스를 제공한다. 다음 시간에는 POLLER를 어떻게 사용하는지에 대해서 살펴보고, 스레드 간의 통신에서는 ZMQ를 어떻게 사용할 수 있는지 살펴보기로 하자.