ZMQ – Poller 사용하기

PUB-SUB 패턴이나 PUSH-PULL 패턴을 사용하면 데이터를 발생시키는 1개 (혹은 여러 개의) 노드로부터 데이터를 전달받는 N개의 노드들이 돌아가는 방식의 네트워크를 구성할 수 있음을 지난 글에서 보았다. 이 때 각각의 클라이언트 노드들은 루프를 돌면서 큐에 들어온 메시지를 순서대로 처리하게 된다.

만약 개별 클라이언트가 한 번에 처리해야 할 메시지의 최대 개수가 정해져 있다면 while 무한 루프가 아닌 for 루프를 통해서 유한한 루프를 돌 수 있을 것이다. 그 외에도 메시지의 내용으로부터 동작이나 중지를 결정할 수 있는 경우에도 그 스스로 동작을 멈출 시점을 결정할 수 있을 것이다.

여러 개의 소켓을 듣는 노드

하지만 클라이언트들의 중지를 다른 제 3의 노드가 결정해야 한다면? ‘커맨드 노드’라는 노드가 있고 이 노드가 PUB 패턴으로 모든 클라이언트들에게 중지 명령을 내리는 구조를 만든다고 가정해보자.

그렇다면 클라이언트 입장에서는 한 번에 두 개의 소켓(하나는 SUB 패턴으로 데이터를 지속적으로 수신받고, 다른 하나는 PULL 패턴으로 종료 명령을 수신받는다.) 을 사용하게 된다. 이것은 이전의 패턴들과는 사뭇다르다. 이전 예제들에서 보여왔던 다중 접속은 하나의 노드가 단일 소켓을 사용하면서, 소켓 자체가 여러 포트에 바인딩되거나, 여러 서버에 커넥트되는 것을 보였다. (그리고 ZMQ는 이 구조 내에서 메시지 전달의 교통정리를 훌륭하게 수행해준다.)

소켓으로 데이터를 반복적으로 읽어야 하는 노드의 특성상, 두 개 이상의 소켓을 하나의 노드가 갖는 것이 문제될 일은 없지만 두 노드를 한꺼번에 대기할 수는 없다. 두 개 소켓으로부터 각각 메시지를 수신하기 위해서는 각각 별도의 스레드를 통해서 처리해야 할 것이다.

POLLER

ZMQ에서는 이러한 상황을 좀 더 쉽게 해결할 수 있도록 POLLER라는 것을 제공한다. Poller는 두 개 이상의 소켓을 등록하여두면 소켓들로부터의 입력을 감지하여 (소켓, 이벤트)의 리스트를 리턴해준다. 이를 이용해서 각각의 소켓을 동시에 수신하면서 소켓별로 구분된 메시지를 얻을 수 있다.

def client(data_port=5556, com_port=5558):
  ctx = zmq.Context()

  # 1. SUB 소켓 연결
  sock_sub = ctx.socket(zmq.SUB)
  sock_sub.setsockopts_string(zmq.SUBSCRIBE, '9')
  sock_sub.connect(f'tcp://localhost:{data_port}')

  # 2. PULL 소켓 연결
  sock_pull = ctx.socket(zmq.PULL)
  sock_pull.connect(f'tcp://localhost:{com_port}')
  
  # 3. 폴러 생성 및 설정
  poller = ctx.Poller()
  poller.register(sock_sub, zmq.POLLIN)
  poller.register(sock_pull, zmq.POLLIN)

  # 폴러로부터 이벤트 수신
  should_continue = True
  while should_continue:
    # 4. 각각의 이벤트는 (소켓, 이벤트), (소켓, 이벤트) ... 의 형태로 전달
    # 이를 사전 타입으로 변환
    socks = dict(poller.poll())
    if socks.get(sock_pull, None) == zmq.POLLIN:
      command = sock_pull.recv_string()
      if command == 'EXIT':
         print('This client will be terminated...')
         should_continue = False
    if socks.get(sock_sub, None) == zmq.POLLIN:
      topic, msg = sock_sub.recv_string().split(' ', 1)
      print(f'Processing: {msg}')

  
  

poller는 복수 개의 소켓을 번갈아 수신하기 보다는 특정한 주기로 각 소켓의 메시지 큐를 검사해서 신규 메시지들을 소켓별로 하나씩 가져와서 이벤트의 리스트로 리턴해준다. 폴러에 등록되는 소켓의 타입은 서로 같은 것들이어도 무방하다. 지난 글에서 살펴본 PUSH-PULL 패턴을 사용한 분산처리는 다음과 같이 개선해볼 수 있다.

  1. Ventilator는 PUSH 패턴으로 복수의 Worker에게 데이터를 순차적으로 뿌린다.
  2. Worker는 PULL 소켓으로 데이터를 수신하여 처리한 후, 다시 PUSH 소켓으로 Sink에게 그 결과를 전달한다.
  3. Worker는 또한 SUB 소켓으로 중지명령을 기다리고 있다. 따라서 Worker에는 PULL, SUB 소켓이 Poller로 묶여있게 된다.
  4. Sink는 PULL 소켓으로 여러 Worker로부터 결과를 수신받아 취합한다.
  5. Sink는 모든 결과가 수신되면 PUB 소켓을 통해 모든 Worker들에게 kill 시그널을 전송한다.

이처럼 단순 소켓이 아니라 소켓을 결합해주는 Poller를 사용하면 복잡한 네트워크 그래프를 간단한 방법으로 구축할 수 있다. Poller외에도 ZMQ에서는 양 끝단에 소켓을 부착하여 메시지를 포워딩해주는 device라는 개념이 있는데, 다음 번에는 이 부분에 대해서 한 번 알아보는 기회를 갖도록 하자.