ZMQ의 기본 개념들

일전에 간단하게 ZMQ(Zero MQ)에 대한 내용을 간단히 정리해본 바 있는데, 이 때는 소켓에 대한 내용을 살펴보다가 흘러흘러 닿은 부분이라 제대로 설명하지 못하고 공식문서에 나오는 예제를 그대로 옮기는 수준이었다.  ZMQ는 소켓 프로그래밍 API를 대체할 수 있는 정말 괜찮은 라이브러리라는 생각이 들어서 활용할 폭이 넓다고 판단됐다. 다만 용어나 개념에 대한 약간의 선행지식이 필요한 부분이 있다. 오늘은 ZMQ에서 사용되는 기본적인 개념에 대해서 알아보고, ZMQ를 통해서 간단한 에코서버와 클라이언트로 소켓통신을 구현하는 방법에 대해 살펴보도록 하겠다. 그리고 ZMQ를 사용하면 전통적인 소켓 접속을 구현하는 것보다 얼마나 편하며 또 멋지게 돌아가는지도 살펴보도록 하겠다.

ZMQ란?

  • 코드를 모든 언어, 모든 플랫폼 상에서 연결
  • inproc, IPC, TCP, TIPC, 멀티캐스트를 통해 메시지들을 전달
  • pub-sub, push-pull 그리고 router-dealer와 같은 스마트한 패턴
  • I/O 비동기 엔진을 지원하는 소형 라이브러리
  • 대규모의 활동적인 오픈소스 커뮤니티 지원
  • 모든 현대 언어와 플랫폼을 지원
  • 중앙집중, 분산, 소규모 및 대규모의 모든 아키텍처를 구성 가능
  • 완전한 상업적 지원을 동반하는 무료 소프트웨어

위는 ZMQ 홈페이지에서 소개하는 ZMQ의 특징이다. ZMQ는 제로엠큐라 불리는 분산 메시징 플랫폼을 구현하기 위한 라이브러리이다. 흔히 소켓 통신 구현을 간단히 할 수 있는 라이브러리로 많이 소개되는데, ZMQ는 사실 훨씬 더 많은 것을 간편하게 구축할 수 있으며, 더군다나 빠르게 처리되도록 할 수 있다. ZMQ의 컨셉은 여러 가지 측면을 가지고 있지만, 그중에 가장 근간이 되는 키워드라 함은 ‘여러 노드를 편리하게 이어주는 분산형 메시징 플랫폼’이다.  위의 특징 소개글에서도 약간 엿볼 수 있지만, 다음과 같은 특징을 가진다.

  1. ZMQ는 단순히 소켓과 소켓을 연결하는 라이브러리가 아닌, 노드와 노드를 연결하여 메시지를 주고받을 수 있는 플랫폼을 구축하는데 쓰인다. 이 때 노드는 소켓외 스레드, 프로세스, TCP로 연결된 다른 머신이 될 수 있다. 즉 노드가 어디에 있든지에 대해서 상관하지 않도록 메시징의 양 끝단이 추상화되어 있다.
  2. 전달되는 데이터는 모두 ‘메시지’라는 형태로 불린다. 메시지는 그 속에 무엇이 들어있는지를 신경쓰지 않으며, 여러 언어/플랫폼간에 호환이 가능하도록 길이값+데이터의 형태로 묶여 있다.
  3. ZMQ의 Q는 Queue를 의미한다. 이는 노드들이 연결되어 만들어진 플랫폼 내에서의 모든 메시지 전달은 자동으로 큐를 거치게 된다. 소켓의 경우 서버가 소켓을 열기 전에 클라이언트가 접속하는 상황은 에러가 되지만, ZMQ에서 이러한 순서는 중요하지 않다. 수신측이 없거나 유효하지 않은 메시지는 모두 큐잉되며, 나중에 소비되거나 혹은 버려질 수 있다. 반대로 메시지를 받아들이는 부분에서도 큐가 사용된다. 모든 큐에 관한 관리는 라이브러리가 자동으로 관리하며, 큐 관리에 들어가는 비용은 없다. (그래서 Zero MQ이다.)

