ZMQ의 메시지 패턴

ZMQ의 기본 개념을 다루면서 ZMQ에서는 몇 가지 코어 메시징 패턴이 있다고 언급했다. 이 중에서 REQ-REP 패턴에 대해서는 에코 서버를 구현하면서 소개하였다. 이 글에서는 ZMQ 메시징 패턴에 대해 각각의 예시에 대해서 언급해보겠다.

REQ-REP 패턴

REQ-REP 패턴은 전형적인 서버-클라이언트 통신에 사용될 수 있는 패턴이다. 웹서버나 에코서버등의 전형적인 서버-클라이언트 구성에서 이 패턴을 사용한다. 주로 서버는 zmq.REP을 소켓을 생성할 때 인자로 전달하며, 클라이언트는 zmq.REQ를 사용해서 소켓을 생성한다. 이전 글에서 하나의 서버가 하나의 소켓을 생성하고, 이 소켓을 여러 tcp 포트에 동시에 연결하는 것을 예로 보인적이 있는데, 클라이언트도 마찬가지로 하나의 REQ 소켓을 사용해서 여러 서버에 동시에 접속할 수 있다. 이 예를 구현해보도록 하자.

서버

서버를 실행할 때 포트번호와 서버 이름을 각각 인자로 주고 실행하도록 했다. 이 이름은 응답 메시지에 덧붙여져서 클라이언트로 돌아오게 된다.

import sys
import zmq

ctx = zmq.asyncio.Context()

def run_server(port, name):
    print("STARTING SERVER")
    sock = ctx.socket(zmq.REP)
    sock.bind(f'tcp://*:{port}')
    print("READY")
    while True:
        message = sock.recv_string()
        print(f'RECEIVED: {message}')
        print(f'SENDING:  {name},{message}')
        sock.send_string(','.join((name, message)))

if __name__ == '__main__':
    run_server(port, name)

하나의 클라이언트가 여러 서버에 접속하는 것은 Socket.connect()를 반복해서 여러번 호출하는 것으로 간단히 구현된다. 이렇게 다중 접속된 상태에서 소켓은 메시지를 보낼 때마다 연결된 서버에 번갈아가면서 메시지를 보내는 식으로 동작한다. 단, REQ-REP 패턴의 특성상 메시지를 보낸 서버로부터 응답을 받은 후에 새로운 서버에게 메시지를 보낼 수 있다.

import sys
import zmq

ctx = zmq.asyncio.Context()

def run_client(*ports):
    sock = ctx.socket(zmq.REQ)
    for port in ports:
        sock.connect(f'tcp://localhost:{port}')
    while True:
        line = input('>> ')
        print(f'SENDING: {line}')
        sock.send_string(line)
        rep = sock.recv_string()
        name, message = rep.split(',', 1)
        print(f'RECEIVED: {message} FROM {name}')
        if line == 'bye':
            break
    sock.close()

if __name__ == '__main__':
    run_client(*sys.argv[1:])

여러개의 터미널을 띄워놓고 클라이언트 하나와 서버들을 실행하자. 클라이언트가 실행되는 창에서 메시지를 입력하면 각각의 서버가 사이좋게 하나씩 메시지를 받아서 클라이언트에게 돌려주는 광경을 목격할 수 있다. 결국, REQ-REP 패턴에서 각각의 소켓은 자신이 서버인지 클라이언트인지는 중요하지 않으며, 주고 받는 대상 노드가 1개 이상인 경우에는 순서대로 번갈아가며 통신하게 된다.

우리가 작성하고 있는 서버 X가 REP 패턴의 소켓을 가지고 있으며, 내부 네트워크의 클라이언트들과 통신하고 있는 상황을 가정해보자.  그 중 하나의 클라이언트가 REQ 소켓과 REP 소켓을 한 몸에 지니고 있으며 두 소켓을 연결 시켜놓은 상태이고, 다시 이 클라이언트의 REP 소켓이 외부로 노출된다면, 이는 외부로부터 들어오는 요청들을 서버에 전달하고 그 응답을 다시 외부에 있는 클라이언트들에게 돌려줄 수 있는 게이트웨이의 역할을 충분히 수행할 수 있게 된다.

