Wireframe

ZMQ 파이프 만들기

ZMQ 프록시등에서 캡쳐된 데이터를 확인하려고 할 때와 같이 간단하게 1:1로 데이터를 주고 받을 수 있도록 두 개의 PAIR 소켓을 사용하여 파이프를 만들어 활용할 수 있다. 다른 소켓의 구성과 무관하게 스레드간 대화는 inproc 프로토콜을 사용해서 짝지은 소켓을 사용하여 만들 수 있다.

import time
import zmq

def get_pipe(ctx: zmq.Context) -> tuple[zmq.Socket, zmq.Socket]:
  n = time.monotonic()
  sock1 = ctx.socket(zmq.PAIR)
  sock1.bind(f'inproc://#{n}')
  sock2 = ctx.socket(zmq.PAIR)
  sock2.connect(f'inproc://#{n}')
  return sock1, sock2

이런 식으로 만들어놓은 파이프는 프록시나 monitored_queue에서 모니터링 소켓으로 활용할 수 있다.

import zmq
from zmq.devices import monitored_queue
from threading import Thread

def server(ctx: zmq.Context):
  sock: zmq.Socket = ctx.socket(zmq.REP)
  sock.connect("tcp://localhost:5555")
  while True:
    mes = sock.recv()
    res = mes.decode().upper()[::-1].encode()
    sock.send(res)

def client(ctx: zmq.Context):
  sock: zmq.Socket = ctx.socket(zmq.REQ)
  sock.connect("tcp://localhost:5556")
  for _ in range(10):
    sock.send(b'hello')
    res = sock.recv()
    print(res)
  

def monitor(sock: zmq.Socket):
  while True:
    print(sock.recv())


def main():
  ctx = zmq.Context.instance()
  pipe = get_pipe(ctx)
  front = ctx.socket(zmq.ROUTER)
  front.bind("tcp://*:5556")
  back = ctx.socket(zmq.DEALER)
  back.bind("tcp://*:5555")

  Thread(target=server, args=(ctx,), daemon=True).start()
  Thread(target=client, args=(ctx,), daemon=True).start()
  Thread(target=mon, args=(pipe[1],), daemon=True).start()
  monitored_queue(front, end, pipe[0])

if __name__ == "__main__":
  main()
  

Exit mobile version