ZMQ 멀티파트메시지

멀티파트 메시지는 하나의 메시지 프레임 내부에 여러 개의 독립적인 메시지 프레임이 들어 있는 것을 말한다. 이는 하나의 프레임에서 처리하기 힘든 데이터 조각들을 모아서 처리할 때 유용하다. 예를 들어 바이너리 파일 데이터를 전송하려는 경우에는 보내는 쪽이나 받는 쪽이나 전송하는 데이터가 이진데이터라는 것을 알고 있다 가정하여 바이트 스트림을 전송할 수 있다. 하지만 이렇게 하면 실제 데이터 외부에 있었던 정보, 이를 테면 파일 이름이나 생성한 날짜 같은 메타 정보를 전달하기가 어려워진다. 이런 경우 여러 정보들을 멀티 파트 메시지로 묶어서 하나의 프레임으로 전송하면 필요한 모든 정보를 같이 전달해 줄 수 있다.

멀티 파트 메시지 전송은 비단 ZMQ내에서만 사용되는 개념이 아니다. HTTP 규격에서도 멀티 파트 메시지 개념이 존재하며, 실제로 HTML 폼 전송에서 멀티 파트 전송이 널리 사용된다. 여러 개의 필드를 하나의 폼에서 묶어서 submit 한다거나, 여러 개의 파일을 한 번에 업로드하는 폼 들은 모두 enctype='multpart/formdata' 라는 폼 속성을 가지며, 실제 HTTP 전송 헤더 역시 Content-Type 에서 이 값이 사용된다.

ZMQ 에서도 여러 개의 메시지 혹은 프레임을 하나의 메시지로 묶어서 멀티파트로 전송할 수 있다. pyzmq를 사용하는 경우 이는 소켓 객체의 send_multipart() / recv_multipart()  두 개의 메소드로 간단히 구현된다. 다만 문자열이 아닌 버퍼(바이트 배열)를 주고 받아야 한다는 점에 주의만 하면 그외에는 매우 간단하다. 다음은 세 개의 문자열을 하나의 멀티파트 메시지로 전송하는 REQ-REP 예제이다.

## multimessage-server.py

import zmq

ctx = zmq.Context()

def run_server(port=5555):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  keys = 'abc'
  while True:
    msg = dict(zip(keys, (x.decode() for x in sock.recv_multipart())))
    for k, v in msg.items():
      print(f'{k}: {v}')
    ## 대문자화하고 하나의 문자열로 만들어서 되돌려준다.
    sock.send_string(' '.join(msg[k].upper() for k in keys))

if __name__ == '__main__':
  run_server()

recv_multipart() 에 의해 리턴되는 값이 바이트배열의 시퀀스임을 알고 있다면 위 코드는 충분히 이해되리라 본다. REQ-REP 패턴에서 클라이언트에서 서버로 멀티 파트 메시지를 전송했다 하더라도, 반드시 응답이 멀티파트여야 할 필요도 없다. 다음은 위 서버 코드와 짝을 맞출 클라이언트 코드이다. 세 번의 키보드 입력 후 해당 문자열들을 한 번에 전송한다. 그리고 서버가 보내준 단일 메시지를 출력하는 것을 반복하는 간단한 코드이다.

import zmq

ctx = zmq.Context()

def run_client(port=5555):
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    data = [input().encode() for _ in range(3)]
    sock.send_multipart(data)
    res = sock.recv_string()
    print(res)

if __name__ == '__main__':
  run_client()
    

 

ZMQ – Poller 사용하기

PUB-SUB 패턴이나 PUSH-PULL 패턴을 사용하면 데이터를 발생시키는 1개 (혹은 여러 개의) 노드로부터 데이터를 전달받는 N개의 노드들이 돌아가는 방식의 네트워크를 구성할 수 있음을 지난 글에서 보았다. 이 때 각각의 클라이언트 노드들은 루프를 돌면서 큐에 들어온 메시지를 순서대로 처리하게 된다.

만약 개별 클라이언트가 한 번에 처리해야 할 메시지의 최대 개수가 정해져 있다면 while 무한 루프가 아닌 for 루프를 통해서 유한한 루프를 돌 수 있을 것이다. 그 외에도 메시지의 내용으로부터 동작이나 중지를 결정할 수 있는 경우에도 그 스스로 동작을 멈출 시점을 결정할 수 있을 것이다.

여러 개의 소켓을 듣는 노드

하지만 클라이언트들의 중지를 다른 제 3의 노드가 결정해야 한다면? ‘커맨드 노드’라는 노드가 있고 이 노드가 PUB 패턴으로 모든 클라이언트들에게 중지 명령을 내리는 구조를 만든다고 가정해보자.

그렇다면 클라이언트 입장에서는 한 번에 두 개의 소켓(하나는 SUB 패턴으로 데이터를 지속적으로 수신받고, 다른 하나는 PULL 패턴으로 종료 명령을 수신받는다.) 을 사용하게 된다. 이것은 이전의 패턴들과는 사뭇다르다. 이전 예제들에서 보여왔던 다중 접속은 하나의 노드가 단일 소켓을 사용하면서, 소켓 자체가 여러 포트에 바인딩되거나, 여러 서버에 커넥트되는 것을 보였다. (그리고 ZMQ는 이 구조 내에서 메시지 전달의 교통정리를 훌륭하게 수행해준다.)

소켓으로 데이터를 반복적으로 읽어야 하는 노드의 특성상, 두 개 이상의 소켓을 하나의 노드가 갖는 것이 문제될 일은 없지만 두 노드를 한꺼번에 대기할 수는 없다. 두 개 소켓으로부터 각각 메시지를 수신하기 위해서는 각각 별도의 스레드를 통해서 처리해야 할 것이다.

POLLER

ZMQ에서는 이러한 상황을 좀 더 쉽게 해결할 수 있도록 POLLER라는 것을 제공한다. Poller는 두 개 이상의 소켓을 등록하여두면 소켓들로부터의 입력을 감지하여 (소켓, 이벤트)의 리스트를 리턴해준다. 이를 이용해서 각각의 소켓을 동시에 수신하면서 소켓별로 구분된 메시지를 얻을 수 있다.

def client(data_port=5556, com_port=5558):
  ctx = zmq.Context()

  # 1. SUB 소켓 연결
  sock_sub = ctx.socket(zmq.SUB)
  sock_sub.setsockopts_string(zmq.SUBSCRIBE, '9')
  sock_sub.connect(f'tcp://localhost:{data_port}')

  # 2. PULL 소켓 연결
  sock_pull = ctx.socket(zmq.PULL)
  sock_pull.connect(f'tcp://localhost:{com_port}')
  
  # 3. 폴러 생성 및 설정
  poller = ctx.Poller()
  poller.register(sock_sub, zmq.POLLIN)
  poller.register(sock_pull, zmq.POLLIN)

  # 폴러로부터 이벤트 수신
  should_continue = True
  while should_continue:
    # 4. 각각의 이벤트는 (소켓, 이벤트), (소켓, 이벤트) ... 의 형태로 전달
    # 이를 사전 타입으로 변환
    socks = dict(poller.poll())
    if socks.get(sock_pull, None) == zmq.POLLIN:
      command = sock_pull.recv_string()
      if command == 'EXIT':
         print('This client will be terminated...')
         should_continue = False
    if socks.get(sock_sub, None) == zmq.POLLIN:
      topic, msg = sock_sub.recv_string().split(' ', 1)
      print(f'Processing: {msg}')

  
  

