使用Asyncio在Python中并发的指南

2020-05-24 23:12:36

这是Python异步模块的快速指南,基于Python版本3.8。

因此,让我们从房间里的大象开始:Python标准库提供了许多模块来处理异步/并发/多进程代码…。

在这篇文章中,我们将重点介绍最后两个(主要是异步,并以concurrent.Futures结尾)。线程和多处理模块位于它们下面,因此我们不会详细介绍它们。

注意:THREAD模块实际上是一个非常低级的API,线程模块构建在该API之上(同样,这也是我们不会介绍它的原因)。

Asyncio模块提供高级和低级API。库和框架开发人员将被要求使用低级API,而所有其他用户则被鼓励使用高级API。

在概念上,它与更传统的线程或多进程异步代码执行方法的不同之处在于,它利用称为事件循环的东西来处理这些异步任务的调度。

我们稍后将解释“事件循环”,但首先让我们简要回顾一下并发。未来。

未来为线程和多处理模块提供了高级抽象,这就是为什么我们不会在这篇文章中详细讨论它们的原因。

一般来说,当您希望在线程池或子进程池中异步执行代码,同时还希望使用干净、现代的Python API(与更灵活但更低级的线程或多处理模块相对)时,您可以使用concurrent.Futures。

所有异步应用程序的核心元素是“事件循环”。事件循环是调度和运行异步任务的对象(它还处理网络IO操作和子进程的运行)。

注意:Futures是一种低级类型,因此如果您不是库/框架开发人员(因为您应该使用更高级的抽象API),那么您不应该太担心它。

基于生成器的协同例程函数(例如,通过使用@asyncio.coroutine修饰函数定义的那些函数)将被异步/等待语法取代,但在Python3.10-docs.python.org/3.8/library/asyncio-task.html.之前将继续受支持。

参考我的帖子“迭代器,生成器,协程”,了解更多关于基于生成器的协程及其异步历史的详细信息。

所有异步应用程序通常都(至少)具有单个“主”入口点任务,该任务将被调度为立即在事件循环上运行。这是使用asyncio.run函数完成的(请参见“运行异步程序”)。

应该将协程函数传递给asyncio.run,而在内部,asyncio将使用助手函数coroutines.iscoroutine来检查这一点(参见:源代码)。如果不是协程,则会引发错误,否则协程将传递给loop.run_Until_Complete(请参阅:源代码)。

Run_Until_Complete函数期望一个Future(有关什么是Future,请参阅下面的部分),并使用另一个助手函数futures.isFuture来检查提供的类型。如果不是Future,则使用低级API Assure_Future将协程转换为Future(参见源代码)。

注意:以下是验证函数是否为协程的各种方法的比较。结果并不一定如您所料。

在旧版本的Python中,如果您打算手动创建自己的Future并将其调度到事件循环中,那么您应该使用asyncio.sure_Future(现在被认为是一个低级API),但是在Python3.7+中,这已经被asyncio.create_task所取代。

此外,在Python3.7中,直接与事件循环交互的想法(例如,获取事件循环,使用create_task创建一个任务,然后将其传递给事件循环)已经被asyncio.run所取代,它为您抽象了所有这些(请参阅“运行异步程序”来理解这是什么意思)。

以下API可让您查看事件循环上运行的任务的状态:

Future是表示异步操作的最终结果的低级可等待对象。

打个比方:它就像一个空的邮箱。在未来的某个时候,邮递员会到达并把一封信塞进邮箱。

此API的存在是为了使基于回调的代码能够与异步/等待一起使用,而loop.run_in_ecutor是返回Future的异步低级API函数的示例(另请参阅并发函数中列出的一些API)。

import asyncioasync def foo():print(";foo!";)async def hello_world():await foo()#等待`foo()`完成打印(";Hello World!";)asyncio.run(hello_world())

run函数总是创建一个新的事件循环,并在结束时将其关闭。如果您使用的是较低级别的API,那么这将是您必须手动处理的事情(如下所示)。

在Python3.8之前,您不能在标准Python REPL中执行异步代码(它将要求您改用IPython REPL)。

要使用最新版本的Python执行此操作,您需要运行python-m asyncio。一旦REPL启动,您就不需要使用asyncio.run(),只需直接使用await语句即可。

