线程池是一种自动高效管理多个线程的机制,允许并发执行任务。Python 不直接通过 threading 模块提供线程池。
相反,它通过 multiprocessing.dummy 模块和 concurrent.futures 模块提供基于线程的池。这些模块为创建和管理线程池提供了方便的接口,从而可以更轻松地执行并发任务。
什么是线程池?
线程池是由池管理的线程的集合。池中的每个线程称为 worker 或 worker thread。这些线程可以重复使用来执行多个任务,从而减轻了重复创建和销毁线程的负担。
线程池控制线程的创建及其生命周期,使其在处理大量任务时更加高效。
我们可以使用以下类在 Python 中实现线程池 -
- Python ThreadPool 类
- Python ThreadPoolExecutor 类
使用 Python ThreadPool 类
multiprocessing.pool.ThreadPool 类在 multiprocessing 模块中提供线程池接口。它管理一个工作线程池,可以将作业提交到该池以进行并发执行。
ThreadPool 对象通过处理工作线程之间的任务创建和分配来简化多个线程的管理。它与 Pool 类共享一个接口,该类最初是为进程设计的,但已调整为也适用于线程。
ThreadPool 实例与 Pool 实例完全接口兼容,应作为上下文管理器或通过手动调用 close() 和 terminate() 进行管理。
例此示例演示了如何使用 Python 线程池对数字列表并行执行 square 和 cube 函数,其中每个函数都与最多 3 个线程同时应用于数字,每个线程在执行之间有 1 秒的延迟。
from multiprocessing.dummy import Pool as ThreadPool
import time
def square(number):
sqr = number * number
time.sleep(1)
print("Number:{} Square:{}".format(number, sqr))
def cube(number):
cub = number*number*number
time.sleep(1)
print("Number:{} Cube:{}".format(number, cub))
numbers = [1, 2, 3, 4, 5]
pool = ThreadPool(3)
pool.map(square, numbers)
pool.map(cube, numbers)
pool.close()
输出
在执行上述代码时,您将获得以下输出 -
Number:1 Square:1
Number:3 Square:9
Number:4 Square:16
Number:5 Square:25
Number:1 Cube:1
Number:2 Cube:8
Number:3 Cube:27
Number:4 Cube:64
Number:5 Cube:125
使用 Python ThreadPoolExecutor 类
Python 的 ThreadPoolExecutor 类 concurrent.futures 模块为使用线程异步执行函数提供了一个高级接口。concurrent.futures 模块包括 Future 类和两个 Executor 类 - ThreadPoolExecutor 和 ProcessPoolExecutor。
未来课堂
concurrent.futures.Future 类负责处理任何可调用对象(如函数)的异步执行。要获取 Future 对象,您应该对任何 Executor 对象调用 submit() 方法。它不应由其构造函数直接创建。
Future 类中的重要方法包括 -
- result(timeout=None):该方法返回调用返回的值。如果调用尚未完成,则此方法将等待最多超时秒。如果调用未在超时秒内完成,则将引发 TimeoutError。如果未指定 timeout,则等待时间没有限制。
- cancel():该方法,尝试取消调用。如果调用当前正在执行或已完成运行且无法取消,则该方法将返回布尔值 False。否则,调用将被取消,并且该方法返回 True。
- cancelled():如果调用成功取消,则返回 True。
- running():如果调用当前正在执行且无法取消,则返回 True。
- done():如果调用已成功取消或完成运行,则返回 True。
ThreadPoolExecutor 类
此类表示要异步执行调用的指定数量最大工作线程的池。
concurrent.futures.ThreadPoolExecutor(max_threads)
例
这是一个使用 concurrent.futures.ThreadPoolExecutor 类在 Python 中异步管理和执行任务的示例。具体来说,它展示了如何将多个任务提交到线程池以及如何检查它们的执行状态。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
def square(numbers):
for val in numbers:
ret = val*val
sleep(1)
print("Number:{} Square:{}".format(val, ret))
def cube(numbers):
for val in numbers:
ret = val*val*val
sleep(1)
print("Number:{} Cube:{}".format(val, ret))
if __name__ == '__main__':
numbers = [1,2,3,4,5]
executor = ThreadPoolExecutor(4)
thread1 = executor.submit(square, (numbers))
thread2 = executor.submit(cube, (numbers))
print("Thread 1 executed ? :",thread1.done())
print("Thread 2 executed ? :",thread2.done())
sleep(2)
print("Thread 1 executed ? :",thread1.done())
print("Thread 2 executed ? :",thread2.done())
它将产生以下输出 -
Thread 2 executed ? : False
Number:1 Square:1
Number:1 Cube:1
Thread 1 executed ? : False
Thread 2 executed ? : False
Number:2 Square:4
Number:2 Cube:8
Number:3 Square:9
Number:3 Cube:27
Number:4 Square:16
Number:4 Cube:64
Number:5 Square:25
Number:5 Cube:125