콘텐츠로 건너뛰기
Home » async for 구문의 작동 원리

async for 구문의 작동 원리

비동기, 즉 작업이 완료되지 않더라도 리턴하는 함수는 프로그래밍에서 꽤 오랜 역사를 가지고 있는 아이디어입니다. . 비동기 함수는 ‘동시성’ 혹은 ‘병렬처리’를 위해 고안된 개념이기도 하고요. asyncio 가 비동기 처리에 관한 라이브러리라는 것은 그 이름부터 이미 알려주고 있습니다. 그런데 다시 말하지만 ‘비동기’는 동시성이나 병렬처리를 위해 고안된 개념이라는 것입니다. 컴퓨터 공학 분야에서 병렬처리나 분산처리에 대한 논의는 오래전부터 있었고, 지금 널리 사용되고 있는 다중 스레드의 개념도 이때 등장하여, 여러 상황이나 조건들에 의해 채택되고 지금까지 이어지고 있는 것입니다. 거꾸로 말하면 동시성 처리를 위해서 반드시 스레드만 사용해야 한다는 법이 아니라는 말이기도 합니다.

어쨌거나 (제법 오래된 이야기이지만요) 파이썬에 async for 구문을 사용할 수 있게 되면서, 많은 사람들이 파이썬에 희한한 기능이 생겼다고 이야기하기 시작했습니다. 이 구문에 관한 흔한 오해 중 하나는 async for를 쓰면 asyncio의 마법에 의해서 반복 구문이 자동으로 병렬화된다는 것입니다.

반복 가능 객체와 이터레이터

for 구문으로 리스트나 튜플, 문자열, 사전 등 집합과 비슷한 성격을 갖는 자료 구조들의 원소(요소)들을 순회할 수 있습니다. 이처럼 for 구문으로 내부 요소를 반복할 수 있는 타입들을 반복가능하다(iterable)고 합니다. 기술적으로 반복가능한 객체는 이터레이터(iterator, 옛날식 표현으로 ‘반복자’)를 만들 수 있는 객체를 말합니다. 그러면 이터레이터란 무엇이냐, next() 함수의 인자로 전달될 수 있는 타입을 말합니다. 반복 가능 객체는 __iter__() 메소드를 구현하여, 이 메소드에서 이터레이터를 리턴하면 되고, 이터레이터는 무엇이 됐든 __next__() 메소드를 구현하면 됩니다. 보통의 반복 가능 객체는 이 두 가지를 모두 구현하는 경우가 많습니다.

class ACountDown:
  def __init__(self, start):
    self.start = start

  def __next__(self):
    if self.start < 0:
      raise StopIteration()  # 반복을 종료해야 할 곳에서는 StopIteration 예외를 일으킨다. 
    r - self.start
    self.start -= 1
    return r

카운트 다운을 하는 이터레이터를 간단하게 구현해보았습니다. 외부에서 next() 함수에 이 이터레이터 객체를 전달하면, 이터레이터 객체의 __next__() 메소드가 호출됩니다. 그래서 매 번 차례에 맞는 원소를 리턴해주는 것입니다. 만약 천만에서부터 카운트 다운을 한다고 하더라도, 정수 천만 개를 메모리에 로드할 필요가 없으니 아주 가볍겠죠?

반복가능 객체는 보통 이터레이터를 리턴만 해주면 됩니다.

class MyCountDown:
  def __init__(self, start):
    self.start = start

  def __iter__(self):
    return ACountDown(self.start)

for x in MyCountDown(10):
  print(x)

# 10
# 9
# ...
# 1
# 0

그리고 보통은 이렇게 두 개의 타입을 각각 작성하기 보다는 하나에 다 작성하는 경우가 더 많습니다. 아무래도 아주 특별한 경우가 아니라면 굳이 이를 나눠서 구현할 필요가 있을까요?

class MyCountDown:
  def __init__(self, start):
    self.r = start + 1

  def __next__(self):
    if self.r < 0:
      raise StopIteration()
    self.r -= 1
    return self.r

  def __iter__(self):
    return self

