ZMQ 디바이스 사용하기

일반적인 소켓 연결의 경우, 주로 서버는 bind()를 통해서 포트에 연결하고 클라이언트는 connect()를 사용해서 포트에 연결한다. ZMQ에서는 이 방식이 절대적인 규칙이 아니다. 간단한 소켓 통신의 예에서 양 끝단 중 상대적으로 안정적인 쪽이 서버인 경우가 많기 때문에 bind() 하는 것이며 클라이언트는 서버보다는 동적이기 때문에 connect() 하는 경우가 많을 뿐이다. 간단한 피어 통신의 예제에서는 사실 양 끝단이 모두 ‘고정’되어 있고, ZMQ에서는 연결의 순서에 구애받지 않으므로 클라이언트가 bind()를 하고 서버가 connect()를 해도 문제 없다.

유념해야하는 원칙 한가지는 안정적인 쪽이 bind()를, 그렇지 않은 쪽이 connect()를 해야 한다는 점이다.

.

하나의 서버에 여러 개의 클라이언트가 붙어서 통신하는 경우를 생각해보자. 이 때 클라이언트는 서버보다 가변적이다. 따라서 서버가 바인드하는 쪽이되고 각각의 클라이언트가 해당 포트에 connect를 사용하여 접속하는 것이 적절한 선택이 될 수 있다. 하지만 만약 서버가 여러 개로 가변적인 상황이 된다면 어떨까? 즉 서버도 네트워크 그래프 상에서 가변적인 요소가 되었으므로 connect()를 해야 하는 상황이다. 이런 상황에서는 특정 클라이언트들을 특정한 서버로 할당하는 작업이 수행되어야 하고 서버의 수가 바뀔 때마다 코드를 수정해야하는 번거로움과 복잡함이 동반된다.

그렇다면 커뮤니케이션에서의 역할이 아닌 ‘유동성’에 초점을 맞추고 접근해보자. 서버와 클라이언트를 중계해줄 수 있는 중간 역할을 담당하는 어떤 고정된 포인트가 있어서 서버와 클라이언트가 모두 이 지점으로 접속한다면 가변적인 클라이언트와 가변적인 서버의 상황에 모두 적절하게 대응할 수 있을 것이다.

ZMQ 디바이스는 이런 상황에 사용하여 전체적인 네트워크의 구성을 단순화하는 좋은 수단이 된다. ZMQ 디바이스는 크게 세 종류가 기본적으로 지원된다.

  1. QUEUE : REQ-REP 패턴으로 연결되는 노드들를 중계하는 장치
  2. FORWARDER : PUB-SUB 패턴으로 연결되는 노드들를 중계하는 장치
  3. STREAMER : PUSH-PULL 패턴으로 연결되는 노드들을 중계하는 장치

그러한 경우에, 서버와 클라이언트를 모두 ZMQ 장치에 접속한다. 이 장치는 서로 다른 두 개의 포트를 가지고 있고, 한 포트에서 다른 포트로 메시지를 포워딩 해준다. 따라서 양단이 모두 가변적인 경우에 ZMQ 장치가 고정적인 포인트 역할을 하게 된다. 이러한 얼개는 아래 그림에 잘 나타나 있다. ZMQ 디바이스는 양쪽에 소켓을 두 개 가지고 있어서 한 쪽은 서버에 다른 한 쪽은 클라이언트에 연결된다.

.

ZMQ 디바이스는 zmq.device()라는 함수를 통해서 실행되며 실행되면 양쪽을 중계하는 서버처럼 작동하게 된다. 이 함수에는 중계 장치의 타입과 두 개의 ZMQ 소켓 객체가 전달되면 된다. QUEUE를 사용하는 예제를 한 번 살펴보자.

ZMQ Queue Device

큐는 REQREP 관계로 이루어지는 노드들 사이에 위치하면서 일종의 로드 밸런서처럼 작동한다. 디바이스는 그 스스로가 로드밸런서인 동시에 네트워크 그래프를 단순하게 만들어주는 구심점이 된다. 다만 주의할 것은 다른 ZMQ 장치들은 일반적인 PUSH, PULL 타입 소켓이나, PUB, SUB 소켓을 사용하지만, 큐의 경우에는 XREQ, XREP 소켓을 사용해야 한다는 점이다. 두 개의 소켓을 네트워크 포트에 바인드 한 후 장치를 시작한다. 그런 다음 REP 소켓들과 REQ 소켓들이 여기에 접속한다.

.

REP 소켓이 접속해야 하는 포트는 XREQ 소켓의 포트여야 하고, REQ 소켓이 접속하는 포트는 XREP 소켓의 포트여야 한다는 점만 주의하면 되겠다.

참고로 원문의 예제는 각각의 파일이 쪼개져 있었는데, 여기서는 하나의 파일에 합쳐서 작동하는 예를 만들어볼 것이다. 때문에 멀티 프로세스로 작동할 것이며, 이 환경에서 동시에 출력되는 로그끼리 얽히지 않게 하기 위해서 별도의 로그 모듈을 작성하여 사용할 것이다.

''' logger.py
    통합 로깅 모듈

    run_logger : 로깅 서버 실행
    get_logger : 로깅 클라이언트를 리턴하는 제너레이터 함수
'''

import zmq
import logging


def get_logger(ctx: zmq:Context, port=7779):
  sock = ctx.socket(zmq.PUSH)
  sock.connect(f"tcp://localhost:{port}")
  def wrapped():
    while True:
      mes = yield
      sock.send_string(mes)
  result = wrapped()
  next(result)
  return result


