在阿森西奥等待

2020-05-23 13:44:14

使用asyncio的主要吸引力之一是能够发出多个协同例程并同时运行它们。你知道有多少种方法可以等待他们的结果?

他们有相当多的人!然而,不同的方式有不同的属性,它们都值得拥有自己的位置。不过,我经常得查一查才能找到合适的。

协程:正在运行的异步函数。因此,如果将函数定义为异步def():.。把它称为f(),你会得到一个协程,在这个意义上,这个术语在这篇文章中一直在使用。

可等待:任何可以与等待一起工作的东西:协程、异步.Futures、异步.Tasks、具有__Await__方法的对象。

在我的示例中,我将使用两个异步函数f和g。它们做什么并不重要,重要的是它们被定义为异步def():.。和Async def g():.。他们最终会终止。

但是g/coro_g的执行在等待之前不会开始,这使得它与第一个示例完全相同。对于这两个问题,您需要将协程封装在任务中。

Asyncio.Tasks包装协程,并在您将控制权交给它时由事件循环独立调度执行2.您可以使用asyncio.create_task()创建它们:

task_f=asyncio.create_task(f())task_g=asyncio.create_task(g())等待异步。睡眠(0.1)#<;-f()和g()已在运行!result_f=等待任务_fresult_g=等待任务g

您的任务现在并发运行,如果您决定不想等待task_f或task_g完成,可以分别使用task_f.ancel()或task_g.ancel()取消它们。请注意,您必须在等待第一个任务之前创建这两个任务-否则您将一无所获。然而,等待只需要收集结果和清理资源3。

但是像这样等他们每个人都是不太现实的。在实际的代码中,您经常甚至不知道需要争论多少个等待项。我们需要的是收集多个等待的结果。

asyncio.ather()将1个或多个等待项作为*args,必要时将它们包装在任务中,然后等待它们全部完成。然后,它返回所有等待项的结果,顺序与您传入等待项的顺序相同:

如果f()或g()引发异常,Gather()将立即引发该异常,但其他任务不受影响。但是,如果Gather()本身被取消,它正在收集的所有等待项-以及尚未完成的等待项-也都会被取消。

您还可以传递RETURN_EXCEPTIONS=True,然后像正常结果一样返回异常,并且您必须检查自己是否成功(例如,使用isInstance(Result,BaseException))。

以相同顺序返回结果列表。否则,如果其中一个等待对象引发异常,Gather()会立即将其传播给调用方。但其余任务仍在运行。

现在我们可以同时等待许多等待的东西了!但是,行为良好的分布式系统需要超时。由于gather()没有这样的选项,我们需要下一个帮助器。

waitfor()接受两个参数:一个是可等待的,另一个是超时(以秒为单位)。如果可等待对象是协程,则它将自动由任务包装。因此,下面的构造非常常见:

try:result_f,result_g=await asyncio.wait_for(asyncio.ather(f(),g()),timeout=5.0。)除异步外。超时错误:打印(";oops耗时超过5s!";)。

如果超时到期,内部任务将被取消。这对于Gather()意味着它正在收集的所有任务也被取消:在本例中是f()和g()。

请注意,仅将create_task()替换为wait_for()并将其称为一天是行不通的。create_task()是返回任务的常规函数;wait_for()是返回协程的异步函数。这意味着在您等待它之前,它不会开始执行:

#NOT CURRENT!cf=asyncio.wait_for(f(),timeout=0.5)cg=asyncio.wait_for(g(),timeout=0.5)#cf和cg都是COROUTINES,而不是任务!#此时,事件循环没有任何计划。等待cf#g()尚未执行!等待cg#wait_for在这里仅为g()创建任务。

如果您现在认为如果Gather()有一个超时选项,就不需要waitfor(),那么我们也是这么想的。

一种更优雅的超时方法是PyPI上的异步超时软件包。它为您提供了一个异步上下文管理器,允许您应用总超时,即使您需要按顺序执行协程:

有时候,你不想等到所有的等待都做完了。也许您希望在它们完成时对其进行处理,并向用户报告某种进度。

