Asyncio – 네트워크 입출력을 위한 스트림

asyncio는 네트워크 입출력을 위한 스트림이라는 타입을 제공하고 있다. 스트림은 네트워크 연결을 만들거나(클라이언트의 connect 동작) 서버를 시작하고(서버의 bind/listen 동작) 연결이 생성되면 해당 연결을 처리하는 핸들러의 인자로 넘겨지게 된다. 비동기 코루틴인 각각의 핸들러는 이 스트림을 이용해서 데이터를 읽거나 전송할 수 있다.

입출력 스트림은 내부적으로 소켓을 감싸고 있으며, 노출되는 API를 통해서 소켓을 기다리고 읽고 쓰는 일련의 작업을 상당히 고수준의 레벨에서 제공해주고 있다.

입력 스트림은 asyncio.StreamReader로, 출력 스트림은 asyncio.StreamWriter로 정의되어 있는데, 이들 클래스의 인스턴스를 직접 만드는 것은 권장되지 않는다. 대신 open_connection()이나 start_server() 등의 코루틴을 사용하여 내부적으로 자동생성된 스트림을 핸들러가 사용하는 형식으로 이용하는 것이 좋다.

입력 스트림

입력 스트림은 StreamReader 타입의 인스턴스가 된다. 내부에는 소켓과 버퍼가 존재하며, IO 스트림과 비슷한 방식으로 작동한다.  (목록에서 표시할 때 *를 붙인 것은 모두 코루틴이다.)

  • * read(n=-1) : 최대 n 바이트만큼을 읽고, 읽어들인 바이트를 반환한다. 최대 바이트가 생략되면 EOF까지 읽은 후 읽은 바이트를 반환한다. EOF를 읽은 후에 내부버퍼가 비어있다면 빈 바이트를 리턴한다.
  • * readline() : 개행(‘\n’)문자까지 읽은 후 해당 바이트를 리턴한다. EOF를 수신했고, 개행을 찾을 수 없다면 읽어들인 모든 바이트를 수신한다. EOF를 읽었고, 버퍼가 비어있으면 빈 바이트를 리턴한다.
  • * readexactly(n) : 정확히 n바이트를 읽는다. read(n)과 달리 EOF를 읽었고, 버퍼가 n 바이트보다 작다면 IncompleteReadError 예외를 일으킨다.
  • * readuntil(separator=b'\n') : 특정한 구분자까지 읽는다. 리턴되는 데이터는 구분자까지를 포함한다. 구분자를 읽기전에 EOF에 도달하면 IncompleteReadError가 발생한다.
  • at_eof() : EOF에 도달했는지를 확인한다.

출력 스트림

출력스트림은 송신을 위한 버퍼와 소켓을 감싼 타입이다. drain()을 제외하면 비동기 코루틴이 아닌 블럭킹 메소드들임에 유의하자.

  • can_write_eof() : 통신을 위한 하부전송구조가 write_eof()를 지원하는지 체크
  • write_eof() : 쓰기 버퍼를 플러시한 후 스트림을 닫는다.
  • write(data) : 데이터를 기록한다. 기록 후에 drain()을 호출해야 하고, drain() 시에 작업 전환이 일어날 수 있다.
  • writelines(data) : 바이트배열의 리스트를 한 번에 기록한다.
  • * drain() : 스트림에 다시 기록할 수 있을 때 까지 기다린다. 이 동작은 쓰기 버퍼의 수위가 높아지는 동안 버퍼에 과다한 데이터가 들어가는 것을 막는다.
  • close() : 스트림을 닫는다.
  • is_closing() : 스트림이 닫혔는지를 확인한다.
  • * wait_closed() : 스트림이 닫힐 때 까지 기다린다.
  • transport : 하부 전송 구조에 접근할 수 있게 한다.
  • get_extra_info(name) : 하부 전송 구조의 정보에 접근한다.

get_extra_info()를 통해 얻을 수 있는 정보 및 속성은 그 하부 전송구조에 따라 달라지는데, 다음과 같은 표로 정리할 수 있다.

