Wireframe

ZMQ 소켓을 모니터링하기

zmq 소켓을 사용한 확장 패턴에서 zmq 프록시를 사용하는 경우, 일반적으로 두 개의 소켓을 사용하여 프론트엔드와 백엔드로 사용하는데, 세번째 소켓인 캡쳐를 추가할 수 있다. 캡쳐는 주로 publisher를 사용하는데, 이 소켓으로는 프록시 내부에서 흘러가는 모든 데이터가 캡쳐되어 발송된다. 이런 방식으로 우리는 zmq 프록시의 내용을 모니터링할 수 있다.

이와는 별도로 각각의 zmq 소켓은 그 자체로 모니터링 기능을 제공한다. 소켓은 프로그램과 네트워크 사이를 연결하는 인터페이스로, 소켓을 통과하는 데이터를 캡쳐링할 필요는 없다. 대신에 소켓의 모니터링 기능은 소켓이 네트워크에서 피어와 연결할 때 발생하는 이벤트를 감지한다.

소켓의 모니터링 기능은 다음과 같이 사용한다. 소켓에 대해서 monitor() 메소드를 호출하는데, 전달한 주소에 소켓 내부 이벤트를 외부로 내보내는 PAIR 소켓이 하나 바인딩된다.

import asyncio
import zmq
from zmq.asyncio import Context

async def server():
    sock = ctx.socket(zmq.REP)
    sock.bind("tcp://*:5555")
    sock.monitor("inproc://server.mon", zmq.EVENT_ALL)
    while True:
        data = await sock.recv()
        await asyncio.sleep(0.5)
        await sock.send(data)

모니터링 소켓으로부터 해당 소켓에서 일어나는 일을 감지할 수 있다. 별도의 PAIR 소켓을 여기에 연결해서 메시지를 수신하면, 어떤 이벤트와 이벤트가 발생한 주소를 알 수 있다. 이 때 모니터링 소켓이 발송하는 메시지는 [이벤트, 주소]로 구성되는 두 개의 프레임으로 나눠진 멀티파트 메시지가 된다.

각각의 이벤트는 아래와 같이 미리 정의된 상수로 표현된다. 만역 두 개 이상의 이벤트가 아주 짧은 간격에 발생됐다면, 두 개 이벤트가 OR 연산으로 묶여서 발송된다. (핸드셰이크 관련 이벤트는 api로는 노출되어 있는데, 문서상으로는 설명이 없음)

EVENT_CONNECTED : 1 - 소켓이 원격 피어에 성공적으로 연결됨
EVENT_CONNECT_DELAYED : 2 - 커넥트 요청이 보류되고 있음
EVENT_CONNECT_RETIRED : 4 - 커넥트 요청에 실패했고 재시도 중임.
EVENT_LISTENING : 8 -  소켓이 네트워크 인터페이스에 성공적으로 묶였음.
EVENT_BIND_FAILED : 16 - 소켓이 네트워크 인터페이스와 묶이는데 실패함
EVENT_ACCEPTED : 32 - 원격 피어의 연결 요청을 수락함
EVENT_ACCEPT_FAILED : 64 - 원격 피어의 연결 요청을 거절했음
EVENT_CLOSED : 128 - 소켓이 닫힘
EVENT_CLOSE_FAILED : 256 - 소켓을 닫는데 실패했음
EVENT_DISCONNECTED : 512 - 소켓이 예기치않게 연결이 끊겼음
EVENT_MONITOR_STOPPED : 1024 - 이 소켓에 대한 모니터링이 종료됨
EVENT_HANDSHAKE_FAILED_NO_DETAIL : 2048 - 핸드셰이크에 실패함
EVENT_HANDSHAKE_SUCCEEDED : 4096 - 핸드셰이크에 성공함
EVENT_HANDSHAKE_FAILED_PROTOCOL : 8192
EVENT_HANDSHAKE_FAILED_AUTH : 16384
EVENT_ALL : 65535

모니터링 소켓으로부터 데이터를 이벤트를 확인하는 코드는 아래와 같은 식으로 구성된다. 찾기를 원하는 이벤트가 있다면 해당 이벤트와 소켓이 보낸 이벤트를 bitwise AND 연산으로 비교하여 찾으면 된다. 실제로 테스트해보면 클라이언트가 연결됐을 때 accepted / handshake_succeeded 이벤트가 하나의 메시지로 전달되는 것을 확인할 수 있다. 대신 소켓이 피어와 성공적으로 연결된 후, 정상적으로 메시지를 주고 받을 때에는 메시지 내용을 따로 캡쳐하지는 않는다.

async def monitor():
    sock = Context.instace().socket(zmq.PAIR)
    sock.connect("inproc://mon")
    while True:
        ev, addr = sock.recv_multipart()
        event = int.from_bytes(ev, 'little')
        if zmq.EVENT_ACCEPTED & event > 0:
           print(f"Accepted: {addr.decode()}")
        if zmq.EVENT_HANDSHAKE_SUCCEEDED & event > 0:
           print("Handshake Succeeded: {addr.decode()}")
        # ... 
 

Exit mobile version