Python - 线程同步



线程同步可以定义为一种方法,借助该方法,我们可以确保两个或多个并发线程不会同时访问称为关键部分的程序段。另一方面,正如我们所知,关键部分是程序中访问共享资源的部分。因此,我们可以说同步是确保两个或多个线程不会通过同时访问资源来相互交互的过程。下图显示 4 个线程尝试同时访问程序的关键部分。

Synchronizing

为了更清楚地说明,假设两个或多个线程尝试同时在列表中添加对象。这个行为不会导致成功的结束,因为它要么会删除一个或所有对象,要么会完全破坏列表的状态。此处同步的作用是一次只有一个线程可以访问该列表。

线程同步中的问题

在实现并发编程或应用同步原语时,我们可能会遇到问题。在本节中,我们将讨论两个主要问题。问题是 -

  • 僵局
  • 争用条件

争用条件

这是并发编程中的主要问题之一。对共享资源的并发访问可能会导致争用条件。争用条件可以定义为两个或多个线程可以访问共享数据,然后尝试同时更改其值时发生的条件。因此,变量的值可能是不可预测的,并且会根据流程的上下文切换时间而变化。

请考虑以下示例以了解争用条件的概念 -

第 1 步 - 在此步骤中,我们需要导入线程模块 -


import threading

第 2 步 - 现在,定义一个全局变量,比如 x,以及它的值 0 -


x = 0

第 3 步 - 现在,我们需要定义 increment_global() 函数,它将在这个全局函数 x 中增加 1 -


def increment_global():

	 	global x
	 	x += 1

第 4 步 - 在此步骤中,我们将定义 taskofThread() 函数,该函数将调用 increment_global() 函数指定次数;对于我们的示例,它是 50000 乘以 -


def taskofThread():

	 	for _ in range(50000):
	 	 	 increment_global()

第 5 步 - 现在,定义创建线程 t1 和 t2 的 main() 函数。两者都将在 start() 函数的帮助下启动,并等待它们在 join() 函数的帮助下完成工作。


def main():
	 	global x
	 	x = 0
	 	
	 	t1 = threading.Thread(target= taskofThread)
	 	t2 = threading.Thread(target= taskofThread)

	 	t1.start()
	 	t2.start()

	 	t1.join()
	 	t2.join()

第 6 步 - 现在,我们需要给出我们想要调用 main() 函数的迭代次数的范围。在这里,我们调用 5 次。


if __name__ == "__main__":
	 	for i in range(5):
	 	 	 main()
	 	 	 print("x = {1} after Iteration {0}".format(i,x))

在下面显示的输出中,我们可以看到竞争条件的影响,因为每次迭代后的 x 值预期为 100000。但是,该值有很多变化。这是由于线程对共享全局变量 x 的并发访问。

x = 100000 after Iteration 0
x = 54034 after Iteration 1
x = 80230 after Iteration 2
x = 93602 after Iteration 3
x = 93289 after Iteration 4

使用锁处理争用条件

正如我们在上面的程序中看到的 race condition 的效果,我们需要一个同步工具,它可以处理多个线程之间的 race condition。在 Python 中,<threading> 模块提供了 Lock 类来处理竞争条件。此外,Lock 类提供了不同的方法,借助这些方法,我们可以处理多个线程之间的竞争条件。方法如下所述 -

acquire() 方法

此方法用于获取,即阻止锁。锁可以是阻塞的,也可以是非阻塞的,具体取决于以下 true 或 false 值 -

  • 值设置为 True − 如果使用 True(默认参数)调用 acquire() 方法,则阻止线程执行,直到解锁锁。
  • 值设置为 False − 如果使用 False(不是默认参数)调用 acquire() 方法,则线程执行不会被阻止,直到它被设置为 true,即直到它被锁定。

release() 方法

该方法用于释放锁。以下是与此方法相关的一些重要任务 -

如果锁被锁定,则 release() 方法将解锁它。它的工作是在多个线程被阻塞并等待锁解锁时只允许一个线程继续。

如果 lock 已经解锁,它将引发 ThreadError 。

现在,我们可以使用 lock 类及其方法重写上述程序,以避免竞争条件。我们需要使用 lock 参数定义 taskofThread() 方法,然后需要使用 acquire() 和 release() 方法来阻塞和非阻塞锁,以避免竞争条件。

以下是 python 程序的示例,用于了解用于处理争用条件的锁的概念 -


import threading

x = 0

def increment_global():

	 	global x
	 	x += 1

def taskofThread(lock):

	 	for _ in range(50000):
	 	 	 lock.acquire()
	 	 	 increment_global()
	 	 	 lock.release()

def main():
	 	global x
	 	x = 0

	 	lock = threading.Lock()
	 	t1 = threading.Thread(target = taskofThread, args = (lock,))
	 	t2 = threading.Thread(target = taskofThread, args = (lock,))

	 	t1.start()
	 	t2.start()

	 	t1.join()
	 	t2.join()

if __name__ == "__main__":
	 	for i in range(5):
	 	 	 main()
	 	 	 print("x = {1} after Iteration {0}".format(i,x))

