Python - 进程相互通信



进程相互通信是指进程之间的数据交换。为了开发并行应用程序,有必要在进程之间交换数据。下图显示了多个子进程之间同步的各种通信机制 -

Intercommunication

各种通信机制

在本节中,我们将了解各种通信机制。机制如下所述 -

队列

队列可以与多进程程序一起使用。multiprocessing 模块的 Queue 类类似于 Queue.Queue 类。因此,可以使用相同的 API。多处理。Queue 为我们提供了一种线程和进程安全的 FIFO (先进先出) 进程之间通信机制。

下面是一个摘自 python 官方文档 multiprocessing(多处理) 的简单示例,用于理解 multiprocessing 的 Queue 类的概念。


from multiprocessing import Process, Queue
import queue
import random
def f(q):
	 	q.put([42, None, 'hello'])
def main():
	 	q = Queue()
	 	p = Process(target = f, args = (q,))
	 	p.start()
	 	print (q.get())
if __name__ == '__main__':
	 	main()

输出

[42, None, 'hello']

管道

它是一种数据结构,用于多进程程序中的进程之间进行通信。Pipe() 函数返回一对由管道连接的连接对象,默认情况下为 duplex(双向)。它的工作方式如下 -

  • 它返回一对表示管道两端的连接对象。
  • 每个对象都有两个方法 – send() recv() ,用于在进程之间进行通信。

下面是一个摘自 python 官方文档的 multiprocessing(多处理) 的简单示例,用于理解 multiprocessing 的 Pipe() 函数的概念。


from multiprocessing import Process, Pipe

def f(conn):
	 	conn.send([42, None, 'hello'])
	 	conn.close()

if __name__ == '__main__':
	 	parent_conn, child_conn = Pipe()
	 	p = Process(target = f, args = (child_conn,))
	 	p.start()
	 	print (parent_conn.recv())
	 	p.join()

输出

[42, None, 'hello']

Manager

Manager 是一类多处理模块,它提供了一种在所有用户之间协调共享信息的方法。管理器对象控制服务器进程,该进程管理共享对象并允许其他进程处理它们。换句话说,管理器提供了一种创建可在不同流程之间共享的数据的方法。以下是管理器对象的不同属性 -

  • manager 的主要属性是控制管理共享对象的服务器进程。
  • 另一个重要属性是,当任何进程修改所有共享库时,更新所有共享库。

下面是一个示例,它使用 manager 对象在服务器进程中创建列表记录,然后在该列表中添加新记录。


import multiprocessing

def print_records(records):
	 	for record in records:
	 	 	 print("Name: {0}\nScore: {1}\n".format(record[0], record[1]))

def insert_record(record, records):
	 	records.append(record)
	 	 	 print("A New record is added\n")

if __name__ == '__main__':
	 	with multiprocessing.Manager() as manager:

	 	 	 records = manager.list([('Computers', 1), ('Histoty', 5), ('Hindi',9)])
	 	 	 new_record = ('English', 3)

	 	 	 p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
	 	 	 p2 = multiprocessing.Process(target = print_records, args = (records,))
		 	 	p1.start()
	 	 	 p1.join()
	 	 	 p2.start()
	 	 	 p2.join()

输出

A New record is added

Name: Computers
Score: 1

Name: Histoty
Score: 5

Name: Hindi
Score: 9

Name: English
Score: 3

Manager 中命名空间的概念

Manager Class 带有命名空间的概念,这是一种在多个进程之间共享多个属性的快速方法。命名空间没有任何可以调用的公共方法,但它们具有可写属性。

以下 Python 脚本示例可帮助我们利用命名空间在主进程和子进程之间共享数据 -


import multiprocessing

def Mng_NaSp(using_ns):

	 	using_ns.x +=5
	 	using_ns.y *= 10

if __name__ == '__main__':
	 	manager = multiprocessing.Manager()
	 	using_ns = manager.Namespace()
	 	using_ns.x = 1
	 	using_ns.y = 1

	 	print ('before', using_ns)
	 	p = multiprocessing.Process(target = Mng_NaSp, args = (using_ns,))
	 	p.start()
	 	p.join()
	 	print ('after', using_ns)

输出

before Namespace(x = 1, y = 1)
after Namespace(x = 6, y = 10)

ctypes-array 和 value

Multiprocessing 模块提供 Array 和 Value 对象,用于将数据存储在共享内存映射中。Array 是从共享内存分配的 ctypes 数组,Value 是从共享内存分配的 ctypes 对象。

要与 一起,请从 multiprocessing 导入 Process、Value、Array。

以下 Python 脚本是从 python 文档中获取的示例,该示例利用 Ctypes、Array 和 Value 在进程之间共享一些数据。


def f(n, a):
	 	n.value = 3.1415927
	 	for i in range(len(a)):
	 	a[i] = -a[i]

if __name__ == '__main__':
	 	num = Value('d', 0.0)
	 	arr = Array('i', range(10))

	 	p = Process(target = f, args = (num, arr))
	 	p.start()
	 	p.join()
	 	print (num.value)
	 	print (arr[:])

输出

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

通信顺序进程 (CSP)

CSP 用于说明系统与具有并发模型的其他系统的交互。CSP 是一个通过消息传递编写并发或程序的框架,因此它对于描述并发性非常有效。

Python 库 – PyCSP

为了实现 CSP 中的核心基元,Python 有一个名为 PyCSP 的库。它使实现非常简短且可读性强,因此很容易理解。以下是 PyCSP 的基本进程网络 -

PyCSP

在上面的 PyCSP 进程网络中,有两个进程 – Process1 和 Process2 。这些进程通过两个通道 – 通道 1 和 通道 2 传递消息来进行通信。

安装 PyCSP

借助以下命令,我们可以安装 Python 库 PyCSP -

pip install PyCSP

以下 Python 脚本是并行运行两个进程的简单示例。它是在 PyCSP python 库的帮助下完成的 -


from pycsp.parallel import *
import time
@process
def P1():
	 	time.sleep(1)
	 	print('P1 exiting')
@process
def P2():
	 	time.sleep(1)
	 	print('P2 exiting')
def main():
	 	Parallel(P1(), P2())
	 	print('Terminating')
if __name__ == '__main__':
	 	main()

在上面的脚本中,已经创建了 P1 P2 两个函数,然后用 @process 进行装饰,以便将它们转换为流程。

输出

P2 exiting
P1 exiting
Terminating