ZMQ 예제 – Poller를 사용하여 종료 시점을 동기화하기

하나의 ZMQ 소켓은 여러 포트에 바인드하거나 커넥트할 수 있어서, 1:N의 연결을 쉽게 구성할 수 있습니다. 하지만 어떤 경우에는 이 다중 접속이 두 개 이상의 소켓을 사용하는 경우도 있습니다. 이런 경우 두 개의 소켓을 동시에 듣는 방법이 필요합니다. ZMQ소켓의 recv() 메소드는 블럭킹 함수이기 때문에 2개 이상의 소켓 중 데이터가 들어온 소켓을 처리하기 위해서는 소켓만으로는 처리할 수 없습니다. ZMQ는 이런 상황에 사용할 수 있는 Poller라는 수단을 제공합니다.

Poller를 사용하는 방법은 다음과 같습니다.

  1. 먼저 Poller 객체를 생성합니다. 주의할 것은 Poller는 ZMQ 컨텍스트로부터 생성하는 것이 아니라 zmq.Poller 클래스의 생성자를 사용합니다.
  2. 생성된 Poller에 register() 메소드를 사용해서 소켓을 등록합니다. 이미 등록된 소켓은 modify() 메소드를 사용해서 변경할 수 있습니다.
  3. poll() 메소드를 호출합니다. 이 메소드는 등록된 소켓 중 1개 이상의 소켓에서 지정한 이벤트가 발생하면 (소켓, 이벤트)의 튜플의 리스트를 리턴해줍니다.
  4. 리턴받은 소켓에 따라 각각 처리를 해줍니다.

일전에 작성해본 PUSH-PULL을 통한 분산처리나, PUB-SUB를 통한 메시지 구독 예제에서 일일이 개별 노드를 중지시키지 않고, 어떤 커맨드센터 역할을 하는 노드를 추가하여 (혹은 기존 노드에 기능을 부여하여) 한꺼번에 모든 노드를 중지하는데 사용할 수 있습니다.

여기서는 PUB-SUB-PUSH-PULL을 조합한 예를 만들어보겠습니다. 구성은 다음과 같습니다.

  1. 1개의 벤틸레이터가 있습니다. 벤틸레이터는 PUB 소켓을 통해서 무작위로 생성한 메시지를 내보냅니다. 이 동작을 무한히 반복합니다.
  2. 여러 개의 워커가 있습니다. 워커는 SUB 소켓을 통해 벤틸레이터로부터 메시지를 수신하고, 수신한 메시지를 다시 PUSH 소켓을 통해 싱크로 전달합니다. 역시 이 동작은 무한히 반복됩니다.
  3. 1개의 싱크가 있습니다. 싱크는 워커들로부터 PULL 소켓을 통해 데이터를 수집합니다. 이때 미리 정해둔 수량 만큼의 데이터만 수집하고 종료합니다.

이 구성에서 작업이 종료되는 시점은 싱크가 알고 있게 됩니다. 싱크가 종료된 후에 각각의 워커와 벤틸레이터 프로세스를 강제로 종료하는 대신, 싱크가 종료 시점을 다른 모든 노드에게 알리는 커맨더 센터가 되게 하면 어떨까요?

  1. 싱크는 PUB 소켓을 하나 준비합니다. 작업이 종료되었을 때, 다른 노드들에게 종료하라는 시그널을 이 소켓을 통해 전달할 수 있습니다.
  2. 워커들도 SUB 소켓을 추가로 가지게 됩니다. 싱크가 종료 신호를 퍼블리싱하면 이 메시지를 받아 소켓을 닫고 프로세스를 종료합니다.
  3. 벤틸레이터도 SUB 소켓을 추가로 가집니다. 싱크로부터 종료 신호를 받으면 무한 루프를 탈출하고, 소켓을 닫습니다.

