태그 보관물: 파이썬

zmq-how to

how to use zmq

컨텍스트

zmq 라이브러리를 사용할 때, 반드시 컨텍스트 객체를 생성해야 한다.

import zmq
ctx = zmq.Context()

컨텍스트는 소켓과 달리 스레드안전하다. 단일 앱은 하나 이상의 컨텍스트를 만들 수 있다.

소켓만들기

소켓은 컨텍스트로부터 생성한다.

socket = ctx.socket(zmq.REP)

zmq.REP는 메시징패턴을 지정하는 값이다. 이는 [[zmq 메시징패턴]]을 참고.

zeromq 메시징패턴

메시징 패턴은 상호연결된 시스템간의 커뮤니케이션 플로우를 묘사하는 구조적 패턴을 말한며, zmq는 이러한 패턴의 장점을 살릴 수 있는 최적화된 소켓들을 제공한다.

ZMQ의 각 패턴은 네트워크 위상에 대한 제한들을 정의하고 있으며, 이러한 패턴들은 스케일을 고려하여 설계되었다. 기본적으로 정의된 패턴에는 다음 네 종류가 있다.

  1. PAIR
  2. Client/Server
  3. Publish/Subscribe
  4. Push/Pull

PAIR

PAIR 패턴은 전통적인 소켓처럼 동작하는 소켓을 정의하는데, 전통적 소켓 다음과 같은 특징을 가진다.

  • 1:1 통신
  • n:1 통신 (하나의 서버, 여러 대의 클라이언트)
  • 1:n 통신 (멀티캐스트)

그중 PAIR는 독점쌍 패턴을 의미하는데

  • 통신은 양방향이다.
  • 소켓내에는 어떤 특정한 상태가 저장되지 않는다.
  • 1쌍의 연결된 peer만이 있다.
  • 서버가 특정 포트를 듣고, 클라이언트가 여기에 접속한다.

.

다음의 간단한 예제로 PAIR 소켓이 얼마나 간단하게 만들어지는지, 그리고 메시지 길이에 상관없이 온전한 메시지를 받게 된다는 점을 보여준다.


""" ZeroMQ socket server """ import zmq import random import sys import time port = "5556" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.bind("tcp://*:%s" % port) while True: socket.send_string("Sever message to client3") msg = socket.recv_string() print(msg) time.sleep(1)

이에 대한 클라이언트의 코드는

import zmq
import random
import sys
import time

port = "5556"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)

while True:
    msg = socket.recv_string()
    print(msg)
    socket.send_string("client message to server1")
    socket.send_string("client message to server2")
    time.sleep(1)

Client/Server

클라이언트/서버 패턴은 Request/Reply 패턴이라고도 하는데, 클라이언트-서버 모델의 기본이 되는 패턴으, 클라이언트가 서버로 요청을 보내고 그에 대한 응답을 받는 것이다.

물론 PAIR 패턴으로도 이는 구현가능한데, PAIR는 다른 패턴들과는 다음의 차이를 갖는다.

  • zmq.req 소켓은 여러 서버에 접속하는 것이 가능하다.
  • 요청은 띄워지거나 두 개 서버로 분산될 수 있다.

또 PAIR를 이용하면 연결된 피어에 대해서 여러 메시지를 주고 받을 수 있지만,

  • zmq.REQ 소켓은 응답을 받을 때까지 다른 전송을 블럭한다.
  • zmq.REP 소켓은 요청을 받을 때까지 recv 동작을 블럭한다.

이는 다음과 같이 그려진다.