def run_logger(port=7779):
  logging.basicConfig(level=logging.DEBUG)
  ctx = zmq.Context()
  sock = ctx.socket(zmq.PULL)
  sock.bind(f"tcp://*:{port}")
  while True:
    message = sock.recv_string()
    logging.debug(message)

ZMQ 디바이스 생성

QUEUE 디바이스를 생성해보자. 클라이언트들은 REQ 소켓을 사용할 것이며, 서버들은 REP 소켓을 사용할 것이다. 따라서 우리의 디바이스는 XREQ, XREP 소켓을 사용하여 구성하게 된다. 코드 자체는 무척 간단하다. 두 개의 소켓을 각각 미리 정해둔 포트에 바인딩하고 zmq.device()를 호출해서 중계 장치를 기동한다.

import zmq
from logger import get_logger, run_logger
# ...

def dev_main():
  try:
    ctx = zmq.Context()
    log = get_logger(ctx)
    # socket facing clients
    fsock = zmq.socket(zmq.XREP)
    fsock.bind("tcp://*:5559")
    # socket facing servers
    bsock = zmq.socket(zmq.XREQ)
    bsock.bind("tcp://*:5560")
    zmq.device(zmq.QUEUE, fsock, bsock)
  except Exception as e:
    log.send("{e} - Bringing down the ZMQ Device...")
  finally:
    fsock.close()
    bsock.close()
    ctx.term()

서버 만들기

QUEUE 패턴의 서버는 REP 소켓을 가진 간단한 서버의 형식을 그대로 따른다. XREQ 소켓이 바인딩된 5560 포트로 접속하게 한다.

import time
import random


def srv_main(srv_id=0):
  port = 5560
  srv_id = random.randrange(1, 100)
  ctx = zmq.Context()
  log = get_logger(ctx)
  sock = ctx.socket(zmq.REP)
  sock.connect(f"tcp://localhost:{port}")
  while True:
    mes = sock.recv_string()
    log.send(f"Server {srv_id} received request: {mes}")
    time.sleep(1)
    sock.send_string(f"world from server {srv_id}")

클라이언트 만들기

클라이언트 역시 간단한 REQ 클라이언트의 코드를 그대로 따른다. 로그를 출력하는 것을 print() 함수가 아닌 로깅 서버를 사용하는 것만 다르다.

def clnt_main(clnt_id=0):
  port = 5559
  ctx = zmq.Context()
  log = get_logger(ctx)
  sock = ctx.socket(zmq.REQ)
  sock.connect(f"tcp://localhost:{port}")
  for req in range(10):
    log.send(f"Client{clnt_id} - Sending request {req + 1}...")
    sock.send_string(f"Hello from {clnt_id}")
    reply = sock.recv_string()
    log.send(f"Client {clnt_id} - Received reply [{reply}]")

Put Together

로깅서버, 큐 장치와 서버, 클라이언트 프로세스들을 멀티 프로세스를 통해 구동한다. 클라이언트 프로세스들의 작업 종료시점까지 기다린 후, 서버 및 그외 프로세스들을 kill하면 모든 과정이 종료된다. 참고로 각각의 프로세스는 모두 거의 동시에 시작 요청을 받으며, 경우에 따라서는 클라이언트들이 서버보다 먼저 시작될 수 있기 때문에 서버 시작 후 약간의 딜레이를 주는 것이 좋다.

def main():
  # setup zmq device and logger
  p_log = Process(target=run_logger)
  p_log.start()
  p_dev = Process(target=dev_main)
  p_dev.start()
  # create and run server processes
  srvs = [Process(target=srv_main, args=(i + 1,)) for i in range(2)]
  for s in srvs:
    s.start()
  time.sleep(2)
  # create and run client processes
  clnts = [Process(target=clnt_main, args=(i + 1,)) for i in range(5)]
  for c in clnts:
    c.start()
  for c in clnts:
    c.join()
  # All jobs are done
  for s in srvs:
    s.kill()
  p_dev.kill()
  p_log.kill()


if __name__ == "__main__":
  main()

이제 파일을 실행하면 서버와 클라이언트들이 제각각 작동하면서 메시지들을 출력할 것이다.

Forwarder / Streamer

Forwarder는 앞서 언급한 것과 같이 PUB-SUB 기반의 중계 장치이고, Streamer는 PUSH-PULL 기반의 중계장치이다. Queue가 XREQ, XREP와 같은 별도 타입의 소켓을 사용한 것과 달리, Forwarder, Streamer는 동일한 PUB,SUB,PUSH, PULL 소켓을 그대로 사용한다. 사용하는 방법 자체는 zmq.device() 를 호출하는 것이므로 자세한 코드를 소개하지는 않고 대략적인 구현 부분만 소개하겠다.

# forwarder

def run_forwarder(fport, bport):
  ctx = zmq.Context()
  fsock = ctx.socket(zmq.XPUB)
  bsock = ctx.socket(zmq.XSUB)
  # XSUB는 중계목적의 소켓이므로 setsockopt_string()을 호출할 필요가 없다. 
  fsock.bind(f"tcp://*:{fport}")
  bsock.bind(f"tcp://*:{bport}")
  zmq.device(zmq.FORWARDER, fsock, bsock)

# Streamer

def run_streamer(fport, bport):
  ctx = zmq.Context()
  fsock = ctx.socket(zmq.PUSH)
  bsock = ctx.socket(zmq.PULL)
  fsock.bind(f"tcp://*:{fport}")
  bsock.bind(f"tcp://*:{bport}")
  zmq.device(zmq.STREAMER, fsock, bsock)