또 여기서는 각 노드의 시작도 좀 간단한 방법으로 처리하려고 합니다. 터미널창을 여러 개 사용하는 대신에 멀티프로세스를 통해서 각각의 노드들을 개별적으로 실행하고자 합니다. 파일을 분산하여 작성하는가, 하나의 파일에 모든 기능을 작성할 것인가는 상황에 따라 판단하면 되겠습니다만, 여기서는 하나의 파일에 작성하겠습니다. 먼저 공통적으로 사용하게 될 모듈 반입이나, 포트번호, 컨텍스트를 준비하는 코드를 작성합니다.

참고로 이번 예제에서 모든 노드의 동작은 zmq.asyncio를 사용하여 비동기 I/O 기반으로 동작하도록 작성했습니다.

from multiprocessing import Process
import random
import sys
import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

PORT_CMD = 5559
PORT_DATA = 5555
PORT_SNK = 5556

이번에는 싱크에 해당하는 코드를 작성해보겠습니다. 싱크는 두 개의 소켓을 사용하는데, 하나는 각 워커로부터 데이터를 전달받기 위한 PULL 소켓이며, 다른 하나는 데이터 수집이 완료되어었을 때 모든 워커 및 벤틸레이터에게 종료 신호를 보내기 위한 PUB 소켓입니다. 워커들의 PUSH 소켓은 상대적으로 동적이기 때문에 PULL 소켓은 bind로 설정하고, PUB 역시 bind로 설정합니다.

async def run_sink():
    # 소켓 생성 및 준비
    sock = ctx.socket(zmq.PULL)
    sock_cmd = ctx.socket(zmq.PUB)
    sock.bind(f'tcp://*:{PORT_SNK}')
    sock_cmd.bind(f'tcp://*:{PORT_CMD}')

    # 500개의 데이터를 수집하고 출력합니다.
    amount = 500
    i = 0
    for _ in range(amount):
        data = await sock.recv_string()
        i += 1
        sys.stdout.write(f'SINK RECV: {data} - {i}/{amount}\n')
        sys.stdout.flush()
    

    # 닫기. 닫기전 종료 시그널을 퍼블리싱합니다.
    sock_cmd.send_string('bye')
    sys.stdout.write(f'SINK CLOSING...\n')
    sys.stdout.flush()
    sock.close()
    sock_cmd.close()

multiprocessing 모듈을 사용해서 프로세스를 스폰할 것이기 때문에 비동기 코루틴이 아닌 시작용 일반함수를 추가로 작성해줍니다.

def proc_sink():
    asyncio.run(run_sink())

이번에는 워커를 작성해보겠습니다. 특이하게 3개의 소켓을 사용하며, 이 중 수신을 위한 소켓 2개를 Poller에 묶게 됩니다.

async def run_worker(zf='10001'):
    sock_in = ctx.socket(zmq.SUB)
    sock_in.setsockopt_string(zmq.SUBSCRIBE, zf)
    sock_in.connect(f'tcp://localhost:{PORT_DATA}')
    sock_out = ctx.socket(zmq.PUSH)
    sock_out.connect(f'tcp://localhost:{PORT_SNK}')
    sock_cmd = ctx.socket(zmq.SUB)
    # SUB 소켓은 반드시 필터설정을 해야 함.
    # 필터링 없이 모든 메시지를 수신하려면 빈 문자열을 등록함
    sock_cmd.setsockopt_string(zmq.SUBSCRIBE, '')
    sock_cmd.connect(f'tcp://localhost:{PORT_CMD}')

    # 폴러 생성 및 소켓 등록
    poller = zmq.asyncio.Poller()
    poller.register(sock_in, zmq.POLLIN)
    poller.register(sock_cmd, zmq.POLLIN)

    while True:
        events = dict(await poller.poll())
        if sock_in in events and\
        events.get(sock_in, None) == zmq.POLLIN:
            data = await sock_in.recv_string()
            sys.stdout.write(
                f'\nWORKER {zf} RECEIVED'
                f'- {data.rsplit(" ", 1)[0]}\n')
            sys.stdout.flush()
            await sock_out.send_string(data)
        elif sock_cmd in events and\
          events.get(sock_cmd, None) == zmq.POLLIN:
            break
    sys.stdout.write(f'CLOSING WORKER@ {zf}\n')
    sys.stdout.flush()
    sock_in.close()
    sock_cmd.close()
    sock_out.close()

