콘텐츠로 건너뛰기
Home » ZMQ 프록시 사용하기

ZMQ 프록시 사용하기

ZMQ 디바이스를 사용하여 네트워크를 확장하는 예를 살펴본 적이 있었는데, 사실 ‘디바이스’라는 개념은 zmq 내에서도 너무 오래되었고, 이제는 프록시 라는 개념으로 대체됐다. 디바이스와 프록시의 차이를 알아보고 프록시는 어떤 형태로 사용하는지 살펴보자.

어떤 요청을 처리할 수 있는 서버와 클라이언트가 1개씩 있는 상황을 생각해보자. 이 두 지점간의 연결은 1개면 충분하다. 소켓의 연결이라는 부분에서도 어느쪽이 bind 하고 어느쪽이 connect 하는지도 zeromq에서는 사실 중요하지 않다. 하지만 우리는 통상 클라이언트보다는 서버(워커)가 bind 하고, 여기에 클라이언트가 connect 하는 구조를 당연하게 생각한다. 왜냐하면 실제로는 1 서버 – 1 클라이언트보다는 1 서버 – N 클라이언트의 네트워크 구조가 더 일반적이기 때문이다.

네트워크에서 소켓은 “비교적” 고정적인 위치나 역할을 담당하고 있으면 bind 하고 동적이면 connect 한다고 생각하면 된다. 1-서버 네트워크에서는 이렇게 하면 되는데, 만약 2-서버가 된다면 어떨까? 모든 클라이언트 소켓은 다시 서버2의 소켓에도 connect를 해야 한다. 이것은 클라이언트 코드의 수정이 필요함을 의미하고, 모든 클라이언트가 작동을 중지해야 함을 의미한다.

워커의 수가 늘어날 수 있는 네트워크에서는 워커 소켓 역시 bind 하기에는 어울리지 않는다. 결국 모든 클라이언트들이 connect 할 수 있고, 모든 서버들이 connect 할 수 있는 네트워크 상의 하나의 혹은 매우 적은 수의 고정적인 지점이 필요하고, zmq는 여기에 프록시 (혹은 디바이스)를 갖다 놓는 형태로 메시지 패턴을 디자인할 것을 권장하고 있다.

이 프록시는, 정확히 말해 프록시가 디바이스로 통용되던 시절에는 서버와 클라이언트가 각각 어떤 타입의 소켓을 쓰고 있는지에 따라서 프록시 자체의 타입을 결정했었다. (과거형이다.) 예를 들어 Queue는 REQ-REP 연결 사이를 중매하는 역할을 담당했고, PUB-SUB 사이에는 FORWARDER, PUSH-PULL 사이에는 STREAMER가 있는 식이었다. 하지만 이러한 고정적인 관점의 디바이스는 이제 ZMQ에서 폐기되고, 이 부분이 프록시로 대체되었다. 디바이스와 프록시의 차이는 다음과 같다.

  1. 디바이스가 그 타입으로 앞뒤 혹은 in/out 소켓을 결정했지만, 프록시는 type_in, type_out 으로 양쪽에 사용될 소켓을 ZMQ 소켓타입으로 각각 지정할 수 있다.
  2. XREQ, XREP 소켓은 그 기능에 의해 ROUTER, DEALER로 명명되었으며, 이제 모든 타입의 ZMQ 소켓과 연결될 수 있다.

물론 앞/뒤에 소켓이 있어서 각각 bind 하여 클라이언트/워커들이 connect 할 수 있는 엔드포인트를 만들고 그 그 통로 역할을 한다는 점에서는 프록시가 디바이스와 크게 다르지는 않지만, 이제 각각의 소켓을 직접 선택할 수 있기 때문에 이를 통해서 훨씬 더 자유롭고 유연한 메시지 패턴을 구성할 수 있게 되었다.

프록시 사용 방법

프록시를 사용하는 방법은 크게 두 가지이다. 하나는 zmq.proxy() 함수를 사용하여 두 개의 소켓을 연결하는 프록시를 바로 시작하는 방법, 다른 하나는 Proxy 타입의 인스턴스를 생성하고, 이 객체의 메소드들을 사용해서 소켓을 설정하고 시작하는 방법이다.

예시를 보기에 앞서 간단한 클라이언트, 워커를 작성해보겠다. 편의상 TCP 포트가 아닌 inproc으로 스레드간 통신을 이용하도록 하겠다.

# nodes.py

import time
import random
import zmq

def client_req(ctx: zmq.Context, url=None):
    cid = random.randint(1, 100)
    url = "inproc://proxy.front" if url is None else url
    sock = ctx.socket(zmq.REQ)
    sock.connect(url)
    for _ in range(10):
        message = str(random.randrange(99999)).encode()
        print(f"[{cid:02d}]Sending: {message}")
        sock.send(message)
        reply = sock.recv()
        print(f"[{cid:02d}]Received: {reply}")


def worker_rep(ctx: zmq.Context, url=None):
    wid = random.randint(1, 10)
    url = "inproc://proxy.back" if url is None else url
    sock = ctx.socket(zmq.REP)
    sock.connect(url)
    while True:
        message = sock.recv()
        print(f"[{wid}]received {len(message)}bytes")
        time.sleep(0.5 + random.random() / 2)
        print(f"[{wid}]sending {len(message)}bytes")
        sock.send(message)

