Selector를 사용한 소켓 멀티플렉싱

zmq.Poller는 소켓을 멀티플렉싱할 때 사용하는 클래스입니다. 파이썬에서의 ZMQ Poller 구현은 파이썬의 내부 select 모듈을 사용합니다. 파이썬 3.4부터는 이 모듈을 개선하여 효율적인 I/O 멀티플렉싱에 대한 고수준 API를 제공하는 selectors 모듈이 있습니다.

selectors.BaseSelector는 복수 파일 객체에 대한 I/O 이벤트를 대기하기 위해 사용합니다. 이 클래스는 파일 스트림에 대한 등록, 해제와 대기에 관한 메소드들을 제공하며, selectors 모듈이 제공하는 여러 셀렉터 클래스들의 기반이 됩니다.

다음은 selectors.BaseSelector가 제공하는 기능입니다. BaseSelector는 그 자체로 인스턴스를 만들지 않으며, selectors 모듈에는 이를 상속한 다른 클래스들이 있습니다. 특히 DefaultSelector는 해당 플랫폼에서 가장 일반적으로 사용할 수 있는 셀렉터 클래스를 가리키므로 우리는 보통 이것을 사용할 것입니다.

  • register(fileObj, events, data=None) : 셀렉터에 스트림을 등록합니다. xnix 시스템에서는 실제 파일이거나, 파일로 표현된 하드웨어일 수 있습니다. 윈도 플랫폼에서는 소켓만이 등록 가능합니다.
  • unregister(fileOj) : 등록된 스트림을 해제합니다. 스트림을 닫기 전에 반드시 해제해야 합니다.
  • select() : 등록된 스트림으로부터 이벤트가 발생할때까지 대기합니다. 이벤트가 발생하면 (키, 이벤트마스크)의 순서쌍의 리스트를 반환합니다.

셀렉터를 잘 활용하면 다중 접속 소켓 서버를 싱글 스레드로 구현할 수 있습니다. 일종의 런루프에서 이벤트를 풀링하는 방식이며, 구현 순서는 대략 다음과 같습니다.

  1. 소켓A를만들고 논블럭방식으로 설정하며, listen()을 호출합니다.
  2. listen()은 논블럭이기 때문에 바로 리턴됩니다. 이 소켓 A를 다시 셀렉터에 등록합니다.
  3. 루프를 돌면서 셀렉터의 select()를 호출합니다. 클라이언트가 접속하여 소켓에 이벤트가 발생할 때까지 기다리는 것은 여기서 실행됩니다.

예제를 통해서 실제로 어떻게 사용하는지 살펴보겠습니다.


소켓 등록

소켓을 생성하고 설정합니다. 이 때 소켓의 블럭킹 모드를 False로 설정합니다. 논블럭으로 설정된 소켓은 마치 timeout=0 으로 설정된 것과 동일하며, 소켓에서 전달받은 데이터를 읽어오거나 쓸 때 기다리지 않게 됩니다. 따라서 실제로는 에러 검출이라든지 이런 부분을 일일이 검사해야 하지만, 데이터를 읽는데 필요한 지연을 최소화할 수 있습니다.

import selectors
import socket

sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', 5551))
sock.listen(100)
sock.setblocking(False)

sel.register(sock, selectors.EVENT_READ, accept_handler)
# accept_handler는 아직 작성되지 않았습니다.

register()를 사용하여 셀렉터 객체에 스트림을 연결할 때, 첫번째 인자는 대기할 스트림이되고, 두번째 인자는 이벤트 타입 (이 경우 읽기이므로 EVENT_READ) 세번째 인자는 스트림에 추가적으로 붙을 컨텍스트 데이터가 됩니다. 나중에 필요한 정보는 이 세번째 인자로 넘겨주게 되는데, 여기서는 핸들러 함수를 넘겨주도록 합니다.

이후 셀러터의 select()가 리턴되는 튜플((키, 마스크))에서 첫번째 요소는 SelectorKey 인스턴스입니다. 이 클래스의 .fileObj속성은 스트림을 가리키므로 소켓이 될 것이고, .data 속성은 아까 넘겨준 세번째 인자입니다. 따라서 select()가 리턴하고 나면 이후 발생한 이벤트들은 루프를 돌면서 처리해줍니다.

접속이벤트 처리

핸들러가 event[0].data 로 들어있을 것이라는 점을 기억하고 아래의 코드를 더 추가합니다.

while True:
  events = sel.select()
  for (key, mask) in events:
    key.data(key.fileObj, sel)

