aiohttp에서 큰 파일을 업로드하는 법

파일 업로드는 보통 요청의 body에 인코딩된 파일 데이터를 넣어서 POST 요청으로 서버에 전달되는데, aiohttp에서는 다음과 같이 post를 처리하는 핸들러를 사용해서 이를 처리할 수 있다.

async def store_mp3_handler(request):
  data = await request.post()
  mp3 = data['mp3']
  filename = mp3.filename
  mp3_file = mp3.file
  content = mp3_file.read()
  return web.Response(body=content, headers=
     MultiDict({ 'CONTENT-DISPOSITION': mp3_file}))

여기서 문제는 request.post() 메소드가 요청 데이터를 한꺼번에 메모리로 읽어들이기 때문에 메모리 부족으로 서버가 죽을 수 있는 상황이 있다는 것이다. 따라서 aiohttp에서 일반적으로 처리할 수 있는 요청의 크기는 2MB로 제한된다. 하지만 이 크기는 어지간한 사진 하나의 용량도 감당하기 어렵기 때문에 뭔가 다른 방법이 필요하다. (보통은 일종의 옵션 값 같은 걸로 최대 처리 요청 크기를 변경할 수 있을 줄 알았는데, 없었다.)

request.multipart는 이러한 문제를 피하는 멀티파트 리더로 기능할 수 있다. 리더 객체를 생성한 다음에는 멀티파트 요청을 단위 콘텐츠 별로 읽어들일 수 있다. 개별 파트의 헤더를 먼저 읽고, 다시 최종적으로 file 필드로부터 버퍼로 파일의 일부 내용을 순차적으로 읽어서 처리할 수 있다.

async def store_mp3_handler(request):
  reader = await request.multipart()

  field = await reader.next()
  assert field.name == 'name'
  name = await field.read(decode=True)
  
  field = await reader.next()
  assert field.name == 'mp3'
  filename = filed.filename

  size = 0
  with open(os.path.join('/spool/yarrr-media/mp3/', filename), 'wb') as f:
    while True:
      chunk = await field.read_chunk()
      if not chunck:
        break
      size += len(chunk)
      f.write(chunk)
  return web.Response(text=f'{filename} sized of {size} successfully stored.')

ZMQ 멀티파트메시지

멀티파트 메시지는 하나의 메시지 프레임 내부에 여러 개의 독립적인 메시지 프레임이 들어 있는 것을 말한다. 이는 하나의 프레임에서 처리하기 힘든 데이터 조각들을 모아서 처리할 때 유용하다. 예를 들어 바이너리 파일 데이터를 전송하려는 경우에는 보내는 쪽이나 받는 쪽이나 전송하는 데이터가 이진데이터라는 것을 알고 있다 가정하여 바이트 스트림을 전송할 수 있다. 하지만 이렇게 하면 실제 데이터 외부에 있었던 정보, 이를 테면 파일 이름이나 생성한 날짜 같은 메타 정보를 전달하기가 어려워진다. 이런 경우 여러 정보들을 멀티 파트 메시지로 묶어서 하나의 프레임으로 전송하면 필요한 모든 정보를 같이 전달해 줄 수 있다.

멀티 파트 메시지 전송은 비단 ZMQ내에서만 사용되는 개념이 아니다. HTTP 규격에서도 멀티 파트 메시지 개념이 존재하며, 실제로 HTML 폼 전송에서 멀티 파트 전송이 널리 사용된다. 여러 개의 필드를 하나의 폼에서 묶어서 submit 한다거나, 여러 개의 파일을 한 번에 업로드하는 폼 들은 모두 enctype='multpart/formdata' 라는 폼 속성을 가지며, 실제 HTTP 전송 헤더 역시 Content-Type 에서 이 값이 사용된다.

ZMQ 에서도 여러 개의 메시지 혹은 프레임을 하나의 메시지로 묶어서 멀티파트로 전송할 수 있다. pyzmq를 사용하는 경우 이는 소켓 객체의 send_multipart() / recv_multipart()  두 개의 메소드로 간단히 구현된다. 다만 문자열이 아닌 버퍼(바이트 배열)를 주고 받아야 한다는 점에 주의만 하면 그외에는 매우 간단하다. 다음은 세 개의 문자열을 하나의 멀티파트 메시지로 전송하는 REQ-REP 예제이다.

## multimessage-server.py

import zmq

ctx = zmq.Context()

def run_server(port=5555):
  sock = ctx.socket(zmq.REP)
  sock.bind(f'tcp://*:{port}')
  keys = 'abc'
  while True:
    msg = dict(zip(keys, (x.decode() for x in sock.recv_multipart())))
    for k, v in msg.items():
      print(f'{k}: {v}')
    ## 대문자화하고 하나의 문자열로 만들어서 되돌려준다.
    sock.send_string(' '.join(msg[k].upper() for k in keys))

if __name__ == '__main__':
  run_server()

recv_multipart() 에 의해 리턴되는 값이 바이트배열의 시퀀스임을 알고 있다면 위 코드는 충분히 이해되리라 본다. REQ-REP 패턴에서 클라이언트에서 서버로 멀티 파트 메시지를 전송했다 하더라도, 반드시 응답이 멀티파트여야 할 필요도 없다. 다음은 위 서버 코드와 짝을 맞출 클라이언트 코드이다. 세 번의 키보드 입력 후 해당 문자열들을 한 번에 전송한다. 그리고 서버가 보내준 단일 메시지를 출력하는 것을 반복하는 간단한 코드이다.

import zmq

ctx = zmq.Context()

def run_client(port=5555):
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    data = [input().encode() for _ in range(3)]
    sock.send_multipart(data)
    res = sock.recv_string()
    print(res)

if __name__ == '__main__':
  run_client()