컨텍스트 (context)

모든 ZMQ 관련 코드는 컨텍스트 객체를 만드는 것으로 시작하고, 모든 ZMQ소켓은 컨텍스트를 통해서 생성된다. 소켓을 생성하고 관리하는 컨테이너이자 관리자이다. 컨텍스트는 스레드 안전하며, 프로세스 내에서 하나만 생성하고 사용하여야 한다.

컨텍스트를 통해서 생성되는 소켓에서 발생하는 입출력은 실제로는 백그라운드 스레드에서 비동기로 일어나며, 이 과정은 전부 컨텍스트에 의해서 처리된다. 컨텍스트로부터 명시적으로 생성되는 소켓은 실제로는 소켓이 아니며, 즉시 생성되지도 않는다. 모든 것은 내부에서 자동으로 관리된다.

소켓

이 글에서 아무런 단서 없이 언급되는 ‘소켓’이라는 명칭은 ZMQ 내에서의 소켓을 말한다. 전통적인 네트워킹에 사용되는 UNIX 소켓은 “UNIX 소켓” 혹은 “전통적인 소켓”이라고 언급할 것이다. 이와 같은 명명 관습은 ZMQ를 다루는 이 글과 그 후속글에서 공통적으로 적용할 예정이다.

전통적인 UNIX의 소켓은 네트워크 포트를 추상화한 것이다. 이에 빈해 ZMQ의 소켓은 메시지를 전달하고 받는 창구의 개념이다. UNIX 소켓은 수신측인 상대편 노드가 열려있는 소켓이거나 혹은 상대방 소켓이 이쪽으로 붙어서 연결되었음을 상정한다. (예를 들어 클라이언트쪽에서 열려있지 않은 서버의 소켓에 접속하려 시도하면 강제로 연결이 끊기면서 에러가 발생한다.) 이에 반해 ZMQ의 소켓은, 컨텍스트 내에서 생성되는 소켓의 재추상화된 버전이다. 소켓은 TCP등의 프로토콜을 사용하는 전통적인 UNIX 소켓일 수도 있으며, 상대방 노드의 종류에 따라 inproc(동일 프로세스 내에서 스레드간 통신을 위한 규격) 혹은 IPC(동일 시스템 내에서 프로세스간 통신을 위한 규격)일 수 있다. 실제 물리적인 소켓은 필요에 따라 생성된다.

실제로 소켓은 UNIX 소켓과 1:1로 대응하지 않는다. 우리가 바라볼 때 소켓의 뒤에는 수신자가 아닌 메시지를 주고 받기 위한 큐가 있을 뿐이다. 기술적으로 ZMQ에서는 하나의 소켓을 여러 개의 포트에 바인딩하거나, 여러 노드에 connect 할 수 있다. (이 경우에 송신할 때나 수신할 때, 여러 포트/노드로 번갈아가며 공평하게 하나씩 전송하거나 전송받는다.)

다음은 간단한 ZMQ에서의 소켓을 이용한 에코서버/클라이언트 구현이다. 컨텍스트가 어떤 소켓을 사용하고 어떤 세팅을 사용할 것인지에 대해서 우리는 큰 고민을 할 필요가 없으며, 몇 가지 핵심적인 코어 메시지 패턴 중에서 하나를 선택하면 된다. 우리가 만들려고 하는 에코 서버/클라이언트는 클라이언트가 서버에 요청을 보내고, 그 요청에 대한 응답을 받는 식으로 서로 한 걸음씩 짝을 맞추어 커뮤니케이션하는 패턴을 갖는다. 이를 ZMQ에서는 REQ-REP 패턴이라고 한다. 이 패턴에 기초해서 소켓을 만들고, 사용한다.

## echo-server.py
## 서버측 코드
import zmq, time

