하나의 ZMQ 소켓은 여러 포트에 바인드하거나 커넥트할 수 있어서, 1:N의 연결을 쉽게 구성할 수 있습니다. 하지만 어떤 경우에는 이 다중 접속이 두 개 이상의 소켓을 사용하는 경우도 있습니다. 이런 경우 두 개의 소켓을 동시에 듣는 방법이 필요합니다. ZMQ소켓의 recv() 메소드는 블럭킹 함수이기 때문에 2개 이상의 소켓 중 데이터가 들어온 소켓을 처리하기 위해서는 소켓만으로는 처리할 수 없습니다. ZMQ는 이런 상황에 사용할 수 있는 Poller라는 수단을 제공합니다.
Poller를 사용하는 방법은 다음과 같습니다.
- 먼저 Poller 객체를 생성합니다. 주의할 것은 Poller는 ZMQ 컨텍스트로부터 생성하는 것이 아니라
zmq.Poller
클래스의 생성자를 사용합니다. - 생성된 Poller에
register()
메소드를 사용해서 소켓을 등록합니다. 이미 등록된 소켓은modify()
메소드를 사용해서 변경할 수 있습니다. poll()
메소드를 호출합니다. 이 메소드는 등록된 소켓 중 1개 이상의 소켓에서 지정한 이벤트가 발생하면 (소켓, 이벤트)의 튜플의 리스트를 리턴해줍니다.- 리턴받은 소켓에 따라 각각 처리를 해줍니다.
일전에 작성해본 PUSH-PULL을 통한 분산처리나, PUB-SUB를 통한 메시지 구독 예제에서 일일이 개별 노드를 중지시키지 않고, 어떤 커맨드센터 역할을 하는 노드를 추가하여 (혹은 기존 노드에 기능을 부여하여) 한꺼번에 모든 노드를 중지하는데 사용할 수 있습니다.
여기서는 PUB-SUB-PUSH-PULL을 조합한 예를 만들어보겠습니다. 구성은 다음과 같습니다.
- 1개의 벤틸레이터가 있습니다. 벤틸레이터는 PUB 소켓을 통해서 무작위로 생성한 메시지를 내보냅니다. 이 동작을 무한히 반복합니다.
- 여러 개의 워커가 있습니다. 워커는 SUB 소켓을 통해 벤틸레이터로부터 메시지를 수신하고, 수신한 메시지를 다시 PUSH 소켓을 통해 싱크로 전달합니다. 역시 이 동작은 무한히 반복됩니다.
- 1개의 싱크가 있습니다. 싱크는 워커들로부터 PULL 소켓을 통해 데이터를 수집합니다. 이때 미리 정해둔 수량 만큼의 데이터만 수집하고 종료합니다.
이 구성에서 작업이 종료되는 시점은 싱크가 알고 있게 됩니다. 싱크가 종료된 후에 각각의 워커와 벤틸레이터 프로세스를 강제로 종료하는 대신, 싱크가 종료 시점을 다른 모든 노드에게 알리는 커맨더 센터가 되게 하면 어떨까요?
- 싱크는 PUB 소켓을 하나 준비합니다. 작업이 종료되었을 때, 다른 노드들에게 종료하라는 시그널을 이 소켓을 통해 전달할 수 있습니다.
- 워커들도 SUB 소켓을 추가로 가지게 됩니다. 싱크가 종료 신호를 퍼블리싱하면 이 메시지를 받아 소켓을 닫고 프로세스를 종료합니다.
- 벤틸레이터도 SUB 소켓을 추가로 가집니다. 싱크로부터 종료 신호를 받으면 무한 루프를 탈출하고, 소켓을 닫습니다.
또 여기서는 각 노드의 시작도 좀 간단한 방법으로 처리하려고 합니다. 터미널창을 여러 개 사용하는 대신에 멀티프로세스를 통해서 각각의 노드들을 개별적으로 실행하고자 합니다. 파일을 분산하여 작성하는가, 하나의 파일에 모든 기능을 작성할 것인가는 상황에 따라 판단하면 되겠습니다만, 여기서는 하나의 파일에 작성하겠습니다. 먼저 공통적으로 사용하게 될 모듈 반입이나, 포트번호, 컨텍스트를 준비하는 코드를 작성합니다.
참고로 이번 예제에서 모든 노드의 동작은 zmq.asyncio를 사용하여 비동기 I/O 기반으로 동작하도록 작성했습니다.
from multiprocessing import Process
import random
import sys
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
PORT_CMD = 5559
PORT_DATA = 5555
PORT_SNK = 5556
이번에는 싱크에 해당하는 코드를 작성해보겠습니다. 싱크는 두 개의 소켓을 사용하는데, 하나는 각 워커로부터 데이터를 전달받기 위한 PULL 소켓이며, 다른 하나는 데이터 수집이 완료되어었을 때 모든 워커 및 벤틸레이터에게 종료 신호를 보내기 위한 PUB 소켓입니다. 워커들의 PUSH 소켓은 상대적으로 동적이기 때문에 PULL 소켓은 bind로 설정하고, PUB 역시 bind로 설정합니다.
async def run_sink():
# 소켓 생성 및 준비
sock = ctx.socket(zmq.PULL)
sock_cmd = ctx.socket(zmq.PUB)
sock.bind(f'tcp://*:{PORT_SNK}')
sock_cmd.bind(f'tcp://*:{PORT_CMD}')
# 500개의 데이터를 수집하고 출력합니다.
amount = 500
i = 0
for _ in range(amount):
data = await sock.recv_string()
i += 1
sys.stdout.write(f'SINK RECV: {data} - {i}/{amount}\n')
sys.stdout.flush()
# 닫기. 닫기전 종료 시그널을 퍼블리싱합니다.
sock_cmd.send_string('bye')
sys.stdout.write(f'SINK CLOSING...\n')
sys.stdout.flush()
sock.close()
sock_cmd.close()
multiprocessing 모듈을 사용해서 프로세스를 스폰할 것이기 때문에 비동기 코루틴이 아닌 시작용 일반함수를 추가로 작성해줍니다.
def proc_sink():
asyncio.run(run_sink())
이번에는 워커를 작성해보겠습니다. 특이하게 3개의 소켓을 사용하며, 이 중 수신을 위한 소켓 2개를 Poller
에 묶게 됩니다.
async def run_worker(zf='10001'):
sock_in = ctx.socket(zmq.SUB)
sock_in.setsockopt_string(zmq.SUBSCRIBE, zf)
sock_in.connect(f'tcp://localhost:{PORT_DATA}')
sock_out = ctx.socket(zmq.PUSH)
sock_out.connect(f'tcp://localhost:{PORT_SNK}')
sock_cmd = ctx.socket(zmq.SUB)
# SUB 소켓은 반드시 필터설정을 해야 함.
# 필터링 없이 모든 메시지를 수신하려면 빈 문자열을 등록함
sock_cmd.setsockopt_string(zmq.SUBSCRIBE, '')
sock_cmd.connect(f'tcp://localhost:{PORT_CMD}')
# 폴러 생성 및 소켓 등록
poller = zmq.asyncio.Poller()
poller.register(sock_in, zmq.POLLIN)
poller.register(sock_cmd, zmq.POLLIN)
while True:
events = dict(await poller.poll())
if sock_in in events and\
events.get(sock_in, None) == zmq.POLLIN:
data = await sock_in.recv_string()
sys.stdout.write(
f'\nWORKER {zf} RECEIVED'
f'- {data.rsplit(" ", 1)[0]}\n')
sys.stdout.flush()
await sock_out.send_string(data)
elif sock_cmd in events and\
events.get(sock_cmd, None) == zmq.POLLIN:
break
sys.stdout.write(f'CLOSING WORKER@ {zf}\n')
sys.stdout.flush()
sock_in.close()
sock_cmd.close()
sock_out.close()
워커는 필터링을 위한 코드값을 인자로 받아서 해당 코드로 시작하는 메시지를 처리합니다. 수신된 메시지의 정보를 표준 출력으로 출력한 후, 다시 싱크로 전달합니다. 이 때 데이터와 종료 시그널을 동시에 기다려야 하므로 Poller가 필요합니다.
다음은 벤틸레이터를 작성해봅니다. PUB 소켓을 사용하여 무작위로 생성된 값으로 쓰여진 메시지를 전송합니다. 동시에 종료 시그널을 기다려야 합니다. 벤틸레이터가 워커랑 다른 점은 워커는 두 개의 수신 소켓을 기다리는데, 벤틸레이터는 수신소켓과 송신소켓을 멀티플렉싱해야 한다는 점입니다.
폴러에 소켓을 등록할 때 사용하는 zmq.POLLIN
메시지는 소켓에 새로운 데이터가 들어오는 이벤트를 캐치하게 됩니다. 반대로 zmq.POLLOUT
을 사용하면 소켓으로부터 데이터가 나가는 이벤트를 캐치할 수 있습니다. 따라서 송신 작업 코드를 실행한 후, 폴링을 수행하면 기본적으로 해당 메시지가 송신되었을 때 폴링이 끝납니다. 이 과정 사이에서 수신 소켓에 들어온 정보가 있었다면, 폴링의 결과는 두 소켓의 데이터를 모두 포함할 것입니다.
메시지를 송신하였다면 송신된 메시지를 출력해줍니다. 이 때, 개행을 입력하지 않고 \r
을 사용하면 메시지가 제자리에서 바뀌는 것 처럼 보이게 됩니다.
async def run_vent():
sock = ctx.socket(zmq.PUB)
sock.bind(f'tcp://*:{PORT_DATA}')
sock_cmd = ctx.socket(zmq.SUB)
sock_cmd.setsockopt_string(zmq.SUBSCRIBE, '')
sock_cmd.connect(f'tcp://localhost:{PORT_CMD}')
poller = zmq.asyncio.Poller()
poller.register(sock, zmq.POLLOUT)
poller.register(sock_cmd, zmq.POLLIN)
message_no = 1
while True:
z = f'{random.randrange(1, 1000):04d}'
t = f'{random.randrange(-40, 90):-02d}'
m = f'{message_no:05d}'
message_no = (message_no + 1) % 10_0000
msg = f'{z} {t} {m}'
sock.send_string(msg)
events = dict(await poller.poll())
if sock_cmd in events and \
events[sock_cmd] == zmq.POLLIN:
break
elif sock in events and events[sock] == zmq.POLLOUT:
sys.stdout.write(f'\r{msg}')
sys.stdout.flush()
await asyncio.sleep(0.08)
sys.stdout.write(f'\nVENT CLOSING...\n')
sys.stdout.flush()
sock.close()
sock_cmd.close()
워커와 벤틸레이터 프로세스를 시작하기 위한 함수를 추가로 작성합니다.
def proc_worker(name):
asyncio.run(run_worker(name))
def proc_vent():
asyncio.run(run_vent())
마지막으로 메인 프로세스에서 실행할 코드입니다. multiprocessing 모듈은 threading 모듈과 거의 동일한 API를 제공하기 때문에 사용법 자체는 어렵지 않습니다. 싱크, 여러 개의 워커, 벤틸레이터 순으로 프로세스를 시작하며 주 프로세스는 벤틸레이터 프로세스에 조인하여 벤틸레이터가 종료한 후에 종료될 수 있게 합니다.
def main():
Process(target=proc_sink).start()
for _ in range(10):
z = f'{random.randrange(1, 1000):04d}'
print(f"WORKER STARTS AT {z}")
Process(target=proc_worker, args=(z,)).start()
p = Process(target=proc_vent)
p.start()
p.join()
if __name__ == '__main__':
main()