예제 – ZMQ + Asyncio 로 PUSH-PULL 구성

PUSH-PULL 구조를 사용한 분산처리를 구현한 예제를 asyncio 버전으로 재작성해보았다. 벤틸레이터가 보내는 값에 대해 각각의 워커는 그 값에 해당하는 시간만큼 지연시킨 후 싱크에게 짝/홀수 여부값을 전송한다.

워커

import sys
import random
import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

async def run_worker(portA=5556, portB=5557):
    sock_pull = ctx.socket(zmq.PULL)
    sock_pull.connect(f'tcp://localhost:{portA}')
    sock_push = ctx.socket(zmq.PUSH)
    sock_push.connect(f'tcp://localhost:{portB}')

    while True:
        data = await sock_pull.recv_pyobj()
        if isinstance(data, int) or isinstance(data, float):
            sys.stdout.write('*')
            sys.stdout.flush()
            await asyncio.sleep(data * 0.01)
        x = '+' if int(data) % 2 == 0 else '-'
        sys.stdout.write(f'\b{x}')
        sys.stdout.flush()
        await sock_push.send_string(x)

if __name__ == '__main__':
    asyncio.run(run_worker())

싱크

import sys
import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

async def run_sink(port=5557):
    sock = ctx.socket(zmq.PULL)
    sock.bind(f'tcp://*:{port}')

    # WAIT
    amount = int.from_bytes((await sock.recv()), 'big')

    for _ in range(amount):
        result = await sock.recv_string()
        sys.stdout.write(result)
        sys.stdout.flush()

if __name__ == '__main__':
    asyncio.run(run_sink())

벤틀레이터

import random
import sys
import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

async def run_vent():
    sock_push = ctx.socket(zmq.PUSH)
    sock_push.bind('tcp://*:5556')

    sock_cmd = ctx.socket(zmq.PUSH)
    sock_cmd.connect('tcp://localhost:5557')

    input('PRESS ENTER TO START')
    amount = 1_000
    await sock_cmd.send(amount.to_bytes(4, 'big'))
    sock_cmd.close()

    for _ in range(amount):
        v = random.randrange(10, 200)
        await sock_push.send_pyobj(v)

    sock_push.close()
    input('PRESS ENTER TO QUIT')

if __name__ == '__main__':
    asyncio.run(run_vent())