## 컨텍스트를 생성한다.
ctx = zmq.Context()

def run_server():
  ## zmq 소켓을 만들고 포트에 바인딩한다.
  sock = zmq.socket(zmq.REP)
  sock.bind('tcp://*:5555')
  while True:
    ## 소켓을 읽고 그 내용을 출력한 후 다시 되돌려 준다.
    msg = sock.recv()
    print(f'Recieved: {msg.decode()}')
    time.sleep(1)
    sock.send(msg)

run_server()

----
## echo-client.py
## 클라이언트 코드

import zmq, sys

## 동일하게 컨텍스트를 만들고 
ctx = zmq.Context()

def run_client(port=7777):
  ## 컨텍스트로부터 소켓을 만들고 서버에 연결한다.
  sock = ctx.socket(zmq.REQ)
  sock.connect(f'tcp://localhost:{port}')
  while True:
    ## 키보드로부터 입력받은 내용을 서버로 전송하고,
    ## 다시 서버의 응답을 출력한다.
    ## bye를 메시지로 보내고 받으면 소켓을 닫는다.
    line = input()
    sock.send(line.encode())
    rep = sock.recv()
    print(f'Reply: {rep.decode()}')
    if rep.decode() == 'bye':
      sock.close()
      break

port = sys.argv[1] if len(sys.argv) > 1 else 7777
run_client(port)

이 코드들은 전통적인 소켓을 이용한 에코서버 구현보다 훨씬 간단하다.

  1. 소켓을 생성하는데에는 어떤 패밀리나 타입 정보도 필요하지 않다. 단지 컨텍스트에게 메시지 패턴 정보만 알려줄 뿐이다.
  2. 바인드하는 액션외에 listen()이나 accept() 같은 작업은 필요하지 않다. 실제로 포트를 듣고, 연결을 수락하는 물리적인 포트에 대한 추상화된 객체가 존재할 것이나, 이 모든것은 컨텍스트 내부에 있으며, 자동으로 관리된다.
  3. 파이썬의 표준 소켓 api와 비교했을 때, sendall()은 존재하지 않는다. 모든 메시지는 마치 전체가 한 덩어리로 보내지는 것처럼 보인다.

실제 실행에서도 몇 가지 차이를 보이는데, 우선 여느 소켓 서버-클라이언트 예제와 달리 이 파일들에 대해서는 클라이언트를 먼저 실행해도 아무런 에러 없이 실행된다. 또한 클라이언트를 2개 이상 실행했을 때 동시 접속이 되는 것처럼 동작한다. 이는 소켓이 상대방 노드가 아니라 메시지 큐에 맞닿아있고, 큐를 통해 들어오는 메시지를 처리하기 때문에 가능한 일이다.

ZMQ의 소켓은 물리적인 포트에 묶이는 UNIX 소켓이 아니라고 했다. 따라서 메시징 타입이 같다면, 하나의 소켓이 (이미 바인딩되지 않은) 다른 포트들에 멀티로 바인딩되는 것도 가능하다.

sock.bind('tcp://*:5555')
sock.bind('tcp://*:7777')

이렇게 동일한 소켓 하나가 두 개 이상의 포트에 묶이도록 호출하는 것이 아무런 문제가 되지 않으며, 클라이언트를 쪼개서 각각 다른 포트들에 접속하도록 했을 때에도 서버 하나로 동작하는게 가능하다.

메시지

ZMQ 소켓을 통해서 주고 받는 데이터는 이진 raw 데이터 스트림이 아니다. ZMQ는 ‘메시지’라 불리는 길이값과 데이터를 결합한 단위의 데이터 타입을 주고 받는다. 따라서 recv()send()에서 별다른 인자 없이 데이터 전체를 하나의 메소드 호출로 주고 받을 수 있게 한다. 또한 이는 문자열을 다루는 매커니즘이 서로 다른 언어 구현간에 발생할 수 있는 문자열 데이터 전달을 처리하는 좋은 돌파구가 된다. 적어도 파이썬을 쓰는 한, 메시지에 대해서는 별다른 처리가 필요하지 않을 것이기 때문에 일단은 넘어가자.