poller는 복수 개의 소켓을 번갈아 수신하기 보다는 특정한 주기로 각 소켓의 메시지 큐를 검사해서 신규 메시지들을 소켓별로 하나씩 가져와서 이벤트의 리스트로 리턴해준다. 폴러에 등록되는 소켓의 타입은 서로 같은 것들이어도 무방하다. 지난 글에서 살펴본 PUSH-PULL 패턴을 사용한 분산처리는 다음과 같이 개선해볼 수 있다.

  1. Ventilator는 PUSH 패턴으로 복수의 Worker에게 데이터를 순차적으로 뿌린다.
  2. Worker는 PULL 소켓으로 데이터를 수신하여 처리한 후, 다시 PUSH 소켓으로 Sink에게 그 결과를 전달한다.
  3. Worker는 또한 SUB 소켓으로 중지명령을 기다리고 있다. 따라서 Worker에는 PULL, SUB 소켓이 Poller로 묶여있게 된다.
  4. Sink는 PULL 소켓으로 여러 Worker로부터 결과를 수신받아 취합한다.
  5. Sink는 모든 결과가 수신되면 PUB 소켓을 통해 모든 Worker들에게 kill 시그널을 전송한다.

이처럼 단순 소켓이 아니라 소켓을 결합해주는 Poller를 사용하면 복잡한 네트워크 그래프를 간단한 방법으로 구축할 수 있다. Poller외에도 ZMQ에서는 양 끝단에 소켓을 부착하여 메시지를 포워딩해주는 device라는 개념이 있는데, 다음 번에는 이 부분에 대해서 한 번 알아보는 기회를 갖도록 하자.

ZMQ – 그외 메시징패턴 구현하기

Publisher 패턴 – 단방향 브로드캐스팅

ZMQ를 이용한 소켓 네트워크 프로그래밍에서 두 번째로 소개되는 메시징패턴은 바로 Publisher vs Subscriber이다. 일전에 소개한 Request vs Response 패턴은 전통적인 메시지 교환의 방식으로 서버와 클라이언트가 서로 한 번씩 메시지를 주고 받는 식으로 통신하는 방식이었다. 또다른 메시징 패턴인

Publisher vs Subscriber는 다음과 같은 특징을 갖는 메시징 패턴이다.

  1. 서버는 퍼블리셔(Publisher)가 되어 일련의 메시지들을 ZMQ 소켓을 통해 발송한다.
  2. 클라이언트는 구독자(Subscriber)가 되어 메시지들을 구독한다.
  3. 메시지는 1 -> N 으로 복수의 구독자에게 전송된다.
  4. 소켓에 구독자가 없는 경우 메시지는 소실된다.
  5. PUB 소켓을 통해서 서버가 메시지를 받을 수는 없다. 반대로, `SUB` 소켓을 통해서 클라이언트가 메시지를 전송할 수도 없다.

즉, 이 패턴은 일종의 단방향 브로드캐스팅이다. 실제 가이드 페이지에서 제공하는 코드를 분석해보자. 서버는 랜덤한 값을 사용해서 가짜 우편번호와 날씨 데이터를 생성해서 퍼블리싱한다.

from random import randrange
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.PUB) #1
sock.bind('tcp://*:5556')
n = 0 #2
while True:
  zip_codde = randrange(10_000, 100_000)
  temp = randrange(-20, 45)
  humidity = randrange(10, 101)
  sock.send(f'{zip_code} {temp} {humidity} {n}'.encode())
  n += 1
  #3
  1. zmq.PUB 패턴의 커뮤니케이션용으로 소켓을 생성한다.
  2. n은 도대체 메시지가 몇개나 생성되는지 보려고하는 것이다.
  3. 주의깊게 코드를 보다보면 뭔가 허전한 것이 없나? 바로 time.sleep(1) 같은 시간 때우기 호출이 없다. 즉 이 루프는 엄청 빡센 무한루프를 돈다.

클라이언트

클라이언트는 메시징타입을 zmq.REQ 가 아니라 zmq.SUB를 쓴다. 그런데 서버는 엄청나게 많은 양의 메시지를 쏟아낼 것이기 때문에 이를 적절히 필터링할 필요가 있다. 물론 메시지를 받은 다음에 메시지 내용을 보고 필터링해도 되긴하는데, ZMQ의 소켓은 여러 옵션을 통해서 필터를 설정할 수 있다. 이 클라이언트는 소켓에 필터를 설정하고, 필터에 해당하는 우편번호의 날씨 데이터만을 수집한 후 기온 필드를 추출하여 50개 데이터에 대해서 평균치를 출력한다.

## wu-client.py
import sys
import zmq

zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001' #1

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter) #2

c, temp = 50, []
for _ in range(c):
  msg = sock.recv().decode()
  print(msg)
  _, t, *_ = msg.split()
  temp.append(t)
print(f'Average Temperature: {sum(temp)/c}')
  1. 서버가 쏟아내는[^1] 데이터는 다섯자리 숫자의 가상 우편번호이다. 클라이언트는 정해진 하나의 우편번호로 시작하는 데이터만을 받아들일 예정이므로 필터링할 조건을 생성한다. 여러 개의 클라이언트가 붙을 수 있음을 상정하여 명령줄에서 옵션으로 변경할 수 있게 한다.
  2. socket.setsockopt()는 소켓에 여러 옵션을 추가할 수 있다. zmq.SUBSCRIBE 는 구독 필터를 세팅한다. _string 이라는 접미사가 붙은 것은 필터의 내용이 문자열이기 때문이다.

setsockopt()의 보다 자세한 내용은 API 문서를 확인하자.

내용이 워낙 많아서 여기서 일일이 다 소개할 수는 없을 것 같다. 어쨌든 한 번 실행해보자. 우선 서버를 먼저 실행한 다음, 여러 터미털 화면[^2]에서 클라이언트를 실행한다. 우편번호 옵션은 서로 달라도 같아도 상관없다. 오른쪽 스크린샷은 터미널화면을 분할하여 동시에 클라이언트들을 실행한 화면이다. 실제 8개의 클라이언트를 돌리고 있다. 이 때 괄호안에 표시되는 값이 서버가 생성한 메시지의 일련번호인데 우리 서버가 굉장히 열일하고 있다는 것을 볼 수 있다.

분산처리

