Selector를 사용한 소켓 멀티플렉싱

소켓을 사용하여 간단한 서버를 만들 때에는 서버 소켓을 생성하고, 이를 특정한 네트워크 포트에 바인드한 다음, listen() 메소드를 사용해서 해당 포트로 들어올 수 있는 접속 대기열의 크기를 지정합니다. 그런 다음 해당 소켓의 accept() 메소드를 사용해서 클라이언트 소켓을 생성하고, 이 클라이언트 소켓을 통해 클라이언트가 보낸 요청을 읽고, 그에 대한 응답을 보내게 됩니다.

서버 소켓은 클라이언트가 접속할 때마다 ‘서버가 사용하는 클라이언트 소켓’을 따로 생성하고 실제 통신은 두 클라이언트 소켓 사이의 peer-to-peer 방식의 대화가 됩니다. 따라서 하나의 서버 소켓은 여러 클라이언트의 접속을 받을 수 있습니다.

만약 다중 접속을 허용하는 소켓 서버를 파이썬에서 구현한다면 가장 쉬운 방법은 스레드를 사용하는 것입니다. 클라이언트 소켓을 인자로 받는 핸들러 함수를 하나 작성하고, 서버 소켓의 accept() 메소드가 리턴하는 시점에 핸들러 함수에게 클라이언트 소켓을 주고 새로운 스레드에서 작동하도록 시작해주면 됩니다.

여기까지의 작동 모델은 ‘동기식 소켓’을 사용합니다. 동기식 소켓은 send(), recv(), accept() 등의 동작이 모두 블럭되는 소켓입니다. 따라서 스레드가 소켓의 입출력을 기다리는 동안에는 다른 일을 할 수가 없습니다. 그래서 서버 소켓과 클라이언트 소켓들이 동시에 작동할 수 없으니 스레드를 사용하는 것이겠죠.

소켓 라이브러리는 이와 다른 비동기 소켓을 지원하고 있습니다. 비동기 소켓은 소켓을 바인딩하기 전에 sock.setblocking(False)를 명시해서 블록킹 모드를 논블록킹으로 변경해줍니다. 이렇게 만들어진 비동기 소켓을 소켓 API만으로 사용할 수는 없습니다. Python How To 문서는 select.select() 를 사용할 것을 추천합니다만, 이는 문서가 오래되었음을 감안해야 하며 실제 파이썬 공식문서는 보다 고수준으로 설계되어 사용하기 쉬운 selectors 모듈을 쓸 것을 추천하고 있습니다.

이 글에서는 selectors 모듈을 사용하여, 단일 스레드에서 하나의 소켓 서버가 여러 클라이언트의 요청을 처리하는 멀티플렉싱을 어떻게 구현하는지 소개하며, 셀렉터 사용 방법에 대해서 살펴보겠습니다.

selectors 모듈의 핵심 객체

IO 멀티플렉싱은 Selector 객체에 의해 이뤄집니다. 셀렉터는 운영체제가 지원하는 select()함수 호출에 대한 엑세스를 제공해주는 객체입니다. 내부적으로 사용하는 커널 함수에 따라서 여러 가지 클래스가 제공되지만, 일반적으로 DefaultSelector를 생성해서 사용합니다. 이 클래스는 현재 운영체제가 지원하는 가장 일반적이고 효율적인 구현체를 가리키고 있습니다. 따라서 대부분의 경우에는 이 클래스를 사용해야 합니다.

다음은 이벤트 입니다. 이벤트는 읽기 가능한 이벤트와 쓰기 가능한 이벤트로 구분하며, 이들은 각각 selectors.EVENT_READ, selectors.EVENT_WRITE라는 상수로 정의되어 있습니다.

