Wireframe

Asyncio를 사용한 비동기 소켓 통신

이 블로그에서는 파이썬으로 소켓 통신을 구현하는 몇 가지 방법을 이미 살펴본 바 있습니다. 원시 소켓인 socket.socket을 사용하는 방법zmq의 REQ-REP 패턴을 사용한 방법이 있었고, 다중 접속을 허용하도록 스레드를 통해 처리하거나, 셀렉터를 사용하여 단일 스레드에서 멀티플렉싱하는 방법도 알아보았습니다. 이번 글에서는 asyncio에서는 과연 소켓 통신을 어떤식으로 구현하는지 살펴보고 역시나 간단한 비동기 다중 접속 에코 서버를 구현하는 과정을 함께 살펴보겠습니다.

빌딩블럭

asyncio에서는 비동기 소켓통신을 지원하기 위해 고수준 읽기/쓰기 API를 갖추고 있는 Reader/Writer 객체를 제공합니다. 연결 방법은 사실상 매우 간단하며, 통신에 필요한 기술은 이 Reader/Writer를 어떻게 사용하느냐가 전부입니다.

스트림리더(Stream Reader)

asyncio.StreamReader는 IO 스트림에서 데이터를 읽는 API를 제공하는 객체입니다. 이 클래스는 다음과 같은 속성들을 갖습니다. (↱ 표시가 있으면 코루틴입니다.)

이것은 마치 읽기 전용의 파일과 비슷합니다. 사실 파일이나 그외 입출력 스트림은 TextIO나 ByteIO와 같은 IO 스트림으로만 인식하기 때문에 asyncio에서는 이 StreamReader를 사용하여 소켓으로부터 비동기적으로 데이터를 읽어들이는 동작을 구현할 수 있습니다. 스트림 리더는 읽기동작만을 지원하며, 쓰기를 위해서는 스트림 라이터가 필요합니다.

스트림 라이터 (Stream Writer)

asyncio.StreamWriter는 IO 스트림에 대해 비동기로 바이트를 쓰는 API를 제공합니다. 특이한 점은 write() 메소드는 일반 함수이며, drain() 코루틴과 함께 쓰여야 한다는 점입니다. 이는 아마도 버퍼에 기록하는 작업은 즉시 수행하되, 실제 외부 IO가 일어나는 시점에서 비동기적으로 처리가 되도록 하는 것으로 보입니다.

클라이언트 만들기

저수준 이벤트 루프 API는 원래 트랜스포트와 프로토콜을 사용하여 통신 채널을 추상화하고 통신을 수행합니다. 하지만 이는 주로 라이브러리나 프레임워크 수준에서 사용되어야 하며, 프로덕트 레벨에서는 고수준 타입인 스트림을 사용할 것이 권장됩니다. 앞서 소개한 두 스트림 타입을 사용하여 소켓을 감싼 API를 사용할 수 있습니다. 하지만 asyncio에서는 이들 스트림 객체를 직접 생성하는 것은 추천하지 않습니다. asyncio 는 open_connection() 함수를 제공하고, 이를 통해 스트림을 만들것을 제안합니다.

open_connection() 코루틴은 주어진 호스트, 포트 정보를 통해 서버와 연결을 수립하고, 연결이 의존하는 소켓을 제거할 수 있는 (reader, writer) 객체 쌍을 리턴합니다. 클라이언트 구현 코드에서는 이 스트림 객체들을 사용해서 서버와 통신하면 됩니다.

간단한 tcp 에코 통신을 위한 비동기 클라이언트를 만들어보겠습니다. 클라이언트의 동작은 크게 4단계로 나눠집니다. 1) 서버와 연결하고 스트림을 생성합니다. 2) 쓰기 스트림을 통해 데이터를 전송합니다. 3) 서버로부터 응답을 기다리고, 읽습니다. 4) 소켓(스트림)을 닫습니다.

import asyncio
from random import random
async def run_client(host: str, port: int):
    # 서버와의 연결을 생성합니다.
    reader: asyncio.StreamReader
    writer: asyncio.StreamWriter
    reader, writer = await asyncio.open_connection(host, port)
    # show connection info
    print("[C] connected")
    # 루프를 돌면서 입력받은 내용을 서버로 보내고,
    # 응답을 받으면 출력합니다.
    while True:
        line = input("[C] enter message: ")
        if not line:
            break
        # 입력받은 내용을 서버로 전송
        payload = line.encode()
        writer.write(payload)
        await writer.drain()
        print(f"[C] sent: {len(payload)} bytes.\n")
        # 서버로부터 받은 응답을 표시
        data = await reader.read(1024)  # type: bytes
        print(f"[C] received: {len(data)} bytes")
        print(f"[C] message: {data.decode()}")
    # 연결을 종료합니다.
    print("[C] closing connection...")
    writer.close()
    await writer.wait_closed()