이번에 소개할 패턴은 PULL vs PUSH 패턴이다. PUSH 패턴은 1:N 연결에서 다수의 반대쪽 종단점에 대해서 순차적으로 메시지를 분산 발송한다. PULL은 반대로 N:1 의 연결에서 여러 클라이언트가 보내는 메시지를 순차적으로 받아들이는 것을 말한다. 즉 여러 클라이언트와 커뮤니케이션하면서 모든 클라이언트에게 동일한 메시지를 전송하는 Publisher  패턴과는 달리, 모든 클라이언트는 각각 고유한 메시지를 받을 수 있게 된다.

이 메시지 패턴은 여러 노드로 메시지를 전송한 후 다시 메시지를 취합할 수 있는 효과를 가지기 때문에 분산처리에 사용하기 좋다. 실제로 가이드 문서에서는 간단한 분산처리 기법을 선보이고 있다.

왼쪽에서 보는 도식에서 Ventilator는 작업할 내용을 생성해서 각 Worker에게 분배하는 역할을 한다. (PUSH 패턴을 이용해서 네트워크에 접속한다.) 개별 Worker들은 특정한 계산이나 처리를 수행하는 노드로 PULL 패턴 소켓으로 Ventilator와 연결되어 순차적으로 입력값을 받는다.

Worker들은 다시 반대쪽에는 PUSH 패턴의 소켓을 갖고 있으며, 이들은 개별적으로 처리된 메시지를 이곳으로 보낸다. 이 PUSH 소켓의 다른 종단점은 sink의 PULL 패턴 소켓이다. Sink는 처리완료된 결과를 취합하여 소비하는 곳이다.

이는 Ventilator -> Worker -> Sink의 파이프라인인데, Worker의 개수가 1개 혹은 그 이상이 될 수 있는 셈이다. 눈여겨볼 지점은 이 그래프에서 Ventilator와 Sink는 각각 서버에 해당하며, 각 Worker가 클라이언트에 해당한다. REQ\REP 패턴에서는 흔히 서버가 REP 패턴을, 클라이언트가 REQ패턴을 가졌지만, 파이프라이닝에서는 소켓의 타입이 해당 노드가 서버인지 클라이언트인지는 중요하지 않다. 다만 다음 라인으로 넘겨주는 위치에서는 PUSH를, 이전 라인으로부터 받는 위치에서는 PULL을 사용한다.

이 그래프에 나와있는 각 노드들을 간단하게 구현하겠다. Worker는 적당히 시간이 오래 걸리는 작업을 수행해야 하니,  time.sleep()으로 대충 시간을 끌도록 하겠다.

 

Ventilator 구현

Ventilator는 Worker가 소모해야 하는 데이터를 생성하는 역할을 한다. 이 값은 100~500 사이의 값이며, 이 값을 받은 worker는 해당 값의 밀리세컨드만큼 지연후에 Sink로 메시지를 보낸다. Ventilator가 사용하는 포트는 5007번이며, Sink가 사용하는 포트는 5008이라고 가정한다.

import random, time
import zmq

ctx = zmq.Context()
dc = 1000 #1

#2
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557') #3

sink = ctx.socket(zmq.PUSH) #4
sink.connect('tcp://localhost:5558') #5

def run():
  _ = input('Please press enter when the workers are ready...')
  print('Sending tasks to workers')
  
  sink.send(f'{dc}'.encode())  #6
  random.seed()
  
  #7
  total_msec = 0
  for _ in range(dc):
    workload = random.randrange(100, 500)
    total_msec += workload
    sender.send(f'{workload}'.encode())
  print(f'total expected cost: {total_msec}ms')
  time.sleep(1)

run()
  1. dc 값은 처리해야 할 데이터의 개수를 말한다.
  2. 값을 worker에게 뿌리기 위한 소켓을 생성해서 sender 라는 이름을 붙였다. 타입은 PUSH이다.
  3. Ventilator는 Worker에게는 서버의 역할을 수행한다. sender는 5007 포트에 바인딩한다.
  4. Sink에게 전체 수집해야 할 데이터의 개수를 알려주기 위한 소켓을 생성했다.  역시 PUSH 타입이다.
  5. Ventilator — Sink 간 연결에서 Ventilator는 클라이언트이다. 따라서 여기선 5008번 포트로 connect() 한다.
  6. 작업을 시작하기 전에 Sink에게 총 데이터 개수를 보낸다.
  7. Worker에게 메시지를 전송하기 시작한다. 메시지의 내용은 대기해야 할 지연시간이다.

Worker로 전송되는 모든 지연시간은 누적합산된 후, 메시지 발송이 끝나면 예상 비용으로 출력된다.

Sink 구현

Sink를 먼저 구현해보자.  Sink는 5008번 포트를 들으면서, 최초 1회는 수집해야 할 데이터의 개수를 전달받는다. 이후 들어오는 데이터를 소비하면 되는데, 여기서는 매번 ‘.’을 찍어주는 것으로 했다. 시각적으로 구분을 위해서 10개 단위마다 ‘:’를 출력하여 보이게 한다.

import sys, time
import zmq

#1
ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

#2
s = int(receiver.recv().decode())
tstart = time.time()

#3
for task_no in range(s):
  s = receiver.recv()
  if task_no % 10 is 0:
    sys.stdout.write(':')
  else:
    sys.stdout.write('.')
  sys.stdout.flush()

#4
tend = time.time()
print(f'Total elapsed time: {(tend-tstart) * 1000}ms')   
  1. 소켓을 생성하고 5558 포트에 바인딩한다.
  2. 첫 입력은 Ventilator가 보낸 데이터 개수이다.
  3. 시작 시간을 설정하고 루프를 시작한다.
  4. 루프에서는 입력으로 들어오는 메시지를 확인하고 메시지가 들어올 때마다 ‘.’을 찍는다. 단 10번에 한 번은 ‘:’을 출력한다.
  5. 출력에는 print() 문이 아닌 sys.stdout을 사용했다. 가이드에서는 특별히 다른 이유는 알려주지 않았다.

 Worker구현

Worker는 단순하다. 5557 포트로부터 들어온 값을 정수로 변환하여, 그만큼 대기하고, 다시 Sink로 메시지를 보내는 일을 계속 반복한다.

import sys, time
import zmq

ctx = zmq.Context()
receiver = ctx.socket(zmq.PULL)
receivier.connect('tcp://localhost:5557')

sender = ctx.socket(zmq.PUSH)
receiver.connect('tcp://localhost:5558')

while True:
  s = receiver.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s.decode()) * 0.001)
  sender.send(b'')

실행하기

다음과 같은 순서대로 실행한다.

  1. 먼저 Sink를 실행한다.
  2. 다음은 Ventilator를 실행한다.
  3. 적당한 수의 Worker를 실행시킨다.
  4. Ventilator를 실행하는 창에서 enter 키를 누른다.

각 Worker 창에서 느긋하게 …. 이 출력되기 시작하고 Sink는 유독 바쁘게 출력하기 시작한다. 여기서 주목해야 할 부분은 Ventilator는 몇 개의 Worker가 작업을 나눠서 하는지에 대해서는 알지 못하며 알 필요도 없다는 것이다. PUSH / PULL 패턴에서는 1:1, 1:N, N:1, N:N을 모두 상정할 수 있고, 이러한 위상의 구분은 코드상에서 구분되는 것이 아니라 어떤 노드가 어떻게 연결되는것인지에 따라 구분된다 할 수 있다.

