异步打开和关闭Asyncio中的文件

2020-09-07 05:07:28

Python asyncio支持异步联网、子进程和进程间通信。但是,它没有用于异步文件操作-打开、读取、写入或关闭。这可能部分是因为操作系统本身也缺乏这些功能。如果文件操作需要很长时间(可能是因为文件在网络挂载上),则整个Pythonprocess将挂起。可以解决这个问题,所以让我们构建可以异步打开和关闭文件的autility。

解决操作系统对特定异步操作支持不足的通常方法是将线程专用于等待这些操作。通过使用线程池,我们甚至可以在需要线程时避免产生线程的开销。另外,Asyncio被设计成无论如何都能很好地处理线程池。

在我们开始之前,我们需要一些方法来测试它是否正常工作。我们需要一个速度很慢的文件系统。一种想法是使用ptrace拦截相关的系统调用,尽管这并不是那么简单。在等待打开(2)的线程暂停时,其他线程需要继续运行,但是ptrace会暂停整个进程。幸运的是,无论如何都有一个更简单的解决方案:LD_PRELOAD。

将LD_PRELOAD环境变量设置为共享对象的名称将导致加载程序优先加载此共享对象,从而允许该共享对象覆盖其他库。我使用的是x86-64Linux(Debian),所以我希望覆盖glibc中的open64(2)。这是我的开场白64.c:

#DEFINE_GNU_SOURCE#include<;dlfcn.h>;#include<;string.h>;#include<;unistd.h>;int open64(const char*path,int flag,int mode){if(!Strncmp(path,";/tmp/";,5){睡眠(3);}int(*f)(const char*,int,int)=dlsym(RTLD_NEXT,";open64";);return f(path,flag,mode);}。

现在,Python在打开文件时必须通过我的C函数。如果该文件位于/tmp/下,则打开该文件将延迟3秒。因为我仍然想实际打开一个文件,所以我使用dlsym()来访问glibc中真正的open64()。我是这样建造的:

为了测试它是否可以与Python一起使用,让我们计算一下打开/tmp/x需要多长时间:

太棒了!(注意:设置环境变量之前的放置时间有点奇怪,但这是因为我使用的是Bash,而且它的时间是特殊的,因为这是该命令的shell版本。)

Python的标准open()最常用作上下文管理器,因此无论发生什么,文件都会自动关闭。

我希望我的异步打开使用Asyncwith遵循此模式。与类似,但是上下文管理器是异步获取和释放的。我将把我的版本命名为AOPEN():

因此,AOPEN()将需要返回异步上下文管理器,这是一个具有方法__aenter__和__aexit__的对象,这两个方法都返回可等待对象。通常这是因为这些方法是协程函数,但是直接返回可等待的普通函数也可以工作,这就是我将为__aenter__所做的。

Class_AsyncOpen():def__init__(self,args,kwargs):...。定义__aenter__(自我):...。异步def__aexit__(self,exc_type,exc,tb):...。

最终,我们必须调用open()。Open()的参数将提供给稍后使用的构造函数。当您看到AOPEN()的定义时,这会更有意义。

当实际打开文件时,Python将调用__aenter__。我们不能直接调用open(),因为它会阻塞,所以我们将使用athread池来等待它。我们将使用当前事件循环附带的线程池,而不是创建线程池。Run_in_Executor()方法在线程池中运行函数-其中NONE表示使用默认池-返回表示未来结果的异步未来,在本例中为打开的文件对象。

Def__aenter__(Self):def thread_open():返回open(*sel.。_args,**self。_kwargs)循环=异步。Get_event_loop()self。_未来=循环。Run_in_Executor(NONE,THREAD_OPEN)返回SELF。_未来。

由于此__aenter__不是协程函数,因此它将直接返回未来作为其等待的结果。打电话的人会等着的。

默认线程池限制为每个核心一个线程,这是最明显的选择,尽管在这里并不理想。这对于CPU受限的操作很好,但对于I/O受限的操作就不行了。在真实程序中,我们可能希望使用更大的线程池。

关闭文件可能会阻塞,因此我们也将在线程池中执行此操作。首先,从将来拉出文件对象,然后在线程池中将其关闭,直到文件实际关闭:

异步def__aexit__(self,exc_type,exc,tb):file=等待自身。_Future定义THREAD_CLOSE():文件。CLOSE()LOOP=Asyncio。GET_EVENT_LOOP()正在等待循环。Run_in_Executor(NONE,THREAD_CLOSE)。

OPEN和CLOSE在此上下文管理器中成对出现,但它可能与任意数量的OTHER_AsyncOpen上下文管理器并发。打开的文件数量会有一些上限,所以我们需要注意不要同时使用太多这样的东西,这在使用unboundedqueue时很容易发生。在没有反压力的情况下,任务打开文件的速度比关闭文件的速度略快即可。

首先定义一个“心跳”任务,它将告诉我们在我们等待打开文件时异步循环仍在嘎嘎作响。

下面是AOPEN()的一个测试函数,它异步打开一个以整数命名的/tmp/下的文件,(同步)将该整数写入文件,然后异步关闭它。

Main()函数创建心跳任务,并通过截获的文件打开例程并发打开4个文件:

Async def main():BEAT=Asyncio。CREATE_TASK(HEADBEAT())TASKS=[Asyncio.。范围(4)内i的CREATE_TASK(WRITE(I))]等待异步。聚集(*任务)节拍。取消()异步。运行(main())

正如预期的那样,所有4个任务的3秒对应的6个心跳同时花费在截取的打开()上等待。如果你想亲自试一下,这里有完整的源代码:

只有打开和关闭文件是异步的。读取和写入保持不变,仍然是完全同步和阻塞的,所以这只是一个半解决方案。一个完整的解决方案几乎没有那么简单,因为异步是异步的/等待。异步读取和写入将需要具有不同颜色的所有新API。您需要一个aprint()来补充print(),依此类推,每个返回一个等待等待的。

这是异步/等待的不幸缺点之一。我更喜欢传统的、先发制人的并发,但我们并不总是有这样的奢侈品。

对这篇文章有什么评论吗?通过发送电子邮件至~Skeeto/[email protected][邮件列表礼仪]开始我的公共收件箱中的讨论,或查看现有讨论。