ZMQ 파이프 만들기
ZMQ 프록시등에서 캡쳐된 데이터를 확인하려고 할 때와 같이 간단하게 1:1로 데이터를 주고 받을 수 있도록 두 개의 PAIR 소켓을 사용하여 파이프를 만들어 활용할 수 있다. 다른 소켓의 구성과 무관하게 스레드간 대화는 inproc
프로토콜을 사용해서 짝지은 소켓을 사용하여 만들 수 있다.
import time
import zmq
def get_pipe(ctx: zmq.Context) -> tuple[zmq.Socket, zmq.Socket]:
n = time.monotonic()
sock1 = ctx.socket(zmq.PAIR)
sock1.bind(f'inproc://#{n}')
sock2 = ctx.socket(zmq.PAIR)
sock2.connect(f'inproc://#{n}')
return sock1, sock2
이런 식으로 만들어놓은 파이프는 프록시나 monitored_queue에서 모니터링 소켓으로 활용할 수 있다.
import zmq
from zmq.devices import monitored_queue
from threading import Thread
def server(ctx: zmq.Context):
sock: zmq.Socket = ctx.socket(zmq.REP)
sock.connect("tcp://localhost:5555")
while True:
mes = sock.recv()
res = mes.decode().upper()[::-1].encode()
sock.send(res)
def client(ctx: zmq.Context):
sock: zmq.Socket = ctx.socket(zmq.REQ)
sock.connect("tcp://localhost:5556")
for _ in range(10):
sock.send(b'hello')
res = sock.recv()
print(res)
def monitor(sock: zmq.Socket):
while True:
print(sock.recv())
def main():
ctx = zmq.Context.instance()
pipe = get_pipe(ctx)
front = ctx.socket(zmq.ROUTER)
front.bind("tcp://*:5556")
back = ctx.socket(zmq.DEALER)
back.bind("tcp://*:5555")
Thread(target=server, args=(ctx,), daemon=True).start()
Thread(target=client, args=(ctx,), daemon=True).start()
Thread(target=mon, args=(pipe[1],), daemon=True).start()
monitored_queue(front, end, pipe[0])
if __name__ == "__main__":
main()