셀렉터를 사용하는 방법은 다음과 같습니다.

  1. DefaultSelector의 인스턴스를 생성합니다. 기본적으로 셀렉터는 생성시 아무런 인자를 필요로 하지 않습니다.
  2. 생성한 셀렉터에 소켓 객체를 등록합니다. 이 때 이벤트 타입과, 부가 데이터를 함께 등록합니다. 부가 데이터는 해당 소켓에 이벤트가 발생했을 때 사용되는 정보입니다. 파이썬에서는 함수도 객체이므로 여기에 함수를 넘겨줄 수 있습니다.
  3. 셀렉터의 select() 함수를 호출합니다. 해당 함수는 현재 스레드를 블럭하면서 셀렉터에 등록된 IO 장치(소켓)에 특정한 이벤트가 준비되었는지 확인합니다. 이벤트가 발생했다면 select() 함수는 (SelectorKey, EventMask) 튜플의 리스트를 리턴합니다.
  4. 셀렉터 키는 소켓이나 파일 객체, 이벤트, 등록시 전달한 데이터 객체등을 담고 있는 객체입니다. 이 객체를 통해서 소켓과 콜백 함수를 모두 얻을 수 있으므로 필요한 작업을 수행하면 됩니다.

특정한 소켓을 모두 사용했으면, 소켓을 닫고 파괴하기 전에 해당 소켓을 셀렉터로부터 등록해제하는 작업도 해주는 것이 좋습니다.


논블럭 소켓 등록

논블럭모드로 소켓을 생성하고, 이를 셀렉터에 등록하는 비동기 소켓 서버를 구현해보겠습니다.

import selectors
import socket

def server_run(port=5559):
  selector = selectors.DefaultSelector()
  # 서버 소켓 생성
  sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  sock.bind(('localhost', port))
  sock.setblocking(False)
  sock.listen(5)

  # 셀렉터에 서버 소켓 등록
  # 콜백인 accept_handler는 추후에 작성
  selector.register(sock, selectors.EVENT_READ, accept_hadler)
  # ... 이후에 계속
  pass

위 코드에서 sock은 서버 소켓입니다. 이 소켓에 접속이 들어오면 읽기 이벤트가 발생을 합니다.이 이벤트가 발생하기 전까지는 서버 소켓의 accept가 아닌 셀렉터의 select()를 기다려야 합니다. 코드의 남은 부분을 마저 완성해보겠습니다.

def server_run(port=5559):
  # ...
  selector.register(sock, selectors.EVENT_READ, accept_hadler)
  while True:
    for (key, mask) in selector.select():
      key: selectors.SelectorKey
      srv_sock, callback = key.fileobj, key.data
      callback(srv_sock, selector)

select() 함수는 호출후 특정 이벤트가 발생하는 것을 기다립니다. 이 이벤트란 어떤 file-like한 IO객체가 읽기 혹은 쓰기 가능한 상태가 되는 것을 말합니다. 루프를 시작하기 전에 셀렉터에 서버 소켓을 등록했기 때문에 클라이언트가 접속하려하면 해당 콜백이 호출될 것입니다.

이 함수의 리턴 값은 (SelectorKey, EventMask)로 구성되는 튜플의 리스트입니다. 이 때 SelectorKey는 셀렉터에 등록된 장치와 이벤트, 데이터에 대한 정보를 구성하고 있습니다.

SelectorKey.fileobj - 소켓
SelectorKey.fd      - 저수준 파일 디스크립터
SelectorKey.events  - 해당 IO가 대기하는 이벤트
SelectorKey.data    - 등록시 사용된 임의 데이터

그럼 콜백을 어떻게 작성해야할지 살펴보겠습니다.


접속 처리 핸들러 작성

클라이언트의 접속은 서버 소켓의 이벤트이며, 이를 처리하는 것은 accept() 메소드입니다. 이를 통해서 우리는 클라이언트 소켓을 얻게 됩니다. 이 때, 서버 소켓이 다른 클라이언트의 접속을 기다리는 동시에 클라이언트 소켓이 통신할 수 있게 하기 위해서는 같은 셀렉터에 클라이언트 소켓을 등록해줘야 합니다. 따라서 접속시 호출되는 콜백은 소켓 뿐만 아니라 셀렉터도 인자로 받아야 합니다.

def accept_handler(sock: socket.socket, sel: selectors.BaseSelector):
  # sel의 실제 정확한 타입을 알 수 없으므로, 추상 기반 클래스로 가정한다.
  # 많은 예제에서 보여주는 accept()의 첫번째 리턴값은
  # 클라이언트 소켓이다.
  conn: socket.socket
  conn, addr = sock.accept()
  
  # 클라이언트 소켓의 수신은 서버 소켓의 접속수용과 같은 스레드에서
  # 이뤄져야하므로 conn 역시 셀렉터에 등록해야 한다.
  # 역시 read_handler는 아직 작성 전이다.
  sel.register(conn, selectors.EVENT_READ, read_handler)
  
  