또 ZMQ를 이용한 분산처리의 장점은 각각의 worker 노드가 스레드일 수도 있고, (위 예제에서와 같이) 프로세스 일수도 있는데 각 노드간의 교신이 TCP 포트를 사용하게 되므로 사실상 위치에 제약을 받지 않는다. 따라서 다른 여러 대의 컴퓨터를 각각 Worker로 만들어서 접속하거나, 혹은 여러 대의 컴퓨터에서도 각각 여러 개의 프로세스를 만들어서, 수백/수천개의 Worker를 작업에 손쉽게 투입할 수 있다.

멀티 프로세스로 분산작업을 하려는 경우에는 엄두조차 내기 힘든 스케일을 지원할 수 있다.

정리

이상으로 기본적인 ZMQ 메시지 패턴에 따른 처리 방법을 살펴보았다. ZMQ 메시지 패턴은 여기서 소개한 것 외에도 몇 가지가 더 있으며, 각 소켓은 상황에 따라 바인드하거나 커넥트할 수 있다. ZMQ는 단순히 소켓을 추상화하여 사용하기 쉽게 만들어주는 것 외에도 다양한 상황에서 분산 메시지 기술을 적용할 수 있도록 해주는 놀라운 도구이며, 활용법을 익혀 나감에 따라서 상상했던 것 이상으로 폭넓게 적용할 수 있음을 알 게 될 것이다.

ZMQ의 메시지 패턴

ZMQ의 기본 개념을 다루면서 ZMQ에서는 몇 가지 코어 메시징 패턴이 있다고 언급했다. 이 중에서 REQ-REP 패턴에 대해서는 에코 서버를 구현하면서 소개하였다. 이 글에서는 ZMQ 메시징 패턴에 대해 각각의 예시에 대해서 언급해보겠다.

REQ-REP 패턴

REQ-REP 패턴은 전형적인 서버-클라이언트 통신에 사용될 수 있는 패턴이다. 클라이언트 -> 서버로 요청이 전송되고 다시 서버 -> 클라이언트로 응답이 전송된다. 이미 이전글에서 서버 1개가 여러 개의 포트를 리스닝하는 동시에 각 포트에 대해서도 다중 접속이 가능한 구현에 대해서 소개한 바 있다. (사실 이를 구현하는 것은 거의 공짜로 가능했다.) 여기서는 이전의 예제를 살짝 비틀어서 여러 개의 서버에 대해서 하나의 클라이언트가 접속했을 때 어떤식으로 메시지가 처리되는지 살펴보자.

서버

서버는 실행시 포트와 서버 이름을 받아서 실행한다. (이 둘은 콤마로 구분된 한 덩어리의 문자열이어야 한다.) 서버는 지정된 포트에 바인드되어 클라이언트로부터 들어온 메시지에 서버의 이름을 덧붙여서 회신한다.

import sys, zmq

ctx = zmq.Context()

def run_server(port, name):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  while True:
    data = sock.recv()
    msg = data.decode()
    print(f'{name} Recieved: {msg}')
    rep = f'{name}->{msg}'
    sock.send(rep.encode())
    if msg == 'bye':
      sock.close()

if __name__ == '__main__':
  if len(sys.argv) > 1:
    port, name = sys.argv[1].split(',', 1)
    run_server(port, name)

클라이언트

이전의 리턴되는 메시지를 약간 변형했다는 것 외에는 별다른 특이한 점이 없는 에코 서버 코드였다. 다음은 클라이언트이다. 클라이언트는 명령줄에서 접속할 포트를 1개 이상 받게 되고, 주어진 모든 포트에 접속한다. 이후 루프를 돌면서 입력받은 메시지를 서버로 전송하고 그 결과를 회신받아 출력한다.

import zmq, sys

ctx = zmq.Context()

def run_client(*ports):
  sock = ctx.socket(zmq.REQ)
  for port in ports:
    sock.connect(f'tcp://127.0.0.1:{port}')
  while True:
    msg = input()
    sock.send(msg.encode())
    rep = sock.recv().decode()
    print(rep)

if __name__ == '__main__':
  if len(sys.argv) > 1:
    ports = sys.argv[1:]
    run_client(*ports)

여러개의 터미널을 띄워놓고 세 개의 터미널에서는 7777,A, 7778,B, 7779,C 의 옵션으로 각각 서버를 돌린 다음, 클라인터는 7777 7778 7779를 인자로주고 실행한다. 그런 다음 클라인트 창에서 입력을 한 줄씩 실행한다. 그러면 그 결과로 각각의 서버가 사이좋게 하나씩 메시지를 받아서 클라이언트에게 돌려주는 광경을 목격할 수 있다. 즉 클라이언트 입장에서도 여러 개의 동일한 동작을 하는 서버의 소켓에 제각각 접속해두면 자체적으로 로드밸런싱을 수행하여 각각의 서버에 대해서 요청과 응답을 수행하게 된다. 즉, REQ-REP 패턴에서 각각의 소켓은 자신이 서버인지 클라이언트인지는 중요하지 않으며, 주고 받는 대상 노드가 1개 이상인 경우에는 순서대로 번갈아가며 통신하게 된다.

만약, 우리가 작성하고 있는 서버 X가 있고, 이는 REP 패턴의 소켓을 가지고 있으며, 내부 네트워크의 클라이언트들과 통신하고 있다고 하자.  REQ 소켓과 REP 소켓을 한 몸에 지니고 있는 ‘무언가’를 만들어서 두 소켓을 연결하는 간단한 코드를 작성한 후, 이 디바이스의 REP 소켓을 외부 IP로 노출하게 되면, 해당 디바이스는 내부 네트워크와 외부 네트워크를 연결하는 단자, 일종의 라우터로 기능하게 된다.

ZMQ는 노드와 노드간의 연결이 어떤식으로 기능하는가를 메시징 패턴으로 정의한다. 그리고 우리는 만들고자하는 네트워크의 특정한 노드가 어떤식으로 동작할지를 결정해서 간단한 디바이스를 하나 추가하는 것으로 손쉽게 네트워크를 확장할 수 있음을 볼 수 있다. (이와 관련된 실제 예를 다른 예제에서 다시 살펴보도록 할 것이다.)

PUB-SUB 패턴

PUB-SUB패턴은 Pulisher와 Subscriber를 정의하고, 두 노드간의 통신을 정의한다. 이 통신은 단방향으로 데이터가 흐르며, publisher가 생성하는 메시지는 해당 퍼블리셔를 구독하는 subscriber들에게 전송된다. 가장 간단한 브로드캐스팅 방법이라 보면 되겠다.

