PUSH-PULL 구조를 사용한 분산처리를 구현한 예제를 asyncio 버전으로 재작성해보았다. 벤틸레이터가 보내는 값에 대해 각각의 워커는 그 값에 해당하는 시간만큼 지연시킨 후 싱크에게 짝/홀수 여부값을 전송한다.
워커
import sys
import random
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
async def run_worker(portA=5556, portB=5557):
sock_pull = ctx.socket(zmq.PULL)
sock_pull.connect(f'tcp://localhost:{portA}')
sock_push = ctx.socket(zmq.PUSH)
sock_push.connect(f'tcp://localhost:{portB}')
while True:
data = await sock_pull.recv_pyobj()
if isinstance(data, int) or isinstance(data, float):
sys.stdout.write('*')
sys.stdout.flush()
await asyncio.sleep(data * 0.01)
x = '+' if int(data) % 2 == 0 else '-'
sys.stdout.write(f'\b{x}')
sys.stdout.flush()
await sock_push.send_string(x)
if __name__ == '__main__':
asyncio.run(run_worker())
싱크
import sys
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
async def run_sink(port=5557):
sock = ctx.socket(zmq.PULL)
sock.bind(f'tcp://*:{port}')
# WAIT
amount = int.from_bytes((await sock.recv()), 'big')
for _ in range(amount):
result = await sock.recv_string()
sys.stdout.write(result)
sys.stdout.flush()
if __name__ == '__main__':
asyncio.run(run_sink())
벤틀레이터
import random
import sys
import asyncio
import zmq
import zmq.asyncio
ctx = zmq.asyncio.Context()
async def run_vent():
sock_push = ctx.socket(zmq.PUSH)
sock_push.bind('tcp://*:5556')
sock_cmd = ctx.socket(zmq.PUSH)
sock_cmd.connect('tcp://localhost:5557')
input('PRESS ENTER TO START')
amount = 1_000
await sock_cmd.send(amount.to_bytes(4, 'big'))
sock_cmd.close()
for _ in range(amount):
v = random.randrange(10, 200)
await sock_push.send_pyobj(v)
sock_push.close()
input('PRESS ENTER TO QUIT')
if __name__ == '__main__':
asyncio.run(run_vent())