EnsekiTT Blog

EnsekiTTが書くブログです。

Pythonで受信したデータを複数プロセスに分配してみた話

Pythonで受信したデータを複数プロセスに分配してみた話

つまりなにしたの?

TCPで受信したデータを複数プロセスに分配してそれぞれで処理をする

Python3.6を対象にやってみた
https://docs.python.jp/3/library/multiprocessing.html

用意したもの

manage.py : プロセスの起動を行う
tcp_recv.py : TCPの受信用(サーバとして動作する)
process.py : TCPの内容が分配されて、この中で処理が実行される
tcp_send.py : TCPの送信用(クライアントとして動作する)
github.com
↑ぜんぶはこちら

事前準備1: TCP通信を確立する

tcp_send.py
import socket
import sys
import time


HOST, PORT = "localhost", 9999

# Create a socket (SOCK_STREAM means a TCP socket)
def sendnum(target=100):
    count = 0
    for i in range(target):
        data = str(count)
        count += 1
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
            # Connect to server and send data
            sock.connect((HOST, PORT))
            sock.sendall(bytes(data + "\n", "utf-8"))

            # Receive data from the server and shut down
            received = str(sock.recv(1024), "utf-8")

        print("Sent:     {}".format(data))
        print("Received: {}".format(received))
        time.sleep(0.1)


if __name__ == "__main__":
    sendnum()

0.1秒毎にカウントアップする数字を送信しようとする

tcp_recv.py
import socketserver

class MyTCPHandler(socketserver.BaseRequestHandler):

    def putter(self, value):
        for queue in self.server.queues:
            queue.put(value)

    def handle(self):
        # self.request is the TCP socket connected to the client
        self.data = self.request.recv(1024).strip()
        print("{0} wrote: {1}".format(self.client_address[0], int(self.data)))
        value = int(self.data)
        self.putter(value = value)
        # just send back the same data, but upper-cased
        self.request.sendall(("ok").encode())

def OpenRecv(queues):
    HOST, PORT = "localhost", 9999
    # Create the server, binding to localhost on port 9999
    server = socketserver.TCPServer((HOST, PORT), MyTCPHandler)
    server.queues = queues
    # Activate the server; this will keep running until you
    # interrupt the program with Ctrl-C
    server.serve_forever()

受信した数字を渡されたQueueのリストにそれぞれ突っ込む。

事前準備2: 分配先のプロセスを用意する

process.py
def func(name, queue):
    print("Start {0}".format(name))
    while True:
        print("{0}: {1}".format(name, queue.get()))

ここでは、Multiprocessing.Queueのインスタンスを受け取って、
入ってるものをただPrintする。

準備: 受信スレッドとプロセス2つを立ち上げる

manage.py
from multiprocessing import Process, Queue
import threading
import tcp_recv
import process

def main():
    queue1 = Queue()
    queue2 = Queue()
    queues = [queue1, queue2]
    thread = threading.Thread(target=tcp_recv.OpenRecv, args=(queues,))
    p1 = Process(target=process.func, args=("proc1", queue1,))
    p2 = Process(target=process.func, args=("proc2", queue2,))

    thread.start()
    p1.start()
    p2.start()
    input("Running>")
    thread.join()
    p1.join()
    p2.join()


if __name__ == '__main__':
    main()

ここでは、Multiprocessing用のキューを用意して受信スレッドと2つのプロセスを立ち上げている。
止めるときはCtrl+c

実行する

サーバ側
$ python manage.py
Running>Start proc1
Start proc2
127.0.0.1 wrote: 0
proc1: 0
proc2: 0
127.0.0.1 wrote: 1
proc2: 1
proc1: 1
127.0.0.1 wrote: 2
proc1: 2
proc2: 2
127.0.0.1 wrote: 3
proc1: 3
proc2: 3
クライアント側
$ python tcp_send.py
Sent:     0
Received: ok
Sent:     1
Received: ok
Sent:     2
Received: ok
Sent:     3
Received: ok

渡された値がそれぞれのプロセスで処理されていることがわかるのでやりたいことはできた。

クリエイティブ・コモンズ・ライセンス
この 作品 は クリエイティブ・コモンズ 表示 4.0 国際 ライセンスの下に提供されています。