!(.)[https://learning-0mq-with-pyzmq.readthedocs.org/en/latest/_images/reqrep.png]

흥미로운 점은 서버는 .bind()를 써서 특정한 범위 내의 포트에 대해서 들을 수 있고, 클라이언트는 connect를 이용해서 고정 포트를 사용한다는 점이다. 또한 단일 소켓에 대해서 여러 포트를 연결하는 것이 가능하다.

서버-클라이언트 모델을 간단하게 구현해보자. 먼저 서버를 구현한다.

"""req-res-server.py"""

import zmq
import time
import sys

argc = len(sys.argv)
port = sys.argv[1] if argc > 1 else "5566"

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

for _ in range(10):
    # wait for next request
    message = socket.recv_string()
    print("received request: ", message)
    time.sleep(1)
    socket.send_string("World from %s" % port)

서버는 zmq.REP 패턴을 이용하는 소켓을 쓰며 .bind()를 이용한다는 차이가 있을 뿐 PAIR와 거의 같다.

클라이언트는 다음과 같이 쓸 수 있다.

"""req-res-client.py"""
import zmq
import sys

argc = len(sys.argv)

ctx = zmq.Context()
sock = ctx.socket(zmq.REP)

port = sys.argv[1] if argc > 1 else "5566"
sock.connect("tcp://localhost:{}".format(port))
for port in sys.argv[2:]:
    sock.connect("tcp://localhost:{}".format(port))

for i in range(10):
    print("sending", i)
    sock.send_string("hello {}".format(i))
    msg = sock.recv_string()
    print(msg)

클라이언트는 소켓에 하나 이상의 연결을 만들 수 있고, 이 경우 각각의 연결을 번갈아가면서 쓰게 된다.

다음과 같이 구동해본다. (각각의 명령은 각각의 콘솔창에서 실행한다.)

python req-res-server.py 5544
python req-res-server.py 5566
python req-res-server.py 5588
python req-res-client.py 5544 5566 5588

포트 5544, 5566, 5588에 물린 서버를 각각 A, B, C 라 하면 다음과 같은 시나리오로 동작한다.

  1. 서버 A가 시작된다. 시작된 서버는 요청을 기다린다.
  2. 서버 B, C 가 각각 시작된다.
  3. 클라이언트가 시작되고 A, B, C 로의 연결을 소켓에 만든다.
  4. 서버 A에 요청을 보낸후 응답을 기다린다.
  5. 서버 A에게 응답을 받으면 이를 출력하고 다시 서버 B에 요청을 보낸 후 응답을 기다린다.
  6. 서버 B에게 응답을 받으면 이를 출력하고 다시 서버 C에 요청을 보낸 후 응답을 기다린다.
  7. 서버 C에게 응답을 받으면 이를 출력하고 다시 서버 A에 요청을 보낸 후 응답을 기다린다.
  8. 5~7의 과정을 반복한다.

클라이언트쪽에 출력되는 결과는 다음과 같다.

sending:  0                            
World from 5544                        
sending:  1                            
World from 5566                        
sending:  2                            
World from 5544                        
sending:  3                            
World from 5566                        
sending:  4                            
World from 5544                        
sending:  5                            
World from 5566                        
sending:  6                            
World from 5544                        
sending:  7                            
World from 5566                        
sending:  8                            
World from 5544                        
sending:  9                            
World from 5566                        

이 중간에 다른 클라이언트를 만들어 넣었을 때에도 각각의 클라이언트는 동일 서버에 접속 시 접속한 순서대로 응답을 받게 된다.

이때 중요한 것은 클라이언트는 요청을 보낸 후 응답을 .recv* 함수들을 이용해서 받아야 한다. 메시지를 받지 않고 다른 요청을 전송하면 에러가 발생한다.

Publish/Subscribe

Publish/Subscribe 패턴은 조금 더 흥미롭다. publisher는 메시지가 어느 수신자에게 갈지 결정하지 않는다. 메시지들은 수신자가 누군지 얼마나 되는지도 알 필요없이 발송된다.

두번째 시나리오는 보다 더 잘 알려져 있는데, 구독자가 퍼블리셔로부터 메시지를 받는 것이다.

  1. 하나의 구독자가 하나의 퍼블리셔로부터 메시지를 받는다.
  2. 하나의 구독자가 복수의 퍼블리셔로부터 메시지를 받는다.
  3. 복수의 구독자가 하나의 퍼블리셔로부터 메시지를 받는다.
  4. 복수의 구독자가 복수의 퍼블리셔로부터 메시지를 받는다.

이를 구현해보는 간단한 예를 만들어보자. 퍼블리셔(서버)는 토픽과 매번 바뀌는 랜덤값을 계속해서 내보낸다. 그리고 구독자(클라이언트)는 구동하는 시점에 지정한 서버에 접속해서 데이터를 받아오게 된다.

서버는 다음과 같다.

"""pub-server.py"""

import zmq
import random
import sys
import time

ports = sys.argv[1:]
if not ports:
    ports = ["5566"]

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:{}".format(ports[0]))

while True:
    topic = random.randrange(9999, 10005)
    msgData = random.randrange(1, 215) - 80
    msg = "{} {} from {}".format(topic, msgData, ports[0])
    print(msg)
    socket.send_string(msg)
    time.sleep(1)

메시지 전송은 퍼블리셔로부터 구독자로의 단방향이다. 따라서 서버는 랜덤값을 1초마다 한 번씩 계속해서 내보낸다.

클라이언트의 경우는 아래와 같다.

"""sub-client.py"""

import zmq
import sys

ports = sys.argv[1:]
if not ports:
    ports = ["5566"]

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)

print("Collecting updates from servers....")

for port in ports:
    sock.connect("tcp://localhost:{}".format(port))

topicFilter = "10001"
sock.setsockopt_string(zmq.SUBSCRIBE, topicFilter)

total_value = 0
for update_nbr in range(10):
    string = sock.recv_string()
    topic, msgData, _, server = string.split()
    total_value += int(msgData)
    print(topic, msgData, "from", server, "->", total_value)

print("Average message data value for topic {} was {}".format(topicFilter, total_value/10))

이 예제에서 흥미로운 것은 소켓단에서 특정 문자열을 필터링하는 옵션을 줄 수 있다는 것이다. (이를 위해 socket.setsockopt_string() 메소드를 썼다.)

