예제 – ZMQ + Asyncio 로 PUSH-PULL 구성

PUSH-PULL 구조를 사용한 분산처리를 구현한 예제를 asyncio 버전으로 재작성해보았다. 벤틸레이터가 보내는 값에 대해 각각의 워커는 그 값에 해당하는 시간만큼 지연시킨 후 싱크에게 짝/홀수 여부값을 전송한다.

예제 – ZMQ + Asyncio 로 PUSH-PULL 구성 더보기

ZMQ 예제 – Poller를 사용하여 종료 시점을 동기화하기

하나의 ZMQ 소켓은 여러 포트에 바인드하거나 커넥트할 수 있어서, 1:N의 연결을 쉽게 구성할 수 있습니다. 하지만 어떤 경우에는 이 다중 접속이 두 개 이상의 소켓을 사용하는 경우도 있습니다. 이런 경우 두 개의 소켓을 동시에 듣는 방법이 필요합니다. ZMQ소켓의 recv() 메소드는 블럭킹 함수이기 때문에 2개 이상의 소켓 중 데이터가 들어온 소켓을 처리하기 위해서는 소켓만으로는 처리할 수 없습니다. ZMQ는 이런 상황에 사용할 수 있는 Poller라는 수단을 제공합니다.

ZMQ 예제 – Poller를 사용하여 종료 시점을 동기화하기 더보기

Asyncio – 네트워크 입출력을 위한 스트림

asyncio는 네트워크 입출력을 위한 스트림이라는 타입을 제공하고 있다. 스트림은 네트워크 연결을 만들거나(클라이언트의 connect 동작) 서버를 시작하고(서버의 bind/listen 동작) 연결이 생성되면 해당 연결을 처리하는 핸들러의 인자로 넘겨지게 된다. 비동기 코루틴인 각각의 핸들러는 이 스트림을 이용해서 데이터를 읽거나 전송할 수 있다.

입출력 스트림은 내부적으로 소켓을 감싸고 있으며, 노출되는 API를 통해서 소켓을 기다리고 읽고 쓰는 일련의 작업을 상당히 고수준의 레벨에서 제공해주고 있다.

Asyncio – 네트워크 입출력을 위한 스트림 더보기

async with : 비동기 컨텍스트 매니저

파이썬의 컨텍스트 매니저with 블럭을 적용할 수 있는 객체를 말한다. 이러한 객체들은 with 절에서 마치 블럭에 대한 데코레이터처럼 동작한다. 가장 흔한 예가 open() 함수로 생성하는 파일 입출력스트림으로, with 구문 내에서 쓰이면 블럭을 빠져나갈 때 파일을 닫는 동작을 자동으로 수행하게 된다.

with open('data.txt') as f:
  for line in f:
    print(line)

컨텍스트 매니저 객체는 __enter__(), __exit__() 두 개의 내장 메소드를 가지고 있는 것으로 간주된다. 위 코드에서는 with 다음에 나오는 open('data.txt') 라는 코드는 파일에 대한 입출력 스트림을 반환한다. 그리고 with 문을 빠져나갈 때, 파일에 대해 __exit__()가 호출되고 여기서 파일이 닫힐 것이다.

async with : 비동기 컨텍스트 매니저 더보기

일반함수를 비동기 코루틴화하는 데코레이터 만들기

지난 글에서 urlopen()과 같은 표준 라이브러리 함수를 어떻게 비동기 코루틴처럼 asyncio에서 사용할 수 있는지 살펴보았다. aiohttp 등의 비동기 라이브러리를 사용해서 여러 핸들러를 작성해야 할 때, 이와 같은 처리를 많이 해야 한다면 빈번하게 런루프 메소드를 호출하는 것보다, 간단히 데코레이터를 만들어서 활용하는 것이 어떨까? 무엇이 되었든 파이썬 함수는 인자를 받고 결과를 내보내는 구조로 되어 있다. ( (*args, **kwds) -> Result  ) 물론 상황에 따라 인자는 생략되기도 하고 결과는 암시적으로 None 이 될 것이다. 따라서 이러한 함수를 데코레이터와 함께 작성하여 별도의 스레드에서 실행되는 비동기 코루틴으로 만들어보도록 하자.

  1. 런루프 및 executor는 생략되는 경우 기본 런루프와 디폴트 executor를 사용하지만, 명시적으로 넘겨지는 경우 그것을 사용한다.
  2. 위 조건은 데코레이터 선언 자체에 들어가야 한다. 따라서 우리가 작성해야하는 함수는 데코레이터가 아니라 데코레이터 생성함수이다.
  3. run_in_executor() 메소드는 키워드 인자를 넘겨주지는 못한다. 따라서 functools.partial 을 사용해서 키워드인자를 만들어주어야 한다.

여차저차해서 코드는 다음과 같이 작성될 수 있다.

from functools import partial, wraps
import asyncio

def run_async(loop=None, pool=None):
  _loop = loop if loop is not None else\
          asyncio.get_event_loop()
  def decorator(fn):
    @wraps(fn)
    async def wrapped(*args, **kwds):
      _fn = partial(fn, **kwds)
      result = await _loop.run_in_executor(pool, *args)
      return result
    return wrapped
  return decorator

실제 사용은 이런식으로 한다.

import sqlite3 as sql

@run_async()
def get_users(page=1, limit=100):
  conn = sql.connect(database)
  c = conn.execute('''SELECT * FROM users LIMIT = ? OFFSET = ?''', 
                   (limit, (page-1) * limit))
  return c.fetchall()

async def run():
  fs = {get_users(i) for i in range(1, 11)}
  asyncio.