어떤 클라이언트가 서버 소켓에 접속한다면, 서버 소켓에 의한 읽기 가능 이벤트가 발생할 것이고, 위 함수가 호출됩니다. 그러면 클라이언트 소켓이 생성되고, 해당 클라이언트 소켓 역시 셀렉터에 등록이 되겠죠. 이후에 새로운 클라이언트가 접속한다면, 서버 소켓은 클라이언트 소켓이 블럭하지 않으므로 즉시 새로운 접속을 수용할 수 있습니다.


이제, 클라이언트 소켓이 어떻게 접속을 처리하는지 살펴보겠습니다.

def read_handler(sock: socket.socket, sel: selectors.BaseSelector):
  # read from client and send reply
  payload = sock.recv(2048)
  time.sleep(.5)
  sock.sendall(payload)

  # unregister and close socket
  sel.unregister(sock)
  sock.close()

앞서 작성한 서버쪽 코드에 의해서, 서버 소켓이 읽기 가능한 상태가 되었을 때에는 accept_handler()가 서버 소켓과 함께 호출되며, 여기서 클라이언트 소켓을 생성해서 셀렉터에 등록해주기 때문에, 클라이언트 소켓이 읽기 가능한 상태가 되면 read_handler()가 해당 클라이언트 소켓과 함께 호출됩니다.

클라이언트 소켓을 읽고 쓰는 방법은 아무런 차이가 없습니다. 다만, 용도가 끝난 소켓을 닫을 때, 셀렉터에서 등록해제해야 합니다. 셀렉터는 등록된 소켓에 대해 강한 참조를 유지하기 때문에, 여기서 해제해주지 않는다면 우리는 닫힌 소켓을 영영 파괴할 수 없게 됩니다.


파이썬의 논블럭 소켓 사용은 셀렉터를 사용해야 하는 것을 제외하면 실질적으로 블럭킹 소켓을 사용하는 것과 코드의 차이가 없습니다. 논블럭 소켓에 대해서 저수준의 read() 함수는 즉시 리턴하기 때문에 별도의 루프를 도는 작업이 필요하겠지만, 파이썬 소켓의 recv()는 이러한 루프를 포함하고 있는 것처럼 보입니다.


from multiprocessing import Process
import selectors
import socket
import time
import random
def read_handler(
sock: socket.socket,
sel_: selectors.BaseSelector):
data = sock.recv(1000)
if data:
print(f'echoing: {repr(data)} | '
f'{sock.getpeername()}')
time.sleep(random.randrange(1, 10) / 10)
sock.send(data)
else:
print('closing…')
sel_.unregister(sock)
sock.close()
def accept_handler(
sock: socket.socket,
sel_: selectors.BaseSelector):
conn: socket.socket
addr: str
conn, addr = sock.accept()
print(f'accepted: {conn.getpeername()} from {addr}')
conn.setblocking(False)
sel_.register(conn, selectors.EVENT_READ, read_handler)
def start_server(port=5577):
sel = selectors.DefaultSelector()
sock = socket.socket()
sock.bind(('localhost', port))
sock.setblocking(False)
sock.listen(100)
sel.register(sock, selectors.EVENT_READ, accept_handler)
while True:
events = sel.select()
for key, mask in events:
callback = key.data
callback(key.fileobj, sel)
def start_client(word: str, port=5577):
sock = socket.socket()
sock.connect(('localhost', port))
sock.sendall(word.encode())
time.sleep(random.randrange(1, 5) / 10)
data = sock.recv(1024)
print(data.decode())
sock.close()
def main():
srv = Process(target=start_server)
srv.start()
words = 'apple hello mango world banana organe'.split()
for i in range(10):
Process(target=start_client,
args=(f'[{i+1}] ' + random.choice(words),))\
.start()
srv.join(9)
srv.kill()
if __name__ == '__main__':
main()
view raw multiserver.py hosted with ❤ by GitHub