워커는 필터링을 위한 코드값을 인자로 받아서 해당 코드로 시작하는 메시지를 처리합니다. 수신된 메시지의 정보를 표준 출력으로 출력한 후, 다시 싱크로 전달합니다. 이 때 데이터와 종료 시그널을 동시에 기다려야 하므로 Poller가 필요합니다.

다음은 벤틸레이터를 작성해봅니다. PUB 소켓을 사용하여 무작위로 생성된 값으로 쓰여진 메시지를 전송합니다. 동시에 종료 시그널을 기다려야 합니다. 벤틸레이터가 워커랑 다른 점은 워커는 두 개의 수신 소켓을 기다리는데, 벤틸레이터는 수신소켓과 송신소켓을 멀티플렉싱해야 한다는 점입니다.

폴러에 소켓을 등록할 때 사용하는 zmq.POLLIN 메시지는 소켓에 새로운 데이터가 들어오는 이벤트를 캐치하게 됩니다. 반대로 zmq.POLLOUT을 사용하면 소켓으로부터 데이터가 나가는 이벤트를 캐치할 수 있습니다. 따라서 송신 작업 코드를 실행한 후, 폴링을 수행하면 기본적으로 해당 메시지가 송신되었을 때 폴링이 끝납니다. 이 과정 사이에서 수신 소켓에 들어온 정보가 있었다면, 폴링의 결과는 두 소켓의 데이터를 모두 포함할 것입니다.

메시지를 송신하였다면 송신된 메시지를 출력해줍니다. 이 때, 개행을 입력하지 않고 \r을 사용하면 메시지가 제자리에서 바뀌는 것 처럼 보이게 됩니다.

async def run_vent():
    sock = ctx.socket(zmq.PUB)
    sock.bind(f'tcp://*:{PORT_DATA}')
    sock_cmd = ctx.socket(zmq.SUB)
    sock_cmd.setsockopt_string(zmq.SUBSCRIBE, '')
    sock_cmd.connect(f'tcp://localhost:{PORT_CMD}')

    poller = zmq.asyncio.Poller()
    poller.register(sock, zmq.POLLOUT)
    poller.register(sock_cmd, zmq.POLLIN)

    message_no = 1
    while True:
        z = f'{random.randrange(1, 1000):04d}'
        t = f'{random.randrange(-40, 90):-02d}'
        m = f'{message_no:05d}'
        message_no = (message_no + 1) % 10_0000
        msg = f'{z} {t} {m}'
        sock.send_string(msg)
        events = dict(await poller.poll())
        if sock_cmd in events and \
          events[sock_cmd] == zmq.POLLIN:
            break
        elif sock in events and events[sock] == zmq.POLLOUT:
            sys.stdout.write(f'\r{msg}')
            sys.stdout.flush()
            await asyncio.sleep(0.08)

    sys.stdout.write(f'\nVENT CLOSING...\n')
    sys.stdout.flush()
    sock.close()
    sock_cmd.close()

워커와 벤틸레이터 프로세스를 시작하기 위한 함수를 추가로 작성합니다.

def proc_worker(name):
    asyncio.run(run_worker(name))

def proc_vent():
    asyncio.run(run_vent())

마지막으로 메인 프로세스에서 실행할 코드입니다. multiprocessing 모듈은 threading 모듈과 거의 동일한 API를 제공하기 때문에 사용법 자체는 어렵지 않습니다. 싱크, 여러 개의 워커, 벤틸레이터 순으로 프로세스를 시작하며 주 프로세스는 벤틸레이터 프로세스에 조인하여 벤틸레이터가 종료한 후에 종료될 수 있게 합니다.

def main():
    Process(target=proc_sink).start()
    for _ in range(10):
        z = f'{random.randrange(1, 1000):04d}'
        print(f"WORKER STARTS AT {z}")
        Process(target=proc_worker, args=(z,)).start()
    p = Process(target=proc_vent)
    p.start()
    p.join()

if __name__ == '__main__':
    main()