ZMQ 프록시 사용하기

ZMQ 디바이스를 사용하여 네트워크를 확장하는 예를 살펴본 적이 있었는데, zmq.device()는 이제 deprecated 되었고 대신에 zmq.proxy()를 사용하라는 내용을 공식문서에서 보게 되었다. 단지 device에서 proxy로 이름이 바뀐 것인가 싶었는데, 프록시는 zmq 디바이스보다 좀 더 오픈되고 유연한 형태의 네트워크 중계 장치이며, 사용도 더 간편하기 때문에 소개하고자 한다.

네트워크 규모가 커질 때 복잡도가 지수적으로 증가하는 것을 피하기 위해서 네트워크 가운데 통신채널이 집중되는 노드를 추가하는 것이 전체적인 안정성과 유연성을 높일 수 있고, 이를 위해서 0MQ에서는 ZMQ Device라는 것을 제공한다고 했다. 이는 서버-클라이언트의 커뮤니케이션 방식 (이 방식은 양단의 소켓의 타입에 따라 결정된다)에 따라 큐, 포워더, 스트리머등의 타입으로 작동할 수 있다고 했다.

그런데 이 device라는 것을 살펴보면 프론트엔드와 백엔드가 모두 열려있는 소켓이다. 즉 이것은 네트워크 상에서 어떤 연결을 중계해주는 프록시에 해당한다. 또 XREP 소켓은 여러 개의 REQ 소켓으로부터의 연결을 분배해주는 라우터의 역할을 하며, XREQ 소켓은 여러 요청을 다시 여러 서버로 돌려주는 딜러의 역할을 한다. 따라서 libzmq의 개발자들은 이를 다시 ROUTER, DEALER라는 이름의 타입으로 만들었다.

흥미로운 것은 이전부터 이 device는 앞뒤가 사실 구분이 없는 객체였다는 점이다. 심지어 경우에 따라서는 양단이 짝을 이루는 소켓일 필요도 없다. 이를 테면 백엔드는 여러 서버로부터 번갈아가며 데이터를 받지만 여기로 받은 데이터를 여러 클라이언트에게 똑같이 전송하는 PULL--PUB 과 같은 구조를 만드는 것도 가능하다. 이렇게 보다 느슨한 정의를 가지고 사용하기 편리하게 프록시를 재설계 했다. 프록시는 디바이스와 달리 타입을 선언할 필요가 없다.

# 프록시로 구현한 Queue

ctx = zmq.Context()

def run_proxy(fport=5559, bport=5560):
  fsock, bsock = ctx.socket(zmq.ROUTER), ctx.socket(zmq.DEALER)
  fsock.bind(f"tcp://*:{fport}")
  fsock.bind(f"tcp://*:{bport}")
  # start proxy with 2 sockets
  zmq.proxy(fsock, bsock)
  

proxy() 함수는 두 개의 소켓을 이어주는 프록시 객체를 생성하고 시작하는 함수이다. proxy()가 기존의 device()와 다른 것은 제 3의 소켓을 지원하는 점이다. 캡쳐 혹은 모니터로 이름 붙여진 이 소켓은 프록시의 양끝단으로 들어오는 모든 데이터를 다시 밖으로 내보내는 역할을 한다. 캡쳐 소켓의 타입은 PUB, DEALDER, PUSH, PAIR 가 될 수 있다.

Proxy 객체 사용하기

proxy()를 사용하기 위해서는 직접 두 개의 소켓을 만들고 바인드 한 다음, 이들을 인자로 넘겨주어야 한다. 그런데 실질적으로는 타입과 바인드할 주소만 있다면 소켓을 생성하는 것은 이 정보들을 사용해서 자동으로 처리할 수 있는 것이다. 그래서 proxy() 함수가 아닌 Proxy() 객체를 사용하여 프록시를 구성하는 방법도 있다.

  1. Proxy 객체는 zmq.devices 서브 모듈에 들어 있다. 이 모듈은 import zmq.devices 로 직접 반입해야 사용할 수 있다.
  2. Proxy 객체를 생성할 때에는 in_socket 과 out_socket의 타입만 지정한다.
  3. proxy.bind_in(), proxy.bind_out() 함수를 사용해서 각각의 소켓이 바인드할 주소를 지정해주면, 프록시를 시작할 때 소켓은 자동으로 생성된다.
  4. bind_mon() 메소드는 캡쳐 소켓을 바인드하도록 한다.
  5. start() 메소드를 사용해서 프록시를 시작한다.

위에서 만든 큐를 프록시 객체를 생성하는 방식으로 다시 작성해보자. 코드가 좀 더 간단해지는 것을 볼 수 있다.

import zmq
import zmq.devices

def run_proxy(f_addr, b_addr):
  proxy = zmq.devices.Proxy(zmq.ROUTER, zmq.DEALER)
  proxy.bind_in(f"tcp://*:{f_port}")
  proxy.bind_out(f"tcp://*:{b_port}")
  proxy.start()

코드가 조금 더 간단해진다.


모니터링하기

