Python - 线程优先级



在 Python 中,目前 threading 模块不直接支持线程优先级。与 Java 不同,Python 不支持线程优先级、线程组或某些线程控制机制,例如销毁、停止、暂停、恢复或中断线程。

甚至认为 Python 线程设计简单,并且松散地基于 Java 的线程模型。这是因为 Python 的全局解释器锁 (GIL) 管理 Python 线程。

但是,您可以使用诸如休眠持续时间、线程中的自定义计划逻辑等技术或使用管理任务优先级的附加模块来模拟基于优先级的行为。

使用 sleep() 设置线程优先级

您可以通过引入延迟或使用其他机制来控制线程的执行顺序来模拟线程优先级。模拟线程优先级的一种常见方法是调整线程的休眠持续时间。

优先级较低的线程睡眠时间较长,优先级较高的线程睡眠时间较短。

这是一个简单的示例,用于演示如何使用 Python 线程中的延迟自定义线程优先级。在此示例中,Thread-2 在 Thread-1 之前完成,因为它的优先级值较低,从而导致睡眠时间更短。


import threading
import time

class DummyThread(threading.Thread):
	 	def __init__(self, name, priority):
	 	 	 threading.Thread.__init__(self)
	 	 	 self.name = name
	 	 	 self.priority = priority

	 	def run(self):
	 	 	 name = self.name
	 	 	 time.sleep(1.0 * self.priority)
	 	 	 print(f"{name} thread with priority {self.priority} is running")

# Creating threads with different priorities
t1 = DummyThread(name='Thread-1', priority=4)
t2 = DummyThread(name='Thread-2', priority=1)

# Starting the threads
t1.start()
t2.start()

# Waiting for both threads to complete
t1.join()
t2.join()

print('All Threads are executed')

输出

在执行上述程序时,您将获得以下结果 -

Thread-2 thread with priority 1 is running
Thread-1 thread with priority 4 is running
All Threads are executed

在 Windows 上调整 Python 线程优先级

在 Windows 操作系统上,您可以使用 ctypes 模块操作线程优先级,这是用于与 Windows API 交互的 Python 标准模块之一。

此示例演示如何在 Windows 系统上使用 ctypes 模块在 Python 中手动设置线程的优先级。


import threading
import ctypes
import time

# Constants for Windows API
w32 = ctypes.windll.kernel32
SET_THREAD = 0x20
PRIORITIZE_THE_THREAD = 1

class MyThread(threading.Thread):
	 	def __init__(self, start_event, name, iterations):
	 	 	 super().__init__()
	 	 	 self.start_event = start_event
	 	 	 self.thread_id = None
	 	 	 self.iterations = iterations
	 	 	 self.name = name

	 	def set_priority(self, priority):
	 	 	 if not self.is_alive():
	 	 	 	 	print('Cannot set priority for a non-active thread')
	 	 	 	 	return

	 	 	 thread_handle = w32.OpenThread(SET_THREAD, False, self.thread_id)
	 	 	 success = w32.SetThreadPriority(thread_handle, priority)
	 	 	 w32.CloseHandle(thread_handle)
	 	 	 if not success:
	 	 	 	 	print('Failed to set thread priority:', w32.GetLastError())

	 	def run(self):
	 	 	 self.thread_id = w32.GetCurrentThreadId()
	 	 	 self.start_event.wait()
	 	 	 while self.iterations:
	 	 	 	 	print(f"{self.name} running")
	 	 	 	 	start_time = time.time()
	 	 	 	 	while time.time() - start_time < 1:
	 	 	 	 	 	 pass
	 	 	 	 	self.iterations -= 1

# Create an event to synchronize thread start
start_event = threading.Event()

# Create threads
thread_normal = MyThread(start_event, name='normal', iterations=4)
thread_high = MyThread(start_event, name='high', iterations=4)

# Start the threads
thread_normal.start()
thread_high.start()

# Adjusting priority of 'high' thread
thread_high.set_priority(PRIORITIZE_THE_THREAD)

# Trigger thread execution
start_event.set()

输出

在 Python 解释器中执行此代码时,您将获得以下结果 -

high running
normal running
high running
normal running
high running
normal running
high running
normal running

使用 queue 模块确定 Python 线程的优先级

Python 标准库中的 queue 模块在线程编程中非常有用,当必须在多个线程之间安全地交换信息时。此模块中的 Priority Queue 类实现了所有必需的锁定语义。

使用优先级队列时,条目将保持排序(使用 heapq 模块),并首先检索值最低的条目。

Queue 对象具有以下方法来控制 Queue -

  • get() − get() 从队列中删除并返回一个项目。
  • put() − put 将项目添加到队列中。
  • qsize() − qsize() 返回当前队列中的项目数。
  • empty() − 如果队列为空,则 empty( ) 返回 True;否则为 False。
  • full() − 如果队列已满,则 full() 返回 True;否则为 False。

 queue.PriorityQueue(maxsize=0)

这是优先级队列的构造函数。maxsize 是一个整数,用于设置可放入队列中的项数的上限。如果 maxsize 小于或等于零,则队列大小为无限大。

首先检索最低值的条目(最低值的条目是 min(entries) 返回的条目)。条目的典型模式是 -


 (priority_number, data)

此示例演示了如何使用 queue 模块中的 PriorityQueue 类来管理两个线程之间的任务优先级。


from time import sleep
from random import random, randint
from threading import Thread
from queue import PriorityQueue

queue = PriorityQueue()

def producer(queue):
	 	print('Producer: Running')
	 	for i in range(5):

	 	 	 # create item with priority
	 	 	 value = random()
	 	 	 priority = randint(0, 5)
	 	 	 item = (priority, value)
	 	 	 queue.put(item)
	 	# wait for all items to be processed
	 	queue.join()

	 	queue.put(None)
	 	print('Producer: Done')

def consumer(queue):
	 	print('Consumer: Running')

	 	while True:

	 	 	 # get a unit of work
	 	 	 item = queue.get()
	 	 	 if item is None:
	 	 	 	 	break

	 	 	 sleep(item[1])
	 	 	 print(item)
	 	 	 queue.task_done()
	 	print('Consumer: Done')

producer = Thread(target=producer, args=(queue,))
producer.start()

consumer = Thread(target=consumer, args=(queue,))
consumer.start()

producer.join()
consumer.join()

输出

在执行时,它将产生以下输出 -

Producer: Running
Consumer: Running
(0, 0.15332707626852804)
(2, 0.4730737391435892)
(2, 0.8679231358257962)
(3, 0.051924220435665025)
(4, 0.23945882716108446)
Producer: Done
Consumer: Done