异步REPL 3.8.0+(HEADS/3.8:5f234538ab,2019年12月1,11:05:25)[clang 10.0.1(clang-1001.0.46.4)](直接在darwinUse";等待";而不是";asyncio.run()";.Type";Help";,";版权";";Credits";或";License&34;键入";帮助";,";版权";";信用";或";许可证&。导入异步定义Foo():&>;>;>;异步定义Foo():.。等待异步。睡眠(5)……。打印(";完成";).>;>;>;等待foo()完成。

请注意,REPL在启动时自动执行import asyncio,因此我们可以使用任何asyncio函数(如.sleep函数),而不必自己手动键入import语句。

如果由于某种原因您不想使用asyncio提供的事件循环(这是一个纯Python实现),您可以将其替换为另一个事件循环,如uvloop。

uvloop是内置异步事件循环的快速插入式替代。uvloop是用Cython实现的,并在幕后使用libuv。

根据uvloop的作者的说法,它在速度上可以与围棋程序相媲美!我推荐阅读他们关于其最初版本的博客文章。

如果您想使用uvloop,那么首先使用pip install uvloop安装它,然后添加对uvloop.install()的调用,如下所示:

以下函数有助于协调函数的并发运行,并根据应用程序的需要提供不同程度的控制。

等待:等待一系列等待,直到满足给定的“条件”。

注:Gather具有用于处理错误和取消的特定选项。例如,如果RETURN_EXCEPTIONS:FALSE,则将其中一个等待对象引发的第一个异常返回给GATHER的调用方,如果设置为True,则异常将与成功的结果一起聚合在列表中。如果Gather()被取消,则所有提交的等待项(尚未完成)也将被取消。

注意:您会发现在这些API中的大多数中都可以提供循环参数,以使您能够指示您想要利用的特定事件循环)。Python似乎已经在3.8中弃用了这个参数,并将在3.10中完全删除它。

导入异步def foo(N):等待异步。睡眠(5)#等待5s,然后再继续打印(f";n:{n}!";)异步def main():Tasks=[foo(1),foo(2),foo(3)]等待asyncio.ather(*Tasks)asyncio.run(main())。

下面的示例使用FIRST_COMPLETED选项,这意味着首先完成的任务就是将返回的任务。

从随机导入的随机导入导入异步easync def foo(N):S=randrange(5)print(f";{n}将休眠:{s}秒";)等待异步。睡眠打印(f";n:{n}!";)异步def main():Tasks=[foo(1),foo(2),foo(3)]result=await异步cio.wait