다음은 지역별 온도와 습도를 발행하는 서버이다. 말이 그렇다는 것이고 실제로는 임의의 우편번호, 온도, 습도 값을 랜덤으로 생성해서 무차별적으로 발행해버린다. (원문에는 메시지 번호를 넣지 않는데, 이 서버가 실제로 얼마나 많은 메시지를 만들어 내는지 체험해보는 것도 나쁘지 않을 것 같아서 넣었다.)

퍼블리셔

import zmq
from random import randrange

ctx = zmq.Context()

def run_server(port):
  sock = ctx.socket(zmq.PUB)
  sock.bind(f'tcp://*:{port}')
  i = 0
  while True:
    zipcode = randrange(1, 100000)
    temp = randrange(-80, 135)
    hum = rangdrange(10, 60)
    msg = f'{zipcode:05d} {temp} {hum} {i}'
    sock.send(msg.encode())
    i += 1

run_server(5556)

다음은 subscriber에 해당하는 클라이언트 코드이다. 클라이언트는 필터링할 우편번호를 필요로 하며, 해당 우편번호로 시작되는 값만을 받아들인다.

구독자(Subscriber)

import sys, zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

sock.connect('tcp://127.0.0.1:5556')
zip_filter = sys.argv[1] if len(sys.argv) > 1 else '10001'
sock.setsockopt_string(zmq.SUBSCRIBE, zip_filter)

total_temp = 0
for update_no in range(50):
  msg = sock.recv().decode()
  print(msg)
  z, t, h, n = msg.split()
  total_temp += int(t)

print(f'Average temp for {z}: {total_temp/50:0.3f}')

이 때 주목해야 할 부분은 setsockopt_string()을 사용한 부분이다. 여기에서 소켓의 옵션을 설정할 수 있다. zmq.SUBSCRIBE는 메시지 구독 필터를 설정하는 부분이다. 구독 필터를 설정하지 않으면 아무런 메시지도 구독하지 않는다.

클라이언트를 먼저 서 너개 실행한 후에 서버를 실행하자. 서버가 시작되면 클라이언트들은 무서운 속도로 50개의 메시지를 채울 것이다. 또한 클라이언트들이 실행을 종료한 후에 다시 동일하게 실행시켜보자. 서버는 클라이언트가 있든 없든 계속해서 메시지를 발행해나간다. 수신자가 없는 메시지는 큐에 들어있다가, 다른 메시지들이 계속 밀고 들어오게되면 어느 때에가서는 자동으로 버려진다.

가이드 문서에서 중요하게 지적하는 한가지는 이 패턴에서 메시지를 가져오는 시간을 정확하게 알기 어렵다는 것이다. (심지어 클라이언트가 먼저 시작하더라도!) 왜냐하면 클라이언트가 먼저 실행되고 있던 중이라하더라도 접속이 체결되는데에는 밀리세컨드 레벨의 지연이 발생하고 이 시점에도 계속 메시지가 나갈 수 있기 때문이다. (이러한 문제는 나중에 컨트롤러 소켓을 별도로 운영하거나 하는 방식으로 해결할 수 있다.)

PUSH-PULL 패턴

이 패턴은 pipeline이라 불리는 패턴으로 한쪽에서 다른쪽으로 메시지를 푸시하는 방식이다. 이 방식이 REQ-REP와 다른 점은 한쪽에서 다른쪽으로 일방적으로만 통신이 이루어진다는 점이다.(이를 서버와 클라이언트로 나누기도 어려운데, 푸시하는 쪽이 클라이언트가 될 수도, 서버가 될 수도 있기 때문이다.) 또 PUB-SUB 패턴과도 다른데, PUB-SUB 패턴에서는 여러 개의 구독자가 하나의 퍼블리셔에 연결되면 모두 동일한 메시지를 받게 되지만 (물론 구독 필터에 따라서 다른 값을 받을 수는 있지만, 모든 구독자가 같은 메시지 풀에 들어간다는 점은 같다.) PUSH-PULL 방식에서는 마치 REQ-REP에서 1:N으로 연결된 소켓처럼 연결된 PULL 소켓쪽으로 번갈아가면서 메시지가 하나씩 전달된다는 것이다.

가이드 문서에서는 이를 이용해서 분산처리를 수행하는 아키텍쳐를 어떻게 쉽게 만들 수 있는가를 설명하고 있다. 프로세스를 여러개 만들어서 분산처리를 하는 경우, 개별 워커가 계산해낸 결과 데이터를 다시 하나로 수집하는 데에는 파이프와 같은 장치를 사용하는데, 이것이 익숙하지 않은 사람에게는 조금 어렵고, 워커 수가 많아지는 경우에는 구현 난이도가 높아진다.

PUSH-PULL 패턴은 데이터의 흐름이 일방적인 라인에 있는 경우, 이를 구현하기에 알맞는 패턴이다. 여기서는 각 단계별로 작성해야하는 프로그램이 나눠진다.

  1. 벤틸레이터 – 처리해야 할 데이터를 계속해서 생성해내는 프로세스이다.
  2. 워커 – 생성된 데이터를 받고 처리한 후 그 결과를 만드는 처리 프로세스이다.
  3. 싱크 – 워커에 의해 생성된 결과 값들이 모이는 곳이다.

이 때 워커가 여러 개가 되면 훌륭한 분산처리 시스템이 된다. 문제는 파이썬에서도 멀티 스레드나 멀티 프로세스를 통한 분산처리 작업은 이미 가능하다는 것이다. 그렇다면 ZMQ를 사용해서 이러한 분산처리 시스템을 구축하는 것이 큰 도움이 될까?

당연히 그렇다. 간단한 멀티 프로세스를 통한 분산처리 코드는 파이썬에서도 작성하기가 간단하다! 하지만 ZMQ를 만들 때 워커는 ZMQ 네트워크 상의 노드로 기능하는 프로세스이며, 각 노드는 IPC외에도 TCP를 통해서 교신할 수 있다. 이 말은 한 대가 아닌 수십~수백대의 네트워크 상의 컴퓨터를 이 분산처리 그리드 내에 투입하는 것이 가능하며, 각각의 컴퓨터가 연산 수행 능력에 여유가 있다면, 1대당 여러 개의 워커 프로세스를 띄울 수 있다는 것이다. 분산 처리에 투입되는 워커가 고작해야 4~8개 정도 될 수 있는 단일 머신 처리에서 수백~수천개로 확장된 분산처리를 “거의 공짜로” 만들어낼 수 있다.

작업의 처리 순서는 다음과 같이 디자인한다.

  1. 맨 먼저 싱크 프로세스가 한 개 실행된다. (편의상 5558 포트를 사용해서 PULL 소켓으로 값을 받는다고 하자)
  2. 벤틸레이터 서버가 구동된다. 벤틸레이터 서버는 구동 후 싱크에 연결하여 (PUSH를 사용) 작업이 시작되는 시그널을 보낸다. 이후 시작을 준비하는 상태로 input() 함수를 하나 실행해서 기다린다.
  3. 각 워커들을 구동한다. 각 워커는 2개의 소켓을 가지고 있는데, 하나는 PULL 소켓으로 이는 벤틸레이터에 connect로 연결되며, 다른 하나는 PUSH로 sink에 connect된다.
  4. 작업이 준비되면 벤틸레이터는 5557포트 PUSH 소켓으로 데이터를 쏟아낸다. 각 데이터는 순차적으로 워커 프로세스로 전달된다.
  5. 각 워커 프로세스는 5558 포트의 PUSH 소켓으로 데이터를 푸시한다. 그 종착역은 sink이다.
  6. sink는 5558번 포트로 들어온 데이터들을 취합한다.

