콘텐츠로 건너뛰기
Home » Asyncio를 사용한 비동기 소켓 통신

Asyncio를 사용한 비동기 소켓 통신

이 블로그에서는 파이썬으로 소켓 통신을 구현하는 몇 가지 방법을 이미 살펴본 바 있습니다. 원시 소켓인 socket.socket을 사용하는 방법zmq의 REQ-REP 패턴을 사용한 방법이 있었고, 다중 접속을 허용하도록 스레드를 통해 처리하거나, 셀렉터를 사용하여 단일 스레드에서 멀티플렉싱하는 방법도 알아보았습니다. 이번 글에서는 asyncio에서는 과연 소켓 통신을 어떤식으로 구현하는지 살펴보고 역시나 간단한 비동기 다중 접속 에코 서버를 구현하는 과정을 함께 살펴보겠습니다.

빌딩블럭

asyncio에서는 비동기 소켓통신을 지원하기 위해 고수준 읽기/쓰기 API를 갖추고 있는 Reader/Writer 객체를 제공합니다. 연결 방법은 사실상 매우 간단하며, 통신에 필요한 기술은 이 Reader/Writer를 어떻게 사용하느냐가 전부입니다.

스트림리더(Stream Reader)

asyncio.StreamReader는 IO 스트림에서 데이터를 읽는 API를 제공하는 객체입니다. 이 클래스는 다음과 같은 속성들을 갖습니다. (↱ 표시가 있으면 코루틴입니다.)

  • read(n=-1) : 최대 n바이트를 읽어들입니다. n이 -1 이면 EOF 까지 읽은 다음, 모든 바이트를 반환합니다. 이미 EOF를 수신했고 내부 버퍼가 비어있다면 빈 bytes를 반환합니다.
  • readline() : 개행(\n)으로 구분되는 한 줄을 읽습니다. EOF를 수신했고 개행을 찾을 수 없으면 남은 데이터를 반환합니다.
  • readexactly(n) : 정확히 n 바이트를 읽습니다. 이는 read와 달리 n 바이트를 읽기전에 EOF에 도달하면 IncompleteReadError 예외를 일으킵니다.
  • readuntil(separator=b'\n') : 구분자까지 읽어서 반환합니다. (구분자를 포함하여 리턴) read 와 달리 파일 끝에서 구분자가 발견되지 않으면 IncompleteReadError가 발생하며, 버퍼가 리셋됩니다.
  • at_eof() : 버퍼가 비었고 EOF에 도달하였는지를 검사합니다.

이것은 마치 읽기 전용의 파일과 비슷합니다. 사실 파일이나 그외 입출력 스트림은 TextIO나 ByteIO와 같은 IO 스트림으로만 인식하기 때문에 asyncio에서는 이 StreamReader를 사용하여 소켓으로부터 비동기적으로 데이터를 읽어들이는 동작을 구현할 수 있습니다. 스트림 리더는 읽기동작만을 지원하며, 쓰기를 위해서는 스트림 라이터가 필요합니다.

스트림 라이터 (Stream Writer)

<a rel="noreferrer noopener" aria-label="asyncio.StreamWriter (새탭으로 열기)" href="https://docs.python.org/ko/3/library/asyncio-stream.html#streamwriter" target="_blank">asyncio.StreamWriter</a>는 IO 스트림에 대해 비동기로 바이트를 쓰는 API를 제공합니다. 특이한 점은 write() 메소드는 일반 함수이며, drain() 코루틴과 함께 쓰여야 한다는 점입니다. 이는 아마도 버퍼에 기록하는 작업은 즉시 수행하되, 실제 외부 IO가 일어나는 시점에서 비동기적으로 처리가 되도록 하는 것으로 보입니다.

  • write(data) : 하부 스트림에 즉시 data를 기록합니다. 실패하는 경우 data는 보낼 수 있을 때까지 버퍼에 남게 됩니다. 이는 실제 쓰기 완료를 보장하지 않으므로 ‘즉시 버퍼에 쓴다’는 동작으로 이해해야 하며, drain() 메소드와 함께 사용해야 합니다.
  • drain() : 스트림에 기록하는 것이 가능해질 때까지 기다립니다. write() 메소드가 코루틴이 아닌 blocking callable임에 유의해야 합니다. 이 메소드는 쓰기 버퍼가 차올라 여유가 없으면 대기하여 동일 코루틴이 버퍼를 손상시키는 것을 막습니다.
  • writelines(data) : 바이트의 리스트를 기록합니다. 역시 버퍼에만 쓰므로 drain()과 함께 사용해야 합니다.
  • close() : 스트림을 닫습니다. 스트림을 닫으면 그 하부 소켓도 함께 닫힙니다. 소켓이 닫히는데에는 시간이 걸릴 수 있으므로 wait_closed() 와 함께 사용해야 합니다.
  • can_write_eof() : 하부 트랜스포트 구조가 write_eof() 메소드를 지원하는지 확인합니다.
  • write_eof() : 쓰기 스트림에 EOF를 삽입합니다.
  • transport : 하부 비동기 트랜스포트를 리턴합니다.
  • get_extra_info(name, default=None) : 트랜스 포트 정보를 조사합니다.
  • wait_closed() : close()를 호출한 후 스트림이 완전히 닫힐 때까지를 기다립니다.