프록시 객체는 모니터라는 제3의 소켓을 가지고 있다. 소켓을 할당하거나 바인드 주소를 제공하면, 프록시는 앞뒤로 들어오는 모든 데이터를 이 소켓을 통해서 내보낸다. 따라서 이 소켓을 듣는 것으로 어떤 데이터가 오가는지 살펴볼 수 있다.

참고로 DEALER, ROUTER 로 연결된 큐 프록시에서는 해당 소켓의 특성 상 각 데이터 사이에 구분자로 빈 프레임이 추가된다.

import zmq
imoprt zmq.devices
from threading import Thread

def run_proxy(fport=5559, bport=5560):
  proxy = zmq.devices.Proxy(zmq.ROUTER, zmq.DEALER, zmq.PUSH)
  proxy.bind_in(f"tcp://*:{fport}")
  proxy.bind_out(f"tcp://*:{bport}")
  proxy.bind_mon(f"tcp://*:7777}")

모니터 소켓은 zmq.PUB 타입이 기본적으로 설정되어 있으며, bind_mon() 을 호출하여 주소를 지정해주면, 프록시 시작 시점에 소켓이 생성돼서 열린다. 서버와 클라이언트가 프록시에 각각 connect()로 연결되어 통신하는 사이에 해당 포트를 PULL 소켓으로 들어보면 프록시의 프론트로 들어온 데이터와 나가는 데이터를 모두 확인할 수 있다.


Steerable Proxy

Proxy의 서브 클래스로 ProxySteerable이 있는데, 이는 제 4의 소켓을 지원하는 프록시이다. 모니터링 소켓 외에 컨트롤 소켓을 정의할 수 있다. (모니터 소켓이 zmq.PUB를 디폴트로 설정하는데 비해서, 컨트롤러는 None이 기본 값이다.) 이 소켓은 SUB, PULL 과 같이 데이터를 받을 수 있는 소켓타입을 지원한다. 여기에 인코딩된 명령 문자열을 전달하면 그에 따라 프록시가 작동하게 된다.

  • PAUSE : 일시중지
  • RESUME : 일시 중지 상태에서 재가동
  • TERMINATE : 작동 종료

프록시가 PAUSE 상태가 되면 연결된 다른 소켓들은 mute 상태가 된다. 이 때의 동작은 해당 소켓의 타입에 따라 달라진다. REQ, REP 의 경우에는 보내기 동작이 블럭될 것이며, PUB 소켓의 경우에는 이후 발송된 메시지는 모두 버려진다.

def run_proxy(port1=5556, port2=5557):
    proxy = zmq.devices.ProxySteerable(zmq.ROUTER, zmq.DEALER, zmq.PUSH, zmq.PULL)
    proxy.bind_in(f"tcp://*:{port1}")
    proxy.bind_out(f"tcp://*:{port2}")
    proxy.bind_mon("tcp://*:7777")
    proxy.bind_ctrl("tcp://*:7778")
    proxy.start()

위 예는 PUSH 소켓으로 캡쳐된 데이터들을 내보내면서 PULL 소켓으로 컨트롤 데이터를 수신할 수 있는 프록시의 예이다.

Device는 양단의 소켓 타입에 따라서 디바이스의 타입이 결정된다. ROUTERDEALER가 연결된 장치는 REQREP 연결을 중계하는 큐(zmq.QUEUE)가 되며, XPUBXSUB로 연결된 장치는 PUBSUB를 중계하는 포워더(zmq.FORWADER)가 되는 식이다. 하지만 프록시는 이러한 타입의 구애를 받지 않는다. 예를 들어 PUSH 소켓을 통해 데이터를 전달하는 서버로부터 PUB 소켓을 사용해서 같은 메시지를 여러 클라이언트로 전송하는 방식의 장치를 만드는 것도 가능하다. in-socket과 out-socket의 구분 역시 인자의 순서로 구분될 뿐, 앞쪽으로 데이터가 들어와도 무방하고 뒤쪽으로 데이터가 들어와도 무방하다. 큐를 만들 때에도 ROUTER > DEALER 순으로 소켓을 설정하지 않고 DEALER > ROUTER 순으로 소켓을 설정해도 상관없다는 이야기다.

다음 예제는 PUBPULL을 연결하는 프록시이다. 또한 모니터 소켓을 PUB가 아닌 PUSH로 설정해주고 있다. port2에 연결된 노드에서 데이터를 푸시하면 해당 데이터가 PUB 소켓에 연결된 복수의 노드들에게 똑같이 전달된다. 일반적인 PUSHPULL 연결에서 하나의 PUSH 소켓에 여러 개의 PULL 소켓이 연결된 경우, 각각의 클라이언트 노드들에게 메시지가 번갈아 전달된다면, 이 장치를 가운데 낀 네트워크의 경우에는 모든 클라이언트가 동일한 메시지를 받게 된다.

def proxy(port1=5559, port2=5560):
    proxy = zmq.devices.Proxy(zmq.PUB, zmq.PULL, zmq.PUSH)
    proxy.bind_in(f"tcp://*:{port1}")
    proxy.bind_out(f"tcp://*:{port2}")
    proxy.bind_mon("tcp://*:7777")
    proxy.start()