aiohttp에서 큰 파일을 업로드하는 법

파일 업로드는 보통 요청의 body에 인코딩된 파일 데이터를 넣어서 POST 요청으로 서버에 전달되는데, aiohttp에서는 다음과 같이 post를 처리하는 핸들러를 사용해서 이를 처리할 수 있다.

async def store_mp3_handler(request):
  data = await request.post()
  mp3 = data['mp3']
  filename = mp3.filename
  mp3_file = mp3.file
  content = mp3_file.read()
  return web.Response(body=content, headers=
     MultiDict({ 'CONTENT-DISPOSITION': mp3_file}))

여기서 문제는 request.post() 메소드가 요청 데이터를 한꺼번에 메모리로 읽어들이기 때문에 메모리 부족으로 서버가 죽을 수 있는 상황이 있다는 것이다. 따라서 aiohttp에서 일반적으로 처리할 수 있는 요청의 크기는 2MB로 제한된다. 하지만 이 크기는 어지간한 사진 하나의 용량도 감당하기 어렵기 때문에 뭔가 다른 방법이 필요하다. (보통은 일종의 옵션 값 같은 걸로 최대 처리 요청 크기를 변경할 수 있을 줄 알았는데, 없었다.)

request.multipart는 이러한 문제를 피하는 멀티파트 리더로 기능할 수 있다. 리더 객체를 생성한 다음에는 멀티파트 요청을 단위 콘텐츠 별로 읽어들일 수 있다. requrest.multipart() 메소드는 멀티파트 요청을 각 파트별로 리턴하는 비동기 제너레이터인데, 각각의 파트(필드)는 read() 메소드를 통해서 통째로 읽어들이거나, read_chunk() 메소드를 통해서 필드의 일부분을 순차적으로 버퍼에 읽어들일 수 있다.

BodyPartReader.read_chunk(size=chunk_size::int)


https://docs.aiohttp.org/en/stable/multipart_reference.html#aiohttp.BodyPartReader.read_chunk

다음 코드는 mp3 파일을 폼 필드에서 mp3라는 필드 이름으로 업로드했을 때, 이를 받아서 저장하는 서버쪽 코드이다. 바이너리 파일을 버퍼로 읽어들여서 순차적으로 저장하는 것과 동일한 방식으로 처리한다. 기존 코드와 차이점이 있다면 awiat request.read()는 HTTP요청 자체를 하나의 사전과 비슷한 객체로 읽어들이는 반면, 개별 필드를 각각 요청하고 처리해야 한다.

async def store_mp3_handler(request):
  reader = await request.multipart()

  field = await reader.next()
  assert field.name == 'name'
  name = await field.read(decode=True)
  
  field = await reader.next()
  assert field.name == 'mp3'
  filename = filed.filename

  size = 0
  with open(os.path.join('/spool/yarrr-media/mp3/', filename), 'wb') as f:
    while True:
      chunk = await field.read_chunk()
      if not chunck:
        break
      size += len(chunk)
      f.write(chunk)
  return web.Response(text=f'{filename} sized of {size} successfully stored.')

이 방법을 사용하여 메모리에 부담을 주지 않고 대용량 파일을 업로드 받을 수 있다.

ZMQ 멀티파트메시지

멀티파트 메시지는 하나의 메시지 프레임 내부에 여러 개의 독립적인 메시지 프레임이 들어 있는 것을 말한다. 이는 하나의 프레임에서 처리하기 힘든 데이터 조각들을 모아서 처리할 때 유용하다. 예를 들어 바이너리 파일 데이터를 전송하려는 경우에는 보내는 쪽이나 받는 쪽이나 전송하는 데이터가 이진데이터라는 것을 알고 있다 가정하여 바이트 스트림을 전송할 수 있다. 하지만 이렇게 하면 실제 데이터 외부에 있었던 정보, 이를 테면 파일 이름이나 생성한 날짜 같은 메타 정보를 전달하기가 어려워진다. 이런 경우 여러 정보들을 멀티 파트 메시지로 묶어서 하나의 프레임으로 전송하면 필요한 모든 정보를 같이 전달해 줄 수 있다.

멀티 파트 메시지 전송은 비단 ZMQ내에서만 사용되는 개념이 아니다. HTTP 규격에서도 멀티 파트 메시지 개념이 존재하며, 실제로 HTML 폼 전송에서 멀티 파트 전송이 널리 사용된다. 여러 개의 필드를 하나의 폼에서 묶어서 submit 한다거나, 여러 개의 파일을 한 번에 업로드하는 폼 들은 모두 enctype='multpart/formdata' 라는 폼 속성을 가지며, 실제 HTTP 전송 헤더 역시 Content-Type 에서 이 값이 사용된다.

ZMQ 에서도 여러 개의 메시지 혹은 프레임을 하나의 메시지로 묶어서 멀티파트로 전송할 수 있다. pyzmq를 사용하는 경우 이는 소켓 객체의 send_multipart() / recv_multipart()  두 개의 메소드로 간단히 구현된다. 다만 문자열이 아닌 버퍼(바이트 배열)를 주고 받아야 한다는 점에 주의만 하면 그외에는 매우 간단하다. 다음은 세 개의 문자열을 하나의 멀티파트 메시지로 전송하는 REQ-REP 예제이다.

## multimessage-server.py

import zmq

ctx = zmq.Context()

def run_server(port=5555):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  keys = 'abc'
  while True:
    msg = dict(zip(keys, (x.decode() for x in sock.recv_multipart())))
    for k, v in msg.items():
      print(f'{k}: {v}')
    ## 대문자화하고 하나의 문자열로 만들어서 되돌려준다.
    sock.send_string(' '.join(msg[k].upper() for k in keys))

if __name__ == '__main__':
  run_server()

recv_multipart() 에 의해 리턴되는 값이 바이트배열의 시퀀스임을 알고 있다면 위 코드는 충분히 이해되리라 본다. REQ-REP 패턴에서 클라이언트에서 서버로 멀티 파트 메시지를 전송했다 하더라도, 반드시 응답이 멀티파트여야 할 필요도 없다. 다음은 위 서버 코드와 짝을 맞출 클라이언트 코드이다. 세 번의 키보드 입력 후 해당 문자열들을 한 번에 전송한다. 그리고 서버가 보내준 단일 메시지를 출력하는 것을 반복하는 간단한 코드이다.

import zmq

ctx = zmq.Context()

def run_client(port=5555):
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    data = [input().encode() for _ in range(3)]
    sock.send_multipart(data)
    res = sock.recv_string()
    print(res)

if __name__ == '__main__':
  run_client()