Python - 线程间通信



线程间通信是指在 Python 多线程程序中实现线程之间的通信和同步的过程。

通常,Python 中的线程在一个进程内共享相同的内存空间,这允许它们通过共享变量、对象和 threading 模块提供的专用同步机制来交换数据并协调其活动。

为了促进线程间通信,threading 模块提供了各种同步原语,如 Locks、Events、Conditions 和 Semaphores 对象。在本教程中,您将学习如何使用 Event 和 Condition 对象来提供多线程程序中线程之间的通信。

事件对象

Event 对象管理内部标志的状态,以便线程可以等待或设置。Event 对象提供控制此标志状态的方法,从而允许线程根据共享条件同步其活动。

该标志最初为 false,使用 set() 方法变为 true,并使用 clear() 方法重置为 false。wait() 方法会阻塞,直到标志为 true。

以下是 Event 对象的主要方法 -

  • is_set():当且仅当内部标志为 true 时,返回 True。
  • set():将 internal 标志设置为 true。所有等待它变为真的线程都将被唤醒。一旦标志为 true,调用 wait() 的线程将根本不会阻塞。
  • clear():将内部标志重置为 false。随后,调用 wait() 的线程将阻塞,直到调用 set() 以再次将内部标志设置为 true。
  • wait(timeout=None):阻止,直到内部标志为 true。如果 internal 标志在输入时为 true,则立即返回。否则,将阻塞,直到另一个线程调用 set() 将标志设置为 true,或者直到发生可选的超时。当 timeout 参数存在且不存在 None 时,它应该是一个浮点数,以秒为单位指定操作的超时时间。

以下代码尝试模拟由流量信号 GREEN 或 RED 状态控制的流量。

程序中有两个线程,针对两个不同的函数。signal_state() 函数会定期设置和重置指示信号从 GREEN 变为 RED 的事件。

traffic_flow() 函数等待事件被设置,并运行一个循环直到它保持设置状态。


from threading import Event, Thread
import time

terminate = False

def signal_state():
	 	 global terminate
	 	 while not terminate:
	 	 	 	 time.sleep(0.5)
	 	 	 	 print("Traffic Police Giving GREEN Signal")
	 	 	 	 event.set()
	 	 	 	 time.sleep(1)
	 	 	 	 print("Traffic Police Giving RED Signal")
	 	 	 	 event.clear()

def traffic_flow():
	 	 global terminate
	 	 num = 0
	 	 while num < 10 and not terminate:
	 	 	 	 print("Waiting for GREEN Signal")
	 	 	 	 event.wait()
	 	 	 	 print("GREEN Signal ... Traffic can move")
	 	 	 	 while event.is_set() and not terminate:
	 	 	 	 	 	 num += 1
	 	 	 	 	 	 print("Vehicle No:", num," Crossing the Signal")
	 	 	 	 	 	 time.sleep(1)
	 	 	 	 print("RED Signal ... Traffic has to wait")

event = Event()
t1 = Thread(target=signal_state)
t2 = Thread(target=traffic_flow)
t1.start()
t2.start()

# Terminate the threads after some time
time.sleep(5)
terminate = True

# join all threads to complete
t1.join()
t2.join()

print("Exiting Main Thread")

输出

在执行上述代码时,您将获得以下输出 -

Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 1 Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Waiting for GREEN Signal
Traffic Police Giving GREEN Signal
GREEN Signal ... Traffic can move
Vehicle No: 2 Crossing the Signal
Vehicle No: 3 Crossing the Signal
Traffic Police Giving RED Signal
Traffic Police Giving GREEN Signal
Vehicle No: 4 Crossing the Signal
Traffic Police Giving RED Signal
RED Signal ... Traffic has to wait
Traffic Police Giving GREEN Signal
Traffic Police Giving RED Signal
Exiting Main Thread

Condition 对象

Python 的 threading 模块中的 Condition 对象提供了更高级的同步机制。它允许线程在继续之前等待来自另一个线程的通知。Condition 对象始终与锁相关联,并提供在线程之间发送信号的机制。