메시지 패턴

컨텍스트가 어떻게 소켓과 각 소켓의 입출력 동작을 마법처럼 관리할 수 있을까? 사실 컨텍스트는 메시지 패턴에 따라서 사용되어야 할 소켓이 어떻게 동작해야 할 것인지를 판단할 수 있다. 메시지 패턴은 두 노드 혹은 여러 노드간에서 메시지를 주고 받는 방향성과 흐름을 정의하는 방법이다. 네트워크 구성 노드들의 연결 방식에 따라서 전체 네트워크의 위상이 달라질 수 있겠지만, ZMQ는 하나의 소켓이 다른 소켓 혹은 소켓들과 어떻게 동작해야 하는지를 몇 개의 패턴으로 구분하고, 그에 맞게 최적화된다. 실질적으로는 매우 많은 패턴들이 있을 수 있겠지만, ZMQ는 몇 개의 기본적인 패턴들을 정의하고 있다.

  1. REQuest – REPly 패턴 : 서버-클라이언트가 각각 한 번씩의 요청과 응답을 주고 받는다.
  2. PUBlisher – SUBscriber 패턴 : 서버가 발행하는 메시지가 각각의 클라이언트로 분산되어 전파된다.
  3. Pipleline : PUSHPULL 패턴의 소켓이 연결되어 단일 방향으로 메시지를 개별전송한다.

이외에도 메시징 네트워크를 확장하기 위한 프록시나 Poller와 같은 디바이스 몇 가지가 정의되어 있다. 분명한 것은 ZMQ는 그렇게 많지 않은 패턴을 제공하고 있음에도 불구하고 매우 다양한 구조를 손쉽게 만들 수 있다는 점이다. 그것은 네트워크 내의 각 노드사이에서 데이터나 신호가 흘러가는 흐름을 디자인하기에 달려 있으며, 우리는 ZMQ를 이용하여 가장 쉽게 할 수 있는 것 (바로 두 노드 간의 통신을 구축하는 것)을 사용하여 얼마든지 다방향 네트워크를 구축할 수 있다는 점이다.

스레드를 이용한 데몬 만들기 – Python

이 블로그를 통해서 파이썬에서의 병렬처리에 대해서는 명시적으로 threading.Thread 대신에 concurrent.futures 에서 제공하는 API를 사용할 것을 여러 차례 권장해 왔다. 여기서 주목할 것은 바로 “병렬처리”라는 조건이다. 즉 concurrent.futures의 API는 일련의 데이터에 대해서 동일한 처리를 하려할 때, 이 “동일한 처리”를 여러 스레드 혹은 프로세스로 나눠서 동시에 진행하는 상황에 어울리는 기능이다. 하지만 실제 상황에서는 동시에 서로 다른 작업이 진행되어야 하는 경우가 존재한다.주로 메인 스레드와 백그라운드 스레드 (혹은 작업 스레드)에서 하는 일이 서로 다른 경우에 이러한 패턴이 필요할 수 있다.

이번 글에서는 파이썬의 threading 모듈을 사용해서 멀티스레드 프로그램을 어떻게 쉽게 작성하는지에 대해서 예제와 같이 살펴보도록 하겠다.

먼저 어떤 파일을 하나 열어서 한줄씩 읽는 프로그램을 생각해보자. 보통 파일을 열고 file.readline()을 사용했을 때, 파일의 마지막 라인을 읽은 후라면 None이 리턴되며 더 이상 내용을 읽지 않는다. 하지만 우리 컴퓨터에는 로그 파일과 같이 지속적으로 뭔가 내용이 계속 늘어나는 파일들이 있다. 따라서 파일을 열어 놓고 특정 시간 주기마다 파일로부터 한줄씩 읽어서 그 내용을 처리하는 프로그램을 작성한다고 생각해보자.

