Home » ZMQ 소켓 – Dealer, Router 이해하기

ZMQ 소켓 – Dealer, Router 이해하기

이전 글에서 프록시를 소개하면서 단순한 REQ – REP 패턴의 네트워크 중간에 중간 매개로 프록시를 넣어 REQ – ROUTER – DEALER – REP 형태로 패턴을 확장하는 예를 살펴보았다. 이 때, ROUTER – DEALER 사이에서 오가는 데이터를 보면 REQ-REP 사이에서 오가는 데이터와 다르게 추가된 프레임이 보인다는 것을 볼 수 있었다.

이번 글에서는 이런 현상은 왜 일어나며, ROUTER가 무슨 짓을 벌이는지에 대해 소개하고자 한다. 적어도 이 개념을 이해한다면 ZMQ를 통해서 훨씬 더 유연하고 창의적인(?) 패턴을 만드는 것도 가능하리라고 본다.

먼저 간단한 REQ-REP 패턴에서 주고 받는 데이터는 어떤지 살펴보자. recv_multipart() 를 통해서 메시지 속에 어떤 프레임들이 있는지를 볼 것이다.

# req_rep.py
from threading import Thread

import zmq


def client(ctx: zmq.Context):
    sock = ctx.socket(zmq.REQ)
    sock.connect("inproc://test")
    for _ in range(10):
        sock.send_string("hello")
        reply = sock.recv_multipart()
        print(f"{reply=} -> {'-'.join(r.decode() for r in reply)}")


