PUB-SUB 패턴이나 PUSH-PULL 패턴을 사용하면 데이터를 발생시키는 1개 (혹은 여러 개의) 노드로부터 데이터를 전달받는 N개의 노드들이 돌아가는 방식의 네트워크를 구성할 수 있음을 지난 글에서 보았다. 이 때 각각의 클라이언트 노드들은 루프를 돌면서 큐에 들어온 메시지를 순서대로 처리하게 된다.
만약 개별 클라이언트가 한 번에 처리해야 할 메시지의 최대 개수가 정해져 있다면 while 무한 루프가 아닌 for 루프를 통해서 유한한 루프를 돌 수 있을 것이다. 그 외에도 메시지의 내용으로부터 동작이나 중지를 결정할 수 있는 경우에도 그 스스로 동작을 멈출 시점을 결정할 수 있을 것이다.
여러 개의 소켓을 듣는 노드
하지만 클라이언트들의 중지를 다른 제 3의 노드가 결정해야 한다면? ‘커맨드 노드’라는 노드가 있고 이 노드가 PUB 패턴으로 모든 클라이언트들에게 중지 명령을 내리는 구조를 만든다고 가정해보자.
그렇다면 클라이언트 입장에서는 한 번에 두 개의 소켓(하나는 SUB 패턴으로 데이터를 지속적으로 수신받고, 다른 하나는 PULL 패턴으로 종료 명령을 수신받는다.) 을 사용하게 된다. 이것은 이전의 패턴들과는 사뭇다르다. 이전 예제들에서 보여왔던 다중 접속은 하나의 노드가 단일 소켓을 사용하면서, 소켓 자체가 여러 포트에 바인딩되거나, 여러 서버에 커넥트되는 것을 보였다. (그리고 ZMQ는 이 구조 내에서 메시지 전달의 교통정리를 훌륭하게 수행해준다.)
소켓으로 데이터를 반복적으로 읽어야 하는 노드의 특성상, 두 개 이상의 소켓을 하나의 노드가 갖는 것이 문제될 일은 없지만 두 노드를 한꺼번에 대기할 수는 없다. 두 개 소켓으로부터 각각 메시지를 수신하기 위해서는 각각 별도의 스레드를 통해서 처리해야 할 것이다.
POLLER
ZMQ에서는 이러한 상황을 좀 더 쉽게 해결할 수 있도록 POLLER라는 것을 제공한다. Poller는 두 개 이상의 소켓을 등록하여두면 소켓들로부터의 입력을 감지하여 (소켓, 이벤트)의 리스트를 리턴해준다. 이를 이용해서 각각의 소켓을 동시에 수신하면서 소켓별로 구분된 메시지를 얻을 수 있다.
def client(data_port=5556, com_port=5558):
ctx = zmq.Context()
# 1. SUB 소켓 연결
sock_sub = ctx.socket(zmq.SUB)
sock_sub.setsockopts_string(zmq.SUBSCRIBE, '9')
sock_sub.connect(f'tcp://localhost:{data_port}')
# 2. PULL 소켓 연결
sock_pull = ctx.socket(zmq.PULL)
sock_pull.connect(f'tcp://localhost:{com_port}')
# 3. 폴러 생성 및 설정
poller = ctx.Poller()
poller.register(sock_sub, zmq.POLLIN)
poller.register(sock_pull, zmq.POLLIN)
# 폴러로부터 이벤트 수신
should_continue = True
while should_continue:
# 4. 각각의 이벤트는 (소켓, 이벤트), (소켓, 이벤트) ... 의 형태로 전달
# 이를 사전 타입으로 변환
socks = dict(poller.poll())
if socks.get(sock_pull, None) == zmq.POLLIN:
command = sock_pull.recv_string()
if command == 'EXIT':
print('This client will be terminated...')
should_continue = False
if socks.get(sock_sub, None) == zmq.POLLIN:
topic, msg = sock_sub.recv_string().split(' ', 1)
print(f'Processing: {msg}')
poller는 복수 개의 소켓을 번갈아 수신하기 보다는 특정한 주기로 각 소켓의 메시지 큐를 검사해서 신규 메시지들을 소켓별로 하나씩 가져와서 이벤트의 리스트로 리턴해준다. 폴러에 등록되는 소켓의 타입은 서로 같은 것들이어도 무방하다. 지난 글에서 살펴본 PUSH-PULL 패턴을 사용한 분산처리는 다음과 같이 개선해볼 수 있다.
- Ventilator는 PUSH 패턴으로 복수의 Worker에게 데이터를 순차적으로 뿌린다.
- Worker는 PULL 소켓으로 데이터를 수신하여 처리한 후, 다시 PUSH 소켓으로 Sink에게 그 결과를 전달한다.
- Worker는 또한 SUB 소켓으로 중지명령을 기다리고 있다. 따라서 Worker에는 PULL, SUB 소켓이 Poller로 묶여있게 된다.
- Sink는 PULL 소켓으로 여러 Worker로부터 결과를 수신받아 취합한다.
- Sink는 모든 결과가 수신되면 PUB 소켓을 통해 모든 Worker들에게 kill 시그널을 전송한다.
이처럼 단순 소켓이 아니라 소켓을 결합해주는 Poller를 사용하면 복잡한 네트워크 그래프를 간단한 방법으로 구축할 수 있다. Poller외에도 ZMQ에서는 양 끝단에 소켓을 부착하여 메시지를 포워딩해주는 device라는 개념이 있는데, 다음 번에는 이 부분에 대해서 한 번 알아보는 기회를 갖도록 하자.
참고로 전통적인 파이썬 소켓도 하나의 스레드에서 여러 소켓들 동시에 수신하는 멀티플렉싱을 구현할 때에는 selector를 사용한다. (내부적으로 zmq 역시 셀렉터를 사용해서 구현되어 있을 듯 하다) 셀렉터를 사용한 소켓 멀티 플렉싱은 이 글을 참고하도록 하자.