import time

def process_line(l):
  print(l)

with open('message.log') as f:
  while True:
    line = f.readline()
    if not line:
      time.sleep(0.5)
      continue
    process_line(line)

위 코드는 이렇게 동작한다.

  1. 파일 하나를 읽기 모드로 열었다.
  2. 한 줄씩 읽어서 process_line() 함수로 보내어 처리한다.
  3. 만약 마지막 줄을 읽은 후 더 읽을 내용이 없다면 0.5초 후에 재시도한다.

이 프로그램은 분명 무한루프를 돌게되지만, CPU 사용량이 치솟거나 하지는 않는다. 0.5초의 대기시간은 CPU 입장에서 봤을 때에는 무척이나 길고 지루한 시간이다. (쿨타임을 0.1초로 잡아도 마찬가지다.)

다음과 같이 빈 파일을 만든 후, 파이썬 코드를 백그라운드에서 실행해놓자. 이 상태에서 파일에 메시지를 추가하면 곧 추가된 라인들이 출력되는 것을 알 수 있다. (백그라운드 실행은 윈도 명령줄에서는 지원되지 않는다. 윈도에서 따라해본다면, 터미널 창을 2개 열어서 한쪽은 파이썬을 실행하고 다른 한쪽은 echo 명령으로 파일에 라인을 추가해보면 된다.)

$ touch message.log
$ python sample.py&
$ echo "hello" >> message.log

자 이 프로그램의 처리흐름을 다시 살펴보도록하자.  먼저 프로그램은 기본적으로 지정된 파일을 열어서 한 줄씩 읽고 그 내용을, 내용 처리를 담당하는 별도의 함수 (여기서는 데이터를 다루는 역할을 하고 있으니 ‘핸들러’라고 하자)로 넘겨준다. 핸들러 함수의 동작이 끝나면 다시 그 다음 줄을 읽는 식으로 루프를 도는데, 더 이상 읽어들일 내용이 없으면 지정된 시간(0.5초)만큼 기다린 후에 재시도 한다.

이 때 핸들러 함수의 실행에 필요한 시간이 매우 짧다면 문제가 되지 않겠지만 항상 그러하지는 못할 것이다. 하지만 텍스트 파일의 내용이 특정한 URL이고 핸들러가 해당 URL의 데이터를 다운로드 받는다면 어떨까? 위 예제의 구조에서는 핸들러는 블럭킹 함수로 동작하므로 현재 데이터를 처리하는 동안에는 추가된 데이터가 계속 남아있으리라고는 보장할 수 없다. 왜냐하면 해당 파일은 계속 내용이 추가되기 보다는 별도의 에디터에서 추가되었던 내용이 삭제처리 될 수도 있기 때문이다. 다음 코드와 그 실행 시나리오를 생각해보자.

from urllib.request import rulopen
import time