참고로 첫번째 단락에서 튜플을 언패킹할 때, 타입 어노테이션을 붙이는 방법을 주목해주세요. 튜플 언패킹 문법에서는 개별 요소에 타입 어노테이션을 할 수 없으므로 이런식으로 어노테이션을 붙입니다.


서버 만들기

서버의 경우에는 asyncio.start_server() 코루틴을 사용하여 생성합니다. 소켓 서버는 기본적으로 런루프를 기반으로 싱글스레드에서 여러 클라이언트의 요청을 처리할 수 있도록 합니다. 셀렉터를 사용한 멀티 플렉싱 예제와 비슷하게, 서버는 생성될 때 요청을 처리할 핸들러를 필요로 합니다. start_server() 함수의 첫 인자는 요청을 처리하는 핸들러이며, 다시 이 핸들러는 소켓을 다루기 때문에 (StreamReader, StreamWriter) 쌍을 인자로 받게 됩니다.

start_server() 함수는 다시 내부적으로 현재 런루프의 loop.create_server() 메소드를 호출하여 서버를 생성합니다. 이 함수는 asyncio.Server의 인스턴스 객체를 리턴합니다. 생성된 서버 객체는 비동기 컨텍스트 관리자로 async with 절을 시작할 수 있습니다. with 절이 끝나면 서버가 닫히고 더 이상 연결을 받아들이지 않는 것이 보장됩니다. 참고로 서버의 속성은 다음과 같이 정리합니다.

서버 객체는 생성 후 start_serving() 이나 serve_forever()를 호출하여 연결 수락을 시작할 수 있습니다. 앞에서도 말했지만 async with 절 내에서 시작하는 것이 좋습니다.

async def run_server():
    # 서버를 생성하고 실행
    server = await asyncio.start_server(handler, host="127.0.0.1", port=_port)
    async with server:
        # serve_forever()를 호출해야 클라이언트와 연결을 수락합니다.
        await server.serve_forever()

핸들러 작성

그런데 서버는 연결을 받기만 할 뿐, 실제 처리는 핸들러에게 위임합니다. 핸들러 연결이 수락되어 소켓이 생성되면, 해당 소켓을 사용하여 클라이언트와 통신하는 과정을 수행합니다. 따라서 핸들러는 다시 읽기/쓰기 스트림을 인자로 받는 코루틴함수이며, 내부 구현은 스트림 리더를 통해서 데이터를 읽고 쓰면 됩니다.

async def handler(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    while True:
        # 클라이언트가 보낸 내용을 받기
        data: bytes = await reader.read(1024)
        # 받은 내용을 출력하고,
        # 가공한 내용을 다시 내보내기
        peername = writer.get_extra_info('peername')
        print(f"[S] received: {len(data)} bytes from {peername}")
        mes = data.decode()
        print(f"[S] message: {mes}")
        res = mes.upper()[::-1]
        await asyncio.sleep(random() * 2)
        writer.write(res.encode())
        await writer.drain()

최종적으로 서버와 클라이언트를 한 번에 돌려서 서로 통신하는지를 살펴보겠습니다.

async def test():
    await asyncio.wait([run_server(), run_client("127.0.0.1", _port)])
if __name__ == "__main__":
    asyncio.run(test(), debug=True))

별도의 콘솔에서 각각 run_server()run_client()를 사용해서 통신하는 테스트를 해봐도 잘 작동하는 것을 확인할 수 있습니다. 기본적으로 이 서버는 동시에 여러 클라이언트가 접근해도 비동기 동작을 통해서 개별적으로 통신을 처리할 수 있음을 볼 수 있습니다.

그리고…

참고로 비동기 코루틴은 내부에서 예외가 발생해도 제대로된 예외 출력을 해주지 않기 때문에 디버깅하기가 매우 까다롭습니다. asyncio.run() 함수의 debug=True 옵션을 사용하면 그나마 프로세스 전체가 중지됐을 때 (Future객체가 수집될 때) 트레이스 로그를 볼 수 있습니다.

Exit mobile version