Python - 并发多处理



在本章中,我们将更多地关注多处理和多线程之间的比较。

多处理(Multiprocessing)

它是在单个计算机系统中使用两个或多个 CPU 单元。这是通过利用计算机系统中可用的全部 CPU 内核来充分发挥我们的硬件潜力的最佳方法。

多线程(Multithreading)

它是 CPU 通过并发执行多个线程来管理操作系统使用的能力。多线程的主要思想是通过将一个进程划分为多个线程来实现并行性。

下表显示了它们之间的一些重要区别 -

多处理 多程序编程
多处理是指多个 CPU 同时处理多个进程。 多程序编程将多个程序同时保存在主内存中,并使用单个 CPU 同时执行它们。
使用多个 CPU。 使用单个 CPU。
允许并行处理。 上下文切换发生。
处理作业所需的时间更少。 处理作业所用的更多时间。
有助于更有效地利用计算机系统的设备。 效率低于多处理。
通常更贵。 这样的系统更便宜。

消除全局解释器锁 (GIL) 的影响

在处理并发应用程序时,Python 中存在一个称为 GIL(全局解释器锁)的限制。GIL 从不允许我们使用 CPU 的多个内核,因此我们可以说 Python 中没有真正的线程。GIL 是互斥锁 —— 互斥锁,它使线程安全。换句话说,我们可以说 GIL 阻止了多个线程并行执行 Python 代码。锁一次只能由一个线程持有,如果我们想执行一个线程,那么它必须先获取锁。

通过使用多处理,我们可以有效地绕过 GIL 带来的限制 -

  • 通过使用多处理,我们正在利用多个进程的能力,因此我们正在利用 GIL 的多个实例。
  • 因此,任何时候在我们的程序中执行一个线程的字节码都没有限制。

在 Python 中启动进程

以下三种方法可用于在多处理模块中的 Python 中启动进程 -

  • Fork
  • Spawn
  • Forkserver

使用 Fork 创建流程

Fork 命令是 UNIX 中的标准命令。它用于创建称为子进程的新进程。此子进程与称为父进程的进程同时运行。这些子进程也与其父进程相同,并继承父进程可用的所有资源。使用 Fork 创建进程时,将使用以下系统调用 -

  • fork() − 它是一个通常在内核中实现的系统调用。它用于创建 process.p 的副本>
  • getpid() − 此系统调用返回调用进程的进程 ID (PID)。

以下 Python 脚本示例将帮助您了解如何创建新的子进程并获取子进程和父进程的 PID -


import os

def child():
	 	n = os.fork()
	 	
	 	if n > 0:
	 	 	 print("PID of Parent process is : ", os.getpid())

	 	else:
	 	 	 print("PID of Child process is : ", os.getpid())
child()

输出

PID of Parent process is : 25989
PID of Child process is : 25990

使用 Spawn 创建进程

Spawn 的意思是开始新的事情。因此,生成进程意味着父进程创建新进程。父进程继续异步执行,或等待子进程结束执行。请按照以下步骤生成进程 -

  • 导入 multiprocessing(多处理) 模块。
  • 创建对象进程。
  • 通过调用 start() 方法启动流程活动。
  • 等待进程完成其工作并通过调用 join() 方法退出。

以下 Python 脚本示例有助于生成三个进程


import multiprocessing

def spawn_process(i):
	 	print ('This is process: %s' %i)
	 	return

if __name__ == '__main__':
	 	Process_jobs = []
	 	for i in range(3):
	 	p = multiprocessing.Process(target = spawn_process, args = (i,))
	 	 	 Process_jobs.append(p)
	 	p.start()
	 	p.join()

输出

This is process: 0
This is process: 1
This is process: 2

使用 Forkserver 创建进程

Forkserver 机制仅在支持通过 Unix Pipes 传递文件描述符的选定 UNIX 平台上可用。考虑以下几点来了解 Forkserver 机制的工作原理 -

  • 使用 Forkserver 机制启动新进程时实例化服务器。
  • 然后,服务器接收命令并处理创建新进程的所有请求。
  • 为了创建一个新的进程,我们的 python 程序将向 Forkserver 发送一个请求,它将为我们创建一个进程。
  • 最后,我们可以在我们的程序中使用这个新创建的进程。

Python 中的守护进程

Python 多处理模块允许我们通过其 daemonic 选项拥有守护进程。守护进程或在后台运行的进程遵循与守护程序线程类似的概念。要在后台执行该进程,我们需要将 daemonic 标志设置为 true。只要主进程正在执行,守护进程就会继续运行,并且它将在完成执行或主程序被杀死时终止。

在这里,我们使用与 daemon 线程中使用的相同的示例。唯一的区别是将 module 从 multithreading 更改为 multiprocessing,并将 daemonic 标志设置为 true。但是,输出会发生变化,如下所示 -


import multiprocessing
import time