1将休眠:4秒2将休眠:2秒3将休眠:1秒sn:3!({<;任务已完成coro=<;foo()已完成,在wait.py:5>;result=NONE>;},{<;任务挂起coro=<;foo()在wait.py:8>;wait_for=<;未来挂起cb=[<;WAIT_FOR=<;将来挂起的CB=[<;TaskWakeupMethWrapper对象位于0x10322b4c8>;()]>;>;})。

下面的示例演示如何利用超时来防止无休止地等待异步任务完成。

导入异步def foo(N):等待异步。睡眠(10)打印(f";n:{n}!";)异步def main():try:等待异步。wait_for(foo(1),timeout=5)除异步外。TimeoutError:打印(";timeout!";)asyncio.run(main())。

注意:asyncio.TimeoutError不提供任何额外信息,因此没有必要尝试在输出中使用它(例如,将asyncio.TimeoutError用作err:print(Err)除外)。

下面的示例演示了AS_COMPLETE将如何生成要完成的第一个任务,然后是下一个最快的任务,然后是下一个任务,直到所有任务都完成。

从随机导入的随机导入导入异步easync def Foo(N):S=randrange(10)print(f";{n}将休眠:{s}秒&34;)等待异步。睡眠返回f";{n}!";异步def main():计数器=0任务=[foo(";a";),foo(";b";),foo(";b";),Foo(";a";),foo(";b";),foo(";a";),foo(";a";),foo。最快";如果计数器==0,否则";下一个最快";计数器+=1结果=等待将来打印(f";{n}结果是:{result}";)asyncio.run(main())。

c将休眠:9秒a将休眠:1秒b将休眠:0秒最快的结果是:b!次快的结果是:a!次快的结果是:c!

下面的示例演示如何将协程转换为Task,并将其调度到事件循环上。

导入异步异步def foo():等待异步。睡眠(10)打印(";foo!";)异步定义hello_world():task=asyncio.create_task(foo())print(Task)等待异步。sleep(5)打印(";Hello World!";)等待异步cio.sleep(10)打印(任务)异步运行(hello_。

我们可以从上面的程序中看到,我们使用CREATE_TASK将协程函数转换为任务。这会自动将任务安排在下一个可用刻度的事件循环上运行。

这与较低级别的API Assure_Future(创建新任务的首选方式)形成对比。与CREATE_TASK相比,SECURE_FOREND函数具有特定的逻辑分支,这使得它对更多的输入类型有用,CREATE_TASK只支持将协程调度到事件循环上,并将其包装在任务中(请参阅:SECURE_FOREND源代码)。

让我们回顾一下代码,并与上面的输出进行比较,我们可以看到…。

我们将foo()转换为Task,然后在创建后立即打印返回的Task。因此,当我们打印任务时,我们可以看到它的状态显示为“挂起”(因为它还没有执行)。

接下来,我们将休眠5秒钟,因为这将导致foo Task现在正在运行(因为当前的Task hello_world将被视为繁忙)。

在foo Task中,我们也会休眠,但是睡眠的时间比hello_world长,因此事件循环现在将上下文切换回hello_world任务,在那里休眠将过去,我们将打印输出字符串Hello World。

最后,我们又睡了十秒钟。这只是为了给foo Task足够的时间来完成并打印它自己的输出。如果我们没有这样做,那么hello_world任务将完成并关闭事件循环。hello_world的最后一行是打印foo Task,我们将在其中看到foo Task的状态现在将显示为“完成”。

在处理任务(实际上就是Future)时,一旦Future设置了值,您就可以执行“回调”函数。

import asyncioasync def foo():等待Asyncio.sleep(10)return";foo!";def get_result(Future):print(f";get result!{future.result()}";)async def hello_world():task=asyncio.create_task(foo())task.add_do_callback(Get_Result)print(Task)等待。)等待asyncio.sleep(10)打印(任务)asyncio.run(hello_world())。

请注意,在上面的程序中,我们添加了一个新的GET_RESULT函数,该函数期望接收Future类型,因此对Future调用.result()。

还要注意,要调用此函数,我们将其传递给.adddocallback(),该函数在create_task返回的Task上调用。

<;任务挂起coro=<;foo()在gather.py:4>;cb=[get_result()at ather.py:9]>;Hello World!得到结果!foo!<;任务完成coro=<;foo()完成,定义在gather.py:4>;result=';foo!&39;>;

在处理大量并发操作时,明智的做法是利用线程(和/或子进程)的“池”来防止耗尽应用程序的主机资源。

这就是concurrent.Futures模块的用武之地。它提供了一个称为Executor的概念来帮助实现这一点,它可以独立运行,也可以集成到现有的异步事件循环中(请参阅Executor文档)。

让我们看一下在这些执行器之一中执行代码的第一种方法,即使用异步事件循环来调度执行器的运行。

为此,您需要调用事件循环的.run_in_ecutor()函数,并将Executor类型作为第一个参数传入。如果未提供,则使用默认执行器(即ThreadPoolExecutor)。

import asyncioimport concurrent.futuresdef block_io():#文件操作(如日志记录)可能会阻塞#event循环:在线程池中运行它们。使用open(";/dev/urandom";,";rb";)as f:return f.read(100)def cpu_bind():#CPU绑定操作将阻塞事件循环:#通常,最好在#进程池中运行它们。return sum(i*i for i in range(10**7))async def main():loop=asyncio.get_running_loop()#1.在默认循环的执行器中运行:result=等待loop.run_in_ecutor(NONE,BLOCKING_IO)print(";默认线程池";,result)#2.在自定义线程池中运行:with concurrent.futures.ThreadPoolExecutor()。自定义线程池";,result)#3.在自定义进程池中运行:with concurrent.futures.ProcessPoolExecutor()as pool:result=await loop.run_in_ecutor(pool,cpu_bound)print(";自定义进程池";,result)asyncio.run(main())。

在这些执行器之一中执行代码的第二种方式是将要执行的代码直接发送到池中。这意味着我们不必获取当前事件循环来将池传递给它(正如前面的示例所演示的那样),但是它附带一个警告,即父程序不会等待任务完成,除非您显式地告诉它(我接下来将演示这一点)。

考虑到这一点,让我们来看看这种替代方法。它涉及到调用执行器的Submit()方法:

导入concurrent.futuresimport timedef low_op(*args):打印(f";参数:{args}";)时间。睡眠(5)打印(";慢速操作完成";)返回123def do_thing():带有concurrent.futures.ProcessPoolExecutor()as pool:Future=pool.mit(low_op,";a";,";b&。)对于concurrent.futures.as_Completed([Future])中的fut:assert future.do()和not future.cancated()print(f";get result from low_op:{fut.result()}";)if__name__==";__main__";:print(";Program started";)do_omething()print(";Program Complete";)(__name__==";__main__";:print(";program start";)do_omething()print(";Program Complete";)。

注意:使用全局流程执行器要小心(例如,将类似process_pool=concurrent.futures.ProcessPoolExecutor()的内容放在全局作用域中,并在do_thing()函数中使用该引用),因为这意味着当将程序复制到新流程中时,您将从Python解释器收到关于泄漏的信号量的错误。这就是我在函数中创建进程池执行器的原因。

这里值得注意的一件事是,如果我们没有使用WITH语句(就像我们在上面的示例中所做的那样),这意味着一旦池完成其工作,我们就不会关闭它,因此(取决于您的程序是否继续运行)您可能会发现资源没有被清理。

要解决这个问题,您可以调用.shutdown()方法,该方法通过其父类concurrent.futures.Executor向两种类型的执行器公开。

import concurrent.futuresTHREAD_POOL=concurrent.futures.ThreadPoolExecutor(max_workers=5)def SLOW_OP(*args):PRINT(f";参数:{args}";)PRINT(";某种慢速操作";)返回123def do_thing():Future=THREAD_POOL.SUBMIT(慢速_OP,";a";,";b";,";c";)THREAD_POOL.shutdown()assert future.do()和not future.cancedprint()print(f";get result from low_op:{future.result()}";)if__name__==";__main__";:print(";Program Started";)do_thing()print(";Program Complete";)。

现在,我没有在示例中使用time.silev(),这一次是因为我们使用的是线程池和time.silear()是一个CPU限制操作,否则会阻塞线程完成。

这意味着我们的示例很可能总是导致low_op()函数在我们开始检查future.do()之前完成。所以,是的,这不是最好的例子。您可以通过合并不阻塞的真正缓慢的操作来更实际地测试这一点。

但是假设我们有一个真正缓慢的操作发生,这意味着当我们检查future.do()时任务还没有完成。

在这种情况下,我们应该注意到,对.shutdown()的调用是在我们显式等待计划的任务完成之前进行的,但是,当我们断言返回的未来是否为.Done()时,我们会发现无论试图关闭线程池,任务都被标记为“完成”。

这是因为Shutdown方法的默认行为是WAIT=True,这意味着它将在关闭执行器池之前等待所有计划的任务完成。

因此,.shutdown()方法是一个同步调用(即,它确保所有任务在关闭之前都已完成,因此我们可以保证所有结果都可用)。

如果我们改为传递.shutdown(WAIT=FALSE),则对future.do()的调用将引发异常(因为在关闭线程池时计划任务仍在运行),因此在这种情况下,我们需要确保使用另一种机制来获取计划任务的结果(例如concurrent.futures.as_Completed或concurrent.futures.Wait)。在这种情况下,我们需要确保使用另一种机制来获取计划任务的结果(如concurrent.futures.as_Completed或concurrent.futures.Wait)。

最后值得一提的是,concurrent.futures.Future对象不同于异步.Future。

Asyncio.Future旨在与Asyncio的事件循环一起使用,并且可以等待。同时,未来,未来是不可等待的。

使用事件循环的.run_in_ecutor()方法将通过将concurrent.futures.Future类型包装在对asyncio.print_Future的调用中(有关详细信息,请参见下一节),从而在两个未来类型之间提供必要的互操作性。

从Python3.5开始,我们可以使用asyncio.print_Future将concurrent.futures.Future转换为asyncio.Future。这方面的示例可以在…下面看到。

从并发导入异步导入随机。未来导入线程池执行从时间导入睡眠def return_After_5_secs(Message):睡眠(5)返回messagepool=ThreadPoolExecutor(3)异步定义doit():Identify=随机.randint(1,100)Future=pool.mit(Return_After_5_Secs,(f";result:{Identify}";))waitable=异步。

..