프록시를 작동하려면 zmq.proxy() 함수에 두 소켓을 전달하면 된다. 인자는 sock_in, sock_out 이런 식으로 구분되어 있지만 각각의 소켓이 자신의 접속 포인트에 바인드되어 있기 때문에 어떤 순서로 전달하든 상관없이 작동할 것이다.

def run_proxy():
    ctx = zmq.Context()
    front = ctx.socket(zmq.ROUTER)
    front.bind('inproc://proxy.front')
    back = ctx.socket(zmq.DEALER)
    back.bind('inproc://proxy.back')

    # 클라이언트 및 워커 실행
    for _ in range(10):
        Thread(target=client_req, args=(ctx,)).start()
    for _ in range(3):
        Thread(target=worker_rep, args=(ctx,)).start()
    zmq.proxy(front, back)

이렇게 함으로써 N:N 구성의 네트워크를 손쉽게 안정적으로 구성할 수 있게 되었다. 서로 개수가 맞지 않는 서버와 클라이언트이지만, 각자의 요청이 안전하게 되돌아가는 것을 볼 수 있다. 이 구조는 이전 디바이스에서의 큐(Queue) 타입과 동일하다. 포워더나 스트리머에 대해서는 각각 <<<SUB -> PUB>> , <<<PULL --> PUSH>>> 의 형태로 소켓을 만들어주면 된다.

모니터링하기

proxy() 함수는 선택적으로 키워드인자인 capture= 를 제공하는데, 여기에 세 번째 소켓을 전달할 수 있다. 이 세번째 소켓에는 REQ, REP를 제외한 아무 타입의 소켓을 넘겨준다. 이 소켓은 프록시 내에서 앞/뒤로 전달되는 데이터를 모니터링하는데 사용한다. 참고로 해당 기능은 현재 버전의 pyzmq (22.3.0) 의 문서에서는 사라졌는데, 키워드 인자 형태로 전달하면 작동하는 것이 확인된다.

Proxy 클래스 사용하기

proxy() 함수 외에도 Proxy 클래스의 인스턴스를 만들어서 프록시를 작동시키는 방법도 있다. 두 개의 소켓을 생성하고 바인드만 하면 프록시가 준비된다.

  1. Proxy 객체는 zmq.devices 서브 모듈에 들어 있다. 이 모듈은 import zmq.devices 로 직접 반입해야 사용할 수 있다.
  2. Proxy 객체를 생성할 때에는 in_socket 과 out_socket의 “타입”을 넘겨준다. 모니터링 소켓의 타입은 mon_type= 키워드 인자로 전달하면 된다. 전달하지 않은 경우에는 PUB 소켓을 쓰도록 되어 있다.
  3. proxy.bind_in(), proxy.bind_out() 를 사용해서 소켓을 바인드한다. bind_mon() 를 호출하여 모니터링 소켓도 준비할 수 있다.
  4. start() 메소드를 사용해서 프록시를 시작한다.

앞서 nodes.py 모듈을 활용한 예이다. 참고로 이 방법을 사용할 때에는 프록시 내부에서는 자동으로 컨텍스트를 생성하거나 획득하기 때문에, 자동으로 생성된 인스턴스를 얻기 위해서 Context.instance() 를 사용한 점에 유의하자.

from threading import Thread

import zmq
from zmq.devices import Proxy, ProxySteerable

from nodes import burl, client_req, cmd, curl, furl, monitor, murl, workder_rep


def sample_1():
    ctx = zmq.Context.instance()
    proxy = Proxy(zmq.ROUTER, zmq.DEALER)
    proxy.bind_in(furl)
    proxy.bind_out(burl)
    proxy.bind_mon(murl)

    for _ in range(7):
        Thread(target=client_req, args=(ctx,)).start()
    for _ in range(2):
        Thread(target=workder_rep, args=(ctx,)).start()
    Thread(target=monitor, args=(ctx,)).start()
    proxy.start()


if __name__ == "__main__":
    sample_1()

Steerable Proxy

프록시는 IN/OUT 과 모니터의 세 개의 소켓이 결합된 장치라고 했다. 여기에 네 번째 소켓을 추가한 조정 가능한 프록시가 있다. 이 프록시는 PULL 같은 메시지를 받을 수 있는 소켓을 사용하여 외부로부터 작동흐름을 제어하는 명령을 받아 처리할 수 있다. 이 지원하는 명령은 총 3가지이며, 명령은 아래 문자열을 전송하는 것으로 작동한다.

  • PAUSE" : 일시중지
  • "RESUME" : 일시 중지 상태에서 재가동
  • "TERMINATE" : 작동 종료

프록시가 PAUSE 상태가 되면 연결된 다른 소켓들은 mute 상태가 된다. 이 때의 동작은 해당 소켓의 타입에 따라 달라진다. REQ, REP 의 경우에는 보내기 동작이 블럭될 것이며, PUB 소켓의 경우에는 이후 발송된 메시지는 모두 버려진다.

조정 가능한 프록시를 사용하는 방법 역시 두 가지로 zmq.proxy_steerable() 을 사용하거나, zmq.devices.ProxySteerable 클래스를 사용하는 방법이 있다. 네 번째 소켓을 위해서, proxy_steerable() 함수는 control= 이라는 인자를 사용하며, 프록시 클래스는 bind_ctrl() 메소드를 제공해준다.