def nondaemonProcess():
	 	print("starting my Process")
	 	time.sleep(8)
	 	print("ending my Process")
def daemonProcess():
	 	while True:
	 	print("Hello")
	 	time.sleep(2)
if __name__ == '__main__':
	 	nondaemonProcess = multiprocessing.Process(target = nondaemonProcess)
	 	daemonProcess = multiprocessing.Process(target = daemonProcess)
	 	daemonProcess.daemon = True
	 	nondaemonProcess.daemon = False
	 	daemonProcess.start()
	 	nondaemonProcess.start()

输出

starting my Process
ending my Process

与守护程序线程生成的输出相比,输出是不同的,因为无守护程序模式下的进程具有输出。因此,守护进程在主程序结束后自动结束,以避免正在运行的进程持久化。

在 Python 中终止进程

我们可以使用 terminate() 方法立即终止或终止进程。我们将使用此方法终止子进程,该子进程是在 function 的帮助下创建的,在完成执行之前。


import multiprocessing
import time
def Child_process():
	 	print ('Starting function')
	 	time.sleep(5)
	 	print ('Finished function')
P = multiprocessing.Process(target = Child_process)
P.start()
print("My Process has terminated, terminating main thread")
print("Terminating Child Process")
P.terminate()
print("Child Process successfully terminated")

输出

My Process has terminated, terminating main thread
Terminating Child Process
Child Process successfully terminated

输出显示程序在执行在 Child_process() 函数的帮助下创建的子进程之前终止。这意味着子进程已成功终止。

在 Python 中识别当前进程

操作系统中的每个进程都具有称为 PID 的进程标识。在 Python 中,我们可以借助以下命令找出当前进程的 PID -


import multiprocessing
print(multiprocessing.current_process().pid)

以下 Python 脚本示例有助于找出主进程的 PID 以及子进程的 PID -


import multiprocessing
import time
def Child_process():
	 	print("PID of Child Process is: {}".format(multiprocessing.current_process().pid))
print("PID of Main process is: {}".format(multiprocessing.current_process().pid))
P = multiprocessing.Process(target=Child_process)
P.start()
P.join()

输出

PID of Main process is: 9401
PID of Child Process is: 9402

在 subclass 中使用进程

我们可以通过对线程进行子类化来创建线程。Thread 类。此外,我们还可以通过对 multiprocessing 进行子类化来创建进程。Process 类。要在 subclass 中使用进程,我们需要考虑以下几点 -

  • 我们需要定义 Process 类的新子类。
  • 我们需要覆盖 _init_(self [,args] ) 类。
  • 我们需要重写 run(self [,args] ) 方法来实现 what Process
  • 我们需要通过调用 start() 方法来启动该过程。


import multiprocessing
class MyProcess(multiprocessing.Process):
	 	def run(self):
	 	print ('called run method in process: %s' %self.name)
	 	return
if __name__ == '__main__':
	 	jobs = []
	 	for i in range(5):
	 	P = MyProcess()
	 	jobs.append(P)
	 	P.start()
	 	P.join()

输出

called run method in process: MyProcess-1
called run method in process: MyProcess-2
called run method in process: MyProcess-3
called run method in process: MyProcess-4
called run method in process: MyProcess-5

Python 多处理模块 – Pool 类

如果我们在 Python 应用程序中谈论简单的并行处理任务,那么 multiprocessing 模块为我们提供了 Pool 类。Pool 类的以下方法可用于在我们的主程序中启动子进程的数量

apply() 方法

此方法类似于 ThreadPoolExecutorThread PoolExecutor 中。它会阻塞,直到结果准备就绪。

apply_async() 方法

当我们需要并行执行任务时,我们需要使用 apply_async() 方法将任务提交到池。这是一个异步操作,在执行所有子进程之前不会锁定主线程。

map() 方法

就像 apply() 方法一样,它也会阻塞,直到结果准备好。它相当于内置的 map() 函数,该函数将可迭代数据拆分为多个块,并作为单独的任务提交到进程池。

map_async() 方法

它是 map() 方法的变体,就像 apply_async() 之于 apply() 方法一样。它返回一个 result 对象。当结果准备就绪时,将对其应用一个可调用对象。可调用对象必须立即完成;否则,处理结果的线程将被阻止。

以下示例将帮助您实现用于执行并行执行的进程池。通过多次处理应用 square() 函数,已经执行了数字平方的简单计算。Pool 方法。然后 pool.map() 被用来提交 5,因为 input 是一个从 0 到 4 的整数列表。结果将存储在 p_outputs 中并打印出来。


def square(n):
	 	result = n*n
	 	return result
if __name__ == '__main__':
	 	inputs = list(range(5))
	 	p = multiprocessing.Pool(processes = 4)
	 	p_outputs = pool.map(function_square, inputs)
	 	p.close()
	 	p.join()
	 	print ('Pool :', p_outputs)

输出

Pool : [0, 1, 4, 9, 16]