ZMQ는 통신의 각 주체를 서버-클라이언 관계로 정의하기 보다는 커다란 네트워크 그래프에서 각 노드간의 연결이 어떤식으로 기능하는가를 메시징 패턴으로 정의한다. 그리고 우리는 만들고자하는 네트워크의 특정한 노드가 어떤식으로 동작할지를 결정해서 간단한 디바이스를 하나 추가하는 것으로 손쉽게 네트워크를 확장할 수 있음을 볼 수 있다.

PUB-SUB 패턴

PUB-SUB패턴은 Pulisher와 Subscriber를 정의하고 이 노드들 사이에서 Publisher가 발송하는 메시지를 모든 Subscriber가 수신한다. 간단히 말해 멀티 캐스트 통신을 구축하는 패턴이다.

다음은 지역별로 랜덤하게 온도와 습도를 발행하는 서버를 구현한 예이다. (원문에는 메시지 번호를 넣지 않는데, 이 서버가 실제로 얼마나 많은 메시지를 만들어 내는지 체험해보는 것도 나쁘지 않을 것 같아서 넣었다.) 코드를 보면 알겠지만 딜레이를 주지 않고 계속해서 메시지를 만들어서 큐를 통해서 발송하기만 한다. 클라이언트 입장에서는 데이터가 스트림처럼 쏟아져 들어오는 것 처럼 보일 것이다.

import sys
import random
import zmq

ctx = zmq.Context()

def run_server(port=5556):
    sock = ctx.socket(zmq.PUB)
    sock.bind(f'tcp://*:{port}')
    i = 0
    while True:
        zipcode = random.randrange(1, 10_0000)
        temp = random.randrange(-80, 135)
        hum = random.randrange(10, 60)
        msg = f'{zipcode:05d} {temp:-3d} {hum:-2d} {i:06d}'
        # 메시지는 스트링이 아니라 바이트로 보낸다.
        # 정확하게는 zmq.Frame으로 전송된다.
        sock.send_string(msg)
        i = (i + 1) % 100_0000

if __name__ == '__main__':
    port = sys.argv[1:3] if len(sys.argv) > 1 else 5556
    run_server(port)

다음은 이 메시지를 수신해서 처리할 클라이언트 코드이다. Subscriber는 모든 메시지를 수신하지 않고, 각각 지정한 zipcode값이 일치하는 데이터만 수신한다고 가정하겠다. SUB 타입 소켓은 zmq.SUBSCRIBE 라는 소켓 옵션을 통해서 데이터의 앞부분을 비교, 일치하는 메시지만 수신할 수 있다. setsockopt_string() 메소드를 사용하면 필터를 문자열로 입력하여 비교가 가능하다.

import sys
import zmq

ctx = zmq.Context()

def run_client(port, zfilter):
    sock = ctx.socket(zmq.SUB)
    sock.connect(f'tcp://localhost:{port}')
    # 소켓이 받아들일 필터를 정한다.
    # 필터와 앞부분이 매치되는 메시지만 수신하게 된다.
    sock.setsockopt_string(zmq.SUBSCRIBE, zfilter)

    total_temp = 0

    #데이터를 50개만 수신한다.
    for update_no in range(50):
        msg = sock.recv_string()
        print(f'RECEIVED: {msg}')
        z, t, h, n = msg.split()
        total_temp += int(t)

    print(f"AVERAGE TEMPERATURE FOR {z}: {total_temp/50:+.2f}")
    sock.close()


if __name__ == '__main__':
    port, zip_filter = sys.argv[1:3] if len(sys.argv) > 1 else (5556, '10001')
    run_client(port, zip_filter)

만약 SUB 소켓에 필터를 설정하는 과정을 생략하면, 해당 소켓은 아무런 메시지도 수신하지 않을 것이다. 모든 메시지를 수신하고 싶다면 빈메시지를 필터로 설정해야 한다.

