Python并发:棘手的问题

2021-02-18 00:18:49

探索Python中的线程,进程和协程,并提供一些有趣的示例,阐明它们之间的差异。

作为在软件工程上花费更多时间的数据科学家,我最近被迫面对我在Python知识方面的一个丑陋鸿沟:并发性。老实说,我从未完全理解异步,线程,池和协程这两个术语的区别,以及这些机制如何协同工作。每次我尝试学习该主题时,这些示例对我来说都太抽象了,并且我很难内化所有工作原理。

当我的一个朋友2推荐一位精通Python的教育家David Beazley进行现场编码演讲时,情况发生了变化。

由于该YouTube视频的限制,我无法将其嵌入本文中,因此您必须在其他窗口中打开它。

首先,这句话令人难以置信。它不仅从头开始进行实时编码,而且还立即跳入套接字编程,这是我作为数据科学家从未遇到过的。但是,如果您慢慢地学习它并理解所有组件(就像我们在此博客文章中所做的那样),那么它无疑是我所遇到的关于Python并发性的最佳教育材料。这篇博客文章记录了我在此过程中学到的知识,以便其他人也可以从中受益。

在开始之前,David设置了以下用于演示并发性的基础结构。

为了演示并发性,创建一个可以在一定时间内使CPU饱和的任务(例如数学运算)很有用。大卫使用计算斐波纳契数的函数。

对于较大的输入,此功能花费的时间要长得多,而对于较小的输入3,则使我们能够分析不同的工作量。

Web服务器是说明不同类型的并发的最佳方法之一。但是,要真正展示事物是如何工作的,使用足够低的层次以查看所有组件的工作方式将很有用。为此,David使用套接字编程来设置Web服务器。如果您不熟悉套接字编程,我将在下面解释重要的一点,但是如果您愿意,可以稍后再深入学习本教程。

首先,David从下面的代码开始(我突出了最有趣的部分):

#server-1.py from socket import *来自fib import fib def fib_server(address):sock =套接字(AF_INET,SOCK_STREAM)sock。 setsockopt(SOL_SOCKET,SO_REUSEADDR,1)袜子。绑定(地址)袜子。在True:客户端,addr = sock时收听(5)。 accept()#等待建立连接print(" Connection",addr)fib_handler(client)#将客户端传递给处理程序,该处理程序将监听输入数据。 def fib_handler(client):为True时:req = client。 recv(100)#等待客户端发送的数据。如果不是,则req:中断结果= fib(int(req))resp = str(结果)。编码(' ascii')+ b' \ n'客户 。 send(resp)#将数据发送回客户端。打印(" Closed")fib_server(('',25000))

6-9行是套接字编程样板。可以认为这是理所当然的设置套接字服务器的方式。这也与我上面链接的教程相匹配。

第11行等待来自客户端的传入连接。建立连接后,服务器即可开始从客户端接收数据。该代码将在此行上停止执行,直到建立连接为止。

第13行:建立连接后,客户端对象将传递给可以处理客户端发送的数据的函数。

第17行:等待客户端发送数据。该代码将在此行上停止执行,直到从客户端接收到数据为止。

第21行:服务器将响应发送回客户端。如果发送缓冲区已满,则代码可能会在此行停止执行,但是在此玩具示例中不太可能。

在上面的示例中,服务器将只能接受来自单个客户端的连接,因为对fib_handler的调用将永远不会返回(因为除非接收到终止信号,否则它将在无限循环中运行)。这意味着sock.accept()只能被调用一次。

您可以像David在他的视频中一样键入数字,并验证返回的斐波那契数字。但是,如果您尝试通过不同的终端会话同时与另一个客户端连接:

您会注意到第二个客户端只是挂起,没有从服务器返回任何内容。这是因为服务器只能接受单个连接。接下来,我们探索如何解决此问题。

我们可以使用线程解决此问题。您可以将线程添加到处理程序,以便接受更多连接,并以黄色突出显示以下代码:

从套接字导入*从fib导入从线程导入fib从线程导入fi def fib_server(地址):sock =套接字(AF_INET,SOCK_STREAM)sock。 setsockopt(SOL_SOCKET,SO_REUSEADDR,1)袜子。绑定(地址)袜子。在True:客户端,addr = sock时收听(5)。 accept()打印(" Connection",addr)线程(target = fib_handler,args =(client,))。 start()def fib_handler(client):而True:req = client。 recv(100)如果不是req:中断结果= fib(int(req))resp = str(结果)。编码(' ascii')+ b' \ n'客户 。发送(resp)打印(" Closed")fib_server(('',25000))

您可以通过在两个单独的终端窗口中运行以下命令,通过将两个单独的客户端连接到服务器来验证此方法是否有效:

通过在线程中执行fib_handler,fib_server中的main while循环将继续,从而允许sock.accept()接收其他客户端。如果您之前没有遇到过线程,那么本教程将对该主题进行很好的介绍。

当代码停止执行并等待外部事件发生(例如建立连接或发送数据)时,这通常称为阻塞。

线程的一项重要实用程序是,当不使用CPU时,它允许阻止任务释放对CPU的控制。但是,由于全局解释器锁定,Python解释器一次只能在一个线程上运行。由于Python在任何给定时间只能运行一个线程,因此线程中任何受CPU约束的工作都必须依次运行。

因此,您必须仔细考虑使用Python在线程中执行什么样的任务。如果您尝试执行受CPU限制的任务,这些任务将彼此放慢速度。 David通过以下脚本将请求发送到我们的线程服务器进行了演示:

#perf1.py from socket import *导入时间sock =套接字(AF_INET,SOCK_STREAM)sock。在True时连接((' localhost',25000)):start = time。时间()袜子。发送(b' 30')resp =袜子。 recv(100)结束=时间。 time()打印(结束-开始)

随着并行运行的脚本数量的增加,每个脚本的执行时间将线性增加。对于此特定任务,添加线程不会使任何操作变得更快。但为什么?这是因为fibonacci任务受CPU限制,因此线程将彼此竞争资源。

Python线程通过交错CPU上不同任务的执行来工作。 4一次只运行一个线程,并且有能力轮流以小位执行直到所有线程都完成为止。 GIL和您的操作系统执行线程处理如何交错的详细信息,因此您不必担心此详细信息(下面将提到一个例外)。交织一堆受CPU约束的任务不会加快这些任务的总运行时间。但是,如果您的任务涉及大量的非CPU时间,例如等待网络连接或磁盘I / O,则线程化任务可能会大大提高速度。在python中模拟非cpu绑定任务的一种典型方法是使用内置函数time.sleep()。

为了检查对线程和性能的了解,我运行了以下实验5,并将time.sleep(2)更改为fib(20),然后再次返回:

导入日志记录导入线程导入时间导入fib def thread_function(名称):日志记录。信息("线程%s:开始&#34 ;,名称)时间。 sleep(2)##将此代码行更改为fib(20)logging。如果__name__ ==" __ main __"信息(" Thread%s:finish",name) :格式=" %(asctime)s:%(message)s"开始=时间。 time()记录。 basicConfig(format = format,level = logging .INFO,datefmt ="%H:%M:%S")线程=范围(3)中的索引的列表():logging。 info(" Main:创建和启动线程%d。&#34 ;,索引)x = threading。线程(target = thread_function,args =(index,))线程。附加(x)x。 start()用于索引,enumerate(线程)中的线程:logging。 info(" Main:加入线程%d之前,&#34 ;, index)线程。 join()日志记录。 info(" Main:线程%d完成",index)end = time。 time()print(f&total time:{end-start}')

不出所料,增加运行时间时的线程数。sleep(2)不会增加程序的整体执行时间(程序运行大约2秒钟)。另一方面,用fib(20)替换time.sleep(2)会导致该程序的运行时间随着添加更多线程而增加。这是因为fib(20)是cpu绑定的任务,因此对任务进行交错实际上并没有太大帮助。您应该尝试运行相同的内容以自己查看。

您会经常听到Python不擅长并行性,并且一次只能在一个CPU内核上运行。这很可能是指线程和GIL的上述问题。因为限于一个线程,所以这意味着基于线程的任务一次只能使用一个CPU内核(单个线程不能跨多个CPU运行)。在Python之外,线程是并行化与CPU绑定的任务的一种流行选择,因为您可以同时为每个CPU内核运行一个单独的线程。但是,对于Python,您必须寻找其他方法来完成与CPU绑定的任务的并行性。

David讨论的另一个有趣但鲜为人知的方面是以下两种任务之间的关系:

perf1.py演示了在CPU上进行计算需要更长时间的事情,例如fib(30)。

如果要让它们竞争线程中的资源,Python GIL将优先考虑第一种任务,而牺牲第二种任务。您可以选择在此处进行演示。这很有趣,因为这与典型操作系统如何通过优先处理线程(通过支持运行时间较短的任务)来区分线程的优先级相反,并且是Python GIL的实现所特有的。更重要的是,此行为产生了非常实际的后果:如果您在运行大多数任务都相当快的Web服务器上运行,则昂贵的,受CPU约束的任务会使所有任务陷入瘫痪。

诱人的是将Python线程视为使事情运行更快的工具,但这不是唯一的用例。回想一下,套接字服务器使用线程来一次允许多个连接而没有任何加速。 David演示了另一种使用线程的方式,他的代码用于测量短期任务的运行时间:

#perf2.py#来自线程导入的快速请求/秒来自套接字导入的线程*导入时间sock =套接字(AF_INET,SOCK_STREAM)袜子。 connect((' localhost',25000))n = 0 def monitor():全局n而True:time。睡眠(1)打印(n,&reqs / sec')n = 0线程(目标=监视器)。在True(开始)时开始()。发送(b' 1')resp = sock。 recv(100)n + = 1

在这种情况下,David使用单个线程,并带有对sleep(1)的阻塞调用,以确保监视器每秒仅打印一次,同时允许程序的其余部分每秒发送请求数百次。换句话说,这是对线程和阻塞的巧妙使用,它允许程序的一部分以所需的时间间隔运行,同时允许程序的其余部分照常运行。 6

线程的这些不同角度使我可以更全面地理解线程。线程不仅可以使某些程序运行更快或并行运行,而且还可以控制程序的执行方式。

一个进程中始终包含一个线程,并且每个进程包含一个或多个线程。同一进程中的线程可以共享内存,这意味着它们可以轻松地通信并写入通用数据结构。线程在以下两种情况下很有用:

在Python之外,如果要通过在独立CPU内核上运行的各个线程之间拆分任务来并行化CPU绑定任务。

一个进程可以跨越多个CPU内核,但是一个线程只能使用一个CPU内核。

一般来说,在任何给定时间,只有一个线程可以在单个内核上运行CPU绑定的任务。如果多个线程共享一个CPU内核,则操作系统将交错这些线程。此规则有一些例外。例如,单个CPU内核能够通过使用SMT /超线程之类的东西同时运行多个线程,或者使用科学计算库中流行的SIMD并行计算数据。

另一方面,进程提供隔离,这在您具有不应该共享信息的不同用户或不同程序时很有用。由于在Python中一次不能运行多个线程,因此常见的解决方法是产生多个Python进程。这将在下面详细讨论。

本书的第2章从操作系统的角度更详细地讨论了哪些进程和线程。

解决GIL和与CPU绑定的任务争用资源的问题的一种方法是使用进程而不是线程。进程在以下方面与线程不同:

Python线程共享一个内存空间,而每个进程都有一个单独的内存空间。如果您需要在任务之间共享变量或数据,这是一个重要的考虑因素。

与线程相比,进程具有大量开销,因为必须在每个进程之间复制数据和程序状态。

与Python线程不同,进程不限于在单个CPU上运行,因此您可以在不同的内核上并行执行与CPU绑定的任务。

David在他的服务器示例中使用进程池使用python进程。 7相关代码行在下面突出显示:

#server-3.py#来自套接字导入的Fib微服务*来自线程导入的fib并发导入的fib从并发的线程导入。ProcessPoolExecutor作为池池=池(4)def fib_server(地址):袜子=套接字(AF_INET,SOCK_STREAM)袜子。 setsockopt(SOL_SOCKET,SO_REUSEADDR,1)袜子。绑定(地址)袜子。在True:客户端,addr = sock时收听(5)。 accept()print(" Connection",addr)线程(target = fib_handler,args =(client,),daemon = True)。 start()def fib_handler(client):而True:req = client。 recv(100)如果不是req:break n = int(req)future =池。提交(fib,n)结果=将来。结果()resp = str(结果)。编码(' ascii')+ b' \ n'客户 。发送(resp)打印(" Closed")fib_server(('',25000))

每秒请求数低于基于线程的版本数,因为执行池中的任务需要更多的开销。

但是,如果您还运行perf1.py,则不会严重干扰第一个任务(来自perf2.py),因为这不会争用同一CPU上的资源。

上面的示例涉及一个CPU绑定任务(计算斐波那契数)。但是,如果我们模拟一个非CPU限制的任务,例如time.sleep(),则使用进程而不是线程实际上会损害整体性能。以下部分提供了一个具体的示例。

这是一个现实的示例,可让您获得有关线程和进程如何工作的更多直观信息。本教程包含Python进程和线程的更多示例。

我发现许多数据科学家(以前包括我自己)都盲目地应用了流程并完全忽略了线程。我理解为什么-进程是一种最不常用的分母,无论您的任务是否受CPU约束,您都可以实现某种并行性。但是,我发现这种方法不是很理想,并且无法充分利用计算源。一些示例来阐明线程或进程在哪里更合适:

如果要从Internet下载大量文件,请考虑使用线程。这是因为您的大部分时间都花在网络I / O上,而不是CPU上。例如,本文演示了与下载文件的过程相比,使用线程时速度提高了50%。

如果要转换或清除大型数据集,则此工作主要受CPU限制,因此使用进程很有意义。唯一不受CPU限制的部分是读写数据到磁盘。

如果您只是想将一堆文件加载到内存中或将一堆文件写入磁盘,而没有真正进行任何转换,请考虑使用线程,因为工作主要是磁盘I / O而不是CPU约束。

请记住,由于线程的工作方式不同,它们可能比进程具有更高的内存效率。因此,在不需要时使用大量进程会导致内存膨胀。

最重要的是,尝试避免尽可能地考虑进程和线程,并尽可能使用numpy等科学计算库并编写矢量化操作。始终需要了解正在使用的库或框架(尤其是数值计算库和其他数据科学库)中可用的并发工具,并在适当时考虑使用它们。

回想一下,Python一次只能在一个线程上运行,并且操作系统会自动决定何时中断每个线程以允许这些线程轮流运行。这称为抢先式多任务处理,因为操作系统(而不是您)决定线程何时进行切换。当您不关心任务如何交错时,线程就很棒,因为您不必担心它们的调度方式。

但是,Python中存在第三种并发范式,您可以控制这种切换的发生方式:异步编程。这也称为协作多任务处理,这意味着每个任务都必须在要切换时宣布。实现协作多任务处理的一种方法是创建协程。

在Python中创建协程的一种方法是使用yield语句。 David在以下代码中提供了一些直觉来说明如何实现具有收益的多任务处理:

从集合中导入deque def倒数计时(n):当n> 0:屈服n n-= 1个任务= deque()任务。扩展([倒数(10),倒数(5),倒数(20)])def run():while任务:任务=任务。 popleft()尝试:x =下一个(任务)打印(x)任务。追加(任务)StopIteration除外:打印(" Task")

运行此代码时,从输出中可以看到三个倒计时任务被交错:

这种对yield的巧妙使用使您可以暂停任务的执行,并转移到诸如线程之类的其他任务上,但您不是由操作系统来控制如何交错计算。这是理解其余讨论的关键直觉,这进一步推动了该示例。

完成异步编程的最流行的方法之一是使用内置的各种实用程序。

......