Home » PUB-SUB 패턴에서 멀티파트 데이터를 사용하기

PUB-SUB 패턴에서 멀티파트 데이터를 사용하기

PUB-SUB 소켓은 데이터를 발송하는 복수의 Publisher와 이를 수신하는 역시 복수의 Subscriber들이 연결되어 데이터를 분산하는 ZMQ 패턴에 사용된다. 이들은 REQ-REP 패턴과 다르게 PUB쪽에서 SUB쪽으로 단방향으로 데이터가 전송되며, PUSH-PULL과 다르게 하나의 메시지를 모든 Subscriber가 수신한다는 특징이 있다. 이 때, SUB쪽 소켓은 자신에게 맞는 데이터만을 수신하도록 필터를 설정하는데, 보통 PUB-SUB 패턴의 예제에서는 이 필터를 문자열이나 바이트로 설정하여 일치하는 경우만 수신하도록 한다.

ZMQ공식문서에서도 이런 식의 코드를 볼 수 있다. setsockopt_string()을 사용해서 전달한 문자열을 인코딩한 값이 메시지의 첫부분에 해당할 때에만 메시지가 수신된다.

sock = ctx.socket(zmq.SUB)
sock.setsockopt_string(zmq.SUBSCRIBE, '12345')
sock.connect(f"tcp://localhost:{port}")

문자열이 아닌 데이터를 필터링하는 조건

그런데 어떤 경우에는 필터링하는 조건 값이 데이터의 앞부분이 아닐 수 있다. 앞선 예제에서는 전체 데이터가 문자열이었고 이 데이터에서 앞 다섯자리로 판단하고 있었지만, 조건이 되는 값이 데이터의 중간에 끼어 있거나 혹은 사용될 데이터와 완전히 별개로 구성될 가능성도 있다. 이런 경우에는 기본 예제에서 제공하는 방식으로는 제대로 필터링할 수 없다. 이런 경우에는 본 데이터와 채널값을 멀티 파트 데이터로 전송하는 것이 가장 쉬운 방법이라 하겠다. 다음 예는 생성한 데이터와 별개로 채널 값을 실어서 멀티파트로 데이터를 전송하는 것을 보여준다.

# publisher
# ...
# ... create PUB socket and bind ...
chnl = str(random.randint(100, 999)).encode()
data = {'x': random.random(), 'y': random.random() }
payload = json.dumps(data).encode()
sock.send_multipart([chnl, payload])

이렇게 멀티파트로 전송된 데이터는, 프레임이라는 단위로 구분되어 직렬화되고, 다시 받는 쪽 소켓에서 recv_multipart()로 수신하게 되면, 각 프레임의 데이터가 다시 리스트로 조합되도록 작동한다. 그런데 이 과정에서 오가는 데이터는 직렬화된 바이트 스트림이며, 이 때 전체 데이터의 앞 부분에는 멀티 파트 데이터 중에서 가장 앞에 오는 데이터가 들어있다. 따라서 SUB 소켓에서는 데이터 본체(?)와 무관하게 채널값으로 필터링하는 것이 가능하다.

#  in subscriber
sock = context.socket(zmq.SUB)

# 데이터는 멀티파트로 전송될 것이지만, 첫번째 프레임의 데이터에 대해서
# "333" 채널에 대해서만 수신하도록 한다.
sock.setsockopt(zmq.SUBSCRIBE, b'333')
sock.connect(f"tcp://localhost:{port}")
for _ in range(100):
  # 멀티파트 데이터를 분리하여 본래 페이로드값을 획득한다.
  _, payload = sock.recv_multipart()
  data = json.load(io.BytesIO(payload))
  print(data) 

참고로, 바이트스트림으로 직렬화한 JSON 데이터를 문자열로 바꾼 후 파싱해도 되지만, BytesIO를 사용하면 임의의 바이트 배열을 마치 이진 파일처럼 사용하여 복구할 수 있다. JSON이나 그외 데이터를 다른 방식을 통해서 교환하거나 할 때에도 BytesIO를 사용해서 마치 파일에서 읽고 쓰는 것처럼 사용할 수 있으니 참고하자.

보너스

서버-클라이언트가 모두 파이썬이라면 굳이 JSON으로 직렬화하지 않고 pickle을 사용하는 방법이 있다. BytesIO는 추상화된 바이트 스트림이므로, 직렬화 방법에 상관없이 동일하게 사용하고 있음을 주목하자.

# pickle 을 사용해서 데이터 전달
# in pub
sock.send_multipart([chl, pickle.dumps(data)])
# in sub
_, payload = sock.recv_multipart()
data = pickle.load(ByteIO(payload))

다음 gist는 pickle을 사용해서 데이터를 직렬화하는 이 글에서 언급한 예제의 작동 가능한 온전한 버전을 소개한다.

https://gist.github.com/2fed4e0d48d98c634a7f5d628311b1b3

댓글 남기기