asyncio.as_Complete()获取可迭代的4个等待项,并返回一个迭代器,该迭代器按照等待项完成的顺序生成asyncio.Futures:

对于Asyncio.as_Complete([task_f,task_g],timeout=5.0)中的FUT:尝试:等待FUT打印(";一个任务关闭!";)例外:打印(";哎呀";)。

没有办法知道你在等待的是哪一个,尽管是5点。

最后,您可能需要对等待进行更多的控制,这将我们带到最后的等待原语。

wait()是API中最笨重的,但也是功能最强大的。它有点让人想起古老的select()系统调用。

与as_Complete()类似,它接受迭代量中的可等待变量。它将返回两个集合:已完成的等待项和仍在等待的项。这取决于你是否等待他们,并决定哪个结果属于什么:

Done,Pending=Await asyncio.Wait([task_f,task_g])for t in Done:try:if t is task_f:print(f";f()的结果是{await task_f}.";)异常除外,因为e:print(f";f()失败,出现{repr(E)}.";)#.,g()也是如此。

如果传入协例程并使用wait()将其封装在任务中,则此代码将不起作用,因为返回的等待对象将不同于您传入的等待对象,并且身份检查总是失败7。目前,wait()无论如何都会这样做,但它会警告您,因为它可能是一个bug。

您可以传递一个超时,在此之后Wait()将返回。与gather()不同的是,当超时到期时不会对等待对象执行任何操作。该函数只返回任务并将其排序到Done和Pending存储桶中。

您可以使用RETURN_WHEN参数告诉wait()不要等到所有等待事项都完成之后。默认情况下,它被设置为asyncio.ALL_COMPLETED,其功能与听起来完全一样。但您也可以将其设置为asyncio.FIRST_EXCEPTION,它也会等待所有等待对象完成,除非其中一个引发异常-然后它将使其立即返回。最后,asyncio.FIRST_COMPLETED返回任何等待项完成的时刻。

所有这些加在一起有点复杂,但是允许您构建强大的调度程序函数。通常使用WHILE循环,直到完成所有等待。

将不返回结果,但传递的等待项被排序为两个集,它们作为(Done,Pending)的元组返回。这取决于你的等待和派遣。

将包含任务,但会就此发出警告,因为这意味着您得到的等待项与您放入的不同。避免并且只通过任务!

在应该将任务排序到存储桶中并返回时为您提供细粒度的控制-它从不取消任何任务:传递一个return_When with:asyncio.FIRST_EXCEPTION:一旦所有任务都完成,或者一旦引发异常。

在所有可用的异步材料中,我只知道有两套内容全面、准确,而且是由具有丰富异步实践经验的人撰写的:

Łukasz langa在facebook和instagram做过大量的异步操作,现在为edgeDB(一个广泛使用异步操作的数据库)工作,他开始了一个视频系列,从无到有,一直发展到异步操作。第三集与本文特别相关,因为它展示了asyncio.wait()的实际用法。

对于更高级的异步生产建议,Spotify的林恩·鲁特(Lynn Root)就这个主题做了两次演讲:异步实践:我们做错了(2018年)和高级异步:解决现实世界生产问题(2019年),并提供了大量的书面教程。

如果您不使用所有的结果和异常,Asyncio将会抱怨。↩︎。

有漏洞,但它们很难看,而且依赖于未记录的实现细节。↩︎。

由于Done存储桶中的结果是可以保证完成的,因此您还可以使用Task.result()和Task.ception()检查它们的结果。但我发现等待他们更加得体。↩︎。

因为waitfor()返回协程,所以这也非常适用于它!除非将waitfor()调用包装到任务中,否则这两个函数不能混合。↩︎。

我的内容对你有帮助和/或有趣吗?请考虑表达你的感激之情!每一点都有助于激励我创作更多内容。你也可以在Ko-fi上给我买一杯咖啡-3欧元起,不需要账户。

如果你会说德语,请考虑从我的雇主Variomedia处获得你的域名和虚拟主机。如果没有他们的支持,我的社区产出将会相当少。