클라이언트 만들기

저수준 이벤트 루프 API는 원래 트랜스포트와 프로토콜을 사용하여 통신 채널을 추상화하고 통신을 수행합니다. 하지만 이는 주로 라이브러리나 프레임워크 수준에서 사용되어야 하며, 프로덕트 레벨에서는 고수준 타입인 스트림을 사용할 것이 권장됩니다. 앞서 소개한 두 스트림 타입을 사용하여 소켓을 감싼 API를 사용할 수 있습니다. 하지만 asyncio에서는 이들 스트림 객체를 직접 생성하는 것은 추천하지 않습니다. asyncio 는 open_connection() 함수를 제공하고, 이를 통해 스트림을 만들것을 제안합니다.

open_connection() 코루틴은 주어진 호스트, 포트 정보를 통해 서버와 연결을 수립하고, 연결이 의존하는 소켓을 제거할 수 있는 (reader, writer) 객체 쌍을 리턴합니다. 클라이언트 구현 코드에서는 이 스트림 객체들을 사용해서 서버와 통신하면 됩니다.

간단한 tcp 에코 통신을 위한 비동기 클라이언트를 만들어보겠습니다. 클라이언트의 동작은 크게 4단계로 나눠집니다. 1) 서버와 연결하고 스트림을 생성합니다. 2) 쓰기 스트림을 통해 데이터를 전송합니다. 3) 서버로부터 응답을 기다리고, 읽습니다. 4) 소켓(스트림)을 닫습니다.

import asyncio
from random import random
async def run_client(host: str, port: int):
    # 서버와의 연결을 생성합니다.
    reader: asyncio.StreamReader
    writer: asyncio.StreamWriter
    reader, writer = await asyncio.open_connection(host, port)
    # show connection info
    print("[C] connected")
    # 루프를 돌면서 입력받은 내용을 서버로 보내고,
    # 응답을 받으면 출력합니다.
    while True:
        line = input("[C] enter message: ")
        if not line:
            break
        # 입력받은 내용을 서버로 전송
        payload = line.encode()
        writer.write(payload)
        await writer.drain()
        print(f"[C] sent: {len(payload)} bytes.\n")
        # 서버로부터 받은 응답을 표시
        data = await reader.read(1024)  # type: bytes
        print(f"[C] received: {len(data)} bytes")
        print(f"[C] message: {data.decode()}")
    # 연결을 종료합니다.
    print("[C] closing connection...")
    writer.close()
    await writer.wait_closed()

참고로 첫번째 단락에서 튜플을 언패킹할 때, 타입 어노테이션을 붙이는 방법을 주목해주세요. 튜플 언패킹 문법에서는 개별 요소에 타입 어노테이션을 할 수 없으므로 이런식으로 어노테이션을 붙입니다.


서버 만들기

서버의 경우에는 asyncio.start_server() 코루틴을 사용하여 생성합니다. 소켓 서버는 기본적으로 런루프를 기반으로 싱글스레드에서 여러 클라이언트의 요청을 처리할 수 있도록 합니다. 셀렉터를 사용한 멀티 플렉싱 예제와 비슷하게, 서버는 생성될 때 요청을 처리할 핸들러를 필요로 합니다. start_server() 함수의 첫 인자는 요청을 처리하는 핸들러이며, 다시 이 핸들러는 소켓을 다루기 때문에 (StreamReader, StreamWriter) 쌍을 인자로 받게 됩니다.

start_server() 함수는 다시 내부적으로 현재 런루프의 loop.create_server() 메소드를 호출하여 서버를 생성합니다. 이 함수는 asyncio.Server의 인스턴스 객체를 리턴합니다. 생성된 서버 객체는 비동기 컨텍스트 관리자로 async with 절을 시작할 수 있습니다. with 절이 끝나면 서버가 닫히고 더 이상 연결을 받아들이지 않는 것이 보장됩니다. 참고로 서버의 속성은 다음과 같이 정리합니다.

  • sockets : 서버가 리스닝하는 소켓의 리스트
  • is_serving() : 서버가 연결을 받는지
  • close() : 서버를 닫습니다. 실제 닫히는 것을 보장하려면 wait_closed() 와 함께 사용합니다.
  • wait_closed() : 서버가 완전히 닫힐 때까지 대기합니다.
  • get_loop() : 서버가 사용하는 이벤트 루프를 리턴합니다.
  • start_serving() : 연결을 받기 시작합니다. 이미 시작한 후에 호출해도 안전합니다.
  • serve_forever() : 연결을 받기 시작하며, 코루틴이 취소될때까지 계속합니다.