이 코드들을 실행시켜보면 서버는 클라이언트가 있든 없든 계속해서 메시지를 발행해나간다는 것을 알 수 있다. socket.socket 과는 달리 ZMQ 소켓은 상대방의 연결 여부를 중요하게 생각하지 않는다. 내부적으로 ZMQ 소켓은 트랜스포트와 분리된 메시지 큐이기 때문에, 큐의 크기가 허용하는데까지 메시지를 쌓아두었다가 클라이언트가 접속되면 네트워크를 통해 메시지를 전송한다.

공식 가이드문에서도 지적하고 있지만, PUB-SUB 패턴에서는 특정한 메시지를 언제 받아오는지를 알 수 있기 어렵다. 이는 서버보다 클라이언트가 먼저 시작되더라도 마찬가지 상황이다. 나중에 설명하겠지만, 이런 문제는 모든 클라이언트 커넥션이 체결된 후에 실제 메시지 전송을 시작하게 함으로써 해결이 가능하다.

PUSH-PULL 패턴

이 패턴은 pipeline이라 불리는 패턴으로 한쪽에서 다른쪽으로 메시지를 푸시하는 방식이다. 이 패턴이 REQ-REP와 다른 점은 요청-응답으로 한쌍의 양방향 통신으로 사이클이 구성되지 않고, PUSH 소켓으로부터 PULL 소켓으로 단방향 통신이 이루어진다는 점이다. 또 PUB-SUB 패턴하고도 차이를 보이는데, PUB-SUB 패턴에서는 연결된 모든 SUB 소켓이 동일한 메시지를 수신받는데 비해(그래서 SUB소켓은 필터를 필요로 한다.) PUSH-PULL에서는 PUSH소켓에서 연결된 PULL 소켓들에게 순차적으로 메시지를 각각 전송하게 된다.

가이드 문서에서는 이를 이용해서 분산처리를 수행하는 아키텍쳐를 어떻게 쉽게 만들 수 있는가를 설명하고 있다. 프로세스를 여러개 만들어서 분산처리를 하는 경우, 개별 워커가 계산해낸 결과 데이터를 다시 하나로 수집하는 데에는 파이프와 같은 장치를 사용하는데, 이것이 익숙하지 않은 사람에게는 조금 어렵고, 또 워커 수가 많아지는 경우에는 구현 난이도가 높아진다.

ZMQ는 이러한 상황에서 구현 난이도를 엄청나게 낮춰준다. 먼저 다른 ZMQ 소켓이 그러하듯 일반적인 네트워크 소켓을 사용하여 데이터를 전달하는 것과 크게 다르지 않은 구현방식을 취하면서, 내부의 트랜스포트에 대해서는 생각하지 않아도 된다. 즉, 분산처리를 수행하는 각각의 워커는 동일 머신 내의 다른 프로세스일수도 있고, 네트워크 상의 다른 컴퓨터일수도 있다. (ZMQ 소켓의 내부 트랜스포트는 바인드/커넥트시 사용하는 프로토콜에 따라 결정된다. tcp, ipc, inproc, vmci, pgm/epgm 등을 사용할 수 있다.)

PUSH-PULL 패턴으로 데이터를 분산처리하는 구조를 구성하는데에는 크게 다음 세가지로 역할에 따라 노드를 만들고 연결하면 된다.

  1. 벤틸레이터 – 처리해야 할 데이터를 계속해서 생성해내는 프로세스이다.
  2. 워커 – 생성된 데이터를 받고 처리한 후 그 결과를 만드는 처리 프로세스이다.
  3. 싱크 – 워커에 의해 생성된 결과 값들이 모이는 곳이다.

이 때 워커가 여러 개가 되면 훌륭한 분산처리 시스템이 된다. 문제는 파이썬에서도 이미 멀티 스레드나 멀티 프로세스를 통한 분산처리 작업은 이미 가능하다는 것이다. 그렇다면 ZMQ를 사용해서 이러한 분산처리 시스템을 구축하는 것이 뭐 그리 큰 도움이 될까?

당연히 그렇다. ZMQ는 하부 트랜스포트의 타입에 상관없이, 항상 소켓이라는 단일 API를 제공하여 코드를 작성할 수 있게 된다. 이 말은 분산처리가 한 대의 로컬 머신이 아닌 네트워크 상에 흩어져 있는 노드를 통해서 처리하는 것도 아주 간단한 코드로 할 수 있다는 뜻이며, 단일 머신이든 네트워크를 통한 분산처리든 프로그램의 구조가 동일해진다는 강점을 갖는다.

