ZMQ + Asyncio 적용하기

파이썬에서 ZMQ를 사용할 때, asyncio를 사용할 수 있게 되었다. asyncio에 적용한다고 해서 크게 달라지는 것은 없고 소켓의 사용방법은 대동소이하다. (실제 IO 시점에 작업 전환이 일어날 수 있게 await를 붙이는 것 정도의 차이만 있다. 대략의 사용법을 정리해보면 다음과 같다.

ZMQ + Asyncio 적용하기 더보기

예제 – ZMQ + Asyncio 로 PUSH-PULL 구성

PUSH-PULL 구조를 사용한 분산처리를 구현한 예제를 asyncio 버전으로 재작성해보았다. 벤틸레이터가 보내는 값에 대해 각각의 워커는 그 값에 해당하는 시간만큼 지연시킨 후 싱크에게 짝/홀수 여부값을 전송한다.

예제 – ZMQ + Asyncio 로 PUSH-PULL 구성 더보기

ZMQ 예제 – Poller를 사용하여 종료 시점을 동기화하기

하나의 ZMQ 소켓은 여러 포트에 바인드하거나 커넥트할 수 있어서, 1:N의 연결을 쉽게 구성할 수 있습니다. 하지만 어떤 경우에는 이 다중 접속이 두 개 이상의 소켓을 사용하는 경우도 있습니다. 이런 경우 두 개의 소켓을 동시에 듣는 방법이 필요합니다. ZMQ소켓의 recv() 메소드는 블럭킹 함수이기 때문에 2개 이상의 소켓 중 데이터가 들어온 소켓을 처리하기 위해서는 소켓만으로는 처리할 수 없습니다. ZMQ는 이런 상황에 사용할 수 있는 Poller라는 수단을 제공합니다.

ZMQ 예제 – Poller를 사용하여 종료 시점을 동기화하기 더보기

ZMQ 멀티파트메시지

멀티파트 메시지는 하나의 메시지 프레임 내부에 여러 개의 독립적인 메시지 프레임이 들어 있는 것을 말한다. 이는 하나의 프레임에서 처리하기 힘든 데이터 조각들을 모아서 처리할 때 유용하다. 예를 들어 바이너리 파일 데이터를 전송하려는 경우에는 보내는 쪽이나 받는 쪽이나 전송하는 데이터가 이진데이터라는 것을 알고 있다 가정하여 바이트 스트림을 전송할 수 있다. 하지만 이렇게 하면 실제 데이터 외부에 있었던 정보, 이를 테면 파일 이름이나 생성한 날짜 같은 메타 정보를 전달하기가 어려워진다. 이런 경우 여러 정보들을 멀티 파트 메시지로 묶어서 하나의 프레임으로 전송하면 필요한 모든 정보를 같이 전달해 줄 수 있다.

멀티 파트 메시지 전송은 비단 ZMQ내에서만 사용되는 개념이 아니다. HTTP 규격에서도 멀티 파트 메시지 개념이 존재하며, 실제로 HTML 폼 전송에서 멀티 파트 전송이 널리 사용된다. 여러 개의 필드를 하나의 폼에서 묶어서 submit 한다거나, 여러 개의 파일을 한 번에 업로드하는 폼 들은 모두 enctype='multpart/formdata' 라는 폼 속성을 가지며, 실제 HTTP 전송 헤더 역시 Content-Type 에서 이 값이 사용된다.

ZMQ 에서도 여러 개의 메시지 혹은 프레임을 하나의 메시지로 묶어서 멀티파트로 전송할 수 있다. pyzmq를 사용하는 경우 이는 소켓 객체의 send_multipart() / recv_multipart()  두 개의 메소드로 간단히 구현된다. 다만 문자열이 아닌 버퍼(바이트 배열)를 주고 받아야 한다는 점에 주의만 하면 그외에는 매우 간단하다. 다음은 세 개의 문자열을 하나의 멀티파트 메시지로 전송하는 REQ-REP 예제이다.

## multimessage-server.py

import zmq

ctx = zmq.Context()

def run_server(port=5555):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  keys = 'abc'
  while True:
    msg = dict(zip(keys, (x.decode() for x in sock.recv_multipart())))
    for k, v in msg.items():
      print(f'{k}: {v}')
    ## 대문자화하고 하나의 문자열로 만들어서 되돌려준다.
    sock.send_string(' '.join(msg[k].upper() for k in keys))

if __name__ == '__main__':
  run_server()

recv_multipart() 에 의해 리턴되는 값이 바이트배열의 시퀀스임을 알고 있다면 위 코드는 충분히 이해되리라 본다. REQ-REP 패턴에서 클라이언트에서 서버로 멀티 파트 메시지를 전송했다 하더라도, 반드시 응답이 멀티파트여야 할 필요도 없다. 다음은 위 서버 코드와 짝을 맞출 클라이언트 코드이다. 세 번의 키보드 입력 후 해당 문자열들을 한 번에 전송한다. 그리고 서버가 보내준 단일 메시지를 출력하는 것을 반복하는 간단한 코드이다.

import zmq

ctx = zmq.Context()

def run_client(port=5555):
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    data = [input().encode() for _ in range(3)]
    sock.send_multipart(data)
    res = sock.recv_string()
    print(res)

if __name__ == '__main__':
  run_client()