Pythonで受信したデータを複数プロセスに分配してみた話
つまりなにしたの?
TCPで受信したデータを複数プロセスに分配してそれぞれで処理をする
Python3.6を対象にやってみた
https://docs.python.jp/3/library/multiprocessing.html
用意したもの
manage.py : プロセスの起動を行う
tcp_recv.py : TCPの受信用(サーバとして動作する)
process.py : TCPの内容が分配されて、この中で処理が実行される
tcp_send.py : TCPの送信用(クライアントとして動作する)
github.com
↑ぜんぶはこちら
事前準備1: TCP通信を確立する
tcp_send.py
import socket import sys import time HOST, PORT = "localhost", 9999 # Create a socket (SOCK_STREAM means a TCP socket) def sendnum(target=100): count = 0 for i in range(target): data = str(count) count += 1 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: # Connect to server and send data sock.connect((HOST, PORT)) sock.sendall(bytes(data + "\n", "utf-8")) # Receive data from the server and shut down received = str(sock.recv(1024), "utf-8") print("Sent: {}".format(data)) print("Received: {}".format(received)) time.sleep(0.1) if __name__ == "__main__": sendnum()
0.1秒毎にカウントアップする数字を送信しようとする
tcp_recv.py
import socketserver class MyTCPHandler(socketserver.BaseRequestHandler): def putter(self, value): for queue in self.server.queues: queue.put(value) def handle(self): # self.request is the TCP socket connected to the client self.data = self.request.recv(1024).strip() print("{0} wrote: {1}".format(self.client_address[0], int(self.data))) value = int(self.data) self.putter(value = value) # just send back the same data, but upper-cased self.request.sendall(("ok").encode()) def OpenRecv(queues): HOST, PORT = "localhost", 9999 # Create the server, binding to localhost on port 9999 server = socketserver.TCPServer((HOST, PORT), MyTCPHandler) server.queues = queues # Activate the server; this will keep running until you # interrupt the program with Ctrl-C server.serve_forever()
受信した数字を渡されたQueueのリストにそれぞれ突っ込む。
事前準備2: 分配先のプロセスを用意する
process.py
def func(name, queue): print("Start {0}".format(name)) while True: print("{0}: {1}".format(name, queue.get()))
ここでは、Multiprocessing.Queueのインスタンスを受け取って、
入ってるものをただPrintする。
準備: 受信スレッドとプロセス2つを立ち上げる
manage.py
from multiprocessing import Process, Queue import threading import tcp_recv import process def main(): queue1 = Queue() queue2 = Queue() queues = [queue1, queue2] thread = threading.Thread(target=tcp_recv.OpenRecv, args=(queues,)) p1 = Process(target=process.func, args=("proc1", queue1,)) p2 = Process(target=process.func, args=("proc2", queue2,)) thread.start() p1.start() p2.start() input("Running>") thread.join() p1.join() p2.join() if __name__ == '__main__': main()
ここでは、Multiprocessing用のキューを用意して受信スレッドと2つのプロセスを立ち上げている。
止めるときはCtrl+c
実行する
サーバ側
$ python manage.py Running>Start proc1 Start proc2 127.0.0.1 wrote: 0 proc1: 0 proc2: 0 127.0.0.1 wrote: 1 proc2: 1 proc1: 1 127.0.0.1 wrote: 2 proc1: 2 proc2: 2 127.0.0.1 wrote: 3 proc1: 3 proc2: 3
クライアント側
$ python tcp_send.py Sent: 0 Received: ok Sent: 1 Received: ok Sent: 2 Received: ok Sent: 3 Received: ok
渡された値がそれぞれのプロセスで処理されていることがわかるのでやりたいことはできた。