zmq device

ZeroMQ 디바이스

ZMQ를 통해서 하나의 포트는 어떤 타입의 소켓이라도 바인드할 수 있다는 것을 이전 글을 통해 알았다. push/pull 패턴에서 특정 포트에 대해서 zmq.PUSH, zmq.PULL 타입의 소켓을 바인드했었다.

통신의 양단에서 상대적으로 변하지 않는 쪽(서버)는 bind() 메소드를 이용해서 소켓을 연결하고 변하는 쪽(클라이언트)은 connect()를 이용해서 소켓에 연결한다.

.

하지만 어떤 경우에는 양쪽이 모두 가변적인 속성을 가지는 경우가 있을 수 있고, 양단에 잘 알려진 포트를 할당하는 것은 그리 좋은 생각이 아닐 수 있다.

이러한 케이스에 대해서 ZMQ는 포워딩 장치를 제공하는데, 이를 연결에 이용할 수 있다.

그러한 경우에, 서버와 클라이언트를 모두 ZMQ 장치에 접속한다. 이 장치는 서로 다른 두 개의 포트를 가지고 있고, 한 포트에서 다른 포트로 메시지를 포워딩 해준다. 따라서 양단이 모두 가변적인 경우에 ZMQ 장치가 고정적인 포인트 역할을 하게 된다.

.

이러한 장치는 메시지 패턴 타입에 따라서 다음 세 가지로 나뉠 수 있다.

  • queue
  • forwarder
  • streamer

Queue

큐 장치는 서버/클라이언트 사이에 위치하면서 클라이언트의 요청을 서버로 전달하고, 다시 서버의 응답을 클라이언트로 전달한다. ZMQ 장치는 디바이스 타입으로 zmq.QUEUE를 지정하고 두 개의 소켓을 지정한다.

.

장치를 생성해보자.

import zmq

def main():
    try:
        context = zmq.Context()

        frontend = context.socket(zmq.XREP)
        frontend.bind('tcp://*:5559')

        backend = context.socket(zmq.XREQ)
        backend.bind('tcp://*:5560')

        zmq.device(zmq.QUEUE, frontend, backend)

    except Exception as e:
        print(e)
        print('bridging down zmq device')

    finally:
        pass
        frontend.close()
        backend.close()
        context.term()

if __name__ == "__main__":
    main()

큐에 사용되는 소켓은 XREP, XREQ 패턴의 소켓을 사용한다. 또한 장치는 생성되면서 자동으로 무한루프를 돌기 때문에 리턴하지 않는다. 따라서 시그널을 보내서 종료시키는 경우에 소켓을 닫고 컨텍스트를 해제하도록 한다.

여기에 붙는 클라이언트, 서버는 기존의 예제와 거의 다르지 않다.

import zmq
import time
import sys
import random

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.connect('tcp://localhost:5560')

server_id = random.randrange(1, 10005)

while 1:
    mes = socket.recv_string()
    print('received:', mes)
    time.sleep(1)
    socket.send_string('world from server {}'.format(server_id))

클라이언트는 아래와 같다.

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)
socket.connect('tcp://localhost:5559')

for r in range(1, 100):
    print('sending request:', r)
    socket.send_string('hello from {}'.format(r))
    mes = socket.recv_string()
    print('response: ', mes)

다음과 같이 실행한다.

  1. python q-device.py
  2. python q-server.py
  3. python q-server.py
  4. python q-client.py
  5. python q-client.py

이렇게하면 2개의 서버와 2개의 클라이언트가 서로 접속하며 큐 장치는 로드밸런싱을 수행한다. 클라이언트1이 요청을 보내면 큐가 서버 하나로 요청을 전달하고, 클라이언트 2가 요청을 보내면 큐는 서버 1로부터 돌아온 응답을 클라이언트1로 돌려보낸 후, 클라이언트2의 요청을 이번에는 서버2로 전송한다.

