zmq poller

멀티프로세스와 프로세스간 통신

멀티프로세스에서 프로세스간 통신을 파이프로 하는 경우도 있지만, zmq를 이용하면 3개 이상의 멀티스레드가 쉽고 자유롭게 메시지를 주고 받는 것이 가능하다. 하나의 프로그램 내에서 분산처리를 위해서는 멀티프로세싱과 ZMQ를 이용하여 프로세스간 통신을 가능하게 할 수 있다.

간단한 예를 만들어보자. 서브 프로세스에서 n 개의 서버와 1개의 클라이언트를 만들고 로드밸런싱 형태로 각각의 서버가 클라이언트와 번갈아가며 통신하는 코드이다.

import zmq
import time
import random
from multiprocessing import Process

def server(port="5566"):
    """ request/reply server process """
    ctx = zmq.Context()
    sock = ctx.socket(zmq.REP)
    print("Running server on port:", port)
    sock.bind('tcp://*:{}'.format(port))
    srv_id = random.randrange(100, 999)

    for req_no in range(10):
        msg = sock.recv_string()
        print('#{}'.format(srv_id), "received: ", msg)
        sock.send_string('world from server #{} on {}'.format(srv_id, port))

def client(ports):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.REQ)
    cli_id = random.randrange(10000,99999)
    for port in ports:
        sock.connect('tcp://localhost:{}'.format(port))

    for req in range(20):
        print('sending request: {} by client #{}'.format(req, cli_id))
        sock.send_string("hello {} from {}".format(req, cli_id))
        res = sock.recv_string()
        print("#{}".format(cli_id), "receive:", res)
        time.sleep(1)

def main():
    ports = range(5500, 5508, 2)
    for port in ports:
        Process(target=server, args=(port,)).start()
    for _ in (1, 2):
        Process(target=client, args=(ports,)).start()

if __name__ == '__main__':
    main()

이렇게해서 하나의 프로그램에서 서버와 클라이언트를 같이 돌릴 수 있게 되었지만 여전히 문제는 존재한다. 바로 하나의 프로세스가 하나의 소켓만 가지게 된다는 점이다. 하지만 실제 프로그램에서는 하나의 프로세스가 여러 개의 소켓에 접속하는 경우도 많다. 이런 문제를 해결하기 위해서는 데이터를 소켓들로부터 폴링하는 것이 필요하다. ZMQ는 이런 매커니즘을 제공한다.

zmq poller

다음과 같은 시나리오를 생각해보자. pub 서버가 데이터를 멀티캐스트하는데 각각의 subscriber들은 이 데이터를 받아서 누적계산을 한다. 그리고 또 다른 push 서버가 있다. push 서버는 간헐적으로 continue 혹은 exit 명령을 전송한다. 그러면 명령을 받은 클라이언트를 그 명령에 따라서 종료하거나 작업을 계속한다.

먼저 클라이언트의 작업 지속 여부를 결정해주는 서버를 만들어보자.

import zmq
import time
import random
from multiprocessing import Process

def srv_push(port='5555'):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUSH)
    sock.bind('tcp://*:{}'.format(port))
    print("Running command server on", port)

    for _ in range(100):
        r = random.randint(1, 100)
        if r % 6 == 0:
            sock.send_string('EXIT')
        else:
            sock.send_string('CONT')

        time.sleep(1)

명령 서버는 1초마다 랜덤으로 계속 혹은 종료하라는 메시지를 내보낸다. (어느 클라이언트가 받게될지는 모른다.) 다음은 데이터를 생성하는 퍼블리셔다. 퍼블리셔 서버는 딱히 특이한 부분은 없다.

def srv_pub(port="4444"):
    ctx = zmq.Context()
    sock = ctx.socket(zmq.PUB)
    sock.bind('tcp://*:%s' % port)

    pub_id = random.randrange(0, 9999)
    print("Running publish server on", port)

    for _ in range(1000):
        topic = random.randrange(2, 10)
        data = 'server#{}'.format(pub_id)
        msg = "{} {}".format(topic, data)
        sock.send_string(msg)
        time.sleep(0.2)

다음은 클라이언트이다. 클라이언트는 두 개의 소켓을 가지고 있으면서 각각 데이터서버와 명령서버에 접속한다. 두 서버의 메시지 발송주기는 다르다. 그리고 하나의 서버를 듣고 있으면 다른 서버의 메시지를 들을 수 없으므로, 문제가 생긴다. 이 때 poller를 이용한다.

def client(topicf, data_port="4444", cmd_port="5555"):
    ctx = zmq.Context()

    cmd_sock = ctx.socket(zmq.PULL)
    cmd_sock.connect('tcp://localhost:{}'.format(cmd_port))

    data_sock = ctx.socket(zmq.SUB)
    data_sock.connect('tcp://localhost:{}'.format(data_port))
    data_sock.setsockopt_string(zmq.SUBSCRIBE, '' if len(str(topicf)) == 0 else str(topicf)[0])

    poller = zmq.Poller()
    poller.register(cmd_sock, zmq.POLLIN)
    poller.register(data_sock, zmq.POLLIN)



    cli_id = random.randrange(100, 999)
    print('start client', cli_id)

    while 1:
        socks = dict(poller.poll())
        if cmd_sock in socks and socks[cmd_sock] == zmq.POLLIN:
            cmd = cmd_sock.recv_string()
            if cmd == "EXIT":
                print("client #{} received EXIT command.".format(cli_id))
                break
        if data_sock in socks and socks[data_sock] == zmq.POLLIN:
            msg = data_sock.recv_string()
            topic, data = msg.split()
            print("Processing:", topic, data, "#{}".format(cli_id))

폴러를 사용하는 방법은

  1. zmq.Poller 클래스의 인스턴스를 만들고
  2. 폴러의 register를 이용해서 소켓들을 등록한다.
  3. 폴러는 만들어진 소켓 중에서 메시지가 들어온 소켓들을 내놓는다.
  4. 만약 명령 서버와 연결된 소켓에 메시지가 들어있으면 명령이 EXIT인지 검사. EXIT라면 클라이언트를 종료한다.
  5. 데이터소켓에 메시지가 있으면 이를 처리한다.

최종적으로 각각의 프로그램을 실행한다.

if __name__ == '__main__':
    Process(target=srv_push).start()
    Process(target=srv_pub).start()
    Process(target=client, args=('9',)).start()
    Process(target=client, args=('',)).start()

최종 코드는 아래와 같다.