Wireframe

DEALER, ROUTER로 REQ, REP 구현하기

이전 글에서 REQ, REP가 메시지를 주고 받을 때 그 속에서 벌어지는 일들에 대해서 살펴본 바 있다. 그리고 이를 통해 REQ > ROUTER > DEALER > REP 로 이어지는 확장 패턴이 어떻게 맞물려 돌아가는지 확인했다. 그렇다면 과연 ROUTER, DEALER 소켓은 중계기로서의 의미만 있는 것일까? 그렇지 않다. ROUTER/DEALER 소켓은 그 자체로도 유용하게 사용될 수 있는데, 이들 소켓은 REQ/REP 소켓처럼 수신-발신 사이크의 제약을 받지 않는다. 따라서 이들 소켓을 사용하면 완전히 자유로운 비동기 패턴을 구축할 수 있다. 그러한 확장 패턴을 탐구해보는 기초로서, 이 글에서는 기본적인 클라이언트-워커 패턴을 REQ-REP 가 아닌 다른 소켓을 사용하여 구현하는 방법에 대해서 살펴보도록 하겠다.

DEALER – REP 패턴

먼저 DEALER로 REQ를 대체하는 것이다. REQ는 메시지 앞에 빈 프레임을 끼워 보내고, 받은 메시지에서는 첫 프레임이 빈 프레임인지 확인한다. 이 과정을 DEALER 소켓으로 대체할 때에는 보내기 직전과 받은 직후의 처리를 해주면 REQ 소켓을 시뮬레이트 할 수 있다.

import zmq
from threading import Thread

def client_dealer():
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.DEALER)
    sock.connect("inproc://test")
    for _ in range(10):
        sock.send_multipart([b'', 'hello'.encode()])
        empty, reply = sock.recv_multipart()
        if empty != b'':
            # 첫 프레임이 빈 프레임이 아니면 메시지 무시
            continue
        print(reply.decode())

def worker_rep():
    ctx = zmq.Context.instance()
    sock = ctx.socket(zmq.REP)
    sock.bind("inproc://test")
    while True:
        message = sock.recv_string()
        sock.send_string("world")

if __name__ == "__main__":
    Thread(target=worker_rep, daemon=True).start()
    client_dealer()

REQ-ROUTER 패턴

이번에는 REQ-REP 패턴에서 REP 를 ROUTER로 변경해보겠다. REP 소켓은 REQ 소켓이 삽입했을 빈 프레임을 찾는다. 그리고 그 빈프레임의 다음 내용이 원래 메시지가 된다. REP 소켓이 메시지에서 빈 프레임을 찾지 못했다면 전체 메시지가 무시된다. 또한 ROUTER 소켓은 받은 메시지에서 피어의 식별값을 끼워넣고 있으므로, 이 부분에 대해서도 처리가 필요하다. 따라서 기본적으로 ROUTER 소켓을 써야 한다면 [send | recv]_multipart() 메소드를 이용해야 함을 기억하자.

def worker_router():
    ctx = zmq.Context.instance()
    sock = ctx.socket(ROUTER)
    sock.bind("inproc://test")
    while True:
        try:
            addr, *fs = sock.recv_multipart()
            i = fs.index(b'')
            message = fs[i + 1].decode()
            print(message)
            fs[i + 1] = "world".encode()
            sock.send_multipart([addr, *fs])
        except ValueError:
            # b'' 프레임을 찾지 못하면 ValueError 예외가 발생하므로, 무시하고 진행
            continue

이렇게 REQ, REP를 각각 DEALER와 ROUTER로 치환할 수 있는 방법을 살펴보았다. 당연히 위에서 작성한 두 개의 소켓끼리 연결하여도 잘 작동할 것이다. 이렇듯 zmq에서 소켓은 활용하기에 따라서 다양한 패턴을 만들 수 있지만, 실제로 가능하지 않은 조합들이 있기 마련이다. 이런 조합들을 꼭 외워둘 필요는 없는데, 왜 그런지를 알면 써도 될지 안될지를 알 수 있기 때문에 한 번 읽어 볼 필요는 있을 것이다.

불가능한 패턴

ZMQ에는 다양한 소켓이 있고, 소켓을 조합할 수 있는 방법은 매우 다양하다. 그렇지만 모든 소켓들이 짝이 될 수는 없다. 예를 들어 REP소켓은 대화를 시작하지 못하기 때문에 REP-REP 연결은 논리적으로도 올바르지 않으며, 같은 이유로 REQ-REQ 소켓도 어울리지 않는다. 다음의 조합은 어느 쪽이 서버/클라이언트인지에 상관없이 zmq 내에서 사용할 수 없거나 권장되지 않는 패턴이다.

의외의 패턴

우리는 두 peer를 구분할 때, bind / connect로 구분하는데, 이 역할이 반드시 작업 흐름에서의 서버-클라이언트에 해당하지 않을 수 있다는 것을 배웠다. 하지만 REQ-REP 패턴에서는 어느쪽이 bind 를 하고 어느쪽이 connect를 하든 상관없이 REQ 소켓을 쓰는 쪽이 클라이언트가 되리라는 것은 예측할 수 있다.

프록시와 N개의 워커가 연결되어 있는 지점을 하나 생각해보자. 프록시의 backend 가 ROUTER 소켓으로 되어 있고, 여기에 여러 개의 워커들이 REQ 소켓을 사용하여 연결되는 것이다. 워커라면 서버에 대응하는데, REQ가 되는 것이 맞을까? 하는 생각이 들 수 있지만, 실제로 사용할 수 있고 사용되기도 하는 로드밸런싱 패턴이다.

REQ는 먼저 메시지를 보내고, 이후부터는 받고 > 보내고를 반복하게 된다. 따라서 워커는 REQ 소켓으로 브로커에게 연결될 때 미리 약속된 메시지를 보내고, 이후부터는 일감을 받고 결과를 보내는 식으로 작동하면 된다.

  1. 브로커는 ROUTER-ROUTER가 연결될 프록시라고 생각할 수 있다. 또 내부적으로 연결된 워커의 숫자를 관리하게 된다.
  2. 워커가 준비됐단 메시지를 보내면 (= 워커가 연결되면) 워커의 식별자를 내부 큐에 관리한다.
  3. 준비된 워커가 없을 때 클라이언트가 요청을 보내면, 또 다른 내부 큐에 보관한다.
  4. 워커가 응답을 보내면, 처음 요청한 클라이언트에게 응답을 재전송해준다. 그리고 이 상태에서는 유휴 워커가 1개 발생한 셈이다. 요청작업 큐에 남은 작업이 있다면, 작업을 꺼내어 할당해준다. 만약 남은 작업이 없다면 가용 워커 큐에 해당 워커의 식별자 정보를 저장해둔다.

이러한 아이디어에 대한 구현은 좀 귀찮은 관계로 다음으로 미루도록 하겠다. 다만, ROUTER와 DEALER의 특성, 그리고 REQ, REP의 특성을 잘 이해한다면 zmq를 사용하는 어떤 메시징 패턴을 직접 간단하게 비동기화하거나, 스위치나 큐를 직접 구현하여 속도나 신뢰성을 높이거나 더 큰 스케일로 확장하는 것에 좀 더 유연하게 대응할 수 있게 해줄 것이라고 생각한다.

Exit mobile version