클라이언트를 하나만 돌려보면 서버1, 서버2로 로드밸런싱되는 결과를 관측할 수 있다.

큐를 사용하지 않으면 2개의 서버를 같은 포트에서 돌릴 수 없다.

Forwarder

Forwarder 장치는 publish/scriber 패턴의 중계장치이다. 복수의 서버가 각각 다른 포트를 사용한다면 클라이언트는 복수개의 소켓을 지정해서 여러 포트로부터 메시지를 받을 수 있다. 하지만 포워더를 사용하면 서버와 클라이언트가 모두 단일 소켓을 이용하고 이를 다시 단일 소켓을 쓰는 클라이언트들에게 분배할 수 있게 된다.

장치

import zmq

def main():
    try:
        context = zmq.Context()

        frontend = context.socket(zmq.SUB)
        frontend.bind('tcp://*:5559')
        frontend.setsockopt_string(zmq.SUBSCRIBE, '')

        backend = context.socket(zmq.PUB)
        backend.bind('tcp://*:5560')
        zmq.device(zmq.FORWARDER, frontend, backend)

    except Exception as e:
        print(e)
        print('bridging down zmq device')
    finally:
        frontend.close()
        backend.close()
        context.term()

if __name__ == '__main__':
    main()

포워더 역시 큐와 동일한 형태로 만들어지는데, 메시지 패턴은 퍼블리셔/스크라이버와 동일한 것을 사용한다. (큐에서만 특별한 패턴이 사용된다.)

퍼블리셔, 스크라이버는 각각 기존의 코드와 동일하다. (다만, 서버가 .bind 대신 .connect를 쓴다는 점만 다르다.)

"""f-server.py"""
import zmq
import random
import sys
import time

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.connect('tcp://localhost:5559')

pub_id = random.randrange(1, 10005)

while 1:
    topic = random.randrange(1, 10)
    mes = 'server #{}'.format(topic)
    print(topic, mes)
    socket.send_string('{} {} {}'.format(topic, mes, pub_id))
    time.sleep(1)
"""f-client.py"""
import zmq
import sys

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
print('collecting updates...')
sock.connect('tcp://localhost:5560')
sock.setsockopt_string(zmq.SUBSCRIBE, '')

topicfilter = '9'

sock.setsockopt_string(zmq.SUBSCRIBE, topicfilter)

for update in range(10):
    string = sock.recv_string()
    print(string)

이상의 코드를 동시에 복수의 서버와 복수의 클라이언트를 돌리면 모든 서버는 동일한 포트를 사용하고, 모든 클라이언트가 또 같은 포트를 사용하기만 하면 각각의 클라이언트는 똑같이 모든 서버의 데이터를 수신받게 된다.

.

streamer

스트리머 장치는 병렬화된 메시징 파이프라인을 위해 사용하는 장치이다. 복수의 feeder가 보내는 메시지는 순차적으로 복수의 worker로 전달된다.

.

먼저 디바이스 코드

import zmq

def main():
    try:
        context = zmq.Context()
        f = context.socket(zmq.PULL)
        f.bind('tcp://*:5577')

        b = context.socket(zmq.PUSH)
        b.bind('tcp://*:5588')

        zmq.device(zmq.STREAMER, f, b)

    except Exception as e:
        print(e)
    finally:
        f.close()
        b.close()
        context.term()

if __name__ == '__main__':
    main()

메시지를 보내는 프로듀서

import zmq
import time

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5577")
    for num in range(200):
        work_message = {'num': num }
        zmq_socket.send_json(work_message)
        time.sleep(0.1)

producer()

아래는 컨슈머…

import random
import zmq

def consumer():
    cid = random.randrange(100, 999)
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PULL)
    sock.connect('tcp://localhost:5588')

    for _ in range(100):
        msg = sock.recv_string()
        num, _, pid = msg.split()
        print('{} from {}'.format(num, pid))


consumer()