Barrier를 사용한 스레드/프로세스 동기화 예제

Barrier는 스레드를 사용하는 동시성 프로그래밍에서 특정 작업의 시작 시점을 동기화하는 수단으로, 여러 스레드가 특정한 시점까지 서로를 기다리다가 동시에 실행 흐름을 재개할 때 사용할 수 있다. 이는 보통 서버-클라이언트의 역할을 하는 각각의 스레드가 서로의 준비 과정을 기다리다 동시에 시작하도록 할 때 유용하게 사용될 수 있다.

여기서는 간단한 에코 서버-클라이언트를 구현하면서 Barrier를 사용하여 시작 시점을 동기화하는 예를 살펴보자.

먼저 Barrier를 사용하는 방법은 다음과 같다.

  1. threading.Barrier() 를 사용하여 Barrier 객체를 생성한다. 생성시에는 대기해야 할 스레드의 개수를 지정해준다.
  2. 각각의 스레드가 시작된다. 각 스레드에서는 준비 작업을 마치고 베리어의 wait()를 호출한다.
  3. 맨 마지막 스레드가 wait()를 호출하면 배리어가 해제되면서 대기 중이던 모든 스레드가 동시에 시작된다.

먼저 다음과 같이 필요한 모듈을 로드하고, 배리어를 생성한다.

from threading import Thread, Barrier
import socket
import time

# 서버, 클라이언트로 총 2개의 스레드에 대한 배리어를 생성한다.
barrier = Barrier(2)

PORT = 5555

다음은 서버 스레드가 동작할 코드이다. 소켓을 생성하고 바인드 한 후에 barrier.wait()를 호출하여 클라이언트가 시작되기를 기다린다.

def run_server():
  sock = socket.socket()
  sock.bind(('localhost', PORT))
  print('SERVER STARTED AND WAITING')
  barrier.wait()
  print('SERVER RESUMED')
  sock.listen(1)
  conn, addr = sock.accept()
  data = conn.recv(1024)
  print(f'RECEIVED {data!r} FROM {addr}')
  conn.sendall(data)
  conn.close()
  sock.close()

다음은 클라이언트 코드이다. 시작시에 1초를 대기한 후, connect를 하기 직전에 barrier.wait()를 호출한다.

def run_client():
  sock = socket.socket()
  time.sleep(1)
  print('CLIENT IS PREPARED')
  barrier.wait()
  sock.connect(('localhost', PORT))
  sock.sendall(b'hello')
  data = sock.recv(1024)
  print(f'RECEIVED {data!r}')
  sock.close()

이제 두 스레드를 돌려보도록 하자.

if __name__ == '__main__':
  t1 = Thread(target=run_server)
  t2 = Thread(target=run_client)
  t1.start()
  t2.start()
  t1.join()
  t2.join()

서버가 시작되었다는 메시지가 출력되고 잠깐 기다린 후, 클라이언트가 준비됐다는 메시지가 출력된다. 그리고 바로 서버가 재개된다는 메시지가 출력되고 데이터를 주고 받는 것을 확인할 수 있다.

보너스 – 멀티프로세스의 배리어

스레드 기반의 동시성 모듈인 threading에 대응하는 멀티프로세스 모듈로 multiprocessing이 있는데, 이 둘은 거의 동일한 API를 제공한다. 그리고 당연히 multiprocessing에도 Barrier가 존재한다. 물론 멀티프로세스를 사용하는 경우에는 배리어를 사용할 일이 좀 드물기는한데, 어떻게 사용하는지 threading 모듈과는 어떻게 달라지게 되는지 알아보자.

멀티 스레드 환경에서는 주 스레드에서 생성한 barrier를 다른 스레드에서 참조가 가능했다. 따라서 barrier를 전역으로 생성해두고 각각의 스레드에서는 그대로 참조만 하면 됐다. 하지만 멀티 프로세스에서는 새 프로세스를 생성하면, ‘깨끗한’ 파이썬 인터프리터 프로세스가 생성되고, 프로세스를 생성한 현재 모듈이 로드된다. 만약 배리어를 전역으로 설정했다면, 새로운 프로세스에서도 새로운 전역 객체로 배리어가 새로 생성될 것이다. 따라서 서버, 클라이언트 프로세스가 각각의 개별적인 배리어를 기다리는 상황이 되면서 교착 상태에 빠지게 된다.

따라서 약간의 디자인 수정이 필요하다.

  1. 배리어 생성 코드는 if __name__ == '__main__': 절이나 별도의 main() 함수에 들어가야 한다.
  2. 각 프로세스에서 실행될 타깃 함수들은 배리어를 인자로 받고, 인자로 받은 배리어를 사용해야 한다.

서버는 우선 다음과 같은 식으로 변경된다. 배리어를 인자로 받고, 인자로 받은 배리어가 존재하면 wait()를 한다.

def run_server(barrier=None):
    sock = socket.socket()
    sock.bind(('localhost', PORT))
    print(f'SERVER IS WAITING...')
    if barrier:
        barrier.wait()
    print(f'SERVER IS RESUMED...')
    sock.listen(1)
    conn, addr = sock.accept()
    data = conn.recv(1024)
    print(f'RECEIVED {data!r} from {addr}')
    conn.sendall(data)
    conn.close()
    sock.close()

클라이언트 코드도 동일한 방식으로 변경된다.

def run_client(barrier=None):
    sock = socket.socket()
    time.sleep(1)
    print(f'CLIENT IS PREPARED')
    if barrier:
        barrier.wait()
    sock.connect(('localhost', PORT))
    sock.sendall(b'hello')
    data = sock.recv(1024)
    print(data.decode())
    sock.close()

이제 나머지 코드들이다.

from multiprocessing import Process, Barrier
import socket
import time

PORT = 5555

def run_server(barrier=None):
  ...

def run_client(barrier=None):
  ...

def main():
    barrier = Barrier(2)
    p1 = Process(target=run_server, args=(barrier,))
    p2 = Process(target=run_client, args=(barrier,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

if __name__ == '__main__':
    main()