동기식 for 구문의 비밀은 이것이 끝입니다. 왠지 그 개념상 콜렉션이 아닌 파일 디스크립터 같은 객체도 for 구문에서 사용할 수 있는 것은 이처럼 “반복 가능”에 대한 프로토콜을 준수하기 때문입니다.

비동기 이터레이터

async for 구문은 비동기식 이터레이터를 위한 for 구문일 뿐입니다. 비동기 이터레이터는 __iter__(), __next__() 대신, __aiter__(), __anext__() 를 구현하고 있는 객체인데, __anext__() 가 awaitable입니다. 즉 어떤 값들을 순차적으로 내놓지만, 그 값들이 I/O 작업 등의 과정을 거쳐야하기 때문에 값이 생성되는데 시간이 걸릴 수 있는 것이죠.

class ACrawler:
    def __init__(self, urls: list[str]):
        self.urls = urls
        self.i = 0

    def __aiter__(self):
        self.i = 0
        return self

    async def get_html(self, url):
        res = await asyncio.to_thread(urlopen, url)
        return (url, res[:500])


    async def __anext__(self):
        if self.i >= len(self.urls):
            raise StopAsyncIteration
        self.i += 1
        return await self.get_html(self.urls[self.i - 1])

비동기 이터레이션은 async for 구문이 반복되는 사이, 즉 이터레이터로부터 다음 값을 기다리는 사이에 실행을 중단하고, 스케줄된 다른 비동기 작업으로 전환할 수 있게 해준다는 개념이지, 병렬처리와는 아무런 상관이 없습니다.

그런데 위 코드를 보면, 이런 생각을 할 수도 있습니다. “첫번째 URL을 요청하고 응답을 기다리는 동안, 다음 URL을 요청하는 식으로 한 번에 여러 URL을 요청한 다음 응답이 먼저 오는 것부터 처리하게 되는 게 아니냐”고 생각할 수 있습니다. 이렇게 작동한다면 실질적으로는 여러 URL에 대해서 병렬처리를 하는 것이 되겠지만, 실제로는 이렇게 작동하지 않습니다. async for 구문은 비동기 이터레이터를 기다리는 동안에는 이터레이터의 다음 요소를 요청하지 않습니다. 즉 async for 자체는 하나의 비동기 작업으로 취급되고 루프 외부의 다른 비동기 작업과 같이 작동할 뿐입니다.

get_html() 내에 임의의 시간 동안 대기 후 리턴하게 하는 코드를 추가해서 테스트해보면 결과를 알 수 있습니다.

class ACrawler:
    def __init__(self, urls: list[str]):
        self.urls = urls
        self.i = 0

    def __aiter__(self):
        self.i = 0
        return self

    async def get_html(self, url):
        res = await asyncio.to_thread(urlopen, url)
        await asyncio.sleep(random.random() * 8)
        return (url, res[:500])


    async def __anext__(self):
        if self.i >= len(self.urls):
            raise StopAsyncIteration
        self.i += 1
        return await self.get_html(self.urls[self.i - 1])


async def test():
    urls = ["https://www.python.org", "https://www.google.com", "https://www.bing.com"]
    async for (url, body) in ACrawler(urls):
        print(url)
        print(body)
        print("----")

asyncio.run(test())

asyncio.sleep()등으로 임의 시간 동안 지연을 부해도 결과가 출력되는 순서는 항상 urls의 순서로 일정하게 유지됩니다. 각각의 요청을 동시에 처리하여 빨리 끝나는 것부터 처리하는 동시성 처리의 스타일은 asyncio.as_completed()를 사용해야 합니다.

async def get_html(url):
    res = await asyncio.to_thread(urlopen, url)
    return (url, res.read().decode())

async def test2():
    urls = ["https://www.python.org", "https://www.google.com", "https://www.bing.com"]
    for aw in asyncio.as_completed(map(get_html, url)):
        (url, html) = await aw
        print(url)
        print(html[:500])
        print("----")

보너스

return 대신 yield를 사용하는 제너레이터 함수는 제너레이터 객체를 리턴하는데, 제너레이터 객체 역시 반복가능 프로토콜에 의해 작동합니다. 즉, 모든 제너레이터는 이터레이터입니다.

댓글 남기기