Selector를 사용한 소켓 멀티플렉싱

zmq.Poller는 소켓을 멀티플렉싱할 때 사용하는 클래스입니다. 파이썬에서의 ZMQ Poller 구현은 파이썬의 내부 select 모듈을 사용합니다. 파이썬 3.4부터는 이 모듈을 개선하여 효율적인 I/O 멀티플렉싱에 대한 고수준 API를 제공하는 selectors 모듈이 있습니다.

selectors.BaseSelector는 복수 파일 객체에 대한 I/O 이벤트를 대기하기 위해 사용합니다. 이 클래스는 파일 스트림에 대한 등록, 해제와 대기에 관한 메소드들을 제공하며, selectors 모듈이 제공하는 여러 셀렉터 클래스들의 기반이 됩니다.

  • register(fileObj, events, data=None) : 셀렉터에 스트림을 등록합니다. xnix 시스템에서는 실제 파일이거나, 파일로 표현된 하드웨어일 수 있습니다. 윈도 플랫폼에서는 소켓만이 등록 가능합니다.
  • unregister(fileOj) : 등록된 스트림을 해제합니다. 스트림을 닫기 전에 반드시 해제해야 합니다.
  • select() : 등록된 스트림으로부터 이벤트가 발생할때까지 대기합니다. 이벤트가 발생하면 (키, 이벤트마스크)의 순서쌍의 리스트를 반환합니다.

이를 사용하면 다중 접속 소켓 서버를 싱글 스레드로 구현할 수 있습니다. 순서는 대략 다음과 같습니다.

  1. 소켓을 만들고 셀렉터에 등록합니다.
  2. 셀렉터가 최초 소켓에 들어오는 이벤트를 감지하면 핸들러를 호출하도록 합니다. 해당 핸들러에서 소켓의 accept()를 호출하여, 연결이 구성된 소켓 객체를 얻을 수 있습니다.
  3. 이 때 얻은 소켓을 다시 셀렉터에 등록합니다. 이제 셀렉터는 주 소켓 뿐만 아니라 연결이 구성된 소켓에 대한 이벤트도 함께 감지합니다. 들어오는 통신에 대한 이벤트는 각각 등록시에 넘겨준 방법에 의해 핸들러를 구분하여 처리할 수 있습니다.

예제를 통해서 실제로 어떻게 사용하는지 살펴보겠습니다.

먼저 소켓과 셀렉터를 만듭니다. 그리고 소켓을 셀렉터에 등록합니다. 이 때 소켓의 블럭킹 모드는 False로 설정합니다. 논블럭으로 설정된 소켓은 마치 timeout=0 으로 설정된 것과 동일하며, 소켓에서 전달받은 데이터를 읽어오거나 쓸 때 기다리지 않게 됩니다. 따라서 실제로는 에러 검출이라든지 이런 부분을 일일이 검사해야 하지만, 데이터를 읽는데 필요한 지연을 최소화할 수 있습니다.

import selectors
import socket

sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', 5551))
sock.listen(100)
sock.setblocking(False)

sel.register(sock, selectors.EVENT_READ, accept_handler)
# accept_handler는 아직 작성되지 않았습니다.

register()로 셀렉터에 연결할 때, 첫번째 인자는 대기할 스트림이되고, 세번째 인자는 스트림에 추가적으로 붙을 컨텍스트 데이터가 됩니다. 후에 select()를 통해서 리턴되는 튜플의 첫 요소에서 .fileObj로 소켓을 참조할 수 있으며, .data를 사용해서 data= 파라미터로 넘긴 값을 참조할 수 있습니다. 따라서 accept_handler.data 요소로 찾게 됩니다. 주 소켓을 듣는 루프를 다음과 같이 작성할 수 있겠네요.

while True:
  events = sel.select()
  for (key, mask) in events:
    key.data(key.fileObj, mask)

이 코드에서 key는 실제로는 selectors.SelectorKey이고 내부적으로는 namedtuple입니다.

select() 메소드는 셀렉터를 듣고 있다가 이벤트가 감지되면 해당 이벤트에 대한 키와 마스크 정보의 리스트를 넘겨받게 됩니다. 아마 최초로 발생하는 이벤트는 주 소켓에 연결 요청이 들어올 때가 됩니다. 각 소켓 객체의 핸들러 함수는 key.data에 맵핑되어 있으므로 필요한 정보를 넘기면 됩니다. 여기서는 스트림으로 추상화된 소켓과 마스크 정보를 넘겨줍니다. 그럼 accept_handler() 함수 역시 소켓과 마스크 정보를 인자로 받는 함수로 아래와 같이 작성할 수 있습니다.

def accept_handler(sock, mask):
  conn, addr = sock.accept()
  print(f'Accepted {conn} from {addr}')
  conn.setblocking(False)
  sel.register(conn, selectors.EVENT_READ, read_handler)
  # read_handler는 아직 작성되지 않았습니다.
   

accept_handler 가 하는 일은 단순합니다. 연결요청이 들어왔다는 사실을 이미 알고 있는 상황이므로 sock.accept()를 출하여 conn 소켓을 얻습니다. 이제 conn 소켓으로 데이터가 들어올때를 다른 소켓과 함께 기다리기 위해서 conn도 셀렉터에 등록합니다. 각각의 conn 은 실제로 데이터를 수신하고 에코잉해야 하므로 별도의 핸들러를 사용해야 합니다. 해당 핸들러의 형식은 aceept_handler와 동일할 것이기 때문에 별 어려운 상황은 아닙니다.

이번에는 read_handler() 함수를 작성하겠습니다. conn은 aceept_handler에서 생성된 후 sel에 등록됩니다. 이후 메인 루프의 sel.select()에서 키 객체가 conn에 해당한다면 해당 소켓으로 데이터가 들어오려는 것입니다. 이를 읽어서 출력하고, 다시 돌려보냅니다.

클라이언트가 접속을 끊고 소켓을 닫는 시점에, 연결이 수립되어 있다면 서버 쪽으로 빈 데이터를 보내서 시그널을 줍니다. 이 시그널을 받을 때에는 소켓을 닫습니다. 소켓을 닫기 전에는 sel.unregister()를 호출해서 등록되었던 내용을 제거해줍니다.

def read_handler(conn, mask):
  data = conn.recv(1024)
  if data:
    print(f'echoing {repr(data)} to {conn}')
    conn.sendall(data)
  else:
    print('closing...')
    sel.unregister(conn)
    conn.close()

설명을 위해서 코드의 순서는 저렇게 썼지만, register 하는 항목 앞에 해당 핸들러의 정의가 나와야 합니다. 결국 전체 코드는 다음과 같겠습니다.

https://gist.github.com/sooop/1ca2cbbd3cf42f718a8b67ce7702967c