서버 객체는 생성 후 start_serving() 이나 serve_forever()를 호출하여 연결 수락을 시작할 수 있습니다. 앞에서도 말했지만 async with 절 내에서 시작하는 것이 좋습니다.

async def run_server():
    # 서버를 생성하고 실행
    server = await asyncio.start_server(handler, host="127.0.0.1", port=_port)
    async with server:
        # serve_forever()를 호출해야 클라이언트와 연결을 수락합니다.
        await server.serve_forever()

핸들러 작성

그런데 서버는 연결을 받기만 할 뿐, 실제 처리는 핸들러에게 위임합니다. 핸들러 연결이 수락되어 소켓이 생성되면, 해당 소켓을 사용하여 클라이언트와 통신하는 과정을 수행합니다. 따라서 핸들러는 다시 읽기/쓰기 스트림을 인자로 받는 코루틴함수이며, 내부 구현은 스트림 리더를 통해서 데이터를 읽고 쓰면 됩니다.

async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    while True:
        # 클라이언트가 보낸 내용을 받기
        data: bytes = await reader.read(1024)
        # 받은 내용을 출력하고,
        # 가공한 내용을 다시 내보내기
        peername = writer.get_extra_info('peername')
        print(f"[S] received: {len(data)} bytes from {peername}")
        mes = data.decode()
        print(f"[S] message: {mes}")
        res = mes.upper()[::-1]
        await asyncio.sleep(random() * 2)
        writer.write(res.encode())
        await writer.drain()

최종적으로 서버와 클라이언트를 한 번에 돌려서 서로 통신하는지를 살펴보겠습니다.

async def test():
    await asyncio.wait([run_server(), run_client("127.0.0.1", _port)])
if __name__ == "__main__":
    asyncio.run(test(), debug=True))

별도의 콘솔에서 각각 run_server()run_client()를 사용해서 통신하는 테스트를 해봐도 잘 작동하는 것을 확인할 수 있습니다. 기본적으로 이 서버는 동시에 여러 클라이언트가 접근해도 비동기 동작을 통해서 개별적으로 통신을 처리할 수 있음을 볼 수 있습니다.

그리고…

참고로 비동기 코루틴은 내부에서 예외가 발생해도 제대로된 예외 출력을 해주지 않기 때문에 디버깅하기가 매우 까다롭습니다. asyncio.run() 함수의 debug=True 옵션을 사용하면 그나마 프로세스 전체가 중지됐을 때 (Future객체가 수집될 때) 트레이스 로그를 볼 수 있습니다.

import asyncio
from random import random
_port = 7770
# asyncio socket server/client
async def run_client(host: str, port: int):
# 서버와의 연결을 생성합니다.
reader: asyncio.StreamReader
writer: asyncio.StreamWriter
reader, writer = await asyncio.open_connection(host, port)
# show connection info
print("[C] connected")
# 루프를 돌면서 입력받은 내용을 서버로 보내고,
# 응답을 받으면 출력합니다.
while True:
line = input("[C] enter message: ")
if not line:
break
# 입력받은 내용을 서버로 전송
payload = line.encode()
writer.write(payload)
await writer.drain()
print(f"[C] sent: {len(payload)} bytes.\n")
# 서버로부터 받은 응답을 표시
data = await reader.read(1024) # type: bytes
print(f"[C] received: {len(data)} bytes")
print(f"[C] message: {data.decode()}")
# 연결을 종료합니다.
print("[C] closing connection…")
writer.close()
await writer.wait_closed()
async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
while True:
# 클라이언트가 보낸 내용을 받기
data: bytes = await reader.read(1024)
# 받은 내용을 출력하고,
# 가공한 내용을 다시 내보내기
peername = writer.get_extra_info('peername')
print(f"[S] received: {len(data)} bytes from {peername}")
mes = data.decode()
print(f"[S] message: {mes}")
res = mes.upper()[::-1]
await asyncio.sleep(random() * 2)
writer.write(res.encode())
await writer.drain()
async def run_server():
# 서버를 생성하고 실행
server = await asyncio.start_server(handler, host="127.0.0.1", port=_port)
async with server:
# serve_forever()를 호출해야 클라이언트와 연결을 수락합니다.
await server.serve_forever()
async def main():
await asyncio.wait([run_server(), run_client("127.0.0.1", _port)])
if __name__ == "__main__":
asyncio.run(main())
view raw stream01.py hosted with ❤ by GitHub