트랜스포트이름내용
소켓peername소켓이 연결된 원격주소
socket소켓 인스턴스(socket.socke)
sockname소켓자체주소
SSL소켓compression압축알고리즘
cipher암호화체계 이름
peercert피어 인증서
sslcontextssl.SSLContext 인스턴스
ssl_objectss.SSLObject 인스턴스
파이프pipe파이프 객체
서브프로세스subprocesssubprocess.Popen 인스턴스

서버만들기

asyncio. asyncio.start_server() 를 사용하여 소켓 서버를 시작할 수 있다. 서버를 시작할 때 첫 인자는 클라이언트가 접속했을 때 호출될 핸들러이다. 이 때 핸들러는 일반함수여도 되고, 비동기 코루틴일수도 있다. (코루틴인 경우 자동으로 Task가 생성됨) 그외에는 문서를 참고하는게 좋겠다.

다음은 이를 사용하여 에코 서버를 작성한 것이다. 소켓 사용법을 소개한 글에서 다중 접속을 허용하는 에코 서버와 비슷하게 동작한다. 대신에 비동기 코루틴을 사용하므로 스레드는 여전히 싱글 스레드이다.

import asyncio

async def handle_conn(reader, writer):
  # 입출력 핸들러는 항상 StreamReader, StreamWriter를 인자로 받는다.
  bufsize = 1024
  addr = writer.get_extra_info('peername')
  while True:
    data = await reader.read(bufsize)
    msg = data.decode()
    print("RECEIVED {} FROM {}".format(msg, addr))
    print("SENDING {}".format(msg))
    writer.write(data)
    await writer.drain()
    if msg == 'bye':
      print("CLOSING")
      writer.close()
      await write.wait_closed()
      break

async def run_server(host='127.0.0.1', port=7788):
  server = await asyncio.start_server(handle_conn, host, port)
  addr = server.sockets[0].getsockname()
  print("SERVING ON {}".format(addr))

  async with server:
    await server.serve_forever()

asyncio.get_event_loop()\
  .run_until_complete(run_server())

start_server() 코루틴 함수는 asycio.Server 객체를 리턴한다. 파이썬 3.7부터 이 타입은 비동기 컨텍스트 관리자이므로 async with server: 구문을 사용할 수 있다. async with 절을 빠져나오는 시점에 서버가 더 이상의 추가적인 연결을 받지 않는 것이 보장된다. 특이할 만한 사항은 서버를 미리 생성한 후 async with를 써야 한다는 것.

또 당연한 이야기이기는 하지만, 이 서버는 내부적으로 소켓을 사용하므로 소켓을 사용하여 구현한 클라이언트와 잘 맞물려 작동할 수 있다.

클라이언트 작성하기

이번에는 클라이언트를 스트림을 사용해서 작성해보겠다. 클라이언트의 경우, asyncio.open_connection(host, port)를 사용한다. 클라이언트는 핸들러 콜백을 따로 전달하지 않으며 open_connection() 호출의 결과로 (StreamReader, StreamWriter)의 쌍을 얻게 된다. 각각의 스트림의 사용법은 서버와 동일하다.

import asyncio

async def run_client(host='127.0.0.1', port=7788):
  reader, writer = await asyncio.open_connection(host, port)
  bufsize = 1024
  while True:
    message = input('>>')
    print('SENDING:", message)
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(bufsize)
    print('RECEIVED:', data.decode())
    if message == 'bye':
      writer.close()
      await writer.wait_closed()
      break

asyncio.get_event_loop()\
   .run_until_complete(run_client())

이상으로 asyncio를 통해서 소켓 통신을 할 때 사용할 수 있는 스트림 객체의 사용법에 대해서 알아보았다. 코드가 다소 익숙하지 않아서 어렵게 느껴질 수 있겠지만, IOWrapper 와 유사한 API를 제공하고 있고, 싱글 스레드에서 다중 접속을 쉽게 구현할 수 있기 때문에 잘 알아두면 아주 요긴하게 써먹을 데가 있을 듯 하다.