以下输出显示忽略了 race condition 的影响;因为每次迭代后的X值现在是100000,这是这个程序的预期。

输出

x = 100000 after Iteration 0
x = 100000 after Iteration 1
x = 100000 after Iteration 2
x = 100000 after Iteration 3
x = 100000 after Iteration 4

死锁 − Dining Philosophers 问题

死锁是设计并发系统时可能面临的一个麻烦问题。我们可以在 dining philosopher 问题的帮助下说明这个问题,如下所示 -

Edsger Dijkstra 最初提出了 dining philosopher 问题,这是并发系统最大的问题之一——死锁的著名例子之一。

在这个问题中,有五位著名的哲学家坐在圆桌旁,吃着他们碗里的食物。有五个叉子,五位哲学家可以用它们来吃他们的食物。然而,哲学家们决定同时使用两个叉子来吃他们的食物。

现在,哲学家有两个主要条件。首先,每个哲学家都可以处于进食或思考状态,其次,他们必须首先获得两个叉子,即左叉和右叉。当五位哲学家中的每一个都设法同时选择左叉时,问题就出现了。现在他们都在等待正确的叉子免费,但他们永远不会放弃他们的叉子,直到他们吃完了他们的食物,正确的叉子永远不会可用。因此,餐桌上将出现死锁状态。

并发系统中的死锁

现在,如果我们看到,同样的问题也可能出现在我们的并发系统中。上面例子中的分叉是系统资源,每个哲学家都可以代表正在竞争获取资源的过程。

使用 Python 程序的解决方案

这个问题的解决办法可以通过将哲学家分为两种类型来找到——贪婪的哲学家和慷慨的哲学家。主要是,一个贪婪的哲学家会试图捡起左边的叉子并等待它在那里。然后他会等待正确的叉子出现,捡起它,吃掉它,然后放下它。另一方面,一个慷慨的哲学家会尝试捡起左叉,如果它不存在,他会等待一段时间后再试。如果他们得到左叉,那么他们将尝试获得正确的叉子。如果他们也得到正确的叉子,那么它们就会吃掉并释放两个叉子。但是,如果他们没有得到正确的分叉,那么他们将释放左分叉。

以下 Python 程序将帮助我们找到解决 dining philosopher 问题的方法 -


import threading
import random
import time

class DiningPhilosopher(threading.Thread):

	 	running = True

	 	def __init__(self, xname, Leftfork, Rightfork):
	 	threading.Thread.__init__(self)
	 	self.name = xname
	 	self.Leftfork = Leftfork
	 	self.Rightfork = Rightfork

	 	def run(self):
	 	while(self.running):
	 	 	 time.sleep( random.uniform(3,13))
	 	 	 print ('%s is hungry.' % self.name)
	 	 	 self.dine()

	 	def dine(self):
	 	fork1, fork2 = self.Leftfork, self.Rightfork

	 	while self.running:
	 	 	 fork1.acquire(True)
	 	 	 locked = fork2.acquire(False)
		 	 	if locked: break
	 	 	 fork1.release()
	 	 	 print ('%s swaps forks' % self.name)
	 	 	 fork1, fork2 = fork2, fork1
	 	else:
	 	 	 return

	 	self.dining()
	 	fork2.release()
	 	fork1.release()

	 	def dining(self):
	 	print ('%s starts eating '% self.name)
	 	time.sleep(random.uniform(1,10))
	 	print ('%s finishes eating and now thinking.' % self.name)

def Dining_Philosophers():
	 	forks = [threading.Lock() for n in range(5)]
	 	philosopherNames = ('1st','2nd','3rd','4th', '5th')

	 	philosophers= [DiningPhilosopher(philosopherNames[i], forks[i%5], forks[(i+1)%5]) \
	 	 	 for i in range(5)]

	 	random.seed()
	 	DiningPhilosopher.running = True
	 	for p in philosophers: p.start()
	 	time.sleep(30)
	 	DiningPhilosopher.running = False
	 	print (" It is finishing.")

Dining_Philosophers()

上述程序使用了贪婪和慷慨的哲学家的概念。该程序还使用了 <threading> 模块的 Lock 类的 acquire() release() 方法。我们可以在下面的输出中看到解决方案 -

4th is hungry.
4th starts eating
1st is hungry.
1st starts eating
2nd is hungry.
5th is hungry.
3rd is hungry.
1st finishes eating and now thinking.3rd swaps forks
2nd starts eating
4th finishes eating and now thinking.
3rd swaps forks5th starts eating
5th finishes eating and now thinking.
4th is hungry.
4th starts eating
2nd finishes eating and now thinking.
3rd swaps forks
1st is hungry.
1st starts eating
4th finishes eating and now thinking.
3rd starts eating
5th is hungry.
5th swaps forks
1st finishes eating and now thinking.
5th starts eating
2nd is hungry.
2nd swaps forks
4th is hungry.
5th finishes eating and now thinking.
3rd finishes eating and now thinking.
2nd starts eating 4th starts eating
It is finishing.