PUSH-PULL 패턴을 통한 분산처리를 수행하는 과정을 묘사해보면 다음과 같겠다.

  1. 맨 먼저 싱크가 구동되어 대기한다. 대기상태의 싱크는 PULL 소켓을 바인딩한다.
  2. 벤틸레이터 서버를 구동한 후 대기한다. 벤틸레이터 서버는 PUSH 소켓을 바인딩한다.
  3. 워커를 구동한다. 워커는 PUSH, PULL 소켓을 각각 하나씩 가지고 있다. PULL 소켓은 벤틸레이터에, PUSH 소켓은 싱크에 연결한다.
  4. 워커가 더 필요하다면 원하는 수 만큼 3의 과정을 반복한다.
  5. 모든 워커가 준비되었으면, 벤틸레이터가 데이터 전송을 시작한다. 전송되는 데이터는 각각의 워커에게 돌아가며 순차적으로 전달되고, 다시 각각의 워커는 싱크에게 처리된 데이터를 전달한다.
  6. 처리가 끝나면 각각의 노드 프로세스를 종료한다.

싱크

싱크는 각 워커로부터 데이터를 수신받는다. 미리 정해진 개수만큼 데이터를 수신받으면 총 걸린 시간을 출력하고 끝내는 동작을 할 것이다. 데이터의 개수는 프로세스가 구동되고 PULL소켓으로 받는 첫 메시지를 통해 알게된다. (이 값은 나중에 벤틸레이터가 알려줄 것이다.) 데이터를 받을 때마다 진행상황을 시각적으로 표현하기 위해서 . 혹은 : 을 출력하게끔 하였다.

싱크는 지정된 횟수만큼 데이터를 수신하고나면 자동으로 종료한다.

import sys
import time
import zmq

ctx = zmq.Context()

def run_server(port=5556):
    sock = ctx.socket(zmq.PULL)
    sock.bind(f'tcp://*:{port}')

    print(f"STARTING SERVER AT {port}")

    sig_start = sock.recv()
    v = int.from_bytes(sig_start, 'big')
    print(f"GATHER {v} TASKS.")

    TIME_START = time.time()
    for task_no in range(v):
        data = sock.recv()
        sys.stdout.write(':' if task_no % 10 == 0 else '.')
        sys.stdout.flush()
    TIME_END = time.time()
    print(f'\nELAPSED TIME: {TIME_END - TIME_START:.2f}')

if __name__ == '__main__':
    run_server()

벤틸레이터

다음은 벤틸레이터이다. 벤틸레이터는 PUSH 소켓으로 일정한 개수의 데이터를 보낸다. 이후 사용자 입력을 기다렸다가 종료한다. 하지만 실제로는 순식간에 데이터를 생성해서 보낸다음, 워커들이 한창 돌아가는 중일때에도 자기는 할일 다했다고 종료하겠다고 메시지를 출력하는 것을 확인할 수 있다.

import sys
import time
import random
import zmq

'''VENTILATOR : 5557'''

ctx = zmq.Context()

def run_server(port=5557):
    sock = ctx.socket(zmq.PUSH)
    sock.bind(f'tcp://*:{port}')

    cmd = ctx.socket(zmq.PUSH)
    cmd.connect(f'tcp://localhost:5556')

    # TELL SINK THE NUMBER OF TASK
    VOLUME = 100_000

    cmd.send(VOLUME.to_bytes(8, 'big'))
    print(f"START VENT SERVER")
    input("> IF YOU'RE READY, PRESS ENTER TO START")


    total_task = 0

    for _ in range(VOLUME):
        workload = random.randrange(1, 100)
        total_task += workload
        sock.send_string(f'{workload}')
    print(f'TOTAL EXPECTED COST: {total_task}')
    input("PRESS ENTER TO QUIT")
    ctx.destroy()

if __name__ == '__main__':
    run_server()