싱크

먼저 sink의 코드를 보자. sink는 각 워커가 응답을 보내올 때마다 ‘.‘을 찍을 것인데, 10개 마다 :을 찍어서 진행상황을 보다 눈에 쉽게 들어오도록 할 것이다. 최초로 실행된 후에는 무엇보다 벤틸레이터로부터 한 번의 통신을 전달받아 준비상태를 완료할 것이다.  준비상태가 완료되면 이 때부터 미리 정해진 개수의 응답을 푸시 받을 때까지 시간을 재고, 총 시간을 출력할 것이다.

import sys, time, zmq

ctx = zmq.Context()

receiver = ctx.socket(zmq.PULL)
receiver.bind('tcp://*:5558')

start_signal = receiver.recv()  #1
v = int(start_signal.decode())

start_time = time.time()
for task_no in range(v):
  s = receiver.recv() #2
  sys.stdout(':' if task_no % 10 is 0 else '.') #3
  sys.stdout.flush()
end_time = time.time()
print(f'Total time: {:%d}msec')

벤틸레이터

벤틸레이터는 구동된 후에 소켓을 5557포트에 바인딩한다. sink와 직접 통신을 위해서 5558번 포트로는 connect로 연결한다. 워커가 실행되는 동안 input() 함수로 기다렸다가, 최종적으로 입력이 들어오면 sink에게 지금부터 시간을 재라고 신호를 하나 보낸 후 100개의 랜덤한 숫자값을 보낸다. 각각의 워커는 무슨 복잡한 일을 하는 것이 아니라, 주어진 시간만큼 sleep 했다가 다음 신호를 처리받을 것이다.

import zmq, randome, time

ctx = zmq.Context
sender = ctx.socket(zmq.PUSH)
sender.bind('tcp://*:5557')

sink = ctx.socket(zmp.PUSH)
sink.connect('tcp://localhost:5558') #1

_ = input('Press Enter when the all workers are ready...')
print('sending tasks to workers')

v = 100
sink.send(f'{v}'.encode()) #2

total_msec = 0
for t_no in range(v):
  workload = random.rangrange(1, 100)
  total_msec += workload
  sender.send(f'{workload}'.encode())
print(f'total expected cost: {total_msec}ms') #3
time.sleep(1) #4
  1. 포트에 bind 할 것인지 connect 할 것인지는 연결의 성격에 따라 달라진다. 여기는 두 개의 PUSH 소켓을 하나는 bind하고 하나는 connect 하는 식으로 연결했다. bind된 쪽에 연결할 때에는 connect를 해야 한다.
  2. 준비가 끝나면 sink에게 타이머를 시작하라고 시그널을 전송한다.
  3. 이후 모든 워커에게 지연시간값을 나눠준다. 그 총량은 모두 합산된후에 출력한다.
  4. 큐에 남아있는 메시지가 있을지 모르니, 타이머를 1초 가량 준다. (이 부분은 별도의 컨트롤러 포트를 통해서 제어할 수 있다.)

워커

워커는 벤틸레이터로부터 값을 받아서 다시 싱크쪽으로 값을 밀어넣는다. 따라서 한쪽은 PULL 소켓 다른 한쪽은 PUSH 소켓이 있어야 한다. 그 사이에 받은 값을 정수로 환산하고 그 밀리세컨드 시간만큼 지연한 후 싱크로 시그널을 보낸다.

import zmq, sys, time

ctx = zmq.Context()

left = ctx.socket(zmq.PULL)
left.connect('tcp://localhost:5557')
right = ctx.socket(zmq.PUSH)
right.connect('tcp://localhost:5558')

while True:
  s = left.recv()
  sys.stdout.write('.')
  sys.stdout.flush()
  time.sleep(int(s) * 0.001)
  right.send(b'')

이제 모든 구성원에 대한 준비가 끝났다. 순서대로 각 프로세스를 실행한 후에 구동해보자. 아래 캡쳐는 이 결과를 보여주는 것이다. 총 7개의 워커를 사용했으며 수천개의 값을 벤틸레이팅했다.  상대적으로 천천히 .을 찍어 나가는 화면이 각각의 worker이고 맨 마지막에 빠르게 결과를 찍어나가는 창은 sink에 해당한다. 최종적으로 sink에 출력되는 총 소요 시간은 벤틸레이터가 합산한 전체 코스트 / 워커의 개수보다 아주 약간 많은 시간이 소요되는 것을 확인할 수 있다.

만약 같은 네트워크 내에 worker를 구동할 머신이 있다면 네트워크를 통한 분산처리를 테스트해보는 것도 어렵지 않을 것이다. (sink와 벤틸레이터를 구동할 컴퓨터의 IP만 알면 된다!)

PUSH-PULL 패턴에서도 소켓이 N:1 혹은 1:N으로 접속할 수 있음을 보고 있다. 즉 ZMQ 소켓에서는 소켓이 서버냐 클라이언트냐에 관계없이 1:N으로 접속하느냐 혹은 N:1로 접속하느냐에 따라서 바깥으로 전송될 메시지는 반대편 노드에 대해서 순차적으로 분산되어 하나씩 전송되며, 반대로 수신측에서도 N개의 노드로부터 데이터를 수신하는 경우 공평하게 하나의 소켓씩 메시지를 수신해서 처리한다.

몇 가지 다른 문제

여기서 “많은 워커”들은 사실상 종료하는 조건 없이 계속해서 실행되면서 데이터를 기다리게 된다. 모든 처리가 끝났을 때, 어떻게 수많은 워커들을 일시에 종료하게 할 수 있을까? 간단하게 생각해보면 sink는 모든 작업이 완료되는 시점을 알고 있으므로, 최종적으로 모든 작업이 완료되었을 때 일종의 PUB 소켓을 통해서 종료 시그널을 배포하는 것이다. 그리고 다른 모든 워커들은 시작할 때 해당 PUB 소켓에 대한 SUB 소켓을 연결한다.

문제는 워커가 일종의 무한루프를 돌면서 입력을 처리하고 있기 때문에, 종료 시그널을 받기 시작할 시점을 만들기가 애매하다는 점이다. 물론 ZMQ 컨텍스트는 스레드 안전하기 때문에, 실제 워커의 계산 작업을 백그라운드 스레드에서 무한 루프를 돌게하고, 메인 스레드는 종료 시그널을 청취하도록 하는 방법이 있다.