以下是线程的语法。Condition() 类 −


 threading.Condition(lock=None)

以下是 Condition 对象的关键方法 -

  • acquire(*args):获取底层锁。此方法在底层锁上调用相应的方法;返回值是该方法返回的任何值。
  • release():释放底层锁。此方法在底层锁上调用相应的方法;没有返回值。
  • wait(timeout=None):此方法释放底层锁,然后阻塞,直到它被另一个线程中相同条件变量的 notify() 或 notify_all() 调用唤醒,或者直到发生可选的超时。一旦被唤醒或超时,它将重新获取锁并返回。
  • wait_for(predicate, timeout=None):此实用程序方法可以重复调用 wait(),直到满足谓词或发生超时。返回值是谓词的最后一个返回值,如果方法超时,则计算结果为 False。
  • notify(n=1):该方法最多唤醒 n 个等待 condition 变量的线程;如果没有线程等待,则为 no-op。
  • notify_all():唤醒所有等待此条件的线程。此方法的作用类似于 notify(),但会唤醒所有等待的线程,而不是一个。如果在调用此方法时调用线程尚未获取锁,则会引发 RuntimeError。

此示例演示了使用 Python 的 threading 模块的 Condition 对象的一种简单形式的线程间通信。在这里,thread_a thread_b 使用 Condition 对象进行通信,thread_a 会等待,直到收到来自 thread_b 的通知。thread_b 在通知 thread_a 之前休眠 2 秒钟,然后结束。


from threading import Condition, Thread
import time

c = Condition()

def thread_a():
	 	 print("Thread A started")
	 	 with c:
	 	 	 	 print("Thread A waiting for permission...")
	 	 	 	 c.wait()
	 	 	 	 print("Thread A got permission!")
	 	 print("Thread A finished")

def thread_b():
	 	 print("Thread B started")
	 	 with c:
	 	 	 	 time.sleep(2)
	 	 	 	 print("Notifying Thread A...")
	 	 	 	 c.notify()
	 	 print("Thread B finished")

Thread(target=thread_a).start()
Thread(target=thread_b).start()

输出

在执行上述代码时,您将获得以下输出 -

Thread A started
Thread A waiting for permission...
Thread B started
Notifying Thread A...
Thread B finished
Thread A got permission!
Thread A finished

下面是另一个代码,演示如何使用 Condition 对象提供线程之间的通信。在这种情况下,线程 t2 运行 taskB() 函数,线程 t1 运行 taskA() 函数。t1 线程获取条件并通知它。

此时,t2 线程处于等待状态。条件释放后,等待线程将继续使用通知函数生成的随机数。


from threading import Condition, Thread
import time
import random

numbers = []

def taskA(c):
	 	 for _ in range(5):
	 	 	 	 with c:
	 	 	 	 	 	 num = random.randint(1, 10)
	 	 	 	 	 	 print("Generated random number:", num)
	 	 	 	 	 	 numbers.append(num)
	 	 	 	 	 	 print("Notification issued")
	 	 	 	 	 	 c.notify()
	 	 	 	 time.sleep(0.3)

def taskB(c):
	 	 for i in range(5):
	 	 	 	 with c:
	 	 	 	 	 	 print("waiting for update")
	 	 	 	 	 	 while not numbers:	
	 	 	 	 	 	 	 	 c.wait()
	 	 	 	 	 	 print("Obtained random number", numbers.pop())
	 	 	 	 time.sleep(0.3)

c = Condition()
t1 = Thread(target=taskB, args=(c,))
t2 = Thread(target=taskA, args=(c,))
t1.start()
t2.start()
t1.join()
t2.join()
print("Done")

当您执行此代码时,它将生成以下输出 -

waiting for update
Generated random number: 2
Notification issued
Obtained random number 2
Generated random number: 5
Notification issued
waiting for update
Obtained random number 5
Generated random number: 1
Notification issued
waiting for update
Obtained random number 1
Generated random number: 9
Notification issued
waiting for update
Obtained random number 9
Generated random number: 2
Notification issued
waiting for update
Obtained random number 2
Done