def worker(ctx: zmq.Context):
    sock = ctx.socket(zmq.REP)
    sock.bind("inproc://test")
    while True:
        task = sock.recv_multipart()
        print(f"{task=}")
        sock.send_multipart(["world".encode())


def main():
    ctx = zmq.Context()
    Thread(target=client, args=(ctx,)).start()
    worker(ctx)


if __name__ == "__main__":
    main()

이 예제를 통해 실행했을 때에는 그냥 [b"hello"] 같이 인코딩된 바이트배열 데이터가 오가는 것을 확인할 수 있다. 이번에는 프록시를 사용해서 연결하고 프록시 내부에서 오가는 데이터를 모니터링 해보도록 하자. 프록시에 연결되는 소켓이 3개가 되므로 클라이언트와 워커의 코드를 약간 수정하고 모니터링 결과를 출력하는 노드를 하나 추가할 것이다.

# req_rep_proxy.py

from threading import Thread
import zmq


def client(ctx: zmq.Context):
    sock = ctx.socket(zmq.REQ)
    sock.connect("inproc://broker.front")
    for _ in range(10):
        sock.send_string("hello")
        reply = sock.recv_multipart()
        print(f"{reply=} -> {'-'.join(r.decode() for r in reply)}")


def worker(ctx: zmq.Context):
    sock = ctx.socket(zmq.REP)
    sock.connect("inproc://broker.back")
    while True:
        task = sock.recv_multipart()
        print(f"{task=}")
        sock.send_multipart(["world".encode())

def monitor(ctx: zmq.Context):
    sock = ctx.socket(zmq.SUB)
    sock.setsocketopt(zmqw.SUBSCRIBE, b'')
    sock.connect("inproc://broker.mon")
    while True:
      data = sock.recv_multipart()
      print(data)


def main():
    ctx = zmq.Context()
    Thread(target=client, args=(ctx,)).start()
    Thread(target=worker, args=(ctx,)).start()
    Therad(target=monitor, args=(ctx,)).start()
    front = ctx.socket(zmq.ROUTER)
    front.bind("inproc://broker.front")
    back = ctx.socket(zmq.DEALER)
    back.bind("inproc://broker.back")
    mon = ctx.socket(zmq.PUB)
    mon.bind("inproc://broker.mon")
    zmq.proxy(frontend=front, backend=back, capture=mon)

if __name__ == "__main__":
    main()

REQ-REP 때와는 다르게, 프록시 내부의 ROUTER-DEALER 에서는 [b'\x00\x80\x00\x00)', b'', b'hello'] 같은 본적도 없던 데이터가 앞에 붙어있는 것을 볼 수 있다. 이런 데이터는 왜 생기는 것이며, 프록시를 사용했을 때에는 어떻게 이런 데이터가 생기는 것과 무관하게 기존의 클라이언트나 워커가 정상적으로 작동할 수 있었을까? 이를 이해하기 위해서는 REQ-REP 사이에서는 무슨 일이 있는지부터 살펴봐야 할 것이다.

REQ-REP 가 하는 것

ZMQ의 메시지는 사실 기본적으로 멀티파트 메시지이다. 파이썬 기준으로 이 멀티파트 메시지는 바이트배열의 리스트이다. 여기서 각각의 바이트배열을 ZMQ에서는 프레임이라고 하고, 전체 리스트를 봉투(envelope)라는 이름을 사용하여 표현한다. 데이터 자체를 쪼개서 멀티파트 메시지를 보낼 수 있다는 것은 알겠는데, 멀티파트 메시지를 왜 굳이 봉투라는 표현을 쓰는지… 이것은 사실 소켓들이 내부에서 어떤 일을 하는지 알아보면 이해가 가는 표현이다.

먼저 REQ 소켓에게 “hello” 라는 문자열을 보내려고 하면 내부에서는 어쨌든 이 문자열을 인코딩하여 바이트배열로 변환한다. 이렇게 변환된 한 조각의 메시지가 프레임이다. 그러면 REQ 소켓은 이 프레임을 리스트에 넣는데 (프레임이 하나인 멀티파트 메시지가 생성되는 셈), 보내기 직전에 이 리스트의 맨 앞에, 빈 프레임 을 하나 삽입해서 보낸다. 앞서 봤던 b'' 이 바로 이 빈 프레임이다.

이 “빈 프레임”은 REP 소켓에게는 이른바 ‘봉투의 끝’ 같은 의미이다. 즉 멀티파트 메시지에서 빈 프레임을 기준으로 여기까지는 메시지를 담고 있는 봉투이고, 그 다음이 원래 전달하려는 메시지인 셈이다.

REQ소켓의 짝인 REP 소켓이 메시지를 받으면, REQ의 이 특이한 행동때문에, 받은 메시지는 무조건 멀티파트 메시지여야 한다. REP 소켓은 받은 메시지에 대해 봉투의 끝을 찾고, 다시 메시지만 쏙 빼서 애플리케이션으로 전달해준다.

REQ, REP 두 소켓이 속에서 이런 일을 하기 때문에 REQ-REP 패턴에서는 전달되는 과정 중간의 데이터를 엿보지 않는 이상 이런 동작을 알 수가 없었던 것이다.

REP 소켓이 답변을 보내는 과정

더 흥미로운 것은 답변을 보내는 과정이다. REP 소켓은 처음에 자신이 받은 ‘봉투’를 버리지 않고 저장해두고 있다. 애플리케이션이 답장을 보내라고 전달하면, REP 소켓은 자신이 받았던 봉투에 답장 프레임을 넣는다. 즉, 빈프레임까지의 리스트는 보관되고 있다가, 빈 프레임 다음의 프레임을 버리고, 여기에 답장 프레임을 붙여서 보내는 것이다.

그래서 다시 REQ 소켓으로 되돌아오는 데이터는 자신이 처음 보냈던 모양과 똑같은 형식을 가지고 있을 것이다. REQ 소켓은 이 봉투를 검사하여 맨 앞에 빈 프레임이 붙어있는지 확인하고, 두 번째 프레임의 내용을 꺼내어 애플리케이션으로 올려보낸다.

여기서 한가지 기억할 점은 REQ-REP 연결에서 이 빈 프레임이 매우 중요하다는 점이다. REQ 프레임은 첫 프레임이 빈 프레임이 아니면, 메시지 전체가 올바른 형식이 아니라 판단하고 무시해버린다. 비슷하게 REP 프레임도 빈 프레임이 포함되어 있지 않다면 메시지 전체를 버린다. (봉투와 편지를 구분할 수 없다고 보는 것과 비슷하다)

또 한가지 간과해서 안되는 점은 REQ-REP 패턴은 두 peer 가 서로 번갈아가면서 메시지를 보내고 받아야 한다는 점이다. REQ 소켓은 항상 대화의 시작을 담당하며, REP는 매 요청에 대한 답장을 보낸다. 그러면서 이 둘은 반드시 각자가 번갈아가면서 메시지를 보내고, 수신하는 것을 한 번씩만 한다. 이 점은 ROUTER, DEALER와의 큰 차이점이기 때문에 기억해두는 것이 좋겠다.

정리

  1. REQ 소켓은 메시지를 보낼 때 빈 프레임을 맨 앞에 끼워넣는다.
  2. REP 소켓은 메시지를 받으면 빈 프레임까지 벗겨내고, 그 다음 프레임을 읽는다
  3. REP 소켓은 답장 메시지를 보낼 때, 벗겨냈던 프레임(들)을 앞에 다시 붙여준다.
  4. REQ 소켓은 메시지를 받으면 빈 프레임을 체크하고, 두 번째 프레임을 읽는다.
  5. 1~4의 과정은 반드시 순서대로 일어나며, 반복된다.

ROUTER가 하는 일

브로커에 사용된 ROUTER와 DEALER는 종종 짝이 된다. 서버-클라이언트를 ROUTER-DEALER로 정의하는 패턴이 가능하고 실제로도 이렇게 쓸 수 있다. 그러면 이 들은 어떤 식으로 작동할까.

먼저 DEALER에 대해서는 간단히 언급만 하고 넘어가면 될 듯 하다. 이 소켓은 애플리케이션이 넘겨주는 데이터나, 소켓을 통해서 읽어들인 메시지에 대해서 아무런 내부적 조작을 하지 않고 매우 투명한 동작을 한다. 따라서 이에 대해서는 특별히 더 언급할 것이 없다.

ROUTER는 peer와 연결되면 해당 peer를 다른 peer들과 구분할 수 있는 고유의 정수값을 부여해 놓는다. ZMQ에서는 이것을 ‘식별자’라고 한다. 식별자는 랜덤하게 만들어지는 (혹은 해당 peer의 소켓 옵션에 의해 결정될 수도 있다.) 정수값이며, 하나의 라우터 소켓 내에서는 peer들을 구별하는 값이다.

ROUTER에게 이러한 식별자가 필요한 이유는 기본적으로 ROUTER가 비동기 소켓이기 때문이다. REP 소켓은 여러 REQ 소켓과 한꺼번에 연결이 가능하지만, 연결된 상대방들은 줄지어서 메시지를 주고 받는다. 또한 수신 > 발신의 사이클에서는 한 번에 하나의 클라이언트와 대화가 가능한데, 그것은 REP 소켓은 메시지를 수신한 후, 그 메시지의 발신자에게만 회신할 수 있는 기회가 주어진다는 것이며, 이는 REP 소켓은 ‘답장’을 보내는 시점에 이 답장을 받는 peer가 결정되어 있다는 것이다. 하지만 ROUTER 소켓은 비동기 소켓이고, 별도의 수발신 사이클이 존재하지 않는다. 극단적으로 100개의 클라이언트로부터 10개씩의 메시지를 받은 후, 1000개의 회신을 순서에 상관없이 보낼 수 있어야 한다. 따라서 각각의 메시지에 대해 ‘누가 받을 것인지’에 대한 정보를 함께 보관하고 있어야 한다.

그래서 ROUTER 소켓은 메시지를 수신한 후에 이 메시지를 보낸 peer를 식별할 수 있는 값을 가장 앞 프레임에 추가한다. 그리고 ROUTER 소켓을 통해서 메시지를 내보낼 때에도 첫번째에는 peer에 대한 식별자값이 있는 프레임이 들어가야 한다.

다음 예제는 DEALER 소켓을 이용한 클라이언트인데, 10개의 메시지를 보낸다. 이 때 REQ 소켓이 매 메시지를 보낸 후 답장을 받아서 출력하는 것과 달리, 10개의 메시지를 모두 보낸 후에 10개의 답장을 받아서 출력하도록 했다. (REP 소켓은 그 작동 구조상 이 클라이언트와 연결될 수 없다.)

import zmq

def client_dl(ctx: zmq.Context):
    sock = ctx.socket(zmq.DEALER)
    sock.connect("inproc://test")
    for i in range(10):
      sock.send_string(f"[{i:02}]-hello")
    for i in range(10):
      reply = sock.recv_string()
      print(reply)

위 클라이언트에 대응할 수 있는 가장 적절한 워커는 ROUTER 소켓을 사용하는 것이다. (1:1로만 접속한다고 가정한다면 워커 역시 DEALER 소켓으로도 작성할 수 있다.) ROUTER 소켓은 상대 피어를 식별하는 정보를 생성해내므로, 항상 recv_multipart()/ send_multipart() 를 사용해서 메시지를 받고 보내야 한다.

아래에서는 한 번에 10개씩 메시지를 받은 후 다시 10개씩 답장을 보내는 형식으로 작동하도록 했다. 이 워커는 1개 이상의 위 DEALER 워커와 연결되어 작동할 수 있다.

def worker_rt(ctx: zmq.Context):
    sock = ctx.socket(zmq.ROUTER)
    sock.bind("inproc://test")
    while True:
        queue = []
        for i in range(10):
            data= sock.recv_multipart()
            queue.append(data)
        for (addr, message) in queue:
            x = message.decode()[:4]
            reply = f"{x}-world".encode()
            sock.send_multipart([addr, reply])

이렇게 ROUTER가 비동기방식으로 여러 개의 peer와 동시에 교신할 수 있기 때문에, 각각의 메시지를 전달해줄 목표를 알아야할 필요가 있다. 따라서 ROUTER 소켓에 메시지를 보내려면, 애플리케이션 역시 어떤 peer에게 데이터를 보내줄 것인가 하는 것을 알아야한다. 그리고 그 식별자는 소켓이 읽어서 넘겨준 데이터에 남아있다.

지금까지 ZMQ의 확장 메시지 패턴에서 사용되는 ROUTER, DEALER 소켓이 어떤 방식으로 동작하는지와 이를 바탕으로 어떻게 사용해야하는지 또, 그에 대응하는 REQ, REP 소켓은 어떻게 작동하는지를 살펴보고, 기본적인 큐 형태의 프록시를 수동으로 만드는 방법에 대해서도 소개해보았다. 다음 시간에는 조금은 특이한 형태의 연결들, 이를 테면 REQ-ROUTER나 REQ-DEALER 등등의 특이할 것 같지만 이론적으로 가능은 해 보이는 연결 패턴들에 대해서 소개하고, 활용할 수 있는 방법에 대해서 살펴보도록 하겠다.

0 Comments

No Comment.