이것은 이것대로 좋은 해결 방법인데, ZMQ는 이를 위한 별도의 해결책으로 조금 특별한 POLLER라는 디바이스를 제공한다. 다음 시간에는 POLLER를 어떻게 사용하는지에 대해서 살펴보고, 스레드 간의 통신에서는 ZMQ를 어떻게 사용할 수 있는지 살펴보기로 하자.

ZMQ의 기본 개념들

일전에 간단하게 ZMQ(Zero MQ)에 대한 내용을 간단히 정리해본 바 있는데, 이 때는 소켓에 대한 내용을 살펴보다가 흘러흘러 닿은 부분이라 제대로 설명하지 못하고 공식문서에 나오는 예제를 그대로 옮기는 수준이었다.  ZMQ는 소켓 프로그래밍 API를 대체할 수 있는 정말 괜찮은 라이브러리라는 생각이 들어서 활용할 폭이 넓다고 판단됐다. 다만 용어나 개념에 대한 약간의 선행지식이 필요한 부분이 있다. 오늘은 ZMQ에서 사용되는 기본적인 개념에 대해서 알아보고, ZMQ를 통해서 간단한 에코서버와 클라이언트로 소켓통신을 구현하는 방법에 대해 살펴보도록 하겠다. 그리고 ZMQ를 사용하면 전통적인 소켓 접속을 구현하는 것보다 얼마나 편하며 또 멋지게 돌아가는지도 살펴보도록 하겠다.

ZMQ란?

  • 코드를 모든 언어, 모든 플랫폼 상에서 연결
  • inproc, IPC, TCP, TIPC, 멀티캐스트를 통해 메시지들을 전달
  • pub-sub, push-pull 그리고 router-dealer와 같은 스마트한 패턴
  • I/O 비동기 엔진을 지원하는 소형 라이브러리
  • 대규모의 활동적인 오픈소스 커뮤니티 지원
  • 모든 현대 언어와 플랫폼을 지원
  • 중앙집중, 분산, 소규모 및 대규모의 모든 아키텍처를 구성 가능
  • 완전한 상업적 지원을 동반하는 무료 소프트웨어

위는 ZMQ 홈페이지에서 소개하는 ZMQ의 특징이다. ZMQ는 제로엠큐라 불리는 분산 메시징 플랫폼을 구현하기 위한 라이브러리이다. 흔히 소켓 통신 구현을 간단히 할 수 있는 라이브러리로 많이 소개되는데, ZMQ는 사실 훨씬 더 많은 것을 간편하게 구축할 수 있으며, 더군다나 빠르게 처리되도록 할 수 있다. ZMQ의 컨셉은 여러 가지 측면을 가지고 있지만, 그중에 가장 근간이 되는 키워드라 함은 ‘여러 노드를 편리하게 이어주는 분산형 메시징 플랫폼’이다.  위의 특징 소개글에서도 약간 엿볼 수 있지만, 다음과 같은 특징을 가진다.

  1. ZMQ는 단순히 소켓과 소켓을 연결하는 라이브러리가 아닌, 노드와 노드를 연결하여 메시지를 주고받을 수 있는 플랫폼을 구축하는데 쓰인다. 이 때 노드는 소켓외 스레드, 프로세스, TCP로 연결된 다른 머신이 될 수 있다. 즉 노드가 어디에 있든지에 대해서 상관하지 않도록 메시징의 양 끝단이 추상화되어 있다.
  2. 전달되는 데이터는 모두 ‘메시지’라는 형태로 불린다. 메시지는 그 속에 무엇이 들어있는지를 신경쓰지 않으며, 여러 언어/플랫폼간에 호환이 가능하도록 길이값+데이터의 형태로 묶여 있다.
  3. ZMQ의 Q는 Queue를 의미한다. 이는 노드들이 연결되어 만들어진 플랫폼 내에서의 모든 메시지 전달은 자동으로 큐를 거치게 된다. 소켓의 경우 서버가 소켓을 열기 전에 클라이언트가 접속하는 상황은 에러가 되지만, ZMQ에서 이러한 순서는 중요하지 않다. 수신측이 없거나 유효하지 않은 메시지는 모두 큐잉되며, 나중에 소비되거나 혹은 버려질 수 있다. 반대로 메시지를 받아들이는 부분에서도 큐가 사용된다. 모든 큐에 관한 관리는 라이브러리가 자동으로 관리하며, 큐 관리에 들어가는 비용은 없다. (그래서 Zero MQ이다.)

컨텍스트 (context)

모든 ZMQ 관련 코드는 컨텍스트 객체를 만드는 것으로 시작하고, 모든 ZMQ소켓은 컨텍스트를 통해서 생성된다. 소켓을 생성하고 관리하는 컨테이너이자 관리자이다. 컨텍스트는 스레드 안전하며, 프로세스 내에서 하나만 생성하고 사용하여야 한다.

컨텍스트를 통해서 생성되는 소켓에서 발생하는 입출력은 실제로는 백그라운드 스레드에서 비동기로 일어나며, 이 과정은 전부 컨텍스트에 의해서 처리된다. 컨텍스트로부터 명시적으로 생성되는 소켓은 실제로는 소켓이 아니며, 즉시 생성되지도 않는다. 모든 것은 내부에서 자동으로 관리된다.

소켓

이 글에서 아무런 단서 없이 언급되는 ‘소켓’이라는 명칭은 ZMQ 내에서의 소켓을 말한다. 전통적인 네트워킹에 사용되는 UNIX 소켓은 “UNIX 소켓” 혹은 “전통적인 소켓”이라고 언급할 것이다. 이와 같은 명명 관습은 ZMQ를 다루는 이 글과 그 후속글에서 공통적으로 적용할 예정이다.

전통적인 UNIX의 소켓은 네트워크 포트를 추상화한 것이다. 이에 빈해 ZMQ의 소켓은 메시지를 전달하고 받는 창구의 개념이다. UNIX 소켓은 수신측인 상대편 노드가 열려있는 소켓이거나 혹은 상대방 소켓이 이쪽으로 붙어서 연결되었음을 상정한다. (예를 들어 클라이언트쪽에서 열려있지 않은 서버의 소켓에 접속하려 시도하면 강제로 연결이 끊기면서 에러가 발생한다.) 이에 반해 ZMQ의 소켓은, 컨텍스트 내에서 생성되는 소켓의 재추상화된 버전이다. 소켓은 TCP등의 프로토콜을 사용하는 전통적인 UNIX 소켓일 수도 있으며, 상대방 노드의 종류에 따라 inproc(동일 프로세스 내에서 스레드간 통신을 위한 규격) 혹은 IPC(동일 시스템 내에서 프로세스간 통신을 위한 규격)일 수 있다. 실제 물리적인 소켓은 필요에 따라 생성된다.

실제로 소켓은 UNIX 소켓과 1:1로 대응하지 않는다. 우리가 바라볼 때 소켓의 뒤에는 수신자가 아닌 메시지를 주고 받기 위한 큐가 있을 뿐이다. 기술적으로 ZMQ에서는 하나의 소켓을 여러 개의 포트에 바인딩하거나, 여러 노드에 connect 할 수 있다. (이 경우에 송신할 때나 수신할 때, 여러 포트/노드로 번갈아가며 공평하게 하나씩 전송하거나 전송받는다.)