def handler(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(urlopen(url))

with open('inputs.txt') as f:
  while True:
    line = f.readline()
    if not line:
       time.sleep(0.1)
       continue
    handler(*(line.split()[:2]))

위에서 소개한 코드와 완전히 동일한데, 한가지 차이는 핸들러가 URL요청과 디스크IO를 수행하는 제법 시간이 오래걸릴 수 있는 작업이라는 점에서 차이가 있다. inputs.txt의 파일의 내용이 실시간으로 편집되고 있고, 네트워크 사정이 그리 좋지 못하다면 다음의 시나리오가 가능해질 수 있다.

  1. 파일에 데이터 A, B, C가 추가됨
  2. 프로그램은 데이터 A를 처리함 (그리고 처리되는 동안 블럭됨)
  3. 그 사이에 외부 프로그램에 의해서 데이터 B, C가 제거됨
  4. A를 처리하고 난 후 파일을 읽으려 시도하면 더 이상 남은 데이터가 없거나 세로운 데이터 D, E 등을 처리하게 됨

이 시나리오에서는 핸들러가 블럭킹으로 동작하는 동안 이후에 추가된 데이터를 확보하지 못한다는 문제가 있다. 이를 해결하기 위해서는 핸들러가 논블럭킹(non-blocking)으로 동작해야 할 필요가 있다. 즉 핸들러 함수를 메인 스레드가 아닌 별개의 스레드에서 실행하게하면 핸들러를 호출하는 구문은 즉시 다름 라인으로 진행이 가능해진다.

threading 모듈

threading 모듈은 파이썬에서 스레드를 사용할 수 있게 해주는 API를 담고 있다. 스레드를 위한 클래스인 Thread와 여러 가지 동기화 도구인 Lock, RLock, Condition, Semaphore 등이 지원된다. 스레드를 사용하는 방법에 대해서 오랜된 문서나 책에서는 Thread 클래스를 오버라이드하여 사용하는 방법을 소개하고 있는데, 스레드를 통한 분기가 함수의 실행단위로 쓰여진다면 굳이 번거롭게 오버라이드를 할 필요는 없다.

Thread 객체를 생성할 때 target= 이라는 인자가 있다. 이 인자로 백그라운드 스레드에서 실행할 함수를 넘겨주면 해당 함수가 별도의 스레드에서 실행될 수 있다. 위의 ‘파일 처리’ 코드를 스레드를 사용하는 방식으로 수정해보면 다음과 같이 작성될 수 있다. 시나리오를 잠시 수정해보자면, 파일에는 한 줄에 공백으로 구분되는 URL과 파일명이 들어온다. 우리의 프로그램은 각 라인을 읽어서 주어진 URL의 데이터를 주어진 파일 이름으로 저장하는 동작을 처리할 것이다.

from threading import Thread
from urllib.request import urlopen
import time

## 1
def hanlder(url, filename):
  with urlopen(url) as res:
    with open(filename, 'wb') as f:
      f.write(res.read())

## 2
with open('inputs.txt') as f:
  f.seek(2) ## 3
  while True:
    line = f.readline()
    if not line: ## 4
      time.sleep(0.1)
      continue
    data = line.split()[:2] 
    t = Thread(target=handler, args=data) ## 5
    t.start() ## 6

그다지 복잡한 코드는 아니다. 코드는 다음과 같이 설명된다.

  1. handler 함수는 URL과 파일이름을 인자로 받아서 주어진 URL의 콘텐츠를 다운로드 받아 해당 파일 이름으로 저장한다.
  2. 파일의 내용을 조사하는 루프를 시작한다.
  3. 현재 시점까지의 파일의 내용은 무시해야 할 것이므로 seek(2)를 호출해서 파일 스트림의 끝으로 커서를 옮긴다.
  4. 새로운 라인이 없으면 0.1초를 대기한다.
  5. 새로운 스레드에서 핸들러 함수를 실행하도록 설정한다.
  6. 만들어진 스레드를 실행한다.

과도한 스레드 생성을 피하는 법

위 코드는 다시 새로운 문제를 야기한다. 스레드를 만든 후 시작하는 t.start() 부분은 논블럭킹이기 때문에 호출하게 되면 새로운 스레드에서 네트워크 요청을 생성하는 일련의 작업을 처리하는데, 이 작업의 완료여부와 별개로 스레드가 시작되면 t.start()는 그 즉시 리턴된다. 따라서 inputs.txt 파일에 새로운 입력이 수십~수백개가 쏟아져 들어갔다면 이 프로그램은 그만큼 많은 스레드를 생성하면서 전체적으로 매우 느려지게 될 것이다.

이 상황에서 어쩌면 누군가는 세마포어같은 동기화 도구를 생각할지도 모르겠다. 혹은 스레드 풀을 구현하여 한 번에 동작하는 스레드의 수를 제한할 수도 있을 것이다. 하지만 이때 가장 중요한 것은 ‘입력으로 들어온 데이터를 나중에 처리하더라도 잃지는 말자’는 것이다. 따라서 가장 간단히 처리하는 방법은 큐를 사용하는 것이다.

  1. 파일로부터 데이터를 입력받는 무한루프는 계속 파일을 체크하여, 추가로 입력된 데이터가 있으면 큐에 넣는다.
  2. 큐를 watching하는 무한루프는 큐에 들어온 데이터를 순서대로 꺼내어 하나씩 처리한다.

여기서 중요한 점은 두 개의 무한루프가 독립적으로 돌아간다는 것이다. 이를 위해서는 둘 중의 하나의 무한루프는 별도의 스레드에서 돌릴 필요가 있다. 그리고 두 스레드 사이의 데이터 교환은 동기화된 상태여야 한다. (한쪽에서 큐에서 데이터를 빼는 동시에 데이터를 집어넣어서는 곤란하다.) 스레드 사이의 동기화된 데이터 교환은 queue 모듈 내의 Queue를 사용하면 된다.

또한 별도 스레드에서의 동작이 무한루프가 되어야 하므로 해당 스레드는 ‘데몬’으로서 동작해야 한다. 데몬은 그 수명이 메인 스레드에 의존적인 스레드로, 그 자신이 무한루프를 돌고있다 하더라도 메인스레드가 종료되면 (즉 프로세스가 종료되면) 자동으로 종료되면서 리소스를 반납한다. 스레드를 데몬으로 만들고 싶다면 Thread 를 생성할 때 daemon=True 옵션을 지정하면 된다.

위 구조에서 어떤 것이 메인 스레드가 되고 어떤 것이 데몬이 될지는 본인의 판단에 달렸다. 여기서는 데이터 핸들링을 메인스레드에서 하고 데이터 마이닝을 데몬에서 수행하도록 구조를 만들어 보도록 하자.

아래는 최종 코드이며, 이 프로그램은 2개의 스레드를 써서 파일 내용을 감시하고, 추가 입력된 데이터를 처리한다.

from threading import Thread
from queue import Queue
from urllib.request import urlopen
from time import sleep

queue = Queue()

def daemon(filename):
  with open(filename) as f:
    f.seek(2)
    while True:
      line = f.readline()
      if not line:
        sleep(0.5)
        continue
      data = line.split()[:2]
      queue.put(data)

def mainLoop():
   while True:
     url, filename = queue.get()
     with urlopen(url) as res:
       with open(filenamem, 'wb') as f:
         f.write(res.read())

if __name__ == '__main__':
   t = Thread(target=daemon, args=('inputs.txt',), daemon=True)
   t.start()
   mainLoop()

정리

프로그램 내에서 두 개 이상의 무한루프가 독립적으로 동작하게 하려면 각각의 루프를 서로 다른 스레드에서 돌려야 한다. 이 때 프로세스 종료 시 작업 스레드들을 안전하게 종료하기 위해서는 이들 스레드는 생성 시 데몬으로 생성한다. 또한 보통 데몬을 사용하는 경우에는 한쪽 스레드에서 수집한 데이터를 다른 스레드(들)에게 넘겨주어야 할텐데, 이 때는 동기화된 큐를 쓰는 것이 권장되며 이 방법이 가장 간편하기도 하다.

더보기

threading 모듈을 통해서 멀티스레드를 사용하는 대표적인 예는 소켓통신의 다중접속 구현일 것이다. 소켓으로부터 접속을 받아들이는 코드는 보통 다음과 같은 모양이다.

sock = ... # 소켓 초기화 및 설정
sock.listen(host, port)
while True:
  conn, client = sock.accept() # 1
  ## 2
  recivedData = conn.recv(1024)
  ## ... 데이터를 처리하고 응답을 보내기

위 코드 중에서 sock.accept()에 의해서 리턴되는 값 중 첫번째 인자는 연결을 표현하는 소켓객체이다.  이 단일 루프 구조는 소켓이 연결을 기다리고, 연결을 받아들이면 해당 연결로부터 송수신을 처리하고 다시 루프의 시작으로 돌아가서 새로운 연결을 기다린다. 즉 한 번에 하나의 연결만 허용하는 구조가 된다. 하지만 이것은 소켓의 구조적인 한계라기보다는 그냥 코드가 선형으로 루프를 돌기 때문이다. 다음와 같이 처리한다면 어떨까?

while True:
  conn, client = sock.accept()
  t = Thread(target=handler, args=(conn, client), daemon=True)
  t.start()

위 코드에서는 서버 소켓이 연결을 받을 때마다 접속 소켓과 접속 정보를 핸들러에게 넘기고, 핸들러는 새로운 스레드에서 돌아가게 된다. 그리고 그 즉시 메인 루프는 다시 처음으로 돌아가서 새로운 연결을 기다릴 수 있다. 이 말은 방금 전에 접속한 소켓의 통신을 별도 스레드에서 처리하도록 하고 (소켓 통신을 주고/받기의 반복으로 돌아가는 일종의 무한 루프(최소한 끝이 정해지지 않은)라 볼 수 있으므로) 개별 접속 소켓의 처리를 데몬으로 만들어주는 것이다.

파이썬 소켓 연결 사용법

네트워크 프로그래밍 분야에서 소켓은 연결된 네트워크의 양 끝단을 추상화 시킨 개념이며, 컴퓨터의 관점에서는 네트워크로 통하는 컴퓨터의 외부와 컴퓨터 내부의 프로그램을 이어주는 인터페이스이다. 소켓의 개념에 대해서 이 글에서 모두 소상히 설명할 수는 없고, 네트워크를 통해서 바이트스트림을 주고 받을 수 있는 창구라 보면 된다. 다만 단순히 프로그램의 내부와 외부를 잇는 표준 입출력과는 달리 소켓은 네트워크의 반대편이 어디인지에 대한 정보를 가지고 있다. 즉 우리가 택배를 보낼 때 박스에 물건을 넣고 받는 사람 주소를 쓰는 것과 비슷하게 소켓은 어디로 보내지는 창구라는 것이 명시된 택배 상자 같은 것이다.

파이썬의 socket 모듈은 소켓 프로그래밍에 필요한 시스템 콜을 래핑하는 API를 제공하는 모듈이다.  소켓 통신을 위해서 물론 소켓을 생성해서 사용하는데, 서버와 클라이언트일 때가 조금 다르다.

공통

소켓을 생성하기

socket.socket() 함수를 이용해서 소켓 객체를 생성할 수 있다. 서버든 클라이언트등 동일하게 소켓을 이용한 네트워킹을 하기 위해서는 소켓을 먼저 생성할 필요가 있다. 이 함수는 두 가지 인자를 받는데, 하나는 패밀리이고 다른 하나는 타입이다.

  1. 패밀리: 첫번째 인자는 패밀리이다. 소켓의 패밀리란, “택배상자에 쓰는 주소 체계가 어떻게 되어 있느나”에 관한 것으로 흔히 AF_INET, AF_INET6를 많이 쓴다. 전자는 IP4v에 후자는 IP6v에 사용된다. 각각 socket.AF_INET, socket.AF_INET6로 정의되어 있다.
  2. 타입: 소켓 타입이다. raw 소켓, 스트림소켓, 데이터그램 소켓등이 있는데, 보통 많이 쓰는 것은 socket.SOCK_STREAM 혹은 socket.SOCK_DGRAM이다.

가장 흔히 쓰이는 socket.AF_INET, socket.SOCK_STREAM 조합은 socket.socket()의 인자 중에서 family=, type=에 대한 기본 인자값이다. 따라서 이 타입의 소켓을 생성하고자 하는 경우에는 인자 없이 socket.socket()만 써도 무방하다. 파이썬 소켓 연결 사용법 더보기