서버를 실행하면 서버는 각자 임의의 랜덤값을 출력하기 시작하고, 클라이언트를 이후 실행하면 그 시점부터 수집되는 데이터 중 “10001”로 시작하는 데이터만 취한다.

구독자의 경우 소켓은 비동기로 움직이므로 한 서버가 중간에 종료되더라도 개의치 않고 나머지 서버로부터 정보를 수집하며, 새로운 서버가 시작되면 그 서버에 대해서도 정보를 취합할 수 있다.

정리하자면,

  • 퍼블리셔는 연결된 구독자를 갖지 않는다. 다만 메시지를 드롭할 뿐이다.
  • TCP를 사용하는 경우, 구독자의 동작이 매우 느리다면 메시지는 퍼블리셔에게서 큐에 쌓인다.
  • 필터링은 구독자측에서 일어난다.

Push/Pull

Push/Pull은 파이프라인 패턴이라고도 한다. push 소켓은 pull 클라이언트 들에게 메시지를 보낸다. 이는 producer/consumer 패턴과 유사하지만, 컨슈머에 의해 계산된 결과가 위로 올라가지 않고 다른 pull/consumer 소켓으로 보내진다. 즉 데이터는 파이프라인을 따라 흐르고 각각의 단계에서는 최소 하나의 노드와 이어진다. 특정 파이프라인 단계가 복수의 노드에 연결되었을 때 데이터는 모든 연결된 노드 사이에서 로드 밸런싱된다.

.

producer 파일을 구현해보자.

import zmq
import time

def producer():
    context = zmq.Context()
    zmq_socket = context.socket(zmq.PUSH)
    zmq_socket.bind("tcp://127.0.0.1:5577")
    for num in range(200):
        work_message = {'num': num }
        zmq_socket.send_json(work_message)
        time.sleep(0.1)

producer()

producer는 연속 숫자를 만들면서 자신에게 연결된 클라이언트(컨슈머)에게 데이터를 보낸다. 연속적으로 생성되는 숫자는 연결된 컨슈머들에게 순차적으로 보내진다(로드밸런싱). 이 점이 push/pull 패턴과 publish/subscribe 패턴의 차이점이다.

컨슈머는 파이프라인의 중간 노드에 해당하므로 pull로부터 뽑아낸 데이터를 필터링하고, 이를 다시 결과 수집 큐에 넣는다.

import zmq

def result_collector():
    context = zmq.Context()
    recv = context.socket(zmq.PULL)
    recv.bind('tcp://127.0.0.1:5588')
    data = {}
    for x in range(20):
        result = recv.recv_json()
        if result['consumer'] in data:
            data[result['consumer']] += 1
        else:
            data[result['consumer']] = 1
        if x == 19:
            print(data)

result_collector()

결과 수집 프로그램은 지정된 포트에 바인딩된 소켓을 열어서 복수개의 컨슈머로부터 테이터를 순차적으로 받아들인다. (이 과정도 로드밸런싱되는 것 같다.) 그렇게하여 각 컨슈머로부터 받은 데이터를 컨슈머별로 개수를 집계하고 특정 개수만큼의 데이터를 처리하면 결과를 출력하고 끝낸다.

import zmq

def result_collector():
    context = zmq.Context()
    recv = context.socket(zmq.PULL)
    recv.bind('tcp://127.0.0.1:5588')
    data = {}
    for x in range(20):
        result = recv.recv_json()
        if result['consumer'] in data:
            data[result['consumer']] += 1
        else:
            data[result['consumer']] = 1
        if x == 19:
            print(data)

result_collector()

싱크홀노드가 중간에 끝나는 경우에도 중간 노드들은 큐에 데이터가 남아 있으므로 계속 기다린다. 만약 결과 수집 프로그램이 재실행되면, 큐에 남은 데이터들이 다시 순차적으로 넘어간다.

오일러 프로젝트 30 번

오일러 프로젝트 30 번

각 자리의 숫자를 4제곱해서 더했을 때 자기 자신이 되는 수는 놀랍게도 단 세 개밖에 없습니다.

1634 = 1**4 + 6**4 + 3**4 + 4**4
8208 = 8**4 + 2**4 + 0**4 + 8**4
9474 = 9**4 + 4**4 + 7**4 + 4**4
(1 = 1**4의 경우는 엄밀히 말해 합이 아니므로 제외합니다)

위의 세 숫자를 모두 더하면 1634 + 8208 + 9474 = 19316 입니다.

그렇다면, 각 자리 숫자를 5제곱해서 더했을 때 자기 자신이 되는 수들의 합은 얼마입니까?

http://euler.synap.co.kr/prob_detail.php?id=30

9**5가 59049이므로, 7자리 수 부터는 이 조건을 만족하는 수가 나올 수 없다. 따라서 999999까지만 검사하면 된다.

def transform(num, n):
    return sum((int(x) ** n for x in str(num)))

def e030():
    result = sum((x for x in range(2, 999999+1) if x == transform(x, 5)))
    print(result)
%time e030()
# 443839
# Wall time: 7.08 s