다음은 간단한 ZMQ에서의 소켓을 이용한 에코서버/클라이언트 구현이다. 컨텍스트가 어떤 소켓을 사용하고 어떤 세팅을 사용할 것인지에 대해서 우리는 큰 고민을 할 필요가 없으며, 몇 가지 핵심적인 코어 메시지 패턴 중에서 하나를 선택하면 된다. 우리가 만들려고 하는 에코 서버/클라이언트는 클라이언트가 서버에 요청을 보내고, 그 요청에 대한 응답을 받는 식으로 서로 한 걸음씩 짝을 맞추어 커뮤니케이션하는 패턴을 갖는다. 이를 ZMQ에서는 REQ-REP 패턴이라고 한다. 이 패턴에 기초해서 소켓을 만들고, 사용한다.

## echo-server.py
## 서버측 코드
import zmq, time

## 컨텍스트를 생성한다.
ctx = zmq.Context()

def run_server():
  ## zmq 소켓을 만들고 포트에 바인딩한다.
  sock = zmq.socket(zmq.REP)
  sock.bind('tcp://*:5555')
  while True:
    ## 소켓을 읽고 그 내용을 출력한 후 다시 되돌려 준다.
    msg = sock.recv()
    print(f'Recieved: {msg.decode()}')
    time.sleep(1)
    sock.send(msg)

run_server()

----
## echo-client.py
## 클라이언트 코드

import zmq, sys

## 동일하게 컨텍스트를 만들고 
ctx = zmq.Context()

def run_client(port=7777):
  ## 컨텍스트로부터 소켓을 만들고 서버에 연결한다.
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    ## 키보드로부터 입력받은 내용을 서버로 전송하고,
    ## 다시 서버의 응답을 출력한다.
    ## bye를 메시지로 보내고 받으면 소켓을 닫는다.
    line = input()
    sock.send(line.encode())
    rep = sock.recv()
    print(f'Reply: {rep.decode()}')
    if rep.decode() == 'bye':
      sock.close()
      break

port = sys.argv[1] if len(sys.argv) > 1 else 7777
run_client(port)

이 코드들은 전통적인 소켓을 이용한 에코서버 구현보다 훨씬 간단하다.

  1. 소켓을 생성하는데에는 어떤 패밀리나 타입 정보도 필요하지 않다. 단지 컨텍스트에게 메시지 패턴 정보만 알려줄 뿐이다.
  2. 바인드하는 액션외에 listen()이나 accept() 같은 작업은 필요하지 않다. 실제로 포트를 듣고, 연결을 수락하는 물리적인 포트에 대한 추상화된 객체가 존재할 것이나, 이 모든것은 컨텍스트 내부에 있으며, 자동으로 관리된다.
  3. 파이썬의 표준 소켓 api와 비교했을 때, sendall()은 존재하지 않는다. 모든 메시지는 마치 전체가 한 덩어리로 보내지는 것처럼 보인다.

실제 실행에서도 몇 가지 차이를 보이는데, 우선 여느 소켓 서버-클라이언트 예제와 달리 이 파일들에 대해서는 클라이언트를 먼저 실행해도 아무런 에러 없이 실행된다. 또한 클라이언트를 2개 이상 실행했을 때 동시 접속이 되는 것처럼 동작한다. 이는 소켓이 상대방 노드가 아니라 메시지 큐에 맞닿아있고, 큐를 통해 들어오는 메시지를 처리하기 때문에 가능한 일이다.

ZMQ의 소켓은 물리적인 포트에 묶이는 UNIX 소켓이 아니라고 했다. 따라서 메시징 타입이 같다면, 하나의 소켓이 (이미 바인딩되지 않은) 다른 포트들에 멀티로 바인딩되는 것도 가능하다.

sock.bind('tcp://*:5555')
sock.bind('tcp://*:7777')

이렇게 동일한 소켓 하나가 두 개 이상의 포트에 묶이도록 호출하는 것이 아무런 문제가 되지 않으며, 클라이언트를 쪼개서 각각 다른 포트들에 접속하도록 했을 때에도 서버 하나로 동작하는게 가능하다.

메시지

ZMQ 소켓을 통해서 주고 받는 데이터는 이진 raw 데이터 스트림이 아니다. ZMQ는 ‘메시지’라 불리는 길이값과 데이터를 결합한 단위의 데이터 타입을 주고 받는다. 따라서 recv()send()에서 별다른 인자 없이 데이터 전체를 하나의 메소드 호출로 주고 받을 수 있게 한다. 또한 이는 문자열을 다루는 매커니즘이 서로 다른 언어 구현간에 발생할 수 있는 문자열 데이터 전달을 처리하는 좋은 돌파구가 된다. 적어도 파이썬을 쓰는 한, 메시지에 대해서는 별다른 처리가 필요하지 않을 것이기 때문에 일단은 넘어가자.

메시지 패턴

컨텍스트가 어떻게 소켓과 각 소켓의 입출력 동작을 마법처럼 관리할 수 있을까? 사실 컨텍스트는 메시지 패턴에 따라서 사용되어야 할 소켓이 어떻게 동작해야 할 것인지를 판단할 수 있다. 메시지 패턴은 두 노드 혹은 여러 노드간에서 메시지를 주고 받는 방향성과 흐름을 정의하는 방법이다. 네트워크 구성 노드들의 연결 방식에 따라서 전체 네트워크의 위상이 달라질 수 있겠지만, ZMQ는 하나의 소켓이 다른 소켓 혹은 소켓들과 어떻게 동작해야 하는지를 몇 개의 패턴으로 구분하고, 그에 맞게 최적화된다. 실질적으로는 매우 많은 패턴들이 있을 수 있겠지만, ZMQ는 몇 개의 기본적인 패턴들을 정의하고 있다.

  1. REQuest – REPly 패턴 : 서버-클라이언트가 각각 한 번씩의 요청과 응답을 주고 받는다.
  2. PUBlisher – SUBscriber 패턴 : 서버가 발행하는 메시지가 각각의 클라이언트로 분산되어 전파된다.
  3. Pipleline : PUSHPULL 패턴의 소켓이 연결되어 단일 방향으로 메시지를 개별전송한다.

이외에도 메시징 네트워크를 확장하기 위한 프록시나 Poller와 같은 디바이스 몇 가지가 정의되어 있다. 분명한 것은 ZMQ는 그렇게 많지 않은 패턴을 제공하고 있음에도 불구하고 매우 다양한 구조를 손쉽게 만들 수 있다는 점이다. 그것은 네트워크 내의 각 노드사이에서 데이터나 신호가 흘러가는 흐름을 디자인하기에 달려 있으며, 우리는 ZMQ를 이용하여 가장 쉽게 할 수 있는 것 (바로 두 노드 간의 통신을 구축하는 것)을 사용하여 얼마든지 다방향 네트워크를 구축할 수 있다는 점이다.