참고로 이 electors.SelectorKey 타입은 사실 namedtuple입니다.

이 루프는 들어오는 연결을 기다리는 루프가 됩니다. 셀렉터를 사용하지 않는다면 소켓의 listen() 이 리턴한 후에는 소켓의 accept()를 호출하여 해당 연결과 통신할 또다른 소켓을 (보통 예제에서는 conn 으로 명명하는) 얻게 됩니다. 따라서 아직 구현하지 않는 accept_handler 함수는 소켓에 새로운 클라이언트가 접속했을 때 이 작업을 처리해주어야 합니다.


다음은 accept_handler 함수의 구현입니다. 클라이언트가 접근한 소켓에 대해서 accept를 호출하여 통신을 위한 또다른 소켓을 생성합니다. 이 때 이 소켓을 통해서 데이터를 주고받는 일에 대해서 생각해보겠습니다. accept가 끝나고 새로 만들어진 소켓을 통해 통신하는 사이에, 다른 클라이언트를 기다리고 있는 주 소켓은 어떻게 될까요? 당연히 아무것도 못하고 기다리게 됩니다. 따라서 각각의 연결에 대해서도 셀렉터에 등록해서 처리될 수 있도록 해줍니다. 즉 accept가 끝나서 얻게 되는 conn 객체(소켓입니다.)를 다시 셀렉터에 등록하며, 이 때에는 에코잉을 처리하는 핸들러를 같이 넘겨줍니다.

이 때문에 셀렉터 객체는 함수의 인자로 받도록 합니다. 핸들러의 코드는 다음과 같습니다.

def accept_handler(sock, sel):
  sock, addr = sock.accept()
  print(f'Accepted {sock.getpeername()} from {addr}')
  sock.setblocking(False)
  sel.register(sock, selectors.EVENT_READ, read_handler)
  # read_handler는 아직 작성되지 않았습니다.
   

이제 클라이언트와 연결이 만들어지고 위 핸들러가 잘 실행된다면 그 상황에서 셀렉터는 (a) 소켓을 통한 또 다른 새로운 연결을 기다리면서 (b) 연결된 클라이언트로부터 데이터를 수신할 수 있습니다.


이번에는 read_handler() 함수를 작성하겠습니다.

참고로 연결이 끊어질 때의 처리를 눈여겨보도록 합니다. 소켓 연결에서 클라이언트가 접속을 끊고 소켓을 닫는 시점에, 해당 소켓의 너머로 연결이 수립되어 있다면 서버 쪽으로 시그널을 줍니다. 파이썬 소켓은 이 경우 빈 바이트 객체를 리턴해주기 때문에, 이를 종료 시그널로 인식할 수 있습니다.

셀렉터는 등록된 소켓 객체에 대한 참조를 유지하기 때문에 메모리 누수를 방지하기 위해서는 연결되었던 소켓을 닫기 전에는 sel.unregister()를 호출해서 등록되었던 내용을 제거해주어야 합니다.

def read_handler(conn, sel):
  data = conn.recv(1024)
  if data:
    print(f'echoing {repr(data)} to '
          f'{conn.getpeername()}')
    conn.sendall(data)
  else:
    print('closing...')
    sel.unregister(conn)
    conn.close()

실행 가능한 예제의 전체 코드를 아래에 게재합니다. 이 예제에서 서버는 위와 동일한 방식으로 구현된 싱글스레드 에코서버입니다. 그리고 multiprocessing 모듈을 사용하여 여러개의 클라이언트를 순차적으로 생성하여 서버에 접속시킵니다.

에코서버가 입력을 멀티플렉싱으로 처리하지 못한다면 서버가 클라이언트 하나와 연결되어 있고 이를 처리하지 못한다면 연결수신과 데이터수신시의 메시지가 순차적으로 처리되거나 혹은 클라이언트 쪽에서 연결이 실패(서버의 listen()의 인자값이 너무 작은 경우)하는 경우가 있을 수 있습니다.

하지만 예제에서 보듯, 서버는 listen(1)을 쓰고 있음에도 불구하고 매우 짧은 시간 간격으로 접속한 모든 클라이언트를 잘 처리하고 있습니다. 또한 싱글 스레드이기 때문에 한 번에는 하나의 read_handler 만이 처리되며, read_handler 내에서 sleep 할 때에는 다른 핸들러가 처리되지 않습니다. 하나의 핸들러 내에서 I/O 대기가 발생할 때 다른 핸들러를 효율적으로 처리하려면 asyncio를 사용해야 합니다.