실제로 사용한 소켓은 PUSH 소켓 두 개이다. sock은 워커들에게 데이터를 뿌리기 위한 것이고, cmd는 싱크에게 데이터의 개수를 알려주기 위한 용도로 사용된다. 그래서 sock은 바인드하고 cmd는 싱크의 포트로 연결하는 것을 알 수 있다.

워커

워커는 벤틸레이터와 싱크 사이에서 데이터를 입력, 가공, 출력하는 일종의 변환장치 같은 것이다. 대신에 PUSH-PULL 구조에서는 이런 변환장치들이 병렬적으로 동작할 수 있다는 것이 차이점이랄까. 구조는 지극히 단순하니 따로 설명하지 않아도 될 것 같다.

import zmq
import sys
import time

ctx = zmq.Context()

def run_worker(*ports):
    port_pull, port_push, *_ = ports

    sock_pull = ctx.socket(zmq.PULL)
    sock_pull.connect(f'tcp://localhost:{port_pull}')
    sock_push = ctx.socket(zmq.PUSH)
    sock_push.connect(f'tcp://localhost:{port_push}')

    while True:
        data = sock_pull.recv_string()
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(int(data) * 0.001)
        sock_push.send(b'')

if __name__ == '__main__':
    run_worker(5557, 5556)

테스트

이제 모든 구성원에 대한 준비가 끝났다. 순서대로 각 프로세스를 실행한 후에 구동해보자. 아래 캡쳐는 이 결과를 보여주는 것이다. 총 7개의 워커를 사용했으며 수천개의 값을 벤틸레이팅했다.  상대적으로 천천히 .을 찍어 나가는 화면이 각각의 worker이고 맨 마지막에 빠르게 결과를 찍어나가는 창은 sink에 해당한다. 최종적으로 sink에 출력되는 총 소요 시간은 벤틸레이터가 합산한 전체 코스트 / 워커의 개수의 값과 거의 비슷함을 알 수 있다.

지금까지의 예제들에서 눈여겨 봐야 할 것은, ZMQ에서 bind/connect의 차이가 서버/클라이언트의 관계와는 별 상관없다는 것을 알 수 있을 것이다. 또한 1:N 접속이 서버:클라이언트의 관계도 아니라는 것도 확인했다. ZMQ에서 bind/connect를 선택하는 기준은 bind 해야하는 소켓은 네트워크 그래프 상에서 비교적 정적인 것이며, connect 하는 소켓은 상대적으로 동적인 것으로 보면 되겠다.

워커의 종료 문제 – 여러 소켓을 동시에 기다리기

마지막 PUSH-PULL 예제에서 우리는 벤틸레이터가 2개의 PUSH 소켓을 사용하면서 워커는 물론 싱크와도 상호작용하도록 할 수 있다는 점을 살펴보았다. 실제로 벤틸레이터와 싱크는 워커와 무관하게 실행하고 종료할 수 있었다. 그런데 워커도 작업이 끝나는 시점에 자동으로 종료시킬 수 있을까?

만약 싱크가 추가적인 PUB 소켓을 가지고 있고, 모든 워커가 이를 구독하는 SUB 소켓을 가지고 있다면, 싱크가 작업 종료시에 종료 시그널을 브로드캐스트해주면 되는 것 아닐까?

그런데 이때 문제가 있다. 워커는 sock_pull 소켓을 무한루프를 돌면서 듣고 있는 상황이이기 때문에, 루프의 어느 시점에서 PULL 소켓을 기다리는 동안에는 SUB 소켓을 들을 수 없게 된다. 즉 소켓이 여러 포트에 바인딩/커넥트 되는 것은 가능하지만, 소켓 자체를 2개 동시에 들을 수 없다는 한계가 있다.

실제로는 멀티스레드에서 각각의 소켓을 듣도록 하는 방법도 있다. 하지만 zmq를 쓰는 것으로 멀티스레드를 직접 사용하지 않아도 하는 편리를 얻기 때문에 굳이 이렇게까지 할 필요가 있을까

이러한 문제를 해결하기 위해서 ZMQ에는 POOLER라는 기능을 제공한다. 이를 활용하면 여러 개의 소켓을 동시에 듣는 효과를 내는 것이 가능하다. 다음 시간에는 풀러에 대해서 알아보도록 하자.